Home Reference Source Repository

es6/Queue/Queue.js

import Promise               from 'bluebird'

/** utils **/
import * as Time             from '../utils/Time'
import Expressive            from '../utils/Expressive'

/** Services **/
import Lock                  from '../Services/Lock'
import Loop                  from '../Services/Loop'
import Registry              from '../Services/Registry'
import Repo                  from '../Services/Repo'
import Advertiser            from '../Services/Advertiser'

import Fabric                from './Fabric'
import Job                   from './Job'
import Thread                from './Thread'
import * as Errors           from '../Errors'

export default class Queue extends Expressive {
  /**
   * Creates a new instance of Queue
   * 
   * @return {Promise}  resolves to the instance of Queue
   */
  static create () {
    return new Queue(...arguments)
  }

  /**
   * returns the cut off in milliseconds ago for checking deadspace 
   *
   * @property    expiry
   */
  static get expiry () {
    return Date.now() - Time.seconds(5)
  }

  /**
   * creates a new Queue
   * 
   * @return {Queue}
   */
  constructor () {
    super()

    return Promise.props({
          lock       : Lock.create('queue')
        , fabric     : Fabric.create()
        , registry   : Registry.create()
        , loop       : Loop.create()
        , advertiser : Advertiser.create()
      })
    .then( extensions => {
        Object.assign(this, extensions)
        this.bringOnline()
      })
    .return(this)
  }
  /**
   * attaches all relevant listeners and brings the Queue online
   * 
   * @method bringOnline
   */
  bringOnline () {
    this.loop.on( Loop.Event.tick, free => {
      this.work()
        .then(free)
    })
    this.fabric.on( Fabric.Event.down,  err    => this.loop.pause()     )
    this.fabric.on( Thread.Event.done,  result => this.complete(result) )
    this.fabric.on( Thread.Event.error, err    => {
      this.error({
          id    : err.id
        , error : {
            message : err.message
          , stack   : err.stack
          , type    : err.type || "Runtime"
        }
      })
    })

    // Rotates an absolute script out of the `require` cache in a Thread
    this.registry.on( Repo.Event.invalidate, script => {
      this.fabric.invalidate(script)
    })

    this.loop.start()
    return this.is(Queue.Event.up)
  }

  /**
   * finds jobs and schedules them.  Triggered by Event.prototype.nextTick
   *
   * @return     {Promise}  { description_of_the_return_value }
   */
  work () {
    if (!this.fabric.availableThreads.length) {
      return Promise.resolve(null)      
    }
    
    return this.lock.synchronize( done => {
      Job
        .nextBatch(this.fabric.availableThreads.length, this.registry.availableIds )
        .map( job => Job.lock({ id : job.id }).return(job) )
        .bind(this)
        .map(this.schedule)
        .finally(done)
    })
  }

  /**
   * Schedules a job to run in an available thread
   *
   * @param      {Job}  job     The job to run
   */
  schedule (job) {
   
    if (!job.repo) {
      return this.error({
          id    : job.id
        , error : (new Errors.Data_Integrity_Error(`invalid job: was created without a valid Repo`)).toJSON()
      })
    }

    if (!this.registry.repo(job.repo)) {
      return this.error({
          id    : job.id
        , error : (new Errors.Data_Integrity_Error(`invalid registry member : ${job.repo.name}`)).toJSON()
      })
    }

    return this.fabric.schedule({
        id     : job.id
      , script : this.registry.repo(job.repo).script
      , data   : job.data
    })
  }

  /**
   * Inserts a job to the Job table for processing later
   *
   * @param      {Job}  job     The job to insert
   */
  enqueue (job) {
    this.debug(`enqueue : `, job)
    return Job.enqueue(job).then( job => this.emit(Queue.Event.enqueue, job) )
  }
  /**
   *  Completes a job in the Job table
   *
   * @param      {Job}  job     The job
   */
  complete (job) {
    this.debug(`result : `, job)
    return Job
      .complete({
          id     : job.id
        , result : job.result
      })
      .then( _ => this.emit(Queue.Event.complete, job) )
  }

  /**
   * Stores a job as errored in the Job table
   *
   * @param      {Job}  job     The job
   */
  error (job) {
    this.debug(`error : `, job)
    return Job.fail(job).then( _ =>  this.emit(Queue.Event.error, job) )
  }

  /**
   * kills the Queue and removes anything that may cause a memory leak
   */
  kill () {
    this.removeAllListeners()
    this.loop.kill()
    this.fabric.kill()
    this.registry.kill()
    this.advertiser.kill()
  }

}

Queue.Event = {
    up      : "queue:up"
  , down    : "queue:down"
  , flood   : "queue:flood"
  , enqueue : "queue:job:enqueue"
  , error   : "queue:job:error"
  , complete: "queue:job:complete"
}