Home Reference Source Repository

es6/Queue/Fabric.js

import os             from 'os'
import Promise        from 'bluebird'
import cluster        from 'cluster'
import debug          from 'debug'

import Expressive     from '../utils/Expressive'
import Thread         from './Thread'
/**
 * exposes a way to manage our underlying Threads
 * @class Fabric
 */
export default class Fabric extends Expressive {
  /**
   * Creates a new Fabric with the number of Threads equal to `require('os').cpus().length * 2`
   *
   * @method     constructor
   */

  constructor () {
    super()

    this.debug(`generating Fabric with thread count of ${Fabric.THREAD_COUNT.length}...`)

    this._dead = false

    cluster.on('exit', (thread, code, signal) => {
      this.debug(`Thread:${thread.process.pid} died with ${code || signal} : regen ${!this._dead}`)

      if ( this._dead ) {
        
        if (!this.threads.length) {
          cluster.removeAllListeners()
          this.emit(Fabric.Event.death, this)
        }
        return
      }
      
      Thread.spin(this)
        .bind(this)
        .tap(this.proxy)
        .then( thread => this.emit(Fabric.Event.regen, thread) )
    })

    return Promise
      .all(Fabric.THREAD_COUNT.map( _ => Thread.spin(this) ))
      .then( _ => this.debug("fabric has begun threading...") )
      .return(this)
  }
  /**
   * Kills all threads in a Fabric
   *
   * @method     kill
   * @return     {Promise}
   */

  kill () {
    if (this._dead) return Promise.resolve(true)

    this._dead = true
    
    return Promise.all(this.threads.map(Thread.kill))
    
  }

  /**
   * Sends a job to a random available Thread for processing
   *
   * @param      {Object}  job     The job
   */

  schedule (job) {
    this.randomAvailableThread.send({ job: job })
    return this
  }

  /**
   * invalidates a script across all running Threads
   *
   * @param      {<type>}  script  The script
   */

  invalidate (script) {
    this.threads.forEach( thread => {
      thread.send({ "cache:invalidate" : script })
    })
  }

  /**
   * getter for all of our underlying thread <Process>
   *
   * @method     threads
   * @return     {Array<Process>}
   */
  get threads () {
    return Object.keys(cluster.workers).map( position => cluster.workers[position] )
  }

  /**
   * returns an Object that allows us to fetch <Process> by pids
   *
   * @method     byPid
   * @return     {Object}  key, value pairs of pid, <Process> respectively
   */
  get byPid () {
    return this.threads.reduce( (acc, thread)=> {
      acc[ thread.process.pid ] = thread
      return acc
    },  {})
  }

  /**
   * returns all Threads that are not currently working
   *
   * @method     availableThreads
   * @return     {Array<Process>}
   */
  get availableThreads () {
    return this.threads.filter(Thread.isAvailable)
  }

  /**
   * returns the Process reference to a random available thread
   *
   * @method     randomAvailableThread
   * @return     {Process}
   */
  get randomAvailableThread () {
    const possibilities = this.availableThreads

    if ( possibilities.length === 0 ) {
      err = new Error('Tried to reserve a thread whilst flooded')
      this.is(Fabric.Event.down, err)
      throw err
      return
    }

    if (possibilities.length === 1) {
      this.emit(Fabric.Event.busy)
      return this.reserve(possibilities[0])
    }

    const randomThread = possibilities[ Math.floor( possibilities.length * Math.random() ) ]

    return this.reserve(randomThread)
  }

  reserve (thread) {
    return Thread.reserve(thread)
  }

}

/**
 * @property THREAD_COUNT   an array that we can map our Processes we spin up into
 */
Fabric.THREAD_COUNT = Array(os.cpus().length * 2).fill(0)

/**
 * @property {Object} Event         dictionary for our fabric events
 * @property {String} Event.up      the event that will occur when Fabric is done bringing our Threads online
 * @property {String} Event.down    the event that will occur when Fabric goes offline
 */
Fabric.Event = {
    up    : 'fabric:up'
  , down  : 'fabric:down'
  , busy  : 'fabric:busy'
  , ready : 'fabric:ready'
  , death : 'fabric:death'
  , regen : 'fabric:regen'
}