Home Reference Source Repository

es6/Queue/Thread.js

import cluster          from 'cluster'
import net              from 'net'
import path             from 'path'
import debug            from 'debug'
import which            from 'which'
import Promise          from 'bluebird'

import Events           from '../utils/Events'
import {Timer, minutes} from '../utils/Time'
import * as Errors      from '../Errors'
import Fabric           from './Fabric'

/**
 * exposes a useful API for managing the state of processes created from `cluster.fork()`
 * @class Thread
 */

export default class Thread {

  /**
   * Generates a new Thread instance
   * 
   * @class      Thread
   * @method     create
   * @returns    {Thread}
   */
  static create () {
    return new Thread()
  }

  /**
   * the key definition that stores the business of a <Process>
   *
   * @class     Thread
   * @getter    BUSY_SWITCH
   * @returns   {String}
   */
  static get BUSY_SWITCH () {
    return "_busy"
  }

  /**
   *  sets the `BUSY_SWITCH` to true for a given <Process>
   *
   * @method     reserve
   * @param      {Process}  thread  The thread to reserve
   * @returns    {Process}
   */
  static reserve (thread) {
    thread[Thread.BUSY_SWITCH] = true
    return thread
  }

  /**
   * sets the `BUSY_SWITCH` to false for a given <Process>
   *
   * @method     free
   * @param      {Process}  thread  The thread to free
   * @return     {Process}
   */
  static free (thread) {
    thread[Thread.BUSY_SWITCH] = false
    return thread
  }

  /**
   * forks a new Thread for the cluster.Master <Process:Main> to use
   *
   * @method    spin
   * @return    {Promise} 
   */
  static spin (fabric) {
    return new Promise( (resolve, reject) => {
      // make sure we are forking the correct settings
      cluster.setupMaster({ exec: __filename, args: [] })
      const thread = cluster.fork()

      function onUp () {
        resolve( Thread.free(thread) ) 
      }

      function onDown (code) {
        reject(code)
      }

      Thread.reserve(thread)
      thread.fabric = fabric
      thread.once("online" , onUp)
      thread.once("exit"   , onDown)
      thread.on("message", (packet)=> {
        const evt      = Object.keys(packet)[0]
        if (evt === Thread.Event.done || evt === Thread.Event.error) {
          Thread.free(thread)
        }
        const payload  = packet[evt]
        thread.fabric.emit(evt, payload)
      })
    })
  }
  /**
   * Used by the Main process to kill a Thread
   *
   * @method     kill
   * @param      {Process}  process  The process to kill
   */
  static kill (thread) {
    return new Promise( (resolve, reject) =>  {
      delete thread.fabric
      thread.once( "exit", _ => resolve(true) )
      thread.kill()
    })
  }

  /**
   * Determine if a Thread is available.
   *
   * @method     isAvailable
   * @param      {Process}   thread  The thread
   * @return     {boolean}  True if available, False otherwise.
   */
  static isAvailable (thread) {
    return !thread[Thread.BUSY_SWITCH]
  }

  /**
   * generates a new Thread instance when `cluster.isWorker` is true
   *
   * @method     constructor
   * @return     {Thread}
   */
  constructor () {
    if (cluster.isMaster) throw new Errors.Thread_Error()
    this.debug = debug(`wyst:Thread:${process.pid}`)

    process.on('message', 
      Events.namespace({
          job                : this.run
        , "cache:invalidate" : this.invalidate 
      }, this)
    )
  }

  /**
   * Checks the integrity of a <Job> before a <Thread> attempts to process it
   *
   * @method     verify
   * @param      {Object}   data    The data to verify
   * @param      {Array}    keys    The keys to verify exist for <data>
   * @return     {boolean}  Where <data> is valid or not
   */
  verify (data, ...keys) {
    const missing = keys.filter( (key)=> !data[key] )

    if (missing.length) {
      this.error(data.id, new Errors.Data_Integrity_Error(`missing required key {${missing.join(', ')}}`))
      return false
    }

    return true
  }

  /**
   * invalidates the local require cache for a particular script
   *
   * @param      {String}  script  The absolute path of the script
   */
  invalidate (script) {
    this.debug(`invalidate:cache :: ${script}`)
    // try to invalidate the script, but don't cause a fatal error is this Thread has never encountered this script
    try {
      delete require.cache[ require.resolve( script ) ]  
    } catch (err) {
      // silence is golden
    }
    
  }

  /**
   * Emits a namespaced <Event> back to <Process:Main>
   *
   * @method     emit
   * @param      {String}  namespace  The namespace
   * @param      {Object}  data       The data
   * @return     {Thread}
   */
  emit ( namespace, data ) {
    const packet = {}
    packet[namespace] = data
    process.send(packet)
    return this
  }

  /**
   * Convenience wrapper for Thread.prototype.emit(Thread.Event.Error, <Error>)
   *
   * @method     error
   * @param      {String}  id      the <Job:id> that should be associated with the <Error>
   * @param      {Error}   err     The <Error> that occured
   * @return     {Thread}
   */
  error (id, err) {
    const message = err.toJSON 
      ? err.toJSON()
      : { message: err.message, stack  : err.stack } 

    this.emit(Thread.Event.error, Object.assign({id: id}, message))
    return this
  }

  /**
   * Convience wrapper for emitting successful <Job> completion
   *
   * @method     done
   * @param      {<type>}  id      the <Job:id> that should be associated with the <Result>
   * @param      {<type>}  result  The result of <Job>
   */
  done (id, result) {
    this.emit(Thread.Event.done, {
        id     : id
      , result : result
    })
  }

  /**
   * The runtime wrapper for a <Job>
   *
   * @method     run
   * @param      {Object}  job     The <Job> to run
   * @return     {Thread}
   */
  run (job) {
    const thread = this

    Promise.try( function runtime () {
      if (!thread.verify(job, "script", "id", "data")) return

      if (!path.isAbsolute(job.script)) {
        return thread.error(
            job.id
          , new Errors.Thread_Error(`job.script [${job.script}] was not an absolute path`)
        )
      }

      thread.debug( `received job ${ JSON.stringify(job) }` )

      // declare halting function here so stack traces match it to Thread 
      function halt () {
        const err = new Errors.Timeout_Error()
        thread.error(job.id, err)
        throw err // we have to throw this so the thread can be killed and restarted by Fabric
      }

      const timer = Timer.create({
          max_run_time : job.max_run_time || minutes(5)
        , name         : `Thread:${process.pid}`
        , after        : halt.bind(this)
      })
      
      const ctx = {
        touch: function touch () { timer.touch() }
      }

      const SCRIPT = require(job.script)

      if ( typeof SCRIPT != 'function' ) {
        throw new Errors.Thread_Error(`${job.script} was not a function`)
      }

      SCRIPT.call(
          ctx
        , job.data
        , function done (result) { 
          timer.cancel()
          // Emit the error, but don't slay the thread
          result instanceof Error
            ? thread.error( job.id, result )
            : thread.done( job.id, result  )
        })
    
    }).catch( function (err) {
      thread.error(job.id, err)
    })

    return this
  }
}

/**
 * @property {Object} Event         dictionary for our fabric events
 * @property {String} Event.done    the event that will occur when a Thread is done processing a Job
 * @property {String} Event.error   the event that will occur when a Thread has an Error
 */
Thread.Event = {
    done  : "job:done"
  , error : "job:error"
}

/**
 * forking logic
 */
if (cluster.isWorker) {
  Thread.create()
}