Home Reference Source Repository

es6/Queue/Job.js

import os         from 'os'
import path       from 'path'
import Promise    from 'bluebird'

import thinky     from '../thinky'
import Ensure     from '../utils/Ensure'
import Table      from '../utils/Table'
import Events     from '../utils/Events'
import * as Time  from '../utils/Time' 

const {type, r} = thinky

/**
 * API that handles the lifecycle for updating and maintaing local copies of a Job
 */
export default class Job extends Table {
  
  /**
   * atomically changes a Job's state in the table
   *
   * @param      {State}    state   The state (as defined in Job.States)
   * @param      {Job}      job     The job
   * @return     {Promise}
   */

  static changeState (state, job) {
    if (job.repo) {
      delete job.repo
    }
    const record = Object.assign(job, { 
        state      : state 
      , updated_at : Date.now()
    })
    return record.id
      ? this.table.get(job.id).update(record).run()
      : this.table.insert(job).run()
  }

  static complete (job) {
    return Job.changeState( Job.States.completed, job )
  }

  static enqueue (job) {
    return Job.changeState( Job.States.queued, job )
  }

  static fail (job) {
    return Job.changeState( Job.States.failed, job )
  }

  static lock (job) {
    return Job.changeState( Job.States.locked, job )
  }

  static nextBatch (n, ids) {
    return this.table
      .filter( function (row) {
        return row('state')
          .eq(Job.States.queued)
          .and(
            row('run_at').default(Date.now()).le(Date.now())
          .and( 
            ~ids.indexOf( row('repoId') ))
        )
      })
      .getJoin({ repo  : true  })
      .orderBy("created_at")
      .limit(n)
      .run()
  }
}

Job.States = {
    locked    : "locked"
  , queued    : "queued"
  , failed    : "failed"
  , completed : "completed"
}

const states = Object.keys(Job.States).map( key => Job.States[key] )

states
  .forEach( state => {
    Job[state] = function enumStateFetcher (n = 10) {
      if ( !~states.indexOf(state) ) {
        throw new Error(`invalidate job state ${state} : must be one of ${states.join(', ')}`)
      }

      return this.table
        .filter({  state : state })
        .getJoin({ repo  : true  })
        .limit(n)
        .run()
    }
})

Job.createModel('Jobs', {
    data      : type.any()
  , result    : type.any()
  , error     : type.object()
  , state     : type.string().enum(states).default(Job.States.queued)
  , created_at: type.date().default(Date.now)
  , run_at    : type.date().default(Date.now)
  , updated_at: type.date().default(Date.now)
})

Job.table.ensureIndex('created_at')