Home Reference Source Test

src/main/generic/consensus/BaseConsensus.js

/**
 * @abstract
 */
class BaseConsensus extends Observable {
    /**
     * @param {BaseChain} blockchain
     * @param {Observable} mempool
     * @param {Network} network
     */
    constructor(blockchain, mempool, network) {
        super();
        /** @type {BaseChain} */
        this._blockchain = blockchain;
        /** @type {Network} */
        this._network = network;

        /** @type {HashMap.<Peer,BaseConsensusAgent>} */
        this._agents = new HashMap();

        /** @type {Timers} */
        this._timers = new Timers();

        /**
         * @type {boolean}
         * @protected
         */
        this._established = false;

        /** @type {Peer} */
        this._syncPeer = null;

        /** @type {Subscription} */
        this._subscription = Subscription.ANY;

        /** @type {InvRequestManager} */
        this._invRequestManager = new InvRequestManager();

        /** @type {Set.<{obj: Observable, type: string, id: number}>} */
        this._listenersToDisconnect = new Set();

        this._onToDisconnect(network, 'peer-joined', peer => this._onPeerJoined(peer));
        this._onToDisconnect(network, 'peer-left', peer => this._onPeerLeft(peer));

        // Notify peers when our blockchain head changes.
        this._onToDisconnect(blockchain, 'head-changed', head => this._onHeadChanged(head));
        this._onToDisconnect(blockchain, 'rebranched', (revertBlocks, forkBlocks, blockHash) => this._onRebranched(blockHash, revertBlocks, forkBlocks));
        this._onToDisconnect(blockchain, 'extended', (blockHash) => this._onExtended(blockHash));
        this._onToDisconnect(blockchain, 'block', (blockHash) => this.fire('block', blockHash));

        // Relay new (verified) transactions to peers.
        this._onToDisconnect(mempool,'transaction-added', tx => this._onTransactionAdded(tx));
        this._onToDisconnect(mempool,'transaction-removed', tx => this._onTransactionRemoved(tx));
    }

    //
    // Public consensus interface
    //

    /**
     * @returns {Promise.<Hash>}
     */
    async getHeadHash() { // eslint-disable-line require-await
        return this._blockchain.headHash;
    }

    /**
     * @returns {Promise.<number>}
     */
    async getHeadHeight() { // eslint-disable-line require-await
        return this._blockchain.height;
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeBody = true]
     * @param {boolean} [includeBodyFromLocal]
     * @param {number} [blockHeight]
     * @returns {Promise.<Block>}
     */
    async getBlock(hash, includeBody = true, includeBodyFromLocal = includeBody, blockHeight) {
        let block = await this._blockchain.getBlock(hash, true, includeBody || includeBodyFromLocal);
        // XXX: Fetches full blocks if no peer supports protocol version 2 and it would be full from local.
        includeBody = includeBody || (includeBodyFromLocal && !this._hasPeersWithVersion(2));
        if (!block || (includeBody && !block.isFull())) {
            block = await this._requestBlock(hash, includeBody, block ? block.height : blockHeight, !!block) || block;
        }
        return block;
    }

    /**
     * @param {number} height
     * @param {boolean} [includeBody = true]
     * @returns {Promise.<Block>}
     */
    async getBlockAt(height, includeBody = true) {
        if (height > this._blockchain.height || height < 1) {
            throw new Error('Invalid height');
        }
        let block = await this._blockchain.getBlockAt(height, includeBody);
        if (!block) {
            block = await this._requestBlockAt(height, includeBody);
        } else if (block && includeBody && !block.isFull()) {
            block = await this._requestBlock(block.hash(), includeBody, height, true) || block;
        }
        return block;
    }

    /**
     * @param {Address} minerAddress
     * @param {Uint8Array} [extraData]
     * @returns {Promise.<Block>}
     */
    async getBlockTemplate(minerAddress, extraData) { // eslint-disable-line require-await, no-unused-vars
        throw new Error('not implemented: getBlockTemplate');
    }

    /**
     * @param {Block} block
     * @returns {Promise.<boolean>}
     */
    async submitBlock(block) { // eslint-disable-line require-await, no-unused-vars
        throw new Error('not implemented: submitBlock');
    }

    /**
     * @param {Array.<Address>} addresses
     * @returns {Promise.<Array.<Account>>}
     * @abstract
     */
    async getAccounts(addresses) { // eslint-disable-line require-await, no-unused-vars
        throw new Error('not implemented: getAccounts');
    }

    /**
     * @param {Array.<Hash>} hashes
     * @returns {Promise.<Array.<Transaction>>}
     */
    getPendingTransactions(hashes) {
        return this._requestPendingTransactions(hashes);
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<Transaction>>}
     */
    async getPendingTransactionsByAddress(address) { // eslint-disable-line require-await, no-unused-vars
        throw new Error('not implemented: getPendingTransactionsByAddress');
    }

    /**
     * @param {Array.<Hash>} hashes
     * @param {Hash} blockHash
     * @param {number} [blockHeight]
     * @param {Block} [block]
     * @returns {Promise.<Array.<Transaction>>}
     */
    async getTransactionsFromBlock(hashes, blockHash, blockHeight, block) {
        if (!block) {
            block = await this.getBlock(blockHash, false, true, blockHeight);
        }
        if (block && block.isFull()) {
            // Just search the block
            return block.transactions.filter(tx => hashes.find(hash => hash.equals(tx.hash())));
        } else {
            return this._requestTransactionsByHashes(hashes, block);
        }
    }

    /**
     * @param {Array.<Address>} addresses
     * @param {Hash} blockHash
     * @param {number} [blockHeight]
     * @returns {Promise.<Array.<Transaction>>}
     */
    async getTransactionsFromBlockByAddresses(addresses, blockHash, blockHeight) {
        let block = await this._blockchain.getBlock(blockHash, false, true);
        if (!block) {
            block = this._requestBlock(blockHash, false, blockHeight);
        }
        if (block && block.isFull()) {
            // Just search the block
            return block.transactions.filter(tx => !!addresses.find(a => a.equals(tx.sender) || a.equals(tx.recipient)));
        } else {
            return this._requestTransactionsByAddresses(addresses, block);
        }
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<TransactionReceipt>>}
     */
    getTransactionReceiptsByAddress(address) {
        return this._requestTransactionReceiptsByAddress(address);
    }

    /**
     * @param {Array.<Hash>} hashes
     * @returns {Promise.<Array.<TransactionReceipt>>}
     */
    getTransactionReceiptsByHashes(hashes) {
        return this._requestTransactionReceiptsByHashes(hashes);
    }

    /**
     * @param {Transaction} tx
     * @returns {Promise.<BaseConsensus.SendTransactionResult>}
     * @abstract
     */
    async sendTransaction(tx) { // eslint-disable-line no-unused-vars, require-await
        throw new Error('not implemented: sendTransaction');
    }

    /**
     * @returns {Array.<Transaction>}
     */
    getMempoolContents() {
        return [];
    }

    //
    //

    /**
     * @param {Observable} obj
     * @param {string} type
     * @param {function} callback
     * @protected
     */
    _onToDisconnect(obj, type, callback) {
        const id = obj.on(type, callback);
        this._listenersToDisconnect.add({obj, type, id});
    }

    /**
     * @protected
     */
    _disconnectListeners() {
        for (const listener of this._listenersToDisconnect) {
            listener.obj.off(listener.type, listener.id);
        }
        this._offAll();
    }

    /**
     * @param {BaseConsensus} consensus
     * @returns {BaseConsensus}
     */
    handoverTo(consensus) {
        this._disconnectListeners();
        for (const agent of this._agents.valueIterator()) {
            const peer = agent.peer;
            agent.shutdown();
            this._onPeerLeft(peer);
            consensus._onPeerJoined(peer);
        }
        return consensus;
    }

    /**
     * @param {number} version
     * @returns {boolean}
     * @private
     */
    _hasPeersWithVersion(version) {
        for (const agent of this._agents.valueIterator()) {
            if (agent.peer.version >= version) {
                return true;
            }
        }
        return false;
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeBody = false]
     * @param {?number} [blockHeight]
     * @param {boolean} [proven = false]
     * @returns {Promise.<?Block>}
     */
    async _requestBlock(hash, includeBody = false, blockHeight, proven) {
        /** @type {Block} */
        let block = null;
        if (includeBody || !blockHeight) {
            /** @type {Array.<BaseConsensusAgent>} */
            const agents = [];
            const requiresHistory = !blockHeight || blockHeight < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
            for (const agent of this._agents.valueIterator()) {
                if (agent.synced && agent.providesServices(Services.FULL_BLOCKS) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY))) {
                    agents.push(agent);
                }
            }

            // Try agents first that (we think) know the block hash.
            agents.sort((a, b) =>
                a.knowsBlock(hash) !== b.knowsBlock(hash)
                    ? -a.knowsBlock(hash) + 0.5
                    : Math.random() - 0.5);

            for (const agent of agents) {
                try {
                    block = await agent.requestBlock(hash); // eslint-disable-line no-await-in-loop
                    if (block) break;
                } catch (e) {
                    Log.w(BaseConsensus, `Failed to retrieve block for ${hash} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                    // Try the next peer.
                }
            }
            if (!block) {
                throw new Error(`Failed to retrieve block for ${hash}`);
            }
            if (!proven) await this._requestBlockProof(hash, block.height);
            return block;
        } else {
            // TODO: Should block be proven?
            return this._requestBlockProof(hash, blockHeight);
        }
    }

    /**
     * @param {number} blockHeight
     * @param {boolean} [includeBody=false]
     * @returns {Promise.<?Block>}
     */
    async _requestBlockAt(blockHeight, includeBody) {
        /** @type {Block} */
        const block = await this._requestBlockProofAt(blockHeight);
        if (includeBody && !block.isFull()) {
            const hash = block.hash();
            /** @type {Array.<BaseConsensusAgent>} */
            const agents = [];
            const requiresHistory = blockHeight < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
            for (const agent of this._agents.valueIterator()) {
                if (agent.synced && agent.providesServices(Services.FULL_BLOCKS) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY))) {
                    agents.push(agent);
                }
            }

            // Try agents first that (we think) know the block hash.
            agents.sort((a, b) =>
                a.knowsBlock(hash) !== b.knowsBlock(hash)
                    ? -a.knowsBlock(hash) + 0.5
                    : Math.random() - 0.5);

            for (const agent of agents) {
                try {
                    return await agent.requestBlock(hash); // eslint-disable-line no-await-in-loop
                } catch (e) {
                    Log.w(BaseConsensus, `Failed to retrieve block for ${hash}@${blockHeight} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                    // Try the next peer.
                }
            }
            throw new Error(`Failed to retrieve block for ${hash}@${blockHeight}`);
        }
        return block;
    }

    /**
     * @param {Array.<Hash>} hashes
     * @returns {Promise.<Array.<Transaction>>}
     */
    _requestPendingTransactions(hashes) {
        return Promise.all(hashes.map(hash => this._requestPendingTransaction(hash)));
    }

    /**
     * @param {Hash} hash
     * @return {Promise.<?Transaction>}
     * @private
     */
    async _requestPendingTransaction(hash) {
        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.MEMPOOL)) {
                agents.push(agent);
            }
        }

        // Try agents first that (we think) know the transaction hash.
        agents.sort((a, b) =>
            a.knowsTransaction(hash) !== b.knowsTransaction(hash)
                ? -a.knowsTransaction(hash) + 0.5
                : Math.random() - 0.5);

        for (const agent of agents) {
            try {
                const tx = await agent.requestTransaction(hash); // eslint-disable-line no-await-in-loop
                if (tx) return tx;
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve pending transaction for ${hash} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested transaction, fail.
        throw new Error(`Failed to retrieve pending transaction for ${hash}`);
    }

    /**
     * @param {Array.<Hash>} hashes
     * @returns {Promise.<Array.<TransactionReceipt>>}
     */
    async _requestTransactionReceiptsByHashes(hashes) {
        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.TRANSACTION_INDEX) && agent.peer.version >= 2) {
                agents.push(agent);
            }
        }

        for (const agent of agents) {
            try {
                return await agent.getTransactionReceiptsByHashes(hashes); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve transaction receipts for ${hashes} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested transaction receipts, fail.
        throw new Error(`Failed to retrieve transaction receipts for ${hashes}`);
    }

    /**
     * @param {Array.<Hash>} hashes
     * @param {Block} block
     * @returns {Promise.<Array.<Transaction>>}
     */
    async _requestTransactionsByHashes(hashes, block) {
        // TODO: Use the agent that provided the receipt
        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        const requiresHistory = block.height < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.BODY_PROOF) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY)) && agent.peer.version >= 2) {
                agents.push(agent);
            }
        }

        // Try agents first that (we think) know the reference block hash.
        const knownBlockHash = block.hash();
        agents.sort((a, b) =>
            a.knowsBlock(knownBlockHash) !== b.knowsBlock(knownBlockHash)
                ? -a.knowsBlock(knownBlockHash) + 0.5
                : Math.random() - 0.5);

        for (const agent of agents) {
            try {
                return await agent.getTransactionsProofByHashes(block, hashes); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve transactions for ${hashes} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested transactions, fail.
        throw new Error(`Failed to retrieve transactions for ${hashes}`);
    }

    /**
     * @param {Subscription} subscription
     */
    subscribe(subscription) {
        this._subscription = subscription;
        for (const /** @type {BaseConsensusAgent} */ agent of this._agents.valueIterator()) {
            agent.subscribe(subscription);
        }
    }

    /**
     * @returns {Subscription}
     */
    getSubscription() {
        return this._subscription;
    }

    /**
     * @param {Peer} peer
     * @returns {BaseConsensusAgent}
     * @protected
     */
    _newConsensusAgent(peer) { // eslint-disable-line no-unused-vars
        throw new Error('not implemented');
    }

    /**
     * @param {Peer} peer
     * @returns {BaseConsensusAgent}
     * @protected
     */
    _onPeerJoined(peer) {
        // Create a ConsensusAgent for each peer that connects.
        const agent = this._newConsensusAgent(peer);
        this._agents.put(peer.id, agent);

        // Register agent event listeners.
        agent.on('close', () => this._onPeerLeft(agent.peer));
        agent.on('sync', () => this._onPeerSynced(agent.peer));
        agent.on('out-of-sync', () => this._onPeerOutOfSync(agent.peer));
        this.bubble(agent, 'transaction-relayed');

        // If no more peers connect within the specified timeout, start syncing.
        this._timers.resetTimeout('sync', this._syncBlockchain.bind(this), BaseConsensus.SYNC_THROTTLE);

        return agent;
    }

    /**
     * @param {Peer} peer
     * @protected
     */
    _onPeerLeft(peer) {
        // Reset syncPeer if it left during the sync.
        if (peer.equals(this._syncPeer)) {
            Log.d(BaseConsensus, `Peer ${peer.peerAddress} left during sync`);
            this._syncPeer = null;
            this.fire('sync-failed', peer.peerAddress);
        }

        this._agents.remove(peer.id);
        this._syncBlockchain();
    }

    /**
     * @protected
     */
    _syncBlockchain() {
        const candidates = [];
        let numSyncedFullNodes = 0;
        for (const /** @type {BaseConsensusAgent} */ agent of this._agents.valueIterator()) {
            if (!agent.synced) {
                candidates.push(agent);
            } else if (Services.isFullNode(agent.peer.peerAddress.services)) {
                numSyncedFullNodes++;
            }
        }

        // Report consensus-lost if we are synced with less than the minimum number of full nodes or have no connections at all.
        if (this._established && (numSyncedFullNodes < BaseConsensus.MIN_FULL_NODES || this._agents.length === 0)) {
            this._established = false;
            this.fire('lost');
        }

        // Wait for ongoing sync to finish.
        if (this._syncPeer) {
            return;
        }

        // Choose a random peer which we aren't sync'd with yet.
        const agent = ArrayUtils.randomElement(candidates);
        if (!agent) {
            // We are synced with all connected peers.

            // Report consensus-established if we are connected to the minimum number of full nodes.
            if (this._hasEnoughPeers(numSyncedFullNodes, this._agents.length)) {
                if (!this._established) {
                    Log.i(BaseConsensus, `Synced with all connected peers (${this._agents.length}), consensus established.`);
                    Log.d(BaseConsensus, `Blockchain: height=${this._blockchain.height}, headHash=${this._blockchain.headHash}`);

                    // Report consensus-established.
                    this._established = true;
                    this.fire('established');

                    // Allow inbound network connections after establishing consensus.
                    this._network.allowInboundConnections = true;
                }
            }
            // Otherwise, wait until more peer connections are established.
            else {
                this.fire('waiting');
            }

            return;
        }

        this._syncPeer = agent.peer;

        // Notify listeners when we start syncing and have not established consensus yet.
        if (!this._established) {
            this.fire('syncing');
        }

        Log.v(BaseConsensus, `Syncing blockchain with peer ${agent.peer.peerAddress}`);
        agent.syncBlockchain().catch(Log.e.tag(BaseConsensusAgent));
    }

    /**
     * @param {number} numSyncedFullNodes
     * @param {number} numSyncedNodes
     * @return {boolean}
     * @protected
     */
    _hasEnoughPeers(numSyncedFullNodes, numSyncedNodes) { // eslint-disable-line no-unused-vars
        return numSyncedFullNodes >= BaseConsensus.MIN_FULL_NODES;
    }

    /**
     * @param {Peer} peer
     * @protected
     */
    _onPeerSynced(peer) {
        // Reset syncPeer if we finished syncing with it.
        if (peer.equals(this._syncPeer)) {
            Log.v(BaseConsensus, `Finished sync with peer ${peer.peerAddress}`);
            this._syncPeer = null;
        }
        this._syncBlockchain();
    }

    /**
     * @param {Peer} peer
     * @protected
     */
    _onPeerOutOfSync(peer) {
        Log.w(BaseConsensus, `Peer ${peer.peerAddress} out of sync, resyncing`);
        this._syncBlockchain();
    }

    /**
     * @param {Block} head
     * @protected
     */
    _onHeadChanged(head) {
        // Don't announce head changes if we are not synced yet.
        if (!this._established) return;

        for (const agent of this._agents.valueIterator()) {
            agent.relayBlock(head);
        }
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Block>} revertBlocks
     * @param {Array.<Block>} forkBlocks
     * @private
     */
    async _onRebranched(blockHash, revertBlocks, forkBlocks) {
        await this.fire('head-changed', blockHash, 'rebranched', revertBlocks, forkBlocks);
    }

    /**
     * @param {Block} block
     * @private
     */
    async _onExtended(block) {
        await this.fire('head-changed', block.hash(), 'extended', [], [block]);
    }

    /**
     * @param {Transaction} tx
     * @protected
     */
    _onTransactionAdded(tx) {
        this.fire('transaction-added', tx);

        // Don't relay transactions if we are not synced yet.
        if (!this._established) return;

        for (const agent of this._agents.valueIterator()) {
            agent.relayTransaction(tx);
        }
    }

    /**
     * @param {Transaction} tx
     * @protected
     */
    _onTransactionRemoved(tx) {
        this.fire('transaction-removed', tx);

        for (const agent of this._agents.valueIterator()) {
            agent.removeTransaction(tx);
        }
    }

    /**
     * @param {Hash} blockHashToProve
     * @param {number} blockHeightToProve
     * @returns {Promise.<Block>}
     * @protected
     */
    async _requestBlockProof(blockHashToProve, blockHeightToProve) {
        /** @type {Block} */
        const knownBlock = await this._blockchain.getNearestBlockAt(blockHeightToProve, /*lower*/ false);
        if (!knownBlock) {
            throw new Error('No suitable reference block found for block proof');
        }

        if (blockHashToProve.equals(knownBlock.hash())) {
            return knownBlock;
        }

        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        const requiresHistory = blockHeightToProve < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION ||
            knownBlock.height < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.BLOCK_PROOF) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY))) {
                agents.push(agent);
            }
        }

        // Try agents first that (we think) know the reference block hash.
        const knownBlockHash = knownBlock.hash();
        agents.sort((a, b) =>
            a.knowsBlock(knownBlockHash) !== b.knowsBlock(knownBlockHash)
                ? -a.knowsBlock(knownBlockHash) + 0.5
                : Math.random() - 0.5);

        for (const agent of agents) {
            try {
                return await agent.getBlockProof(blockHashToProve, knownBlock); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve block proof for ${blockHashToProve}@${blockHeightToProve} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested block proof, fail.
        throw new Error(`Failed to retrieve block proof for ${blockHashToProve}`);
    }

    /**
     * @param {number} blockHeightToProve
     * @returns {Promise.<Block>}
     * @protected
     */
    async _requestBlockProofAt(blockHeightToProve) {
        /** @type {Block} */
        const knownBlock = await this._blockchain.getNearestBlockAt(blockHeightToProve, /*lower*/ false);
        if (!knownBlock) {
            throw new Error('No suitable reference block found for block proof');
        }

        if (blockHeightToProve === knownBlock.height) {
            return knownBlock;
        }

        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        const requiresHistory = blockHeightToProve < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION ||
            knownBlock.height < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.BLOCK_PROOF) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY)) && agent.peer.version >= 2) {
                agents.push(agent);
            }
        }

        // Try agents first that (we think) know the reference block hash.
        const knownBlockHash = knownBlock.hash();
        agents.sort((a, b) =>
            a.knowsBlock(knownBlockHash) !== b.knowsBlock(knownBlockHash)
                ? -a.knowsBlock(knownBlockHash) + 0.5
                : Math.random() - 0.5);

        for (const agent of agents) {
            try {
                return await agent.getBlockProofAt(blockHeightToProve, knownBlock); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve block proof at ${blockHeightToProve} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested block proof, fail.
        throw new Error(`Failed to retrieve block proof at ${blockHeightToProve}`);
    }

    /**
     * @param {Array.<Address>} addresses
     * @param {Block} [block]
     * @returns {Promise.<Array<Transaction>>}
     * @protected
     */
    async _requestTransactionsByAddresses(addresses, block = this._blockchain.head) {
        if (addresses.length === 0) {
            return [];
        }

        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        const requiresHistory = block.height < this._blockchain.height - Policy.NUM_BLOCKS_VERIFICATION;
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.BODY_PROOF) && (!requiresHistory || agent.providesServices(Services.BLOCK_HISTORY))) {
                agents.push(agent);
            }
        }

        // Try agents first that (we think) know the reference block hash.
        const blockHash = block.hash();
        agents.sort((a, b) =>
            a.knowsBlock(blockHash) !== b.knowsBlock(blockHash)
                ? -a.knowsBlock(blockHash) + 0.5
                : Math.random() - 0.5);

        for (const agent of agents) {
            try {
                return await agent.getTransactionsProofByAddresses(block, addresses); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve transactions proof for ${addresses} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested transactions proof, fail.
        throw new Error(`Failed to retrieve transactions proof for ${addresses}`);
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<TransactionReceipt>>}
     * @protected
     */
    async _requestTransactionReceiptsByAddress(address) {
        /** @type {Array.<BaseConsensusAgent>} */
        const agents = [];
        for (const agent of this._agents.valueIterator()) {
            if (agent.synced && agent.providesServices(Services.TRANSACTION_INDEX)) {
                agents.push(agent);
            }
        }
        agents.sort(() => Math.random() - 0.5);

        for (const agent of agents) {
            try {
                return await agent.getTransactionReceiptsByAddress(address); // eslint-disable-line no-await-in-loop
            } catch (e) {
                Log.w(BaseConsensus, `Failed to retrieve transaction receipts for ${address} from ${agent.peer.peerAddress}: ${e && e.message || e}`);
                // Try the next peer.
            }
        }

        // No peer supplied the requested receipts, fail.
        throw new Error(`Failed to retrieve transaction receipts for ${address}`);
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<{transaction: Transaction, header: BlockHeader}>>}
     * @protected
     * @deprecated
     */
    async _requestTransactionHistory(address) {
        // 1. Get transaction receipts.
        const receipts = await this._requestTransactionReceiptsByAddress(address);

        // 2. Request proofs for missing blocks.
        /** @type {Array.<Promise.<Block>>} */
        const blockRequests = [];
        let lastBlockHash = null;
        for (const receipt of receipts) {
            if (!receipt.blockHash.equals(lastBlockHash)) {
                // eslint-disable-next-line no-await-in-loop
                const block = await this._blockchain.getBlock(receipt.blockHash);
                if (block) {
                    blockRequests.push(Promise.resolve(block));
                } else {
                    const request = this._requestBlockProof(receipt.blockHash, receipt.blockHeight)
                        .catch(e => Log.e(BaseConsensus, `Failed to retrieve proof for block ${receipt.blockHash}`
                            + ` (${e}) - transaction history may be incomplete`));
                    blockRequests.push(request);
                }

                lastBlockHash = receipt.blockHash;
            }
        }
        const blocks = await Promise.all(blockRequests);

        // 3. Request transaction proofs.
        const transactionRequests = [];
        for (const block of blocks) {
            if (!block) continue;

            const request = this._requestTransactionsByAddresses([address], block)
                .then(txs => txs.map(tx => ({ transaction: tx, header: block.header })))
                .catch(e => Log.e(BaseConsensus, `Failed to retrieve transactions for block ${block.hash()}`
                    + ` (${e}) - transaction history may be incomplete`));
            transactionRequests.push(request);
        }

        const transactions = await Promise.all(transactionRequests);
        return transactions
            .reduce((flat, it) => it ? flat.concat(it) : flat, [])
            .sort((a, b) => a.header.height - b.header.height);
    }

    /** @type {boolean} */
    get established() {
        return this._established;
    }

    /** @type {Network} */
    get network() {
        return this._network;
    }

    /** @type {InvRequestManager} */
    get invRequestManager() {
        return this._invRequestManager;
    }
}
BaseConsensus.MAX_ATTEMPTS_TO_FETCH = 5;
BaseConsensus.SYNC_THROTTLE = 1500; // ms
BaseConsensus.MIN_FULL_NODES = 1;
BaseConsensus.TRANSACTION_RELAY_TIMEOUT = 10000;
BaseConsensus.SendTransactionResult = {
    REJECTED_LOCAL: -4,
    EXPIRED: -3,
    ALREADY_MINED: -2,
    INVALID: -1,
    NONE: 0,
    RELAYED: 1,
    KNOWN: 2,
    PENDING_LOCAL: 3,
};
Class.register(BaseConsensus);