es6/Queue/Queue.js
import Promise from 'bluebird'
/** utils **/
import * as Time from '../utils/Time'
import Expressive from '../utils/Expressive'
/** Services **/
import Lock from '../Services/Lock'
import Loop from '../Services/Loop'
import Registry from '../Services/Registry'
import Repo from '../Services/Repo'
import Advertiser from '../Services/Advertiser'
import Fabric from './Fabric'
import Job from './Job'
import Thread from './Thread'
import * as Errors from '../Errors'
export default class Queue extends Expressive {
/**
* Creates a new instance of Queue
*
* @return {Promise} resolves to the instance of Queue
*/
static create () {
return new Queue(...arguments)
}
/**
* returns the cut off in milliseconds ago for checking deadspace
*
* @property expiry
*/
static get expiry () {
return Date.now() - Time.seconds(5)
}
/**
* creates a new Queue
*
* @return {Queue}
*/
constructor () {
super()
return Promise.props({
lock : Lock.create('queue')
, fabric : Fabric.create()
, registry : Registry.create()
, loop : Loop.create()
, advertiser : Advertiser.create()
})
.then( extensions => {
Object.assign(this, extensions)
this.bringOnline()
})
.return(this)
}
/**
* attaches all relevant listeners and brings the Queue online
*
* @method bringOnline
*/
bringOnline () {
this.loop.on( Loop.Event.tick, free => {
this.work()
.then(free)
})
this.fabric.on( Fabric.Event.down, err => this.loop.pause() )
this.fabric.on( Thread.Event.done, result => this.complete(result) )
this.fabric.on( Thread.Event.error, err => {
this.error({
id : err.id
, error : {
message : err.message
, stack : err.stack
, type : err.type || "Runtime"
}
})
})
// Rotates an absolute script out of the `require` cache in a Thread
this.registry.on( Repo.Event.invalidate, script => {
this.fabric.invalidate(script)
})
this.loop.start()
return this.is(Queue.Event.up)
}
/**
* finds jobs and schedules them. Triggered by Event.prototype.nextTick
*
* @return {Promise} { description_of_the_return_value }
*/
work () {
if (!this.fabric.availableThreads.length) {
return Promise.resolve(null)
}
return this.lock.synchronize( done => {
Job
.nextBatch(this.fabric.availableThreads.length, this.registry.availableIds )
.map( job => Job.lock({ id : job.id }).return(job) )
.bind(this)
.map(this.schedule)
.finally(done)
})
}
/**
* Schedules a job to run in an available thread
*
* @param {Job} job The job to run
*/
schedule (job) {
if (!job.repo) {
return this.error({
id : job.id
, error : (new Errors.Data_Integrity_Error(`invalid job: was created without a valid Repo`)).toJSON()
})
}
if (!this.registry.repo(job.repo)) {
return this.error({
id : job.id
, error : (new Errors.Data_Integrity_Error(`invalid registry member : ${job.repo.name}`)).toJSON()
})
}
return this.fabric.schedule({
id : job.id
, script : this.registry.repo(job.repo).script
, data : job.data
})
}
/**
* Inserts a job to the Job table for processing later
*
* @param {Job} job The job to insert
*/
enqueue (job) {
this.debug(`enqueue : `, job)
return Job.enqueue(job).then( job => this.emit(Queue.Event.enqueue, job) )
}
/**
* Completes a job in the Job table
*
* @param {Job} job The job
*/
complete (job) {
this.debug(`result : `, job)
return Job
.complete({
id : job.id
, result : job.result
})
.then( _ => this.emit(Queue.Event.complete, job) )
}
/**
* Stores a job as errored in the Job table
*
* @param {Job} job The job
*/
error (job) {
this.debug(`error : `, job)
return Job.fail(job).then( _ => this.emit(Queue.Event.error, job) )
}
/**
* kills the Queue and removes anything that may cause a memory leak
*/
kill () {
this.removeAllListeners()
this.loop.kill()
this.fabric.kill()
this.registry.kill()
this.advertiser.kill()
}
}
Queue.Event = {
up : "queue:up"
, down : "queue:down"
, flood : "queue:flood"
, enqueue : "queue:job:enqueue"
, error : "queue:job:error"
, complete: "queue:job:complete"
}