src/main/generic/network/webrtc/WebRtcConnector.js
class WebRtcConnector extends Observable {
/**
* @constructor
* @param {NetworkConfig} networkConfig
*/
constructor(networkConfig) {
super();
/** @type {NetworkConfig} */
this._networkConfig = networkConfig;
/** @type {HashMap.<PeerId,PeerConnector>} */
this._connectors = new HashMap();
/** @type {Timers} */
this._timers = new Timers();
}
/**
* @param {PeerAddress} peerAddress
* @param {PeerChannel} signalChannel
* @returns {boolean}
*/
connect(peerAddress, signalChannel) {
if (peerAddress.protocol !== Protocol.RTC) throw new Error('Malformed peerAddress');
const peerId = peerAddress.peerId;
if (this._connectors.contains(peerId)) {
return false;
}
// Check connector limit.
if (this._connectors.length >= WebRtcConnector.CONNECTORS_MAX) {
return false;
}
const connector = new OutboundPeerConnector(this._networkConfig, peerAddress, signalChannel);
connector.on('connection', conn => this._onConnection(conn, peerId));
this._connectors.put(peerId, connector);
this._timers.setTimeout(`connect_${peerId}`, () => {
this._connectors.remove(peerId);
this._timers.clearTimeout(`connect_${peerId}`);
connector.close();
this.fire('error', peerAddress, 'timeout');
}, WebRtcConnector.CONNECT_TIMEOUT);
return true;
}
isValidSignal(msg) {
return this._connectors.contains(msg.senderId) && this._connectors.get(msg.senderId).nonce === msg.nonce;
}
/**
* @param {PeerChannel} channel
* @param {SignalMessage} msg
*/
onSignal(channel, msg) {
// Check if we received an unroutable/ttl exceeded response from one of the signaling peers.
if (msg.isUnroutable() || msg.isTtlExceeded()) {
// Clear the timeout early if we initiated the connection.
if (this.isValidSignal(msg) && this._connectors.get(msg.senderId) instanceof OutboundPeerConnector) {
const connector = this._connectors.get(msg.senderId);
const peerAddress = connector.peerAddress;
this._connectors.remove(msg.senderId);
this._timers.clearTimeout(`connect_${msg.senderId}`);
connector.close();
// XXX Reason needs to be adapted when more flags are added.
const reason = msg.isUnroutable() ? 'unroutable' : 'ttl exceeded';
this.fire('error', peerAddress, reason);
}
return;
}
let payload;
try {
payload = JSON.parse(BufferUtils.toAscii(msg.payload));
} catch (e) {
Log.e(WebRtcConnector, `Failed to parse signal payload from ${msg.senderId}`);
return;
}
if (!payload) {
Log.d(WebRtcConnector, `Discarding signal from ${msg.senderId} - empty payload`);
return;
}
if (payload.type === 'offer') {
// Check if we have received an offer on an ongoing connection.
// This can happen if two peers initiate connections to one another
// simultaneously. Resolve this by having the peer with the higher
// peerId discard the offer while the one with the lower peerId
// accepts it.
/** @type {PeerConnector} */
let connector = this._connectors.get(msg.senderId);
if (connector) {
if (msg.recipientId.compare(msg.senderId) > 0) {
// Discard the offer.
Log.d(WebRtcConnector, `Simultaneous connection, discarding offer from ${msg.senderId} (<${msg.recipientId})`);
return;
} else if (connector instanceof InboundPeerConnector) {
// We have already seen an offer from this peer. Forward it to the existing connector.
Log.w(WebRtcConnector, `Duplicate offer received from ${msg.senderId}`);
connector.onSignal(payload);
return;
} else {
// We are going to accept the offer. Clear the connect timeout
// from our previous outbound connection attempt to this peer.
Log.d(WebRtcConnector, `Simultaneous connection, accepting offer from ${msg.senderId} (>${msg.recipientId})`);
this._timers.clearTimeout(`connect_${msg.senderId}`);
// Abort the outbound connection attempt.
connector.close();
// Let listeners know that the connection attempt was aborted.
this.fire('error', connector.peerAddress, 'simultaneous inbound connection');
}
}
// Check connector limit.
if (this._connectors.length >= WebRtcConnector.CONNECTORS_MAX) {
Log.d(WebRtcConnector, `Rejecting offer from ${msg.senderId} - max connectors exceeded`);
return;
}
// Check inbound connector limit.
if (this._connectors.length >= WebRtcConnector.INBOUND_CONNECTORS_MAX) {
let numInboundConnectors = 0;
for (const c of this._connectors.valueIterator()) {
if (c instanceof InboundPeerConnector) {
numInboundConnectors++;
}
}
if (numInboundConnectors >= WebRtcConnector.INBOUND_CONNECTORS_MAX) {
Log.d(WebRtcConnector, `Rejecting offer from ${msg.senderId} - max inbound connectors exceeded`);
return;
}
}
// Accept the offer.
connector = new InboundPeerConnector(this._networkConfig, channel, msg.senderId, payload);
connector.on('connection', conn => this._onConnection(conn, msg.senderId));
this._connectors.put(msg.senderId, connector);
this._timers.setTimeout(`connect_${msg.senderId}`, () => {
this._timers.clearTimeout(`connect_${msg.senderId}`);
this._connectors.remove(msg.senderId);
connector.close();
}, WebRtcConnector.CONNECT_TIMEOUT);
}
// If we are already establishing a connection with the sender of this
// signal, forward it to the corresponding connector.
else if (this._connectors.contains(msg.senderId)) {
this._connectors.get(msg.senderId).onSignal(payload);
}
// If none of the above conditions is met, the signal is invalid and we discard it.
}
_onConnection(conn, peerId) {
// Clear the connect timeout.
this._timers.clearTimeout(`connect_${peerId}`);
// Clean up when this connection closes.
conn.on('close', () => this._onClose(peerId));
// Tell listeners about the new connection.
this.fire('connection', conn);
}
_onClose(peerId) {
this._connectors.remove(peerId);
this._timers.clearTimeout(`connect_${peerId}`);
}
}
WebRtcConnector.CONNECT_TIMEOUT = 8000; // ms
WebRtcConnector.CONNECTORS_MAX = 6;
WebRtcConnector.INBOUND_CONNECTORS_MAX = 3;
Class.register(WebRtcConnector);
class PeerConnector extends Observable {
/**
* @param {NetworkConfig} networkConfig
* @param {PeerChannel} signalChannel
* @param {PeerId} peerId
* @param {PeerAddress} peerAddress
*/
constructor(networkConfig, signalChannel, peerId, peerAddress) {
super();
/** @type {NetworkConfig} */
this._networkConfig = networkConfig;
/** @type {PeerChannel} */
this._signalChannel = signalChannel;
/** @type {PeerId} */
this._peerId = peerId;
/** @type {PeerAddress} */
this._peerAddress = peerAddress; // null for inbound connections
/** @type {number} */
this._nonce = NumberUtils.randomUint32();
/** @type {RTCPeerConnection} */
this._rtcConnection = WebRtcFactory.newPeerConnection(this._networkConfig.rtcConfig);
this._rtcConnection.onicecandidate = e => this._onIceCandidate(e);
this._rtcConnection.onconnectionstatechange = e => this._onConnectionStateChange(e);
this._rtcConnection.onicegatheringstatechange = e => this._onIceGatheringStateChange(e);
this._lastIceCandidate = null;
this._iceCandidateQueue = [];
this._localIceCandidates = [];
this._timers = new Timers();
}
/**
* @param {*} signal
*/
onSignal(signal) {
if (!this._rtcConnection) return;
if (signal.sdp) {
this._rtcConnection.setRemoteDescription(WebRtcFactory.newSessionDescription(signal))
.then(() => {
if (signal.type === 'offer') {
this._rtcConnection.createAnswer()
.then(description => this._onDescription(description))
.catch(Log.e.tag(PeerConnector));
}
this._handleCandidateQueue().catch(Log.w.tag(PeerConnector));
})
.catch(Log.e.tag(PeerConnector));
} else if (signal.candidate) {
// Parse other candidates if present and keep original order.
if (signal.otherCandidates) {
for (const iceCandidate of signal.otherCandidates) {
this._addIceCandidate(iceCandidate).catch(Log.w.tag(PeerConnector));
}
}
this._addIceCandidate(signal).catch(Log.w.tag(PeerConnector));
}
}
_onConnectionStateChange(e) {
if (!this._rtcConnection) return;
switch (this._rtcConnection.connectionState) {
case 'failed':
case 'disconnected':
case 'closed':
this.close();
}
}
close() {
if (!this._rtcConnection) return;
this._rtcConnection.onicecandidate = null;
this._rtcConnection.onconnectionstatechange = null;
this._rtcConnection.onicegatheringstatechange = null;
this._rtcConnection.close();
this._rtcConnection = null;
this._signalChannel = null;
this._timers.clearAll();
this._offAll();
}
/**
* @param {*} signal
* @returns {Promise}
* @private
*/
_addIceCandidate(signal) {
if (!this._rtcConnection) return Promise.resolve();
this._lastIceCandidate = WebRtcFactory.newIceCandidate(signal);
// Do not try to add ICE candidates before the remote description is set.
if (!this._rtcConnection.remoteDescription || !this._rtcConnection.remoteDescription.type) {
this._iceCandidateQueue.push(signal);
return Promise.resolve();
}
return this._rtcConnection.addIceCandidate(this._lastIceCandidate)
.catch(Log.e.tag(PeerConnector));
}
async _handleCandidateQueue() {
if (!this._rtcConnection) return;
// Handle ICE candidates if they already arrived.
for (const candidate of this._iceCandidateQueue) {
await this._addIceCandidate(candidate);
}
this._iceCandidateQueue = [];
}
_signal(signal) {
if (!this._rtcConnection) return;
const payload = BufferUtils.fromAscii(JSON.stringify(signal));
const keyPair = this._networkConfig.keyPair;
const peerId = this._networkConfig.peerId;
this._signalChannel.signal(
peerId,
this._peerId,
this._nonce,
Network.SIGNAL_TTL_INITIAL,
0, /*flags*/
payload,
keyPair.publicKey,
Signature.create(keyPair.privateKey, keyPair.publicKey, payload)
);
}
_onIceCandidate(event) {
if (!this._rtcConnection) return;
if (event.candidate !== null) {
this._localIceCandidates.push(event.candidate);
if (!this._timers.timeoutExists('ice-gathering')) {
this._timers.setTimeout('ice-gathering', this._sendIceCandidates.bind(this),
PeerConnector.ICE_GATHERING_TIMEOUT);
}
}
}
_onIceGatheringStateChange(event) {
if (!this._rtcConnection) return;
if (this._rtcConnection.iceGatheringState === 'complete') {
this._sendIceCandidates();
}
}
_sendIceCandidates() {
this._timers.clearTimeout('ice-gathering');
if (this._localIceCandidates.length > 0) {
// Build backwards compatible structure:
// We assume the last ice candidate to be the most promising one for old clients.
let lastIceCandidate = this._localIceCandidates.pop();
// Not all browsers support toJSON.
lastIceCandidate = lastIceCandidate.toJSON ? lastIceCandidate.toJSON() : JSON.parse(JSON.stringify(lastIceCandidate));
// Embed other candidates in this one ice candidate.
lastIceCandidate.otherCandidates = this._localIceCandidates;
// Send ice candidate.
this._signal(lastIceCandidate);
// Reset local candidates.
this._localIceCandidates = [];
}
}
_onDescription(description) {
if (!this._rtcConnection) return;
this._rtcConnection.setLocalDescription(description)
.then(() => this._signal(this._rtcConnection.localDescription))
.catch(Log.e.tag(PeerConnector));
}
_onDataChannel(event) {
if (!this._rtcConnection) return;
const channel = new WebRtcDataChannel(event.channel || event.target);
// Make sure to close the corresponding RTCPeerConnection when the RTCDataChannel is closed
channel.on('close', () => {
if (!this._rtcConnection) return;
this.close();
});
// There is no API to get the remote IP address. As a crude heuristic, we parse the IP address
// from the last ICE candidate seen before the connection was established.
let netAddress = null;
if (this._lastIceCandidate) {
try {
netAddress = WebRtcUtils.candidateToNetAddress(this._lastIceCandidate);
} catch (e) {
Log.w(PeerConnector, `Failed to parse IP from ICE candidate: ${this._lastIceCandidate}`);
}
} else {
Log.d(PeerConnector, 'No ICE candidate seen for RTC connection');
}
const conn = new NetworkConnection(channel, Protocol.RTC, netAddress, this._peerAddress);
this.fire('connection', conn);
}
get nonce() {
return this._nonce;
}
get peerAddress() {
return this._peerAddress;
}
}
PeerConnector.ICE_GATHERING_TIMEOUT = 1000;
PeerConnector.CONNECTION_OPEN_DELAY = 200;
Class.register(PeerConnector);
class OutboundPeerConnector extends PeerConnector {
constructor(webRtcConfig, peerAddress, signalChannel) {
super(webRtcConfig, signalChannel, peerAddress.peerId, peerAddress);
this._peerAddress = peerAddress;
// Create offer.
this._channel = this._rtcConnection.createDataChannel('data-channel');
this._channel.binaryType = 'arraybuffer';
this._channel.onopen = e => this._onDataChannel(e);
this._rtcConnection.createOffer()
.then(description => this._onDescription(description))
.catch(Log.e.tag(OutboundPeerConnector));
}
close() {
super.close();
if (!this._channel) return;
this._channel.onopen = null;
this._channel = null;
}
}
Class.register(OutboundPeerConnector);
class InboundPeerConnector extends PeerConnector {
constructor(webRtcConfig, signalChannel, peerId, offer) {
super(webRtcConfig, signalChannel, peerId, null);
this._rtcConnection.ondatachannel = event => {
event.channel.onopen = e => this._onDataChannel(e);
};
this.onSignal(offer);
}
}
Class.register(InboundPeerConnector);