es6/Services/Lock.js
import thinky from '../thinky'
import * as Time from '../utils/Time'
import Promise from 'bluebird'
import * as Errors from '../Errors'
const { Timer } = Time
const {type, r} = thinky
/**
* our locking class
*
* @class Lock
*/
export default class Lock {
/**
* gets the next closests expiry timestamp in MS
*
* @property {Integer} nextExpiry
*/
static get nextExpiry () {
return Date.now() + Time.seconds(5)
}
/**
* creates a new Lock instance
*
* @method create
* @return {Promise}
*/
static create () {
return new Lock(...arguments)
}
/**
* constructor for a Lock class
*
* @param {<type>} key The lock key
* @param {<type>} options options for the lock
*/
constructor (key, options={}) {
this.key = key
this.locked = false
this.options = options
if (options.instance_token) {
this.instance_token = options.instance_token
return Promise.resolve(this)
}
return Lock.table.ready()
.bind(this)
.then(this.genId)
}
genId () {
return r.uuid().run()
.then( id => this.instance_token = id )
.return(this)
}
/**
* base Query we inherit from
*
* @method get
* @returns {Thinky:Query}
*/
get () {
return r.table("Locks").get(this.key)
}
/**
* returns attributes for a given Lock update
*
* @return {Object} { description_of_the_return_value }
*/
attributes () {
return {
name : this.key
, expires_at : Lock.nextExpiry
, instance_token : this.instance_token
}
}
/**
* update Query for a lock update REQL operation
*
* @param {Function} updater function to use as a REQL update
*/
update (updater) {
return this.get().update(updater).run()
}
/**
* replace Query for replacing a lock instance in REQL
*
* @param {Function} replacer function that implements the REQL replace
*/
replace (replacer) {
return this.get().replace(replacer).run()
}
/**
* tries to attain a Lock on the current instance
*
* @method tryLock
* @return {Promise}
*/
tryLock () {
return new Promise( (resolve, reject) => {
this.replace( (lock) => {
return r.branch( lock.eq(null).or( lock("expires_at").lt(Date.now()) )
, this.attributes()
, lock
)
}).then( result => {
if (result.inserted + result.replaced === 1) {
this.locked = true
}
resolve([this, result])
})
})
}
/**
* releases the lock on the current instance
*
* @method release
* @return {Promise}
*/
release () {
return new Promise( (resolve, reject) => {
this.throwUnlessLocked()
this.replace( (lock) => {
return r.branch( lock('instance_token').eq(this.instance_token)
, null
, lock
)
}).then( result => {
if (result.deleted !== 1) this.throwLockLost()
this.locked = false
resolve([this, result])
})
})
}
/**
* polls until the instance obtains a lock
*
* @method lock
* @return {Promise}
*/
acquire () {
const self = this
return new Promise( (resolve, reject)=> {
function _poll () {
self.tryLock().then( ([lock, result])=> {
if (lock.locked) return resolve(lock)
setTimeout(_poll, 100)
})
}
_poll()
})
}
/**
* obtains a lock and then runs an unary function that receives a `done` callback
*
* @param {Function} fn the function to run after a lock is obtained
* @return {Promise}
*/
synchronize (fn) {
return this
.acquire()
.bind(this)
.then( _ => {
return new Promise( (resolve, reject)=> {
const result = fn(resolve)
if (result && result.then) {
return result
.then(resolve)
.catch(reject)
}
})
})
.then(this.release)
.return([this])
}
/**
* touches an acquired lock
*
* @method touch
* @return {Promise}
*/
touch () {
return new Promise( (resolve, reject) => {
this.throwUnlessLocked()
this.update( (lock) => {
return r.branch( lock("instance_token").default(null).eq(this.instance_token)
, { expires_at: Lock.nextExpiry }
, null
)
}).then( result => {
this.locked = false
if (result.replaced !== 1) {
return reject( new Errors.Lock_Lost_Error() )
}
this.locked = true
resolve([this, result])
})
})
}
/**
* logical validators
*/
throwUnlessLocked () {
if (!this.locked) {
throw new Errors.Lock_Not_Owned_Error()
}
}
throwLockLost () {
throw new Errors.Lock_Lost_Error()
}
}
Lock.table = thinky.createModel('Locks', {
key : type.string()
, instance_token : type.string().uuid(5)
, expires_at : type.date().default(Date.now)
}, { pk: 'name' })