Home Reference Source Test

src/main/generic/network/connection/PeerChannel.js

class PeerChannel extends Observable {
    /**
     * @listens NetworkConnection#message
     * @param {NetworkConnection} connection
     */
    constructor(connection) {
        super();
        this._conn = connection;
        this._conn.on('message', msg => this._onMessage(msg));

        // Forward specified events on the connection to listeners of this Observable.
        this.bubble(this._conn, 'close', 'error');
    }

    /**
     * @param {Uint8Array} rawMsg
     * @private
     */
    async _onMessage(rawMsg) {
        const start = Date.now();
        let msg = null, type = null;

        try {
            const buf = new SerialBuffer(rawMsg);
            type = MessageFactory.peekType(buf);
            msg = MessageFactory.parse(buf);
        } catch(e) {
            Log.d(PeerChannel, () => `Failed to parse '${PeerChannel.Event[type]}' message from ${this.peerAddress || this.netAddress}`, e.message || e);

            // Confirm that message arrived but could not be parsed successfully.
            this._conn.confirmExpectedMessage(type, false);

            // From the Bitcoin Reference:
            //  "Be careful of reject message feedback loops where two peers
            //   each don’t understand each other’s reject messages and so keep
            //   sending them back and forth forever."

            // If the message does not make sense at a whole or we fear to get into a reject loop,
            // we ban the peer instead.
            if (type === null || type === Message.Type.REJECT) {
                this.close(CloseType.FAILED_TO_PARSE_MESSAGE_TYPE, 'Failed to parse message type');
                return;
            }

            // Otherwise inform other node and ignore message.
            this.reject(type, RejectMessage.Code.REJECT_MALFORMED, e.message || e);
            return;
        }

        if (!msg) return;

        // Confirm that message was successfully parsed.
        this._conn.confirmExpectedMessage(type, true);

        try {
            await this.fire(PeerChannel.Event[msg.type], msg, this);
            this.fire('message-log', msg, this, Date.now() - start, rawMsg.byteLength);
        } catch (e) {
            Log.w(PeerChannel, `Error while processing '${PeerChannel.Event[msg.type]}' message from ${this.peerAddress || this.netAddress}: ${e}`);
        }
    }

    /**
     * @param {Message.Type|Array.<Message.Type>} types
     * @param {function()} timeoutCallback
     * @param {number} [msgTimeout]
     * @param {number} [chunkTimeout]
     */
    expectMessage(types, timeoutCallback, msgTimeout, chunkTimeout) {
        this._conn.expectMessage(types, timeoutCallback, msgTimeout, chunkTimeout);
    }

    /**
     * @param {Message.Type} type
     * @returns {boolean}
     */
    isExpectingMessage(type) {
        return this._conn.isExpectingMessage(type);
    }

    /**
     * @param {Message} msg
     * @return {boolean}
     * @private
     */
    _send(msg) {
        return this._conn.send(msg.serialize());
    }

    /**
     * @param {number} [type]
     * @param {string} [reason]
     */
    close(type, reason) {
        this._conn.close(type, reason);
        this._offAll();
    }

    /**
     * @param {PeerAddress} peerAddress
     * @param {Hash} headHash
     * @param {Uint8Array} challengeNonce
     * @param {string} [appAgent]
     * @return {boolean}
     */
    version(peerAddress, headHash, challengeNonce, appAgent) {
        return this._send(new VersionMessage(Version.CODE, peerAddress, GenesisConfig.GENESIS_HASH, headHash, challengeNonce, Version.createUserAgent(appAgent)));
    }

    /**
     * @param {PublicKey} publicKey
     * @param {Signature} signature
     * @returns {boolean}
     */
    verack(publicKey, signature) {
        return this._send(new VerAckMessage(publicKey, signature));
    }

    /**
     * @param {Array.<InvVector>} vectors
     * @return {boolean}
     */
    inv(vectors) {
        return this._send(new InvMessage(vectors));
    }

    /**
     * @param {Array.<InvVector>} vectors
     * @return {boolean}
     */
    notFound(vectors) {
        return this._send(new NotFoundMessage(vectors));
    }

    /**
     * @param {Array.<InvVector>} vectors
     * @return {boolean}
     */
    getData(vectors) {
        return this._send(new GetDataMessage(vectors));
    }

    /**
     * @param {Array.<InvVector>} vectors
     * @return {boolean}
     */
    getHeader(vectors) {
        return this._send(new GetHeaderMessage(vectors));
    }

    /**
     * @param {Block} block
     * @return {boolean}
     */
    block(block) {
        return this._send(new BlockMessage(block));
    }

    /**
     * @param {Uint8Array} block
     * @return {boolean}
     */
    rawBlock(block) {
        return this._send(new RawBlockMessage(block));
    }

    /**
     * @param {BlockHeader} header
     * @return {boolean}
     */
    header(header) {
        return this._send(new HeaderMessage(header));
    }

    /**
     * @param {Transaction} transaction
     * @param {?AccountsProof} [accountsProof]
     * @return {boolean}
     */
    tx(transaction, accountsProof) {
        return this._send(new TxMessage(transaction, accountsProof));
    }

    /**
     * @param {Array.<Hash>} locators
     * @param {number} maxInvSize
     * @param {boolean} [ascending]
     * @return {boolean}
     */
    getBlocks(locators, maxInvSize=BaseInventoryMessage.VECTORS_MAX_COUNT, ascending=true) {
        return this._send(new GetBlocksMessage(locators, maxInvSize, ascending ? GetBlocksMessage.Direction.FORWARD : GetBlocksMessage.Direction.BACKWARD));
    }

    /**
     * @return {boolean}
     */
    mempool() {
        return this._send(new MempoolMessage());
    }

    /**
     * @param {Message.Type} messageType
     * @param {RejectMessage.Code} code
     * @param {string} reason
     * @param {Uint8Array} [extraData]
     * @return {boolean}
     */
    reject(messageType, code, reason, extraData) {
        return this._send(new RejectMessage(messageType, code, reason, extraData));
    }

    /**
     * @param {Subscription} subscription
     * @returns {boolean}
     */
    subscribe(subscription) {
        return this._send(new SubscribeMessage(subscription));
    }

    /**
     * @param {Array.<PeerAddress>} addresses
     * @return {boolean}
     */
    addr(addresses) {
        return this._send(new AddrMessage(addresses));
    }

    /**
     * @param {number} protocolMask
     * @param {number} serviceMask
     * @param {number} maxResults
     * @return {boolean}
     */
    getAddr(protocolMask, serviceMask, maxResults) {
        return this._send(new GetAddrMessage(protocolMask, serviceMask, maxResults));
    }

    /**
     * @param {number} nonce
     * @return {boolean}
     */
    ping(nonce) {
        return this._send(new PingMessage(nonce));
    }

    /**
     * @param {number} nonce
     * @return {boolean}
     */
    pong(nonce) {
        return this._send(new PongMessage(nonce));
    }

    /**
     * @param {PeerId} senderId
     * @param {PeerId} recipientId
     * @param {number} nonce
     * @param {number} ttl
     * @param {SignalMessage.Flag|number} flags
     * @param {Uint8Array} [payload]
     * @param {PublicKey} [senderPubKey]
     * @param {Signature} [signature]
     * @return {boolean}
     */
    signal(senderId, recipientId, nonce, ttl, flags, payload, senderPubKey, signature) {
        return this._send(new SignalMessage(senderId, recipientId, nonce, ttl, flags, payload, senderPubKey, signature));
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Address>} addresses
     * @return {boolean}
     */
    getAccountsProof(blockHash, addresses) {
        return this._send(new GetAccountsProofMessage(blockHash, addresses));
    }

    /**
     * @param {Hash} blockHash
     * @param {AccountsProof} [proof]
     * @return {boolean}
     */
    accountsProof(blockHash, proof) {
        return this._send(new AccountsProofMessage(blockHash, proof));
    }

    /**
     * @return {boolean}
     */
    getChainProof() {
        return this._send(new GetChainProofMessage());
    }

    /**
     * @param {ChainProof} proof
     * @return {boolean}
     */
    chainProof(proof) {
        return this._send(new ChainProofMessage(proof));
    }

    /**
     * @param {Hash} blockHash
     * @param {string} startPrefix
     * @return {boolean}
     */
    getAccountsTreeChunk(blockHash, startPrefix) {
        return this._send(new GetAccountsTreeChunkMessage(blockHash, startPrefix));
    }

    /**
     * @param {Hash} blockHash
     * @param {AccountsTreeChunk} [chunk]
     * @return {boolean}
     */
    accountsTreeChunk(blockHash, chunk) {
        return this._send(new AccountsTreeChunkMessage(blockHash, chunk));
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Address>} addresses
     * @return {boolean}
     * @deprecated
     */
    getTransactionsProof(blockHash, addresses) {
        return this.getTransactionsProofByAddresses(blockHash, addresses);
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Address>} addresses
     * @return {boolean}
     */
    getTransactionsProofByAddresses(blockHash, addresses) {
        return this._send(new GetTransactionsProofByAddressesMessage(blockHash, addresses));
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Hash>} hashes
     * @return {boolean}
     */
    getTransactionsProofByHashes(blockHash, hashes) {
        return this._send(new GetTransactionsProofByHashesMessage(blockHash, hashes));
    }

    /**
     * @param {Hash} blockHash
     * @param {TransactionsProof} [proof]
     * @return {boolean}
     */
    transactionsProof(blockHash, proof) {
        return this._send(new TransactionsProofMessage(blockHash, proof));
    }


    /**
     * @param {Address} address
     * @returns {boolean}
     * @deprecated
     */
    getTransactionReceipts(address) {
        return this.getTransactionReceiptsByAddress(address);
    }

    /**
     * @param {Address} address
     * @returns {boolean}
     */
    getTransactionReceiptsByAddress(address) {
        return this._send(new GetTransactionReceiptsByAddressMessage(address));
    }

    /**
     * @param {Array.<Hash>} hashes
     * @returns {boolean}
     */
    getTransactionReceiptsByHashes(hashes) {
        return this._send(new GetTransactionReceiptsByHashesMessage(hashes));
    }

    /**
     * @param {?Array.<TransactionReceipt>} transactionReceipts
     * @returns {boolean}
     */
    transactionReceipts(transactionReceipts) {
        return this._send(new TransactionReceiptsMessage(transactionReceipts));
    }

    /**
     * @param {Hash} blockHashToProve
     * @param {Hash} knownBlockHash
     * @returns {boolean}
     */
    getBlockProof(blockHashToProve, knownBlockHash) {
        return this._send(new GetBlockProofMessage(blockHashToProve, knownBlockHash));
    }

    /**
     * @param {number} blockHeightToProve
     * @param {Hash} knownBlockHash
     * @returns {boolean}
     */
    getBlockProofAt(blockHeightToProve, knownBlockHash) {
        return this._send(new GetBlockProofAtMessage(blockHeightToProve, knownBlockHash));
    }

    /**
     * @param {BlockChain} [proof]
     * @returns {boolean}
     */
    blockProof(proof) {
        return this._send(new BlockProofMessage(proof));
    }

    /**
     * @returns {boolean}
     */
    getHead() {
        return this._send(new GetHeadMessage());
    }

    /**
     * @param {BlockHeader} header
     * @returns {boolean}
     */
    head(header) {
        return this._send(new HeadMessage(header));
    }

    /**
     * @param {PeerChannel} o
     * @return {boolean}
     */
    equals(o) {
        return o instanceof PeerChannel
            && this._conn.equals(o.connection);
    }

    /**
     * @returns {string}
     */
    hashCode() {
        return this._conn.hashCode();
    }

    /**
     * @return {string}
     */
    toString() {
        return `PeerChannel{conn=${this._conn}}`;
    }

    /** @type {NetworkConnection} */
    get connection() {
        return this._conn;
    }

    /** @type {number} */
    get id() {
        return this._conn.id;
    }

    /** @type {number} */
    get protocol() {
        return this._conn.protocol;
    }

    /** @type {PeerAddress} */
    get peerAddress() {
        return this._conn.peerAddress;
    }

    /** @type {PeerAddress} */
    set peerAddress(value) {
        this._conn.peerAddress = value;
    }

    /** @type {NetAddress} */
    get netAddress() {
        return this._conn.netAddress;
    }

    /** @type {NetAddress} */
    set netAddress(value) {
        this._conn.netAddress = value;
    }

    /** @type {boolean} */
    get closed() {
        return this._conn.closed;
    }

    /** @type {number} */
    get lastMessageReceivedAt() {
        return this._conn.lastMessageReceivedAt;
    }
}
Class.register(PeerChannel);

PeerChannel.Event = {};
PeerChannel.Event[Message.Type.VERSION] = 'version';
PeerChannel.Event[Message.Type.INV] = 'inv';
PeerChannel.Event[Message.Type.GET_DATA] = 'get-data';
PeerChannel.Event[Message.Type.GET_HEADER] = 'get-header';
PeerChannel.Event[Message.Type.NOT_FOUND] = 'not-found';
PeerChannel.Event[Message.Type.GET_BLOCKS] = 'get-blocks';
PeerChannel.Event[Message.Type.BLOCK] = 'block';
PeerChannel.Event[Message.Type.HEADER] = 'header';
PeerChannel.Event[Message.Type.TX] = 'tx';
PeerChannel.Event[Message.Type.MEMPOOL] = 'mempool';
PeerChannel.Event[Message.Type.REJECT] = 'reject';
PeerChannel.Event[Message.Type.SUBSCRIBE] = 'subscribe';
PeerChannel.Event[Message.Type.ADDR] = 'addr';
PeerChannel.Event[Message.Type.GET_ADDR] = 'get-addr';
PeerChannel.Event[Message.Type.PING] = 'ping';
PeerChannel.Event[Message.Type.PONG] = 'pong';
PeerChannel.Event[Message.Type.SIGNAL] = 'signal';
PeerChannel.Event[Message.Type.GET_CHAIN_PROOF] = 'get-chain-proof';
PeerChannel.Event[Message.Type.CHAIN_PROOF] = 'chain-proof';
PeerChannel.Event[Message.Type.GET_ACCOUNTS_PROOF] = 'get-accounts-proof';
PeerChannel.Event[Message.Type.ACCOUNTS_PROOF] = 'accounts-proof';
PeerChannel.Event[Message.Type.GET_ACCOUNTS_TREE_CHUNK] = 'get-accounts-tree-chunk';
PeerChannel.Event[Message.Type.ACCOUNTS_TREE_CHUNK] = 'accounts-tree-chunk';
PeerChannel.Event[Message.Type.GET_TRANSACTIONS_PROOF_BY_ADDRESSES] = 'get-transactions-proof';
PeerChannel.Event[Message.Type.TRANSACTIONS_PROOF] = 'transactions-proof';
PeerChannel.Event[Message.Type.GET_TRANSACTION_RECEIPTS_BY_ADDRESS] = 'get-transaction-receipts';
PeerChannel.Event[Message.Type.TRANSACTION_RECEIPTS] = 'transaction-receipts';
PeerChannel.Event[Message.Type.GET_BLOCK_PROOF] = 'get-block-proof';
PeerChannel.Event[Message.Type.BLOCK_PROOF] = 'block-proof';
PeerChannel.Event[Message.Type.GET_TRANSACTIONS_PROOF_BY_HASHES] = 'get-transactions-proof-by-hashes';
PeerChannel.Event[Message.Type.GET_TRANSACTION_RECEIPTS_BY_HASHES] = 'get-transaction-receipts-by-hashes';
PeerChannel.Event[Message.Type.GET_BLOCK_PROOF_AT] = 'get-block-proof-at';
PeerChannel.Event[Message.Type.GET_HEAD] = 'get-head';
PeerChannel.Event[Message.Type.HEAD] = 'head';
PeerChannel.Event[Message.Type.VERACK] = 'verack';