Home Reference Source Test

src/main/generic/network/connection/NetworkAgent.js

class NetworkAgent extends Observable {
    /**
     * @param {IBlockchain} blockchain
     * @param {PeerAddressBook} addresses
     * @param {NetworkConfig} networkConfig
     * @param {PeerChannel} channel
     *
     * @listens PeerChannel#version
     * @listens PeerChannel#verack
     * @listens PeerChannel#addr
     * @listens PeerChannel#getAddr
     * @listens PeerChannel#ping
     * @listens PeerChannel#pong
     * @listens PeerChannel#close
     */
    constructor(blockchain, addresses, networkConfig, channel) {
        super();
        /** @type {IBlockchain} */
        this._blockchain = blockchain;
        /** @type {PeerAddressBook} */
        this._addresses = addresses;
        /** @type {NetworkConfig} */
        this._networkConfig = networkConfig;
        /** @type {PeerChannel} */
        this._channel = channel;

        /**
         * The peer object we create after the handshake completes.
         * @type {Peer}
         * @private
         */
        this._peer = null;

        /**
         * Helper object to keep track of timeouts & intervals.
         * @type {Timers}
         * @private
         */
        this._timers = new Timers();

        /**
         * True if we have received the peer's version message.
         * @type {boolean}
         * @private
         */
        this._versionReceived = false;

        /**
         * True if we have received the peer's verack message.
         * @type {boolean}
         * @private
         */
        this._verackReceived = false;

        /**
         * True if we have successfully sent our version message.
         * @type {boolean}
         * @private
         */
        this._versionSent = false;

        /**
         * True if we have successfully sent our verack message.
         * @type {boolean}
         * @private
         */
        this._verackSent = false;

        /**
         * Number of times we have tried to send out the version message.
         * @type {number}
         * @private
         */
        this._versionAttempts = 0;

        /**
         * @type {boolean}
         * @private
         */
        this._peerAddressVerified = false;

        /**
         * @type {Uint8Array}
         * @private
         */
        this._peerChallengeNonce = null;

        /**
         * @type {Map.<number, number>}
         * @private
         */
        this._pingTimes = new Map();

        /**
         * @type {{maxResults:number}}
         * @private
         */
        this._addressRequest = null;

        /**
         * @type {RateLimit}
         * @private
         */
        this._getAddrLimit = new RateLimit(NetworkAgent.GETADDR_RATE_LIMIT);

        /** @type {Uint8Array} */
        this._challengeNonce = new Uint8Array(VersionMessage.CHALLENGE_SIZE);
        CryptoWorker.lib.getRandomValues(this._challengeNonce);

        // Listen to network/control messages from the peer.
        channel.on('version', msg => this._onVersion(msg));
        channel.on('verack', msg => this._onVerAck(msg));
        channel.on('addr', msg => this._onAddr(msg));
        channel.on('get-addr', msg => this._onGetAddr(msg));
        channel.on('ping', msg => this._onPing(msg));
        channel.on('pong', msg => this._onPong(msg));

        // Clean up when the peer disconnects.
        channel.on('close', () => this._onClose());
    }

    /* Handshake */

    handshake() {
        if (this._versionSent) {
            // Version already sent, no need to handshake again.
            return;
        }

        // Kick off the handshake by telling the peer our version, network address & blockchain head hash.
        // Some browsers (Firefox, Safari) send the data-channel-open event too early, so sending the version message might fail.
        // Try again in this case.
        if (!this._channel.version(this._networkConfig.peerAddress, this._blockchain.headHash, this._challengeNonce, this._networkConfig.appAgent)) {
            this._versionAttempts++;
            if (this._versionAttempts >= NetworkAgent.VERSION_ATTEMPTS_MAX || this._channel.closed) {
                this._channel.close(CloseType.SENDING_OF_VERSION_MESSAGE_FAILED, 'sending of version message failed');
                return;
            }

            setTimeout(this.handshake.bind(this), NetworkAgent.VERSION_RETRY_DELAY);
            return;
        }

        this._versionSent = true;

        // Drop the peer if it doesn't send us a version message.
        // Only do this if we haven't received the peer's version message already.
        if (!this._versionReceived) {
            this._timers.setTimeout('version', () => {
                this._timers.clearTimeout('version');
                this._channel.close(CloseType.VERSION_TIMEOUT, 'version timeout');
            }, NetworkAgent.HANDSHAKE_TIMEOUT);
        } else if (this._peerAddressVerified) {
            this._sendVerAck();
        }

        this._timers.setTimeout('verack', () => {
            this._timers.clearTimeout('verack');
            this._channel.close(CloseType.VERACK_TIMEOUT, 'verack timeout');
        }, NetworkAgent.HANDSHAKE_TIMEOUT * 2);
    }

    /**
     * @param {VersionMessage} msg
     * @private
     */
    _onVersion(msg) {
        Log.v(NetworkAgent, () => `[VERSION] ${msg.peerAddress} ${msg.headHash.toBase64()}`);

        const now = Date.now();

        // Make sure this is a valid message in our current state.
        if (!this._canAcceptMessage(msg)) {
            return;
        }

        // Ignore duplicate version messages.
        if (this._versionReceived) {
            Log.d(NetworkAgent, () => `Ignoring duplicate version message from ${this._channel.peerAddress}`);
            return;
        }

        // Clear the version timeout.
        this._timers.clearTimeout('version');

        // Check if the peer is running a compatible version.
        if (!Version.isCompatible(msg.version)) {
            this._channel.reject(Message.Type.VERSION, RejectMessage.Code.REJECT_OBSOLETE, `incompatible version (ours=${Version.CODE}, theirs=${msg.version})`);
            this._channel.close(CloseType.INCOMPATIBLE_VERSION, `incompatible version (ours=${Version.CODE}, theirs=${msg.version})`);
            return;
        }

        // Check if the peer is working on the same genesis block.
        if (!GenesisConfig.GENESIS_HASH.equals(msg.genesisHash)) {
            this._channel.close(CloseType.DIFFERENT_GENESIS_BLOCK, `different genesis block (${msg.genesisHash})`);
            return;
        }

        // Check that the given peerAddress is correctly signed.
        if (!msg.peerAddress.verifySignature()) {
            this._channel.close(CloseType.INVALID_PEER_ADDRESS_IN_VERSION_MESSAGE, 'invalid peerAddress in version message');
            return;
        }

        // TODO check services?

        // Check that the given peerAddress matches the one we expect.
        // In case of inbound WebSocket connections, this is the first time we
        // see the remote peer's peerAddress.
        const peerAddress = msg.peerAddress;
        if (this._channel.peerAddress) {
            if (!this._channel.peerAddress.equals(peerAddress)) {
                this._channel.close(CloseType.UNEXPECTED_PEER_ADDRESS_IN_VERSION_MESSAGE, 'unexpected peerAddress in version message');
                return;
            }
            this._peerAddressVerified = true;
        }

        // The client might not send its netAddress. Set it from our address database if we have it.
        if (!peerAddress.netAddress) {
            /** @type {PeerAddress} */
            const storedAddress = this._addresses.get(peerAddress);
            if (storedAddress && storedAddress.netAddress) {
                peerAddress.netAddress = storedAddress.netAddress;
            }
        }

        // Set/update the channel's peer address.
        this._channel.peerAddress = peerAddress;

        // Create peer object. Since the initial version message received from the
        // peer contains their local timestamp, we can use it to calculate their
        // offset to our local timestamp and store it for later (last argument).
        this._peer = new Peer(
            this._channel,
            msg.version,
            msg.headHash,
            peerAddress.timestamp - now,
            msg.userAgent
        );

        this._peerChallengeNonce = msg.challengeNonce;
        this._versionReceived = true;

        // Tell listeners that we received this peer's version information.
        // Listeners registered to this event might close the connection to this peer.
        this.fire('version', this._peer, this);

        // Abort handshake if the connection was closed.
        if (this._channel.closed) {
            return;
        }

        if (!this._versionSent) {
            this.handshake();
            return;
        }

        if (this._peerAddressVerified) {
            this._sendVerAck();
        }

        if (this._verackReceived) {
            this._finishHandshake();
        }
    }

    _sendVerAck() {
        Assert.that(this._peerAddressVerified);

        const data = BufferUtils.concatTypedArrays(this._channel.peerAddress.peerId.serialize(), this._peerChallengeNonce);
        const signature = Signature.create(this._networkConfig.keyPair.privateKey, this._networkConfig.keyPair.publicKey, data);
        this._channel.verack(this._networkConfig.keyPair.publicKey, signature);

        this._verackSent = true;
    }

    /**
     * @param {VerAckMessage} msg
     * @private
     */
    _onVerAck(msg) {
        Log.v(NetworkAgent, () => `[VERACK] from ${this._channel.peerAddress}`);

        // Make sure this is a valid message in our current state.
        if (!this._canAcceptMessage(msg)) {
            return;
        }

        // Ignore duplicate verack messages.
        if (this._verackReceived) {
            Log.d(NetworkAgent, () => `Ignoring duplicate verack message from ${this._channel.peerAddress}`);
            return;
        }

        // Clear the verack timeout.
        this._timers.clearTimeout('verack');

        // Verify public key
        if (!msg.publicKey.toPeerId().equals(this._channel.peerAddress.peerId)) {
            this._channel.close(CloseType.INVALID_PUBLIC_KEY_IN_VERACK_MESSAGE, 'Invalid public key in verack message');
            return;
        }

        // Verify signature
        const data = BufferUtils.concatTypedArrays(this._networkConfig.peerAddress.peerId.serialize(), this._challengeNonce);
        if (!msg.signature.verify(msg.publicKey, data)) {
            this._channel.close(CloseType.INVALID_SIGNATURE_IN_VERACK_MESSAGE, 'Invalid signature in verack message');
            return;
        }

        if (!this._peerAddressVerified) {
            this._peerAddressVerified = true;
            this._sendVerAck();
        }

        this._verackReceived = true;

        if (this._verackSent) {
            this._finishHandshake();
        }
    }

    _finishHandshake() {
        // Setup regular connectivity check.
        // TODO randomize interval?
        this._timers.setInterval('connectivity',
            () => this._checkConnectivity(),
            NetworkAgent.CONNECTIVITY_CHECK_INTERVAL);

        // Regularly announce our address.
        this._timers.setInterval('announce-addr',
            () => this._channel.addr([this._networkConfig.peerAddress]),
            NetworkAgent.ANNOUNCE_ADDR_INTERVAL);

        // Tell listeners that the handshake with this peer succeeded.
        this.fire('handshake', this._peer, this);

        // Request new network addresses from the peer.
        this.requestAddresses();
    }


    /* Addresses */

    requestAddresses(maxResults = NetworkAgent.NUM_ADDR_PER_REQUEST) {
        Log.d(Network, () => `Requesting addresses from ${this._peer.peerAddress}`);

        this._addressRequest = {
            maxResults
        };

        // Request only legacy services from older peers
        let acceptedServices = this._networkConfig.services.accepted;
        if (this._peer.version < 2) {
            acceptedServices &= Services.ALL_LEGACY;
        }

        // Request addresses from peer.
        this._channel.getAddr(this._networkConfig.protocolMask, acceptedServices, maxResults);

        // We don't use a timeout here. The peer will not respond with an addr message if
        // it doesn't have any new addresses.
    }

    /**
     * @param {AddrMessage} msg
     * @private
     */
    _onAddr(msg) {
        // Make sure this is a valid message in our current state.
        if (!this._canAcceptMessage(msg)) {
            return;
        }

        // Reject unsolicited address messages unless it is the peer's own address.
        const isOwnAddress = msg.addresses.length === 1 && this._peer.peerAddress.equals(msg.addresses[0]);
        if (!this._addressRequest && !isOwnAddress) {
            return;
        }

        const { maxResults = NetworkAgent.MAX_ADDR_PER_REQUEST } = this._addressRequest || {};
        if (!isOwnAddress) {
            this._addressRequest = null;
        }

        // Reject messages that contain more than 1000 addresses, ban peer (bitcoin).
        if (msg.addresses.length > NetworkAgent.MAX_ADDR_PER_MESSAGE) {
            Log.w(NetworkAgent, 'Rejecting addr message - too many addresses');
            this._channel.close(CloseType.ADDR_MESSAGE_TOO_LARGE, 'addr message too large');
            return;
        }

        Log.v(NetworkAgent, () => `[ADDR] ${msg.addresses.length} addresses from ${this._peer.peerAddress}`);

        // XXX Discard any addresses beyond the ones we requested.
        // TODO reject addr messages not matching our request.
        const addresses = msg.addresses.slice(0, maxResults);

        // Check the addresses the peer send us.
        for (const addr of addresses) {
            if (!addr.verifySignature()) {
                this._channel.close(CloseType.INVALID_ADDR, 'invalid addr');
                return;
            }

            if ((addr.protocol === Protocol.WS || addr.protocol === Protocol.WSS) && !addr.globallyReachable()) {
                this._channel.close(CloseType.ADDR_NOT_GLOBALLY_REACHABLE, 'addr not globally reachable');
                return;
            }
        }

        // Put the new addresses in the address pool.
        this._addresses.add(this._channel, addresses);

        // Update peer with new address.
        if (isOwnAddress) {
            this._peer.peerAddress = addresses[0];
        }

        // Tell listeners that we have received new addresses.
        this.fire('addr', addresses, this);
    }

    /**
     * @private
     * @param {GetAddrMessage} msg
     * @return {void}
     */
    _onGetAddr(msg) {
        // Make sure this is a valid message in our current state.
        if (!this._canAcceptMessage(msg)) {
            return;
        }

        if (!this._getAddrLimit.note()) {
            Log.w(NetworkAgent, 'Rejecting getaddr message - rate limit exceeded');
            return;
        }

        // Find addresses that match the given protocolMask & serviceMask.
        const numResults = Math.min(msg.maxResults, NetworkAgent.MAX_ADDR_PER_REQUEST);
        const addresses = this._addresses.query(msg.protocolMask, msg.serviceMask, numResults);
        this._channel.addr(addresses);
    }


    /* Connectivity Check */

    _checkConnectivity() {
        // Generate random nonce.
        const nonce = NumberUtils.randomUint32();

        // Send ping message to peer.
        // If sending the ping message fails, assume the connection has died.
        if (!this._channel.ping(nonce)) {
            this._channel.close(CloseType.SENDING_PING_MESSAGE_FAILED, 'sending ping message failed');
            return;
        }

        // Save ping timestamp to detect the speed of the connection.
        const startTime = Date.now();
        this._pingTimes.set(nonce, startTime);

        // Expect the peer to answer with a pong message if we haven't heard anything from it
        // within the last CONNECTIVITY_CHECK_INTERVAL. Drop the peer otherwise.
        if (this._channel.lastMessageReceivedAt < startTime - NetworkAgent.CONNECTIVITY_CHECK_INTERVAL) {
            this._timers.setTimeout(`ping_${nonce}`, () => {
                this._timers.clearTimeout(`ping_${nonce}`);
                this._pingTimes.delete(nonce);
                this._channel.close(CloseType.PING_TIMEOUT, 'ping timeout');
            }, NetworkAgent.PING_TIMEOUT);
        }
    }

    /**
     * @param {PingMessage} msg
     * @private
     */
    _onPing(msg) {
        // Make sure this is a valid message in our current state.
        if (!this._canAcceptMessage(msg)) {
            return;
        }

        // Respond with a pong message
        this._channel.pong(msg.nonce);
    }

    /**
     * @param {PongMessage} msg
     * @fires NetworkAgent#ping-pong
     * @private
     */
    _onPong(msg) {
        // Clear the ping timeout for this nonce.
        this._timers.clearTimeout(`ping_${msg.nonce}`);

        /** @type {number} */
        const startTime = this._pingTimes.get(msg.nonce);
        if (startTime) {
            const delta = Date.now() - startTime;
            if (delta > 0) {
                this.fire('ping-pong', delta);
            }
            this._pingTimes.delete(msg.nonce);
        }
    }

    /**
     * @private
     */
    _onClose() {
        // Clear all timers and intervals when the peer disconnects.
        this._timers.clearAll();
    }

    /**
     * @param {Message} msg
     * @return {boolean}
     * @private
     */
    _canAcceptMessage(msg) {
        // The first message must be the version message.
        if (!this._versionReceived && msg.type !== Message.Type.VERSION) {
            Log.w(NetworkAgent, `Discarding '${PeerChannel.Event[msg.type] || msg.type}' message from ${this._channel.peerAddress || this._channel.netAddress}`
                + ' - no version message received previously');
            return false;
        }
        if (this._versionReceived && !this._verackReceived && msg.type !== Message.Type.VERACK) {
            Log.w(NetworkAgent, `Discarding '${PeerChannel.Event[msg.type] || msg.type}' message from ${this._channel.peerAddress || this._channel.netAddress}`
                + ' - no verack message received previously');
            return false;
        }
        return true;
    }

    /** @type {PeerChannel} */
    get channel() {
        return this._channel;
    }

    /** @type {Peer} */
    get peer() {
        return this._peer;
    }
}
NetworkAgent.HANDSHAKE_TIMEOUT = 1000 * 4; // 4 seconds
NetworkAgent.PING_TIMEOUT = 1000 * 10; // 10 seconds
NetworkAgent.CONNECTIVITY_CHECK_INTERVAL = 1000 * 60; // 1 minute
NetworkAgent.ANNOUNCE_ADDR_INTERVAL = 1000 * 60 * 10; // 10 minutes
NetworkAgent.VERSION_ATTEMPTS_MAX = 10;
NetworkAgent.VERSION_RETRY_DELAY = 500; // 500 ms
NetworkAgent.GETADDR_RATE_LIMIT = 3; // per minute
NetworkAgent.MAX_ADDR_PER_MESSAGE = 1000;
NetworkAgent.MAX_ADDR_PER_REQUEST = 500;
NetworkAgent.NUM_ADDR_PER_REQUEST = 200;
Class.register(NetworkAgent);