Home Reference Source Test

src/main/generic/consensus/full/FullConsensusAgent.js

class FullConsensusAgent extends BaseConsensusAgent {
    /**
     * @param {FullChain} blockchain
     * @param {Mempool} mempool
     * @param {Time} time
     * @param {Peer} peer
     * @param {InvRequestManager} invRequestManager
     * @param {Subscription} targetSubscription
     */
    constructor(blockchain, mempool, time, peer, invRequestManager, targetSubscription) {
        super(time, peer, invRequestManager, targetSubscription);
        /** @type {FullChain} */
        this._blockchain = blockchain;
        /** @type {Mempool} */
        this._mempool = mempool;

        // Flag indicating that we are currently syncing our blockchain with the peer's.
        /** @type {boolean} */
        this._syncing = false;

        // The number of blocks that extended our blockchain since the last requestBlocks().
        /** @type {number} */
        this._numBlocksExtending = -1;
        // The number of blocks that forked our blockchain since the last requestBlocks().
        /** @type {number} */
        this._numBlocksForking = -1;
        // The last fork block the peer has sent us.
        /** @type {Block} */
        this._forkHead = null;

        // The number of failed blockchain sync attempts.
        /** @type {number} */
        this._failedSyncs = 0;

        // The block hash that we want to learn to consider the sync complete.
        /** @type {Hash} */
        this._syncTarget = peer.headHash;

        /** @type {RateLimit} */
        this._chainProofLimit = new RateLimit(FullConsensusAgent.CHAIN_PROOF_RATE_LIMIT);
        /** @type {RateLimit} */
        this._accountsProofLimit = new RateLimit(FullConsensusAgent.ACCOUNTS_PROOF_RATE_LIMIT);
        /** @type {RateLimit} */
        this._accountsTreeChunkLimit = new RateLimit(FullConsensusAgent.ACCOUNTS_TREE_CHUNK_RATE_LIMIT);
        /** @type {RateLimit} */
        this._transactionsProofLimit = new RateLimit(FullConsensusAgent.TRANSACTION_PROOF_RATE_LIMIT);
        /** @type {RateLimit} */
        this._transactionReceiptsLimit = new RateLimit(FullConsensusAgent.TRANSACTION_RECEIPTS_RATE_LIMIT);
        /** @type {RateLimit} */
        this._blockProofLimit = new RateLimit(FullConsensusAgent.BLOCK_PROOF_RATE_LIMIT);
        /** @type {RateLimit} */
        this._getBlocksLimit = new RateLimit(FullConsensusAgent.GET_BLOCKS_RATE_LIMIT);

        // Listen to consensus messages from the peer.
        peer.channel.on('get-blocks', msg => this._onGetBlocks(msg));
        peer.channel.on('get-chain-proof', msg => this._onGetChainProof(msg));
        peer.channel.on('get-accounts-proof', msg => this._onGetAccountsProof(msg));
        peer.channel.on('get-accounts-tree-chunk', msg => this._onGetAccountsTreeChunk(msg));
        peer.channel.on('get-transactions-proof', msg => this._onGetTransactionsProof(msg));
        peer.channel.on('get-transaction-receipts', msg => this._onGetTransactions(msg));
        peer.channel.on('get-block-proof', msg => this._onGetBlockProof(msg));
        peer.channel.on('mempool', msg => this._onMempool(msg));
    }

    async syncBlockchain() {
        this._syncing = true;

        // We only sync with other full nodes.
        if (!Services.isFullNode(this._peer.peerAddress.services)) {
            this._syncFinished();
            return;
        }

        // Wait for all objects to arrive.
        if (!this._objectsInFlight.isEmpty()) {
            Log.v(FullConsensusAgent, `Waiting for ${this._objectsInFlight.length} objects to arrive ...`);
            return;
        }

        // Wait for all objects to be processed.
        if (!this._objectsProcessing.isEmpty()) {
            Log.v(FullConsensusAgent, `Waiting for ${this._objectsProcessing.length} objects to be processed ...`);
            return;
        }

        // If we know our sync target block, the sync process is finished.
        const head = await this._blockchain.getBlock(this._syncTarget, /*includeForks*/ true);
        if (head) {
            this._syncFinished();
            return;
        }

        // If the peer didn't send us any blocks that extended our chain, count it as a failed sync attempt.
        // This sets a maximum length for forks that the full client will accept:
        //   FullConsensusAgent.SYNC_ATTEMPTS_MAX * BaseInvectoryMessage.VECTORS_MAX_COUNT
        if (this._numBlocksExtending === 0 && ++this._failedSyncs >= FullConsensusAgent.SYNC_ATTEMPTS_MAX) {
            this._peer.channel.close(CloseType.BLOCKCHAIN_SYNC_FAILED, 'blockchain sync failed');
            return;
        }

        // We don't know the peer's head block, request blocks from it.
        this._requestBlocks().catch(Log.w.tag(FullConsensusAgent));
    }

    _syncFinished() {
        // Subscribe to all announcements from the peer.
        this._subscribeTarget();

        // Request the peer's mempool.
        // XXX Use a random delay here to prevent requests to multiple peers at once.
        const delay = FullConsensusAgent.MEMPOOL_DELAY_MIN
            + Math.random() * (FullConsensusAgent.MEMPOOL_DELAY_MAX - FullConsensusAgent.MEMPOOL_DELAY_MIN);
        setTimeout(() => this._peer.channel.mempool(), delay);

        this._syncing = false;
        this._synced = true;

        this._numBlocksExtending = 0;
        this._numBlocksForking = 0;
        this._forkHead = null;
        this._failedSyncs = 0;

        this.fire('sync');
    }

    async _requestBlocks(maxInvSize) {
        // Only one getBlocks request at a time.
        if (this._peer.channel.isExpectingMessage(Message.Type.INV)) {
            return;
        }

        // Drop the peer if it doesn't start sending InvVectors for its chain within the timeout.
        // Set timeout early to prevent re-entering the method.
        this._peer.channel.expectMessage(Message.Type.INV, () => {
            this._peer.channel.close(CloseType.GET_BLOCKS_TIMEOUT, 'getBlocks timeout');
        }, BaseConsensusAgent.REQUEST_TIMEOUT);

        // Check if the peer is sending us a fork.
        const onFork = this._forkHead && this._numBlocksExtending === 0 && this._numBlocksForking > 0;

        /** @type {Array.<Hash>} */
        let locators;
        if (onFork) {
            // Only send the fork head as locator if the peer is sending us a fork.
            locators = [this._forkHead.hash()];
        } else {
            locators = await this._blockchain.getBlockLocators();
        }

        // Reset block counters.
        this._numBlocksExtending = 0;
        this._numBlocksForking = 0;

        // Request blocks from peer.
        this._peer.channel.getBlocks(locators, maxInvSize);
    }

    /**
     * @param {InvMessage} msg
     * @returns {Promise}
     * @protected
     * @override
     */
    _onInv(msg) {
        return super._onInv(msg);
    }

    /**
     * @param {InvVector} vector
     * @returns {boolean}
     * @protected
     * @override
     */
    _shouldRequestData(vector) {
        // Ignore block announcements from nano clients as they will ignore our getData requests anyways (they only know headers).
        return !(Services.isNanoNode(this._peer.peerAddress.services) && vector.type === InvVector.Type.BLOCK);
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeForks]
     * @param {boolean} [includeBody]
     * @returns {Promise.<?Block>}
     * @protected
     * @override
     */
    _getBlock(hash, includeForks = false, includeBody = false) {
        return this._blockchain.getBlock(hash, includeForks, includeBody);
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeForks]
     * @returns {Promise.<?Uint8Array>}
     * @protected
     * @override
     */
    _getRawBlock(hash, includeForks = false) {
        return this._blockchain.getRawBlock(hash, includeForks);
    }

    /**
     * @param {Hash} hash
     * @returns {Promise.<?Transaction>}
     * @protected
     * @override
     */
    _getTransaction(hash) {
        return Promise.resolve(this._mempool.getTransaction(hash));
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {void}
     * @protected
     * @override
     */
    async _onKnownBlockAnnounced(hash, block) {
        if (!this._syncing) return;

        this._numBlocksForking++;
        this._forkHead = block;
    }

    /**
     * @returns {void}
     * @protected
     * @override
     */
    _onNoUnknownObjects() {
        // The peer does not have any new inv vectors for us.
        if (this._syncing) {
            this.syncBlockchain().catch(Log.w.tag(FullConsensusAgent));
        }
    }

    /**
     * @protected
     * @override
     */
    _onAllObjectsReceived() {
        // If all objects have been received, request more if we're syncing the blockchain.
        if (this._syncing) {
            this.syncBlockchain().catch(Log.w.tag(FullConsensusAgent));
        }
    }

    /**
     * @param {HeaderMessage} msg
     * @return {Promise.<void>}
     * @protected
     * @override
     */
    _onHeader(msg) {
        // Ignore header messages.
        Log.w(FullConsensusAgent, `Unsolicited header message received from ${this._peer.peerAddress}, discarding`);
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {Promise.<void>}
     * @protected
     * @override
     */
    async _processBlock(hash, block) {
        // TODO send reject message if we don't like the block
        const status = await this._blockchain.pushBlock(block);
        switch (status) {
            case FullChain.ERR_INVALID:
                this._peer.channel.close(CloseType.RECEIVED_INVALID_BLOCK, 'received invalid block');
                break;

            case FullChain.OK_EXTENDED:
            case FullChain.OK_REBRANCHED:
                if (this._syncing) this._numBlocksExtending++;
                break;

            case FullChain.OK_FORKED:
                if (this._syncing) {
                    this._numBlocksForking++;
                    this._forkHead = block;
                }
                break;

            case FullChain.ERR_ORPHAN:
                this._onOrphanBlock(hash, block);
                break;

            case FullChain.OK_KNOWN:
                Log.v(FullConsensusAgent, `Received known block ${hash} (height=${block.height}, prevHash=${block.prevHash}) from ${this._peer.peerAddress}`);
                break;
        }
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @protected
     */
    _onOrphanBlock(hash, block) {
        // Ignore orphan blocks if we're not synced yet. This shouldn't happen.
        if (!this._synced) {
            Log.w(FullConsensusAgent, `Received orphan block ${hash} (height=${block.height}, prevHash=${block.prevHash}) while syncing`);
            return;
        }

        // The peer has announced an orphaned block after the initial sync. We're probably out of sync.
        Log.d(FullConsensusAgent, `Received orphan block ${hash} (height=${block.height}, prevHash=${block.prevHash}) from ${this._peer.peerAddress}`);

        // Disable announcements from the peer once.
        if (!this._timers.timeoutExists('outOfSync')) {
            this._subscribe(Subscription.NONE);
        }

        // Set the orphaned block as the new sync target.
        this._syncTarget = hash;

        // Wait a short time for:
        // - our (un-)subscribe message to be sent
        // - potentially more orphaned blocks to arrive
        this._timers.resetTimeout('outOfSync', () => this._outOfSync(), FullConsensusAgent.RESYNC_THROTTLE);
    }

    /**
     * @private
     */
    _outOfSync() {
        this._timers.clearTimeout('outOfSync');

        this._synced = false;

        this.fire('out-of-sync');
    }

    /**
     * @param {Hash} hash
     * @param {Transaction} transaction
     * @returns {Promise.<boolean>}
     * @protected
     * @override
     */
    async _processTransaction(hash, transaction) {
        const result = await this._mempool.pushTransaction(transaction);
        switch (result) {
            case Mempool.ReturnCode.ACCEPTED:
                return true;
            case Mempool.ReturnCode.KNOWN:
                return false;
            case Mempool.ReturnCode.FEE_TOO_LOW:
                this.peer.channel.reject(Message.Type.TX, RejectMessage.Code.REJECT_INSUFFICIENT_FEE,
                    'Sender has too many free transactions', transaction.hash().serialize());
                return false;
            case Mempool.ReturnCode.INVALID:
                this.peer.channel.reject(Message.Type.TX, RejectMessage.Code.REJECT_INVALID, 'Invalid transaction',
                    transaction.hash().serialize());
                return false;
            default:
                return false;
        }
    }

    /**
     * @protected
     * @override
     */
    _onAllObjectsProcessed() {
        // If all objects have been processed, request more if we're syncing the blockchain.
        if (this._syncing) {
            this.syncBlockchain().catch(Log.w.tag(FullConsensusAgent));
        }
    }


    /* Request endpoints */

    /**
     * @param {GetBlocksMessage} msg
     * @return {Promise}
     * @private
     */
    async _onGetBlocks(msg) {
        if (!this._getBlocksLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetBlocks message - rate-limit exceeded');
            return;
        }
        Log.v(FullConsensusAgent, `[GETBLOCKS] ${msg.locators.length} block locators maxInvSize ${msg.maxInvSize} received from ${this._peer.peerAddress}`);

        // A peer has requested blocks. Check all requested block locator hashes
        // in the given order and pick the first hash that is found on our main
        // chain, ignore the rest. If none of the requested hashes is found,
        // pick the genesis block hash. Send the main chain starting from the
        // picked hash back to the peer.
        let startBlock = GenesisConfig.GENESIS_BLOCK;
        for (const locator of msg.locators) {
            const block = await this._blockchain.getBlock(locator);
            if (block) {
                // We found a block, ignore remaining block locator hashes.
                startBlock = block;
                break;
            }
        }

        // Collect up to GETBLOCKS_VECTORS_MAX inventory vectors for the blocks starting right
        // after the identified block on the main chain.
        const blocks = await this._blockchain.getBlocks(startBlock.hash(),
            Math.min(msg.maxInvSize, FullConsensusAgent.GETBLOCKS_VECTORS_MAX),
            msg.direction === GetBlocksMessage.Direction.FORWARD);
        const vectors = [];
        for (const block of blocks) {
            vectors.push(InvVector.fromBlock(block));
        }

        // Send the vectors back to the requesting peer.
        this._peer.channel.inv(vectors);
    }

    /**
     * @param {GetChainProofMessage} msg
     * @private
     */
    async _onGetChainProof(msg) {
        if (!this._chainProofLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetChainProof message - rate-limit exceeded');
            this._peer.channel.close(CloseType.RATE_LIMIT_EXCEEDED, 'rate-limit exceeded');
            return;
        }
        const proof = await this._blockchain.getChainProof();
        this._peer.channel.chainProof(proof);
    }

    /**
     * @param {GetBlockProofMessage} msg
     * @private
     */
    async _onGetBlockProof(msg) {
        if (!this._blockProofLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetBlockProof message - rate-limit exceeded');
            this._peer.channel.blockProof(null);
            return;
        }
        const blockToProve = await this._blockchain.getBlock(msg.blockHashToProve);
        const knownBlock = await this._blockchain.getBlock(msg.knownBlockHash);
        if (!blockToProve || !knownBlock) {
            this._peer.channel.blockProof();
            return;
        }

        const proof = await this._blockchain.getBlockProof(blockToProve, knownBlock);
        this._peer.channel.blockProof(proof);
    }

    /**
     * @param {GetAccountsProofMessage} msg
     * @private
     */
    async _onGetAccountsProof(msg) {
        if (!this._accountsProofLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetAccountsProof message - rate-limit exceeded');
            this._peer.channel.accountsProof(msg.blockHash, null);
            return;
        }
        const proof = await this._blockchain.getAccountsProof(msg.blockHash, msg.addresses);
        this._peer.channel.accountsProof(msg.blockHash, proof);
    }

    /**
     * @param {GetTransactionsProofMessage} msg
     * @private
     */
    async _onGetTransactionsProof(msg) {
        if (!this._transactionsProofLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetTransactionsProof message - rate-limit exceeded');
            this._peer.channel.transactionsProof(msg.blockHash, null);
            return;
        }
        const proof = await this._blockchain.getTransactionsProof(msg.blockHash, msg.addresses);
        this._peer.channel.transactionsProof(msg.blockHash, proof);
    }

    /**
     * @param {GetAccountsTreeChunkMessage} msg
     * @private
     */
    async _onGetAccountsTreeChunk(msg) {
        if (!this._accountsTreeChunkLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetAccountsTreeChunk message - rate-limit exceeded');
            this._peer.channel.accountsTreeChunk(msg.blockHash, null);
            return;
        }
        const chunk = await this._blockchain.getAccountsTreeChunk(msg.blockHash, msg.startPrefix);
        this._peer.channel.accountsTreeChunk(msg.blockHash, chunk);
    }

    /**
     * @param {GetTransactionReceiptsMessage} msg
     * @private
     */
    async _onGetTransactions(msg) {
        if (!this._transactionReceiptsLimit.note()) {
            Log.w(FullConsensusAgent, 'Rejecting GetTransactionReceipts message - rate-limit exceeded');
            this._peer.channel.transactionReceipts(null);
            return;
        }

        const receipts = await this._blockchain.getTransactionReceiptsByAddress(msg.address, TransactionReceiptsMessage.RECEIPTS_MAX_COUNT);
        this._peer.channel.transactionReceipts(receipts);
    }

    /**
     * @param {MempoolMessage} msg
     * @return {Promise}
     * @private
     */
    async _onMempool(msg) {
        // Query mempool for transactions
        let transactions = [];
        switch (this._remoteSubscription.type) {
            case Subscription.Type.ADDRESSES:
                transactions = this._mempool.getTransactionsByAddresses(this._remoteSubscription.addresses, FullConsensusAgent.MEMPOOL_ENTRIES_MAX);
                break;
            case Subscription.Type.MIN_FEE:
                transactions = new LimitIterable(this._mempool.transactionGenerator(/*maxSize*/ undefined, this._remoteSubscription.minFeePerByte), FullConsensusAgent.MEMPOOL_ENTRIES_MAX);
                break;
            case Subscription.Type.ANY:
                transactions = new LimitIterable(this._mempool.transactionGenerator(), FullConsensusAgent.MEMPOOL_ENTRIES_MAX);
                break;
        }

        // Send an InvVector for each transaction in the mempool.
        // Split into multiple Inv messages if the mempool is large.
        let vectors = [];
        for (const tx of transactions) {
            vectors.push(InvVector.fromTransaction(tx));

            if (vectors.length >= BaseInventoryMessage.VECTORS_MAX_COUNT) {
                this._peer.channel.inv(vectors);
                vectors = [];
                await new Promise((resolve) => setTimeout(resolve, FullConsensusAgent.MEMPOOL_THROTTLE));
            }
        }

        if (vectors.length > 0) {
            this._peer.channel.inv(vectors);
        }
    }

    /** @type {boolean} */
    get syncing() {
        return this._syncing;
    }
}
/**
 * Maximum number of blockchain sync retries before closing the connection.
 * XXX If the peer is on a long fork, it will count as a failed sync attempt
 * if our blockchain doesn't switch to the fork within 500 (max InvVectors returned by getBlocks)
 * blocks.
 * @type {number}
 */
FullConsensusAgent.SYNC_ATTEMPTS_MAX = 25;
/**
 * Maximum number of inventory vectors to sent in the response for onGetBlocks.
 * @type {number}
 */
FullConsensusAgent.GETBLOCKS_VECTORS_MAX = 500;
/**
 * Time {ms} to wait before triggering a blockchain re-sync with the peer.
 * @type {number}
 */
FullConsensusAgent.RESYNC_THROTTLE = 1000 * 3; // 3 seconds
/**
 * Minimum time {ms} to wait before triggering the initial mempool request.
 * @type {number}
 */
FullConsensusAgent.MEMPOOL_DELAY_MIN = 1000 * 2; // 2 seconds
/**
 * Maximum time {ms} to wait before triggering the initial mempool request.
 * @type {number}
 */
FullConsensusAgent.MEMPOOL_DELAY_MAX = 1000 * 20; // 20 seconds
/**
 * Time {ms} to wait between sending full inv vectors of transactions during Mempool request
 * @type {number}
 */
FullConsensusAgent.MEMPOOL_THROTTLE = 1000;
/**
 * Number of transaction vectors to send
 * @type {number}
 */
FullConsensusAgent.MEMPOOL_ENTRIES_MAX = 10000;
FullConsensusAgent.CHAIN_PROOF_RATE_LIMIT = 3; // per minute
FullConsensusAgent.ACCOUNTS_PROOF_RATE_LIMIT = 60; // per minute
FullConsensusAgent.ACCOUNTS_TREE_CHUNK_RATE_LIMIT = 120; // per minute
FullConsensusAgent.TRANSACTION_PROOF_RATE_LIMIT = 60; // per minute
FullConsensusAgent.TRANSACTION_RECEIPTS_RATE_LIMIT = 30; // per minute
FullConsensusAgent.BLOCK_PROOF_RATE_LIMIT = 60; // per minute
FullConsensusAgent.GET_BLOCKS_RATE_LIMIT = 30; // per minute
Class.register(FullConsensusAgent);