Home Reference Source Repository

es6/Services/Lock.js

import thinky      from '../thinky'
import * as Time   from '../utils/Time'
import Promise     from 'bluebird'
import * as Errors from '../Errors'

const { Timer } = Time
const {type, r} = thinky

/**
 * our locking class
 * 
 * @class Lock
 */
export default class Lock {

  /**
   * gets the next closests expiry timestamp in MS
   * 
   * @property {Integer} nextExpiry
   */
  static get nextExpiry () {
    return Date.now() + Time.seconds(5)
  }

  /**
   * creates a new Lock instance
   * 
   * @method create
   * @return {Promise}
   */
  static create () {
    return new Lock(...arguments)
  }

  /**
   * constructor for a Lock class
   *
   * @param      {<type>}  key      The lock key
   * @param      {<type>}  options  options for the lock
   */
  constructor (key, options={}) {
    this.key     = key
    this.locked  = false
    this.options = options

    if (options.instance_token) {
      this.instance_token = options.instance_token
      return Promise.resolve(this)
    }

    return Lock.table.ready()
      .bind(this)
      .then(this.genId)      
  }

  genId () {
    return r.uuid().run()
      .then( id => this.instance_token = id )
      .return(this)
  }

  /**
   * base Query we inherit from
   * 
   * @method get
   * @returns {Thinky:Query}
   */
  get () {
    return r.table("Locks").get(this.key)
  }

  /**
   * returns attributes for a given Lock update
   *
   * @return     {Object}  { description_of_the_return_value }
   */
  attributes () {
    return {
        name           : this.key
      , expires_at     : Lock.nextExpiry
      , instance_token : this.instance_token
    }
  }

  /**
   * update Query for a lock update REQL operation
   *
   * @param      {Function}  updater  function to use as a REQL update
   */
  update (updater) {
    return this.get().update(updater).run()
  }

  /**
   * replace Query for replacing a lock instance in REQL
   *
   * @param      {Function}  replacer  function that implements the REQL replace
   */
  replace (replacer) {
    return this.get().replace(replacer).run()
  }

  /**
   * tries to attain a Lock on the current instance
   * 
   * @method tryLock
   * @return {Promise}
   */
  tryLock () {
    return new Promise( (resolve, reject) => {
      this.replace( (lock) => {
        return r.branch( lock.eq(null).or( lock("expires_at").lt(Date.now()) )
          , this.attributes()
          , lock
        )
      }).then( result => {
        if (result.inserted + result.replaced === 1) {
          this.locked = true
        }
        resolve([this, result])
      })
    })
  }

  /**
   * releases the lock on the current instance
   * 
   * @method release
   * @return {Promise}
   */
  release () {
    return new Promise( (resolve, reject) => {
      this.throwUnlessLocked()

      this.replace( (lock) => {
        return r.branch( lock('instance_token').eq(this.instance_token)
          , null
          , lock
        )
      }).then( result => {

        if (result.deleted !== 1) this.throwLockLost()

        this.locked = false

        resolve([this, result])
      })
    })
  }

  /**
   * polls until the instance obtains a lock
   * 
   * @method lock
   * @return {Promise}
   */
  acquire () {
    const self = this
    return new Promise( (resolve, reject)=> {
      function _poll () {
        self.tryLock().then( ([lock, result])=> {
          if (lock.locked) return resolve(lock)
          setTimeout(_poll, 100)
        })
      }
      _poll()
    })
  }

  /**
   * obtains a lock and then runs an unary function that receives a `done` callback
   *
   * @param      {Function}  fn      the function to run after a lock is obtained
   * @return     {Promise}   
   */
  synchronize (fn) {
    return this
      .acquire()
      .bind(this)
      .then( _ => {
        return new Promise( (resolve, reject)=> {
          const result = fn(resolve) 

          if (result && result.then) {
            return result
              .then(resolve)
              .catch(reject)
          } 

        })
      })
      .then(this.release)
      .return([this])
  }

  /**
   * touches an acquired lock
   * 
   * @method touch
   * @return {Promise}
   */
  touch () {
    return new Promise( (resolve, reject) => {
      this.throwUnlessLocked()

      this.update( (lock) => {
        return r.branch( lock("instance_token").default(null).eq(this.instance_token)
          , { expires_at: Lock.nextExpiry }
          , null
        )
      }).then( result => {
        this.locked = false
        
        if (result.replaced !== 1) {
          return reject( new Errors.Lock_Lost_Error() )
        }

        this.locked = true
        resolve([this, result])
      })
    })
  }

  /**
   * logical validators
   */
  throwUnlessLocked () {
    if (!this.locked) {
      throw new Errors.Lock_Not_Owned_Error()
    }
  }

  throwLockLost () {
    throw new Errors.Lock_Lost_Error()
  }

}

Lock.table = thinky.createModel('Locks', {
    key            : type.string()
  , instance_token : type.string().uuid(5)
  , expires_at     : type.date().default(Date.now)
}, { pk: 'name' })