lib/pubsub.js
/**
* Copyright 2015 Oursky Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const _ = require('lodash');
const _ws = require('websocket');
let WebSocket = null;
if (_ws) {
WebSocket = _ws.w3cwebsocket;
} else {
WebSocket = window.WebSocket; //eslint-disable-line
}
const url = require('url');
const ee = require('event-emitter');
import {EventHandle} from './util';
const ON_OPEN = 'onOpen';
const ON_CLOSE = 'onClose';
export default class Pubsub {
constructor(container, internal = false) {
this._container = container;
this._ws = null;
this._internal = internal;
this._queue = [];
this.ee = ee({});
this._handlers = {};
this._reconnectWait = 5000;
this._retryCount = 0;
}
onOpen(listener) {
this.ee.on(ON_OPEN, listener);
return new EventHandle(this.ee, ON_OPEN, listener);
}
onClose(listener) {
this.ee.on(ON_CLOSE, listener);
return new EventHandle(this.ee, ON_CLOSE, listener);
}
_pubsubUrl(internal = false) {
let parsedUrl = url.parse(this._container.endPoint);
let protocol = parsedUrl.protocol === 'https:' ? 'wss:' : 'ws:';
let path = internal ? '/_/pubsub' : '/pubsub';
var queryString = '?api_key=' + this._container.apiKey;
return protocol + '//' + parsedUrl.host + path + queryString;
}
_hasCredentials() {
return !!this._container.apiKey;
}
reconfigure() {
if (!this._hasCredentials()) {
this.close();
return;
}
this.connect();
}
_onopen() {
let self = this;
// Trigger registed onOpen callback
this.ee.emit(ON_OPEN, true);
// Resubscribe previously subscribed channels
_.forEach(this._handlers, function (handlers, channel) {
self._sendSubscription(channel);
});
// Flushed queued messages to the server
_.forEach(this._queue, function (data) {
self._ws.send(JSON.stringify(data));
});
this._queue = [];
}
_onmessage(data) {
_.forEach(this._handlers[data.channel], function (handler) {
handler(data.data);
});
}
on(channel, callback) {
return this.subscribe(channel, callback);
}
publish(channel, data) {
if (!channel) {
throw new Error('Missing channel to publish');
}
if (!data) {
throw new Error('Missing data to publish');
}
let publishData = {
action: 'pub',
channel,
data
};
if (this.connected) {
this._ws.send(JSON.stringify(publishData));
} else {
this._queue.push(publishData);
}
}
_sendSubscription(channel) {
if (this.connected) {
let data = {
action: 'sub',
channel: channel
};
this._ws.send(JSON.stringify(data));
}
}
_sendRemoveSubscription(channel) {
if (this.connected) {
let data = {
action: 'unsub',
channel: channel
};
this._ws.send(JSON.stringify(data));
}
}
off(channel, callback = null) {
this.unsubscribe(channel, callback);
}
subscribe(channel, handler) {
if (!channel) {
throw new Error('Missing channel to subscribe');
}
let alreadyExists = this.hasHandlers(channel);
this._register(channel, handler);
if (!alreadyExists) {
this._sendSubscription(channel);
}
return handler;
}
unsubscribe(channel, handler = null) {
if (!channel) {
throw new Error('Missing channel to unsubscribe');
}
if (!this.hasHandlers(channel)) {
return;
}
var handlersToRemove;
if (handler) {
handlersToRemove = [handler];
} else {
handlersToRemove = this._handlers[channel];
}
let self = this;
_.forEach(handlersToRemove, function (handlerToRemove) {
self._unregister(channel, handlerToRemove);
});
if (!this.hasHandlers(channel)) {
this._sendRemoveSubscription(channel);
}
}
hasHandlers(channel) {
let handlers = this._handlers[channel];
return handlers ? handlers.length > 0 : false;
}
_register(channel, handler) {
if (!this._handlers[channel]) {
this._handlers[channel] = [];
}
this._handlers[channel].push(handler);
}
_unregister(channel, handler) {
let handlers = this._handlers[channel];
handlers = _.reject(handlers, function (item) {
return item === handler;
});
if (handlers.length > 0) {
this._handlers[channel] = handlers;
} else {
delete this._handlers[channel];
}
}
_reconnect() {
let self = this;
let interval = _.min([this._reconnectWait * this._retryCount, 60000]);
_.delay(function () {
self._retryCount += 1;
self.connect();
}, interval);
}
get connected() {
return this._ws && this._ws.readyState === 1;
}
reset() {
this.close();
this._handlers = {};
}
close() {
if (this._ws) {
this._ws.close();
this._ws = null;
}
}
get WebSocket() {
return WebSocket;
}
_setWebSocket(ws) {
const emitter = this.ee;
this._ws = ws;
if (!this._ws) {
return;
}
let self = this;
this._ws.onopen = function () {
self._retryCount = 0;
self._onopen();
};
this._ws.onclose = function () {
emitter.emit(ON_CLOSE, false);
self._reconnect();
};
this._ws.onmessage = function (evt) {
var message;
try {
message = JSON.parse(evt.data);
} catch (e) {
console.log('Got malformed websocket data:', evt.data);
return;
}
self._onmessage(message);
};
}
connect() {
if (!this._hasCredentials() || this.connected) {
return;
}
let pubsubUrl = this._pubsubUrl(this._internal);
let ws = new this.WebSocket(pubsubUrl);
this._setWebSocket(ws);
}
}