es6/Queue/Job.js
import os from 'os'
import path from 'path'
import Promise from 'bluebird'
import thinky from '../thinky'
import Ensure from '../utils/Ensure'
import Table from '../utils/Table'
import Events from '../utils/Events'
import * as Time from '../utils/Time'
const {type, r} = thinky
/**
* API that handles the lifecycle for updating and maintaing local copies of a Job
*/
export default class Job extends Table {
/**
* atomically changes a Job's state in the table
*
* @param {State} state The state (as defined in Job.States)
* @param {Job} job The job
* @return {Promise}
*/
static changeState (state, job) {
if (job.repo) {
delete job.repo
}
const record = Object.assign(job, {
state : state
, updated_at : Date.now()
})
return record.id
? this.table.get(job.id).update(record).run()
: this.table.insert(job).run()
}
static complete (job) {
return Job.changeState( Job.States.completed, job )
}
static enqueue (job) {
return Job.changeState( Job.States.queued, job )
}
static fail (job) {
return Job.changeState( Job.States.failed, job )
}
static lock (job) {
return Job.changeState( Job.States.locked, job )
}
static nextBatch (n, ids) {
return this.table
.filter( function (row) {
return row('state')
.eq(Job.States.queued)
.and(
row('run_at').default(Date.now()).le(Date.now())
.and(
~ids.indexOf( row('repoId') ))
)
})
.getJoin({ repo : true })
.orderBy("created_at")
.limit(n)
.run()
}
}
Job.States = {
locked : "locked"
, queued : "queued"
, failed : "failed"
, completed : "completed"
}
const states = Object.keys(Job.States).map( key => Job.States[key] )
states
.forEach( state => {
Job[state] = function enumStateFetcher (n = 10) {
if ( !~states.indexOf(state) ) {
throw new Error(`invalidate job state ${state} : must be one of ${states.join(', ')}`)
}
return this.table
.filter({ state : state })
.getJoin({ repo : true })
.limit(n)
.run()
}
})
Job.createModel('Jobs', {
data : type.any()
, result : type.any()
, error : type.object()
, state : type.string().enum(states).default(Job.States.queued)
, created_at: type.date().default(Date.now)
, run_at : type.date().default(Date.now)
, updated_at: type.date().default(Date.now)
})
Job.table.ensureIndex('created_at')