src/main/generic/network/connection/ConnectionPool.js
class ConnectionPool extends Observable {
/**
* @constructor
* @param {PeerAddressBook} peerAddresses
* @param {NetworkConfig} networkConfig
* @param {IBlockchain} blockchain
* @listens WebSocketConnector#connection
* @listens WebSocketConnector#error
* @listens WebRtcConnector#connection
* @listens WebRtcConnector#error
*/
constructor(peerAddresses, networkConfig, blockchain) {
super();
/**
* @type {PeerAddressBook}
* @private
*/
this._addresses = peerAddresses;
/**
* @type {NetworkConfig}
* @private
*/
this._networkConfig = networkConfig;
/**
* @type {IBlockchain}
* @private
*/
this._blockchain = blockchain;
/**
* HashMap from peerAddresses to connections.
* @type {HashMap.<PeerAddress, PeerConnection>}
* @private
*/
this._connectionsByPeerAddress = new HashMap();
/**
* HashMap from netAddresses to connections.
* @type {HashMap.<NetAddress, Array.<PeerConnection>>}
* @private
*/
this._connectionsByNetAddress = new HashMap();
/**
* HashMap from subnet addresses to connections.
* @type {HashMap.<NetAddress, Array.<PeerConnection>>}
* @private
*/
this._connectionsBySubnet = new HashMap();
// Total bytes sent/received on past connections.
/** @type {number} */
this._bytesSent = 0;
/** @type {number} */
this._bytesReceived = 0;
/** @type {WebSocketConnector} */
this._wssConnector = new WebSocketConnector(Protocol.WSS, 'wss', this._networkConfig);
this._wssConnector.on('connection', conn => this._onConnection(conn));
this._wssConnector.on('error', (peerAddr, e) => this._onConnectError(peerAddr, e));
/** @type {WebSocketConnector} */
this._wsConnector = new WebSocketConnector(Protocol.WS, 'ws', this._networkConfig);
this._wsConnector.on('connection', conn => this._onConnection(conn));
this._wsConnector.on('error', (peerAddr, e) => this._onConnectError(peerAddr, e));
/** @type {WebRtcConnector} */
this._rtcConnector = new WebRtcConnector(this._networkConfig);
this._rtcConnector.on('connection', conn => this._onConnection(conn));
this._rtcConnector.on('error', (peerAddr, reason) => this._onConnectError(peerAddr, reason));
// Various counters for established connections.
/** @type {number} */
this._peerCountWs = 0;
/** @type {number} */
this._peerCountWss = 0;
/** @type {number} */
this._peerCountRtc = 0;
/** @type {number} */
this._peerCountDumb = 0;
/** @type {number} */
this._peerCountFull = 0;
/** @type {number} */
this._peerCountLight = 0;
/** @type {number} */
this._peerCountNano = 0;
/** @type {number} */
this._peerCountOutbound = 0;
/** @type {number} */
this._peerCountFullWsOutbound = 0;
/**
* Number of ongoing outbound connection attempts.
* @type {number}
* @private
*/
this._connectingCount = 0;
/**
* Number of not established inbound connections.
* @type {number}
* @private
*/
this._inboundCount = 0;
/** @type {SignalProcessor} */
this._signalProcessor = new SignalProcessor(peerAddresses, networkConfig, this._rtcConnector);
// When true, send a signal to network to close an established connection for a incoming one
/** @type {boolean} */
this._allowInboundExchange = false;
// Whether we allow inbound connections. Does not apply to WebRTC connections.
/** @type {boolean} */
this._allowInboundConnections = false;
/** @type {HashMap.<NetAddress, number>} */
this._bannedIPv4IPs = new HashMap();
/** @type {HashMap.<Uint8Array, number>} */
this._bannedIPv6IPs = new HashMap();
setInterval(() => this._checkUnbanIps(), ConnectionPool.UNBAN_IPS_INTERVAL);
}
/**
* @returns {Array.<PeerConnection>}
*/
values() {
return Array.from(this._connectionsByPeerAddress.values());
}
/**
* @returns {Iterator.<PeerConnection>}
*/
valueIterator() {
return this._connectionsByPeerAddress.valueIterator();
}
/**
* @param {PeerAddress} peerAddress
* @returns {?PeerConnection}
*/
getConnectionByPeerAddress(peerAddress) {
return this._connectionsByPeerAddress.get(peerAddress);
}
/**
* @param {NetAddress} netAddress
* @returns {Array.<PeerConnection>}
*/
getConnectionsByNetAddress(netAddress) {
return this._connectionsByNetAddress.get(netAddress) || [];
}
/**
* @param {NetAddress} netAddress
* @returns {Array.<PeerConnection>}
*/
getConnectionsBySubnet(netAddress) {
return this._connectionsBySubnet.get(this._getSubnetAddress(netAddress)) || [];
}
/**
* @param {NetAddress} netAddress
* @returns {Array.<PeerConnection>}
*/
getOutboundConnectionsBySubnet(netAddress) {
return (this._connectionsBySubnet.get(this._getSubnetAddress(netAddress)) || [])
.filter(/** @type {PeerConnection} */ peerConnection => peerConnection.networkConnection.outbound);
}
/**
* @param {NetAddress} netAddress
* @returns {NetAddress}
*/
_getSubnetAddress(netAddress) {
return netAddress.subnet(netAddress.isIPv4() ? Network.IPV4_SUBNET_MASK : Network.IPV6_SUBNET_MASK);
}
/**
* @param {PeerConnection} peerConnection
* @returns {void}
* @private
*/
_add(peerConnection) {
if (peerConnection.peerAddress) {
this._connectionsByPeerAddress.put(peerConnection.peerAddress, peerConnection);
}
}
/**
* @param {PeerConnection} peerConnection
* @returns {void}
* @private
*/
_remove(peerConnection) {
if (peerConnection.peerAddress) {
this._connectionsByPeerAddress.remove(peerConnection.peerAddress);
}
if (peerConnection.networkConnection && peerConnection.networkConnection.netAddress) {
this._removeNetAddress(peerConnection, peerConnection.networkConnection.netAddress);
}
}
/**
* @param {PeerConnection} peerConnection
* @param {NetAddress} netAddress
* @returns {void}
* @private
*/
_addNetAddress(peerConnection, netAddress) {
// Only add reliable netAddresses.
if (netAddress.isPseudo() || !netAddress.reliable) {
return;
}
if (this._connectionsByNetAddress.contains(netAddress)) {
this._connectionsByNetAddress.get(netAddress).push(peerConnection);
} else {
this._connectionsByNetAddress.put(netAddress, [peerConnection]);
}
const subnetAddress = this._getSubnetAddress(netAddress);
if (this._connectionsBySubnet.contains(subnetAddress)) {
this._connectionsBySubnet.get(subnetAddress).push(peerConnection);
} else {
this._connectionsBySubnet.put(subnetAddress, [peerConnection]);
}
}
/**
* @param {PeerConnection} peerConnection
* @param {NetAddress} netAddress
* @returns {void}
* @private
*/
_removeNetAddress(peerConnection, netAddress) {
if (netAddress.isPseudo() || !netAddress.reliable) {
return;
}
if (this._connectionsByNetAddress.contains(netAddress)) {
const peerConnections = this._connectionsByNetAddress.get(netAddress);
const index = peerConnections.indexOf(peerConnection);
if (index >= 0) {
peerConnections.splice(index, 1);
}
if (peerConnections.length === 0) {
this._connectionsByNetAddress.remove(netAddress);
}
}
const subnetAddress = this._getSubnetAddress(netAddress);
if (this._connectionsBySubnet.contains(subnetAddress)) {
const peerConnections = this._connectionsBySubnet.get(subnetAddress);
const index = peerConnections.indexOf(peerConnection);
if (index >= 0) {
peerConnections.splice(index, 1);
}
if (peerConnections.length === 0) {
this._connectionsBySubnet.remove(subnetAddress);
}
}
}
/**
* @param {PeerAddress} peerAddress
* @returns {boolean}
*/
_checkOutboundConnectionRequest(peerAddress) {
if (peerAddress === null) {
return false;
}
if (peerAddress.protocol !== Protocol.WS && peerAddress.protocol !== Protocol.WSS && peerAddress.protocol !== Protocol.RTC) {
Log.e(ConnectionPool, `Cannot connect to ${peerAddress} - unsupported protocol`);
return false;
}
if (this._addresses.isBanned(peerAddress)){
Log.e(ConnectionPool, `Connecting to banned address ${peerAddress}`);
return false;
}
const peerConnection = this.getConnectionByPeerAddress(peerAddress);
if (peerConnection) {
Log.e(ConnectionPool, `Duplicate connection to ${peerAddress}`);
return false;
}
// Forbid connection if we have too many connections to the peer's IP address.
if (peerAddress.netAddress && peerAddress.netAddress.reliable) {
if (this.getConnectionsByNetAddress(peerAddress.netAddress).length >= Network.PEER_COUNT_PER_IP_MAX) {
Log.e(ConnectionPool, `connection limit per ip (${Network.PEER_COUNT_PER_IP_MAX}) reached`);
return false;
}
if (this.getOutboundConnectionsBySubnet(peerAddress.netAddress).length >= Network.OUTBOUND_PEER_COUNT_PER_SUBNET_MAX) {
Log.e(ConnectionPool, `connection limit per ip (${Network.OUTBOUND_PEER_COUNT_PER_SUBNET_MAX}) reached`);
return false;
}
}
return true;
}
/**
* @param {PeerAddress} peerAddress
* @returns {boolean}
*/
connectOutbound(peerAddress) {
// all checks in one step
if (!this._checkOutboundConnectionRequest(peerAddress)) {
return false;
}
// Connection request accepted.
// create fresh PeerConnection instance
const peerConnection = PeerConnection.getOutbound(peerAddress);
this._add(peerConnection);
// choose connector type and call
let connecting = false;
if (peerAddress.protocol === Protocol.WSS) {
connecting = this._wssConnector.connect(peerAddress);
} else if (peerAddress.protocol === Protocol.WS) {
connecting = this._wsConnector.connect(peerAddress);
} else {
const signalChannel = this._addresses.getChannelByPeerId(peerAddress.peerId);
connecting = this._rtcConnector.connect(peerAddress, signalChannel);
}
if (connecting) {
this._connectingCount++;
} else {
this._remove(peerConnection);
Log.d(Network, () => `Outbound attempt not connecting: ${peerAddress}`);
return false;
}
return true;
}
/**
* @param {PeerConnection} peerConnection
* @returns {boolean}
* @private
*/
_checkConnection(peerConnection) {
/** @type {NetworkConnection} */
const conn = peerConnection.networkConnection;
// Close connection if we currently do not allow inbound connections. WebRTC connections are exempt.
if (conn.inbound && !this._allowInboundConnections && conn.protocol !== Protocol.RTC) {
conn.close(CloseType.INBOUND_CONNECTIONS_BLOCKED, 'inbound connections are blocked temporarily');
return false;
}
if (conn.netAddress && !conn.netAddress.isPseudo() && conn.netAddress.reliable) {
// Close connection if peer's IP is banned.
if (this._isIpBanned(conn.netAddress)) {
conn.close(CloseType.IP_BANNED, `connection with banned IP ${conn.netAddress}`);
return false;
}
// Close connection if we have too many connections to the peer's IP address.
if (!conn.netAddress.isPrivate() && this.getConnectionsByNetAddress(conn.netAddress).length >= Network.PEER_COUNT_PER_IP_MAX) {
conn.close(CloseType.CONNECTION_LIMIT_PER_IP, `connection limit per IP (${Network.PEER_COUNT_PER_IP_MAX}) reached`);
return false;
}
// Close connection if we have too many connections to the peer's subnet.
if (!conn.netAddress.isPrivate() && this.getConnectionsBySubnet(conn.netAddress).length >= Network.INBOUND_PEER_COUNT_PER_SUBNET_MAX) {
conn.close(CloseType.CONNECTION_LIMIT_PER_IP, `connection limit per subnet (${Network.INBOUND_PEER_COUNT_PER_SUBNET_MAX}) reached`);
return false;
}
}
// Reject peer if we have reached max peer count.
if (this.peerCount >= Network.PEER_COUNT_MAX
&& !conn.outbound
&& !(conn.inbound && this._allowInboundExchange)) {
conn.close(CloseType.MAX_PEER_COUNT_REACHED, `max peer count reached (${Network.PEER_COUNT_MAX})`);
return false;
}
return true;
}
/**
* @listens PeerChannel#signal
* @listens NetworkAgent#handshake
* @listens NetworkAgent#close
* @fires ConnectionPool#connection
* @param {NetworkConnection} conn
* @returns {void}
* @private
*/
_onConnection(conn) {
/** @type {PeerConnection} */
let peerConnection;
if (conn.outbound) {
peerConnection = this.getConnectionByPeerAddress(conn.peerAddress);
if (!peerConnection) {
conn.close(CloseType.INVALID_CONNECTION_STATE, `No PeerConnection present for outgoing connection (${conn.peerAddress})`);
return;
} else if (peerConnection.state !== PeerConnectionState.CONNECTING) {
conn.close(CloseType.INVALID_CONNECTION_STATE, `PeerConnection state not CONNECTING, but ${peerConnection.state} (${conn.peerAddress})`);
return;
}
this._connectingCount--;
Assert.that(this._connectingCount >= 0, 'connectingCount < 0');
} else {
peerConnection = PeerConnection.getInbound(conn);
this._inboundCount++;
}
// Set peerConnection to CONNECTED state.
peerConnection.networkConnection = conn;
// Register close listener early to clean up correctly in case _checkConnection() closes the connection.
conn.on('close', (type, reason) => this._onClose(peerConnection, type, reason));
if (!this._checkConnection(peerConnection)) {
return;
}
// Connection accepted.
if (conn.netAddress && !conn.netAddress.isPseudo()) {
this._addNetAddress(peerConnection, conn.netAddress);
}
const connType = conn.inbound ? 'inbound' : 'outbound';
Log.d(ConnectionPool, () => `Connection established (${connType}) #${conn.id} ${conn.netAddress || conn.peerAddress || '<pending>'}`);
// Let listeners know about this connection.
this.fire('connection', conn);
// Create peer channel.
const channel = new PeerChannel(conn);
peerConnection.peerChannel = channel;
// Create network agent.
const agent = new NetworkAgent(this._blockchain, this._addresses, this._networkConfig, channel);
agent.on('version', peer => this._checkHandshake(peerConnection, peer));
agent.on('handshake', peer => this._onHandshake(peerConnection, peer));
peerConnection.networkAgent = agent;
// Initiate handshake with the peer.
agent.handshake();
}
/**
* @param {PeerConnection} peerConnection
* @param {Peer} peer
* @returns {boolean}
* @private
*/
_checkHandshake(peerConnection, peer) {
// Close connection if peer's address is banned.
if (this._addresses.isBanned(peer.peerAddress)) {
peerConnection.peerChannel.close(CloseType.PEER_BANNED,
`connection with banned address ${peer.peerAddress} (post version)`);
return false;
}
// Duplicate/simultaneous connection check (post version):
const storedConnection = this.getConnectionByPeerAddress(peer.peerAddress);
if (storedConnection && storedConnection.id !== peerConnection.id) {
// If we already have an established connection to this peer, close this connection.
if (storedConnection.state === PeerConnectionState.ESTABLISHED) {
peerConnection.peerChannel.close(CloseType.DUPLICATE_CONNECTION,
'duplicate connection (post version)');
return false;
}
}
// Close connection if we have too many dumb connections.
if (peer.peerAddress.protocol === Protocol.DUMB && this.peerCountDumb >= Network.PEER_COUNT_DUMB_MAX) {
peerConnection.peerChannel.close(CloseType.CONNECTION_LIMIT_DUMB,
`connection limit for dumb peers (${Network.PEER_COUNT_DUMB_MAX}) reached`);
return false;
}
// Set peerConnection to NEGOTIATING state.
peerConnection.negotiating();
return true;
}
/**
* Handshake with this peer was successful.
* @fires ConnectionPool#peer-joined
* @fires ConnectionPool#peers-changed
* @fires ConnectionPool#recyling-request
* @param {PeerConnection} peerConnection
* @param {Peer} peer
* @returns {void}
* @private
*/
_onHandshake(peerConnection, peer) {
if (peerConnection.networkConnection.inbound) {
// Re-check allowInboundExchange as it might have changed.
if (this.peerCount >= Network.PEER_COUNT_MAX && !this._allowInboundExchange) {
peerConnection.peerChannel.close(CloseType.MAX_PEER_COUNT_REACHED,
`max peer count reached (${Network.PEER_COUNT_MAX})`);
return;
}
// Duplicate/simultaneous connection check (post handshake):
const storedConnection = this.getConnectionByPeerAddress(peer.peerAddress);
if (storedConnection && storedConnection.id !== peerConnection.id) {
switch (storedConnection.state) {
case PeerConnectionState.CONNECTING:
// Abort the stored connection attempt and accept this connection.
Assert.that(peer.peerAddress.protocol === Protocol.WSS || peer.peerAddress.protocol === Protocol.WS, 'Duplicate connection to non-WS node');
Log.d(ConnectionPool, () => `Aborting connection attempt to ${peer.peerAddress}, simultaneous inbound connection succeeded`);
if (peer.peerAddress.protocol === Protocol.WSS) {
this._wssConnector.abort(peer.peerAddress);
} else {
this._wsConnector.abort(peer.peerAddress);
}
Assert.that(!this.getConnectionByPeerAddress(peer.peerAddress), 'PeerConnection not removed');
break;
case PeerConnectionState.ESTABLISHED:
// If we have another established connection to this peer, close this connection.
peerConnection.peerChannel.close(CloseType.DUPLICATE_CONNECTION,
'duplicate connection (post handshake)');
return;
case PeerConnectionState.NEGOTIATING:
// The peer with the lower peerId accepts this connection and closes his stored connection.
if (this._networkConfig.peerAddress.peerId.compare(peer.peerAddress.peerId) < 0) {
storedConnection.peerChannel.close(CloseType.SIMULTANEOUS_CONNECTION,
'simultaneous connection (post handshake) - lower peerId');
Assert.that(!this.getConnectionByPeerAddress(peer.peerAddress), 'PeerConnection not removed');
}
// The peer with the higher peerId closes this connection and keeps his stored connection.
else {
peerConnection.peerChannel.close(CloseType.SIMULTANEOUS_CONNECTION,
'simultaneous connection (post handshake) - higher peerId');
return;
}
break;
default:
// Accept this connection and close the stored connection.
storedConnection.peerChannel.close(CloseType.SIMULTANEOUS_CONNECTION,
`simultaneous connection (post handshake) - state ${storedConnection.state}`);
Assert.that(!this.getConnectionByPeerAddress(peer.peerAddress), 'PeerConnection not removed');
}
}
Assert.that(!this.getConnectionByPeerAddress(peer.peerAddress), `PeerConnection ${peer.peerAddress} already exists`);
peerConnection.peerAddress = peer.peerAddress;
this._add(peerConnection);
this._inboundCount--;
Assert.that(this._inboundCount >= 0, 'inboundCount < 0');
}
// Handshake accepted.
// Check if we need to recycle a connection.
if (this.peerCount >= Network.PEER_COUNT_MAX) {
this.fire('recycling-request');
}
// Set peerConnection to ESTABLISHED state.
peerConnection.peer = peer;
if (peer.netAddress && !peer.netAddress.isPseudo() && this.getConnectionsByNetAddress(peer.netAddress).indexOf(peerConnection) < 0) {
this._addNetAddress(peerConnection, peer.netAddress);
}
this._updateConnectedPeerCount(peerConnection, 1);
// Setup signal forwarding.
if (Network.SIGNALING_ENABLED) {
peerConnection.peerChannel.on('signal', msg => this._signalProcessor.onSignal(peerConnection.peerChannel, msg));
}
// Mark address as established.
this._addresses.established(peer.channel, peer.peerAddress);
Log.d(ConnectionPool, () => `[PEER-JOINED] ${peer.peerAddress} ${peer.netAddress} (version=${peer.version}, services=${peer.peerAddress.services}, userAgent=${peer.userAgent || '<unknown>'}, headHash=${peer.headHash.toBase64()})`);
// Let listeners know about this peer.
this.fire('peer-joined', peer);
// Let listeners know that the peers changed.
this.fire('peers-changed');
}
/**
* This peer channel was closed.
* @param {PeerConnection} peerConnection
* @param {number} type
* @param {string} reason
* @fires ConnectionPool#peer-left
* @fires ConnectionPool#peers-changed
* @fires ConnectionPool#close
* @returns {void}
* @private
*/
_onClose(peerConnection, type, reason) {
// Update total bytes sent/received.
this._bytesSent += peerConnection.networkConnection.bytesSent;
this._bytesReceived += peerConnection.networkConnection.bytesReceived;
// Only propagate the close type (i.e. track fails/bans) if the peerAddress is set.
// This is true for
// - all outbound connections
// - inbound connections post handshake (peerAddress is verified)
if (peerConnection.peerAddress) {
this._addresses.close(peerConnection.peerChannel, peerConnection.peerAddress, type);
}
this._remove(peerConnection);
// Check if the handshake with this peer has completed.
if (peerConnection.state === PeerConnectionState.ESTABLISHED) {
// If closing is due to a ban, also ban the IP
if (CloseType.isBanningType(type) && peerConnection.peer.netAddress){
this._banIp(peerConnection.peer.netAddress);
}
this._updateConnectedPeerCount(peerConnection, -1);
const kbTransferred = ((peerConnection.networkConnection.bytesSent
+ peerConnection.networkConnection.bytesReceived) / 1000).toFixed(2);
Log.d(ConnectionPool, () => `[PEER-LEFT] ${peerConnection.peerAddress} ${peerConnection.peer.netAddress} `
+ `(transferred=${kbTransferred} kB, closeType=${type} ${reason})`);
// Tell listeners that this peer has gone away.
this.fire('peer-left', peerConnection.peer);
// Let listeners know that the peers changed.
this.fire('peers-changed');
} else {
if (peerConnection.networkConnection.inbound) {
this._inboundCount--;
Log.d(ConnectionPool, () => `Inbound connection #${peerConnection.networkConnection.id} closed pre-handshake: ${reason} (${type})`);
} else {
Log.d(ConnectionPool, () => `Connection #${peerConnection.networkConnection.id} to ${peerConnection.peerAddress} closed pre-handshake: ${reason} (${type})`);
this.fire('connect-error', peerConnection.peerAddress, `${reason} (${type})`);
}
}
// Let listeners know about this closing.
this.fire('close', peerConnection, type, reason);
// Set the peer connection to closed state.
peerConnection.close();
}
/**
* @param {NetAddress} netAddress
* @returns {void}
* @private
*/
_banIp(netAddress) {
if (!netAddress.isPseudo() && netAddress.reliable) {
Log.w(ConnectionPool, `Banning IP ${netAddress}`);
if (netAddress.isIPv4()) {
this._bannedIPv4IPs.put(netAddress, Date.now() + ConnectionPool.DEFAULT_BAN_TIME);
} else if (netAddress.isIPv6()) {
// Ban IPv6 IPs prefix based
this._bannedIPv6IPs.put(netAddress.ip.subarray(0,8), Date.now() + ConnectionPool.DEFAULT_BAN_TIME);
}
}
}
/**
* @param {NetAddress} netAddress
* @returns {boolean}
* @private
*/
_isIpBanned(netAddress) {
if (netAddress.isPseudo()) return false;
if (netAddress.isIPv4()) {
return this._bannedIPv4IPs.contains(netAddress);
} else if (netAddress.isIPv6()) {
const prefix = netAddress.ip.subarray(0, 8);
return this._bannedIPv6IPs.contains(prefix);
}
return false;
}
/**
* @returns {void}
* @private
*/
_checkUnbanIps() {
const now = Date.now();
for (const netAddress of this._bannedIPv4IPs.keys()) {
if (this._bannedIPv4IPs.get(netAddress) < now) {
this._bannedIPv4IPs.remove(netAddress);
}
}
for (const prefix of this._bannedIPv6IPs.keys()) {
if (this._bannedIPv6IPs.get(prefix) < now) {
this._bannedIPv6IPs.remove(prefix);
}
}
}
/**
* Connection to this peer address failed.
* @param {PeerAddress} peerAddress
* @param {string|*} [reason]
* @fires ConnectionPool#connect-error
* @returns {void}
* @private
*/
_onConnectError(peerAddress, reason) {
Log.d(ConnectionPool, () => `Connection to ${peerAddress} failed` + (typeof reason === 'string' ? ` - ${reason}` : ''));
const peerConnection = this.getConnectionByPeerAddress(peerAddress);
Assert.that(!!peerConnection, `PeerAddress not stored ${peerAddress}`);
if (peerConnection.state !== PeerConnectionState.CONNECTING) {
Log.e(ConnectionPool, `PeerConnection state not CONNECTING, but ${peerConnection.state} (${peerAddress})`);
return;
}
this._remove(peerConnection);
this._connectingCount--;
Assert.that(this._connectingCount >= 0, 'connectingCount < 0');
this._addresses.close(null, peerAddress, CloseType.CONNECTION_FAILED);
this.fire('connect-error', peerAddress, reason);
}
/**
* @param {PeerConnection} peerConnection
* @param {number} delta
* @returns {void}
* @private
*/
_updateConnectedPeerCount(peerConnection, delta) {
const peerAddress = peerConnection.peerAddress;
switch (peerAddress.protocol) {
case Protocol.WS:
this._peerCountWs += delta;
Assert.that(this._peerCountWs >= 0, 'peerCountWs < 0');
break;
case Protocol.WSS:
this._peerCountWss += delta;
Assert.that(this._peerCountWss >= 0, 'peerCountWs < 0');
break;
case Protocol.RTC:
this._peerCountRtc += delta;
Assert.that(this._peerCountRtc >= 0, 'peerCountRtc < 0');
break;
case Protocol.DUMB:
this._peerCountDumb += delta;
Assert.that(this._peerCountDumb >= 0, 'peerCountDumb < 0');
break;
default:
Log.w(PeerAddressBook, `Unknown protocol ${peerAddress.protocol}`);
}
if (Services.isFullNode(peerAddress.services)) {
this._peerCountFull += delta;
Assert.that(this._peerCountFull >= 0, 'peerCountFull < 0');
} else if (Services.isLightNode(peerAddress.services)) {
this._peerCountLight += delta;
Assert.that(this._peerCountLight >= 0, 'peerCountLight < 0');
} else {
this._peerCountNano += delta;
Assert.that(this._peerCountNano >= 0, 'peerCountNano < 0');
}
if (peerConnection.networkConnection.outbound) {
this._peerCountOutbound += delta;
if (Services.isFullNode(peerAddress.services) && (peerAddress.protocol === Protocol.WSS || peerAddress.protocol === Protocol.WS)) {
this._peerCountFullWsOutbound += delta;
}
}
}
/**
* @param {string|*} reason
* @returns {void}
*/
disconnect(reason) {
// Close all active connections.
for (const connection of this.valueIterator()) {
if (connection.peerChannel) {
connection.peerChannel.close(CloseType.MANUAL_NETWORK_DISCONNECT, reason || 'manual network disconnect');
}
}
}
// XXX For testing
disconnectWebSocket() {
// Close all websocket connections.
for (const connection of this.valueIterator()) {
if (connection.peerChannel && connection.peerAddress && (connection.peerAddress.protocol === Protocol.WSS || connection.peerAddress.protocol === Protocol.WS)) {
connection.peerChannel.close(CloseType.MANUAL_WEBSOCKET_DISCONNECT, 'manual websocket disconnect');
}
}
}
/** @type {number} */
get peerCountWs() {
return this._peerCountWs;
}
/** @type {number} */
get peerCountWss() {
return this._peerCountWss;
}
/** @type {number} */
get peerCountRtc() {
return this._peerCountRtc;
}
/** @type {number} */
get peerCountDumb() {
return this._peerCountDumb;
}
/** @type {number} */
get peerCount() {
return this._peerCountWs + this._peerCountWss + this._peerCountRtc + this._peerCountDumb;
}
/** @type {number} */
get peerCountFull() {
return this._peerCountFull;
}
/** @type {number} */
get peerCountLight() {
return this._peerCountLight;
}
/** @type {number} */
get peerCountNano() {
return this._peerCountNano;
}
/** @type {number} */
get peerCountOutbound() {
return this._peerCountOutbound;
}
/** @type {number} */
get peerCountFullWsOutbound() {
return this._peerCountFullWsOutbound;
}
/** @type {number} */
get connectingCount() {
return this._connectingCount;
}
/** @type {number} */
get count() {
return this._connectionsByPeerAddress.length + this._inboundCount;
}
/** @type {number} */
get bytesSent() {
let bytesSent = this._bytesSent;
for (const peerConnection of this.valueIterator()) {
if (peerConnection.networkConnection) {
bytesSent += peerConnection.networkConnection.bytesSent;
}
}
return bytesSent;
}
/** @type {number} */
get bytesReceived() {
let bytesReceived = this._bytesReceived;
for (const peerConnection of this.valueIterator()) {
if (peerConnection.networkConnection) {
bytesReceived += peerConnection.networkConnection.bytesReceived;
}
}
return bytesReceived;
}
/** @param {boolean} value */
set allowInboundExchange(value) {
this._allowInboundExchange = value;
}
/** @type {boolean} */
get allowInboundConnections() {
return this._allowInboundConnections;
}
/** @param {boolean} value */
set allowInboundConnections(value) {
this._allowInboundConnections = value;
}
}
ConnectionPool.DEFAULT_BAN_TIME = 1000 * 60 * 10; // 10 minutes
ConnectionPool.UNBAN_IPS_INTERVAL = 1000 * 60; // 1 minute
Class.register(ConnectionPool);