es6/Queue/Thread.js
import cluster from 'cluster'
import net from 'net'
import path from 'path'
import debug from 'debug'
import which from 'which'
import Promise from 'bluebird'
import Events from '../utils/Events'
import {Timer, minutes} from '../utils/Time'
import * as Errors from '../Errors'
import Fabric from './Fabric'
/**
* exposes a useful API for managing the state of processes created from `cluster.fork()`
* @class Thread
*/
export default class Thread {
/**
* Generates a new Thread instance
*
* @class Thread
* @method create
* @returns {Thread}
*/
static create () {
return new Thread()
}
/**
* the key definition that stores the business of a <Process>
*
* @class Thread
* @getter BUSY_SWITCH
* @returns {String}
*/
static get BUSY_SWITCH () {
return "_busy"
}
/**
* sets the `BUSY_SWITCH` to true for a given <Process>
*
* @method reserve
* @param {Process} thread The thread to reserve
* @returns {Process}
*/
static reserve (thread) {
thread[Thread.BUSY_SWITCH] = true
return thread
}
/**
* sets the `BUSY_SWITCH` to false for a given <Process>
*
* @method free
* @param {Process} thread The thread to free
* @return {Process}
*/
static free (thread) {
thread[Thread.BUSY_SWITCH] = false
return thread
}
/**
* forks a new Thread for the cluster.Master <Process:Main> to use
*
* @method spin
* @return {Promise}
*/
static spin (fabric) {
return new Promise( (resolve, reject) => {
// make sure we are forking the correct settings
cluster.setupMaster({ exec: __filename, args: [] })
const thread = cluster.fork()
function onUp () {
resolve( Thread.free(thread) )
}
function onDown (code) {
reject(code)
}
Thread.reserve(thread)
thread.fabric = fabric
thread.once("online" , onUp)
thread.once("exit" , onDown)
thread.on("message", (packet)=> {
const evt = Object.keys(packet)[0]
if (evt === Thread.Event.done || evt === Thread.Event.error) {
Thread.free(thread)
}
const payload = packet[evt]
thread.fabric.emit(evt, payload)
})
})
}
/**
* Used by the Main process to kill a Thread
*
* @method kill
* @param {Process} process The process to kill
*/
static kill (thread) {
return new Promise( (resolve, reject) => {
delete thread.fabric
thread.once( "exit", _ => resolve(true) )
thread.kill()
})
}
/**
* Determine if a Thread is available.
*
* @method isAvailable
* @param {Process} thread The thread
* @return {boolean} True if available, False otherwise.
*/
static isAvailable (thread) {
return !thread[Thread.BUSY_SWITCH]
}
/**
* generates a new Thread instance when `cluster.isWorker` is true
*
* @method constructor
* @return {Thread}
*/
constructor () {
if (cluster.isMaster) throw new Errors.Thread_Error()
this.debug = debug(`wyst:Thread:${process.pid}`)
process.on('message',
Events.namespace({
job : this.run
, "cache:invalidate" : this.invalidate
}, this)
)
}
/**
* Checks the integrity of a <Job> before a <Thread> attempts to process it
*
* @method verify
* @param {Object} data The data to verify
* @param {Array} keys The keys to verify exist for <data>
* @return {boolean} Where <data> is valid or not
*/
verify (data, ...keys) {
const missing = keys.filter( (key)=> !data[key] )
if (missing.length) {
this.error(data.id, new Errors.Data_Integrity_Error(`missing required key {${missing.join(', ')}}`))
return false
}
return true
}
/**
* invalidates the local require cache for a particular script
*
* @param {String} script The absolute path of the script
*/
invalidate (script) {
this.debug(`invalidate:cache :: ${script}`)
// try to invalidate the script, but don't cause a fatal error is this Thread has never encountered this script
try {
delete require.cache[ require.resolve( script ) ]
} catch (err) {
// silence is golden
}
}
/**
* Emits a namespaced <Event> back to <Process:Main>
*
* @method emit
* @param {String} namespace The namespace
* @param {Object} data The data
* @return {Thread}
*/
emit ( namespace, data ) {
const packet = {}
packet[namespace] = data
process.send(packet)
return this
}
/**
* Convenience wrapper for Thread.prototype.emit(Thread.Event.Error, <Error>)
*
* @method error
* @param {String} id the <Job:id> that should be associated with the <Error>
* @param {Error} err The <Error> that occured
* @return {Thread}
*/
error (id, err) {
const message = err.toJSON
? err.toJSON()
: { message: err.message, stack : err.stack }
this.emit(Thread.Event.error, Object.assign({id: id}, message))
return this
}
/**
* Convience wrapper for emitting successful <Job> completion
*
* @method done
* @param {<type>} id the <Job:id> that should be associated with the <Result>
* @param {<type>} result The result of <Job>
*/
done (id, result) {
this.emit(Thread.Event.done, {
id : id
, result : result
})
}
/**
* The runtime wrapper for a <Job>
*
* @method run
* @param {Object} job The <Job> to run
* @return {Thread}
*/
run (job) {
const thread = this
Promise.try( function runtime () {
if (!thread.verify(job, "script", "id", "data")) return
if (!path.isAbsolute(job.script)) {
return thread.error(
job.id
, new Errors.Thread_Error(`job.script [${job.script}] was not an absolute path`)
)
}
thread.debug( `received job ${ JSON.stringify(job) }` )
// declare halting function here so stack traces match it to Thread
function halt () {
const err = new Errors.Timeout_Error()
thread.error(job.id, err)
throw err // we have to throw this so the thread can be killed and restarted by Fabric
}
const timer = Timer.create({
max_run_time : job.max_run_time || minutes(5)
, name : `Thread:${process.pid}`
, after : halt.bind(this)
})
const ctx = {
touch: function touch () { timer.touch() }
}
const SCRIPT = require(job.script)
if ( typeof SCRIPT != 'function' ) {
throw new Errors.Thread_Error(`${job.script} was not a function`)
}
SCRIPT.call(
ctx
, job.data
, function done (result) {
timer.cancel()
// Emit the error, but don't slay the thread
result instanceof Error
? thread.error( job.id, result )
: thread.done( job.id, result )
})
}).catch( function (err) {
thread.error(job.id, err)
})
return this
}
}
/**
* @property {Object} Event dictionary for our fabric events
* @property {String} Event.done the event that will occur when a Thread is done processing a Job
* @property {String} Event.error the event that will occur when a Thread has an Error
*/
Thread.Event = {
done : "job:done"
, error : "job:error"
}
/**
* forking logic
*/
if (cluster.isWorker) {
Thread.create()
}