src/WebChannel.js
import ServiceFactory, {WEB_RTC, WEB_SOCKET, MESSAGE_BUILDER} from 'ServiceFactory'
import Channel from 'Channel'
import SignalingGate from 'SignalingGate'
import Util from 'Util'
/**
* Maximum identifier number for {@link WebChannel#generateId} function.
* @type {number}
*/
const MAX_ID = 4294967295
/**
* Timout for ping `WebChannel` in milliseconds.
* @type {number}
*/
const PING_TIMEOUT = 5000
const ID_TIMEOUT = 10000
/**
* One of the internal message type. It's a peer message.
* @ignore
* @type {number}
*/
const USER_DATA = 1
/**
* One of the internal message type. This message should be threated by a
* specific service class.
* @type {number}
*/
const INNER_DATA = 2
const INITIALIZATION = 3
/**
* One of the internal message type. Ping message.
* @type {number}
*/
const PING = 4
/**
* One of the internal message type. Pong message, response to the ping message.
* @type {number}
*/
const PONG = 5
/**
* This class is an API starting point. It represents a group of collaborators
* also called peers. Each peer can send/receive broadcast as well as personal
* messages. Every peer in the `WebChannel` can invite another person to join
* the `WebChannel` and he also possess enough information to be able to add it
* preserving the current `WebChannel` structure (network topology).
*/
class WebChannel {
/**
* @param {WebChannelSettings} settings Web channel settings
*/
constructor (settings) {
/**
* @private
* @type {WebChannelSettings}
*/
this.settings = settings
/**
* Channels through which this peer is connected with other peers. This
* attribute depends on the `WebChannel` topology. E. g. in fully connected
* `WebChannel` you are connected to each other peer in the group, however
* in the star structure this attribute contains only the connection to
* the central peer.
* @private
* @type {external:Set}
*/
this.channels = new Set()
/**
* This event handler is used to resolve *Promise* in {@link WebChannel#join}.
* @private
*/
this.onJoin = () => {}
/**
* `WebChannel` topology.
* @private
* @type {Service}
*/
this.manager = ServiceFactory.get(this.settings.topology)
/**
* Message builder service instance.
*
* @private
* @type {MessageBuilderService}
*/
this.msgBld = ServiceFactory.get(MESSAGE_BUILDER)
/**
* An array of all peer ids except this.
* @type {number[]}
*/
this.members = []
/**
* @private
* @type {Set<number>}
*/
this.generatedIds = new Set()
/**
* @private
* @type {Date}
*/
this.pingTime = 0
/**
* @private
* @type {number}
*/
this.maxTime = 0
/**
* @private
* @type {function(delay: number)}
*/
this.pingFinish = () => {}
/**
* @private
* @type {number}
*/
this.pongNb = 0
/**
* The `WebChannel` gate.
* @private
* @type {SignalingGate}
*/
this.gate = new SignalingGate(this)
/**
* Unique `WebChannel` identifier. Its value is the same for all `WebChannel` members.
* @type {number}
*/
this.id = this.generateId()
/**
* Unique peer identifier of you in this `WebChannel`. After each `join` function call
* this id will change, because it is up to the `WebChannel` to assign it when
* you join.
* @type {number}
*/
this.myId = this.generateId()
/**
* Is the event handler called when a new peer has joined the `WebChannel`.
* @type {function(id: number)}
*/
this.onPeerJoin = () => {}
/**
* Is the event handler called when a peer hes left the `WebChannel`.
* @type {function(id: number)}
*/
this.onPeerLeave = () => {}
/**
* Is the event handler called when a message is available on the `WebChannel`.
* @type {function(id: number, msg: UserMessage, isBroadcast: boolean)}
*/
this.onMessage = () => {}
/**
* Is the event handler called when the `WebChannel` has been closed.
* @type {function(closeEvt: CloseEvent)}
*/
this.onClose = () => {}
}
/**
* Join the `WebChannel`.
*
* @param {string|WebSocket} keyOrSocket The key provided by one of the `WebChannel` members or a socket
* @param {string} [url=this.settings.signalingURL] Server URL
* @returns {Promise<undefined,string>} It resolves once you became a `WebChannel` member.
*/
join (keyOrSocket, url = this.settings.signalingURL) {
return new Promise((resolve, reject) => {
this.onJoin = resolve
if (keyOrSocket.constructor.name !== 'WebSocket') {
if (Util.isURL(url)) {
ServiceFactory.get(WEB_SOCKET).connect(url)
.then(ws => {
ws.onclose = closeEvt => reject(closeEvt.reason)
ws.onmessage = evt => {
try {
const msg = JSON.parse(evt.data)
if ('isKeyOk' in msg) {
if (msg.isKeyOk) {
if ('useThis' in msg && msg.useThis) {
this.initChannel(ws).catch(reject)
} else {
ServiceFactory.get(WEB_RTC, this.settings.iceServers).connectOverSignaling(ws, keyOrSocket)
.then(channel => {
ws.onclose = null
ws.close()
return this.initChannel(channel)
})
.catch(reject)
}
} else reject(`The key "${keyOrSocket}" was not found`)
} else reject(`Unknown message from the server ${url}: ${evt.data}`)
} catch (err) { reject(err.message) }
}
ws.send(JSON.stringify({join: keyOrSocket}))
})
.catch(reject)
} else reject(`${url} is not a valid URL`)
} else {
this.initChannel(keyOrSocket).catch(reject)
}
})
}
/**
* Invite a peer to join the `WebChannel`.
*
* @param {string|WebSocket} keyOrSocket
*
* @returns {Promise<undefined,string>}
*/
invite (keyOrSocket) {
if (typeof keyOrSocket === 'string' || keyOrSocket instanceof String) {
if (!Util.isURL(keyOrSocket)) {
return Promise.reject(`${keyOrSocket} is not a valid URL`)
}
return ServiceFactory.get(WEB_SOCKET).connect(keyOrSocket)
.then(ws => {
ws.send(JSON.stringify({wcId: this.id}))
return this.addChannel(ws)
})
} else if (keyOrSocket.constructor.name === 'WebSocket') {
return this.addChannel(keyOrSocket)
} else {
return Promise.reject(`${keyOrSocket} is not a valid URL`)
}
}
/**
* Enable other peers to join the `WebChannel` with your help as an
* intermediary peer.
* @param {Object} [options] Any available connection service options
* @returns {Promise} It is resolved once the `WebChannel` is open. The
* callback function take a parameter of type {@link SignalingGate~AccessData}.
*/
open (options) {
const defaultSettings = {
url: this.settings.signalingURL,
key: null
}
const settings = Object.assign({}, defaultSettings, options)
if (Util.isURL(settings.url)) {
return this.gate.open(settings.url, dataCh => this.addChannel(dataCh), settings.key)
} else {
return Promise.reject(`${settings.url} is not a valid URL`)
}
}
/**
* Prevent clients to join the `WebChannel` even if they possesses a key.
*/
close () {
this.gate.close()
}
/**
* If the `WebChannel` is open, the clients can join it through you, otherwise
* it is not possible.
* @returns {boolean} True if the `WebChannel` is open, false otherwise
*/
isOpen () {
return this.gate.isOpen()
}
/**
* Get the data allowing to join the `WebChannel`. It is the same data which
* {@link WebChannel#open} callback function provides.
* @returns {OpenData|null} - Data to join the `WebChannel` or null is the `WebChannel` is closed
*/
getOpenData () {
return this.gate.getOpenData()
}
/**
* Leave the `WebChannel`. No longer can receive and send messages to the group.
*/
leave () {
if (this.channels.size !== 0) {
this.members = []
this.pingTime = 0
// this.gate.close()
this.manager.leave(this)
}
}
/**
* Send the message to all `WebChannel` members.
* @param {UserMessage} data - Message
*/
send (data) {
if (this.channels.size !== 0) {
this.msgBld.handleUserMessage(data, this.myId, null, dataChunk => {
this.manager.broadcast(this, dataChunk)
})
}
}
/**
* Send the message to a particular peer in the `WebChannel`.
* @param {number} id - Id of the recipient peer
* @param {UserMessage} data - Message
*/
sendTo (id, data) {
if (this.channels.size !== 0) {
this.msgBld.handleUserMessage(data, this.myId, id, dataChunk => {
this.manager.sendTo(id, this, dataChunk)
}, false)
}
}
/**
* Get the ping of the `WebChannel`. It is an amount in milliseconds which
* corresponds to the longest ping to each `WebChannel` member.
* @returns {Promise}
*/
ping () {
if (this.members.length !== 0 && this.pingTime === 0) {
return new Promise((resolve, reject) => {
if (this.pingTime === 0) {
this.pingTime = Date.now()
this.maxTime = 0
this.pongNb = 0
this.pingFinish = delay => resolve(delay)
this.manager.broadcast(this, this.msgBld.msg(PING, this.myId))
setTimeout(() => resolve(PING_TIMEOUT), PING_TIMEOUT)
}
})
} else return Promise.resolve(0)
}
/**
* @private
* @param {WebSocket|RTCDataChannel} channel
*
* @returns {Promise<undefined,string>}
*/
addChannel (channel) {
return this.initChannel(channel)
.then(channel => {
const msg = this.msgBld.msg(INITIALIZATION, this.myId, channel.peerId, {
manager: this.manager.id,
wcId: this.id
})
channel.send(msg)
return this.manager.add(channel)
})
}
/**
* @private
* @param {number} peerId
*/
onPeerJoin$ (peerId) {
this.members[this.members.length] = peerId
this.onPeerJoin(peerId)
}
/**
* @private
* @param {number} peerId
*/
onPeerLeave$ (peerId) {
this.members.splice(this.members.indexOf(peerId), 1)
this.onPeerLeave(peerId)
}
/**
* Send a message to a service of the same peer, joining peer or any peer in
* the `WebChannel`.
* @private
* @param {number} recepient - Identifier of recepient peer id
* @param {string} serviceId - Service id
* @param {Object} data - Message to send
* @param {boolean} [forward=false] - SHould the message be forwarded?
*/
sendInnerTo (recepient, serviceId, data, forward = false) {
if (forward) {
this.manager.sendInnerTo(recepient, this, data)
} else {
if (Number.isInteger(recepient)) {
const msg = this.msgBld.msg(INNER_DATA, this.myId, recepient, {serviceId, data})
this.manager.sendInnerTo(recepient, this, msg)
} else {
recepient.send(this.msgBld.msg(INNER_DATA, this.myId, recepient.peerId, {serviceId, data}))
}
}
}
/**
* @private
* @param {number} serviceId
* @param {Object} data
*/
sendInner (serviceId, data) {
this.manager.sendInner(this, this.msgBld.msg(INNER_DATA, this.myId, null, {serviceId, data}))
}
/**
* Message event handler (`WebChannel` mediator). All messages arrive here first.
* @private
* @param {Channel} channel - The channel the message came from
* @param {external:ArrayBuffer} data - Message
*/
onChannelMessage (channel, data) {
const header = this.msgBld.readHeader(data)
if (header.code === USER_DATA) {
this.msgBld.readUserMessage(this, header.senderId, data, (fullData, isBroadcast) => {
this.onMessage(header.senderId, fullData, isBroadcast)
})
} else {
const msg = this.msgBld.readInternalMessage(data)
switch (header.code) {
case INITIALIZATION: {
this.settings.topology = msg.manager
this.manager = ServiceFactory.get(this.settings.topology)
this.myId = header.recepientId
this.id = msg.wcId
channel.peerId = header.senderId
break
}
case INNER_DATA: {
if (header.recepientId === 0 || this.myId === header.recepientId) {
this.getService(msg.serviceId).onMessage(
channel,
header.senderId,
header.recepientId,
msg.data
)
} else this.sendInnerTo(header.recepientId, null, data, true)
break
}
case PING:
this.manager.sendTo(header.senderId, this, this.msgBld.msg(PONG, this.myId))
break
case PONG: {
const now = Date.now()
this.pongNb++
this.maxTime = Math.max(this.maxTime, now - this.pingTime)
if (this.pongNb === this.members.length) {
this.pingFinish(this.maxTime)
this.pingTime = 0
}
break
}
default:
throw new Error(`Unknown message type code: "${header.code}"`)
}
}
}
/**
* Initialize channel. The *Channel* object is a facade for *WebSocket* and
* *RTCDataChannel*.
* @private
* @param {external:WebSocket|external:RTCDataChannel} ch - Channel to
* initialize
* @param {number} [id] - Assign an id to this channel. It would be generated
* if not provided
* @returns {Promise} - Resolved once the channel is initialized on both sides
*/
initChannel (ch, id = -1) {
if (id === -1) id = this.generateId()
const channel = new Channel(ch)
channel.peerId = id
channel.webChannel = this
channel.onMessage = data => this.onChannelMessage(channel, data)
channel.onClose = closeEvt => this.manager.onChannelClose(closeEvt, channel)
channel.onError = evt => this.manager.onChannelError(evt, channel)
return Promise.resolve(channel)
}
/**
* @private
* @param {MESSAGE_BUILDER|WEB_RTC|WEB_SOCKET|FULLY_CONNECTED|CHANNEL_BUILDER} id
*
* @returns {Service}
*/
getService (id) {
if (id === WEB_RTC) {
return ServiceFactory.get(WEB_RTC, this.settings.iceServers)
}
return ServiceFactory.get(id)
}
/**
* Generate random id for a `WebChannel` or a new peer.
* @private
* @returns {number} - Generated id
*/
generateId () {
do {
const id = Math.ceil(Math.random() * MAX_ID)
if (id === this.myId) continue
if (this.members.includes(id)) continue
if (this.generatedIds.has(id)) continue
this.generatedIds.add(id)
setTimeout(() => this.generatedIds.delete(id), ID_TIMEOUT)
return id
} while (true)
}
}
export default WebChannel
export {USER_DATA}