Home Reference Source Repository

es6/Services/Registry.js

import os         from 'os'
import path       from 'path'
import Promise    from 'bluebird'

import thinky     from '../thinky'
import Repo       from './Repo'
import * as Errors from '../Errors'


import Ensure     from '../utils/Ensure'
import Table      from '../utils/Table'
import fs         from '../utils/fs'
import Events     from '../utils/Events'
import * as Time  from '../utils/Time' 

import Job        from '../Queue/Job'

const {type, r} = thinky

const GIT_REGEX = /((git|ssh|http(s)?)|(git@[\w\.]+))(:(\/\/)?)([\w\.@\:\/\-~]+)(\.git)(\/)?/

/**
 * API that handles the lifecycle for updating and maintaining local copies of a Repository
 */
export default class Registry extends Table {

  /**
   * Max attempts to retry reconnecting with RethinkDB cluster
   *
   * @property      MAX_ATTEMPTS (name)
   */
  static get MAX_ATTEMPTS () {
    return 60
  }
  
  /**
   * generates a path relative to the appDir in $HOME/.wyst
   *
   * @param      {Array}  segments  The segments
   */
  static appDir (...segments) {
    segments.unshift(".wyst")
    segments.unshift(os.homedir())
    return path.join.apply(path, segments)
  }

  /**
   * removes the app dir in $HOME/.wyst
   */
  static rmAppDir () {
    return fs.remove$( Registry.appDir() )
  }

  /**
   * ensures the app dir in $HOME/.wyst exists
   */
  static ensureAppDir () {
    return fs.ensureDir$(Registry.appDir()).catch( err => {
      throw new Error(`could not create Registry directory in : ${Registry.appDir()}\n${err.stack}`)
    })
  }

  /**
   * generates a Registry instance
   */
  constructor () {
    super()

    this.repos    = {}
    this.attempts = 0

    return this.table.ready()
      .then(Registry.ensureAppDir)
      .bind(this)
      .then(this.members).map( repo => this.register(repo) )
      .then(this.subscribe)
      .return(this)
  }

  register (repo) {
    if ( this.repos[ repo.name ] ) {
      this.debug(`: updating repo : `, repo) 
      return this.repos[ repo.name ]
        .update(repo) 
    }

    this.debug(`: new repo : `, repo)
    return Repo.create(repo).then( repo => {
      this.repos[ repo.name ] = repo
      // Bubble up cache invalidation for Worker Threads
      repo.on( Repo.Event.invalidate, script => this.emit(Repo.Event.invalidate, script) )
      return repo
    })
  }

  /**
   * returns a registered repo object from the Registry
   *
   * @param      {Object}  repo    The raw repo object
   * @return     {Repo}  the Repo instance
   */
  repo (repo) {
    return this.repos[ repo.name ]
  }

  /**
   * returns the names of all registered repos
   */
  get repoNames () {
    return Object.keys(this.repos)
  }

  /**
   * gets all of the available Repos that have been registered on the Registry
   * 
   * @return {Array} the available repos
   */
  get available () {
    return this.repoNames.filter( name => this.repos[name].available ).map( name => this.repos[name] )
  }

  /**
   * returns the IDs of all the available repos
   * 
   * @return {Array} the ids
   */
  get availableIds () {
    return this.available.map( repo => repo.id )
  }

  /**
   * subscribes to the Registry table and attempts to reconnect on some sort of Network failure
   *
   * @return {Promise} the promise that will be resolved when a subscription has been established
   */
  subscribe () {
    this.attempts++

    this.debug(`opening changefeed : { Attempt : ${this.attempts} }`)

    return this.table.changes().then( cursor => {
      // Successfully connected to RethinkDB node, so we should reset our attempt counts
      this.attempts = 0

      this.feed = cursor

      cursor.each( (err, change) => {

        if (err) {
          return this.emitError(err)
        }

        if ( change.isSaved() ) {

          return this.register(change)
            .bind(this)
            .tap( repo => this.emitUpdate(repo) )
            .catch(this.emitError)
        }

        this.repo(change).rm()
          .bind(this)
          .tap(this.emitDestroy)
          .catch(this.emitError)
      })

      // Attempt to reconnect from some sort of network irregularity
      cursor.once("error", this.subscribe.bind(this))

    }).catch( err => {
      if ( this.attempts >= Registry.MAX_ATTEMPTS ) {
        Ensure.throws(err)
      } else {
        this.debug(`error received : ${err.message} ... attempting recovery`)
        // Give the RethinkDB node some time to figure its life out
        setTimeout( this.subscribe.bind(this),  Time.seconds(10) )
      }
    })
  }

  /**
   * gets all Repos from the table
   * 
   * @return {Promise}
   */
  members () {
    return this.table.filter({}).run()
  }

  /**
   * saves a repo to the Table
   *
   * @param      {Object}  repo    The repo
   * @returns    {Promise}
   */

  add (repo) {
    return this.table.save(repo).return(this)
  }


  /**
   * kills the Registry instance and removes all listeners to prevent memory leaks
   */
  kill () {
    this.removeAllListeners()
    this.feed && this.feed.close()
  }
}

/**
 * Event Dictionary
 */
Registry.Event = {
    update  : "registry:repo:update"
  , create  : "registry:repo:create"
  , destroy : "registry:repo:destroy"
  , log     : "registry:log"
  , error   : "registry:error"
}

Events.methodize( Registry, Registry.Event )

Registry.createModel('Repos', {
    name        : type.string()
  , description : type.string()
  , concurrency : type.number().default(10)
  , repo        : type.string().regex(GIT_REGEX)
  , branch      : type.string().default('master')
  , created_at  : type.date().default(Date.now)
  , updated_at  : type.date().default(Date.now)
})

Registry.table.hasMany(Job.table, 'jobs', 'id', 'repoId')

//Registry.table.hasMany(Build, 'builds', 'id', 'repoId')

Job.table.belongsTo(Registry.table, 'repo', 'repoId', 'id')