es6/Queue/Fabric.js
import os from 'os'
import Promise from 'bluebird'
import cluster from 'cluster'
import debug from 'debug'
import Expressive from '../utils/Expressive'
import Thread from './Thread'
/**
* exposes a way to manage our underlying Threads
* @class Fabric
*/
export default class Fabric extends Expressive {
/**
* Creates a new Fabric with the number of Threads equal to `require('os').cpus().length * 2`
*
* @method constructor
*/
constructor () {
super()
this.debug(`generating Fabric with thread count of ${Fabric.THREAD_COUNT.length}...`)
this._dead = false
cluster.on('exit', (thread, code, signal) => {
this.debug(`Thread:${thread.process.pid} died with ${code || signal} : regen ${!this._dead}`)
if ( this._dead ) {
if (!this.threads.length) {
cluster.removeAllListeners()
this.emit(Fabric.Event.death, this)
}
return
}
Thread.spin(this)
.bind(this)
.tap(this.proxy)
.then( thread => this.emit(Fabric.Event.regen, thread) )
})
return Promise
.all(Fabric.THREAD_COUNT.map( _ => Thread.spin(this) ))
.then( _ => this.debug("fabric has begun threading...") )
.return(this)
}
/**
* Kills all threads in a Fabric
*
* @method kill
* @return {Promise}
*/
kill () {
if (this._dead) return Promise.resolve(true)
this._dead = true
return Promise.all(this.threads.map(Thread.kill))
}
/**
* Sends a job to a random available Thread for processing
*
* @param {Object} job The job
*/
schedule (job) {
this.randomAvailableThread.send({ job: job })
return this
}
/**
* invalidates a script across all running Threads
*
* @param {<type>} script The script
*/
invalidate (script) {
this.threads.forEach( thread => {
thread.send({ "cache:invalidate" : script })
})
}
/**
* getter for all of our underlying thread <Process>
*
* @method threads
* @return {Array<Process>}
*/
get threads () {
return Object.keys(cluster.workers).map( position => cluster.workers[position] )
}
/**
* returns an Object that allows us to fetch <Process> by pids
*
* @method byPid
* @return {Object} key, value pairs of pid, <Process> respectively
*/
get byPid () {
return this.threads.reduce( (acc, thread)=> {
acc[ thread.process.pid ] = thread
return acc
}, {})
}
/**
* returns all Threads that are not currently working
*
* @method availableThreads
* @return {Array<Process>}
*/
get availableThreads () {
return this.threads.filter(Thread.isAvailable)
}
/**
* returns the Process reference to a random available thread
*
* @method randomAvailableThread
* @return {Process}
*/
get randomAvailableThread () {
const possibilities = this.availableThreads
if ( possibilities.length === 0 ) {
err = new Error('Tried to reserve a thread whilst flooded')
this.is(Fabric.Event.down, err)
throw err
return
}
if (possibilities.length === 1) {
this.emit(Fabric.Event.busy)
return this.reserve(possibilities[0])
}
const randomThread = possibilities[ Math.floor( possibilities.length * Math.random() ) ]
return this.reserve(randomThread)
}
reserve (thread) {
return Thread.reserve(thread)
}
}
/**
* @property THREAD_COUNT an array that we can map our Processes we spin up into
*/
Fabric.THREAD_COUNT = Array(os.cpus().length * 2).fill(0)
/**
* @property {Object} Event dictionary for our fabric events
* @property {String} Event.up the event that will occur when Fabric is done bringing our Threads online
* @property {String} Event.down the event that will occur when Fabric goes offline
*/
Fabric.Event = {
up : 'fabric:up'
, down : 'fabric:down'
, busy : 'fabric:busy'
, ready : 'fabric:ready'
, death : 'fabric:death'
, regen : 'fabric:regen'
}