es6/Services/Registry.js
import os from 'os'
import path from 'path'
import Promise from 'bluebird'
import thinky from '../thinky'
import Repo from './Repo'
import * as Errors from '../Errors'
import Ensure from '../utils/Ensure'
import Table from '../utils/Table'
import fs from '../utils/fs'
import Events from '../utils/Events'
import * as Time from '../utils/Time'
import Job from '../Queue/Job'
const {type, r} = thinky
const GIT_REGEX = /((git|ssh|http(s)?)|(git@[\w\.]+))(:(\/\/)?)([\w\.@\:\/\-~]+)(\.git)(\/)?/
/**
* API that handles the lifecycle for updating and maintaining local copies of a Repository
*/
export default class Registry extends Table {
/**
* Max attempts to retry reconnecting with RethinkDB cluster
*
* @property MAX_ATTEMPTS (name)
*/
static get MAX_ATTEMPTS () {
return 60
}
/**
* generates a path relative to the appDir in $HOME/.wyst
*
* @param {Array} segments The segments
*/
static appDir (...segments) {
segments.unshift(".wyst")
segments.unshift(os.homedir())
return path.join.apply(path, segments)
}
/**
* removes the app dir in $HOME/.wyst
*/
static rmAppDir () {
return fs.remove$( Registry.appDir() )
}
/**
* ensures the app dir in $HOME/.wyst exists
*/
static ensureAppDir () {
return fs.ensureDir$(Registry.appDir()).catch( err => {
throw new Error(`could not create Registry directory in : ${Registry.appDir()}\n${err.stack}`)
})
}
/**
* generates a Registry instance
*/
constructor () {
super()
this.repos = {}
this.attempts = 0
return this.table.ready()
.then(Registry.ensureAppDir)
.bind(this)
.then(this.members).map( repo => this.register(repo) )
.then(this.subscribe)
.return(this)
}
register (repo) {
if ( this.repos[ repo.name ] ) {
this.debug(`: updating repo : `, repo)
return this.repos[ repo.name ]
.update(repo)
}
this.debug(`: new repo : `, repo)
return Repo.create(repo).then( repo => {
this.repos[ repo.name ] = repo
// Bubble up cache invalidation for Worker Threads
repo.on( Repo.Event.invalidate, script => this.emit(Repo.Event.invalidate, script) )
return repo
})
}
/**
* returns a registered repo object from the Registry
*
* @param {Object} repo The raw repo object
* @return {Repo} the Repo instance
*/
repo (repo) {
return this.repos[ repo.name ]
}
/**
* returns the names of all registered repos
*/
get repoNames () {
return Object.keys(this.repos)
}
/**
* gets all of the available Repos that have been registered on the Registry
*
* @return {Array} the available repos
*/
get available () {
return this.repoNames.filter( name => this.repos[name].available ).map( name => this.repos[name] )
}
/**
* returns the IDs of all the available repos
*
* @return {Array} the ids
*/
get availableIds () {
return this.available.map( repo => repo.id )
}
/**
* subscribes to the Registry table and attempts to reconnect on some sort of Network failure
*
* @return {Promise} the promise that will be resolved when a subscription has been established
*/
subscribe () {
this.attempts++
this.debug(`opening changefeed : { Attempt : ${this.attempts} }`)
return this.table.changes().then( cursor => {
// Successfully connected to RethinkDB node, so we should reset our attempt counts
this.attempts = 0
this.feed = cursor
cursor.each( (err, change) => {
if (err) {
return this.emitError(err)
}
if ( change.isSaved() ) {
return this.register(change)
.bind(this)
.tap( repo => this.emitUpdate(repo) )
.catch(this.emitError)
}
this.repo(change).rm()
.bind(this)
.tap(this.emitDestroy)
.catch(this.emitError)
})
// Attempt to reconnect from some sort of network irregularity
cursor.once("error", this.subscribe.bind(this))
}).catch( err => {
if ( this.attempts >= Registry.MAX_ATTEMPTS ) {
Ensure.throws(err)
} else {
this.debug(`error received : ${err.message} ... attempting recovery`)
// Give the RethinkDB node some time to figure its life out
setTimeout( this.subscribe.bind(this), Time.seconds(10) )
}
})
}
/**
* gets all Repos from the table
*
* @return {Promise}
*/
members () {
return this.table.filter({}).run()
}
/**
* saves a repo to the Table
*
* @param {Object} repo The repo
* @returns {Promise}
*/
add (repo) {
return this.table.save(repo).return(this)
}
/**
* kills the Registry instance and removes all listeners to prevent memory leaks
*/
kill () {
this.removeAllListeners()
this.feed && this.feed.close()
}
}
/**
* Event Dictionary
*/
Registry.Event = {
update : "registry:repo:update"
, create : "registry:repo:create"
, destroy : "registry:repo:destroy"
, log : "registry:log"
, error : "registry:error"
}
Events.methodize( Registry, Registry.Event )
Registry.createModel('Repos', {
name : type.string()
, description : type.string()
, concurrency : type.number().default(10)
, repo : type.string().regex(GIT_REGEX)
, branch : type.string().default('master')
, created_at : type.date().default(Date.now)
, updated_at : type.date().default(Date.now)
})
Registry.table.hasMany(Job.table, 'jobs', 'id', 'repoId')
//Registry.table.hasMany(Build, 'builds', 'id', 'repoId')
Job.table.belongsTo(Registry.table, 'repo', 'repoId', 'id')