Home Reference Source Test

src/main/generic/consensus/BaseConsensusAgent.js

/**
 * @abstract
 */
class BaseConsensusAgent extends Observable {
    /**
     * @param {Time} time
     * @param {Peer} peer
     * @param {InvRequestManager} invRequestManager
     * @param {Subscription} [targetSubscription]
     */
    constructor(time, peer, invRequestManager, targetSubscription) {
        super();
        /** @type {Time} */
        this._time = time;
        /** @type {Peer} */
        this._peer = peer;

        // Flag indicating that have synced our blockchain with the peer's.
        /** @type {boolean} */
        this._synced = false;

        // Set of all objects (InvVectors) that we think the remote peer knows.
        /** @type {LimitInclusionHashSet.<InvVector>} */
        this._knownObjects = new LimitInclusionHashSet(BaseConsensusAgent.KNOWN_OBJECTS_COUNT_MAX);
        this._knownObjects.add(new InvVector(InvVector.Type.BLOCK, peer.headHash));

        // InvVectors we want to request via getData are collected here and
        // periodically requested.
        /** @type {UniqueQueue.<InvVector>} */
        this._blocksToRequest = new UniqueQueue();
        /** @type {ThrottledQueue.<InvVector>} */
        this._txsToRequest = new ThrottledQueue(
            BaseConsensusAgent.TRANSACTIONS_AT_ONCE + BaseConsensusAgent.FREE_TRANSACTIONS_AT_ONCE,
            BaseConsensusAgent.TRANSACTIONS_PER_SECOND + BaseConsensusAgent.FREE_TRANSACTIONS_PER_SECOND,
            1000, BaseConsensusAgent.REQUEST_TRANSACTIONS_WAITING_MAX);

        // Objects that are currently being requested from the peer.
        /** @type {HashSet.<InvVector>} */
        this._objectsInFlight = new HashSet();

        // All objects that were requested from the peer but not received yet.
        /** @type {HashSet.<InvVector>} */
        this._objectsThatFlew = new HashSet();

        // Objects that are currently being processed by the blockchain/mempool.
        /** @type {HashSet.<InvVector>} */
        this._objectsProcessing = new HashSet();

        // A Subscription object specifying which objects should be announced to the peer.
        // Initially, we don't announce anything to the peer until it tells us otherwise.
        /** @type {Subscription} */
        this._remoteSubscription = Subscription.NONE;
        // Subscribe to all announcements from the peer.
        /** @type {Subscription} */
        this._localSubscription = Subscription.NONE;
        this._lastSubscriptionChange = null;
        /** @type {Subscription} */
        this._targetSubscription = targetSubscription || Subscription.ANY;

        // Helper object to keep track of timeouts & intervals.
        /** @type {Timers} */
        this._timers = new Timers();

        // Queue of transaction inv vectors waiting to be sent out
        /** @type {ThrottledQueue.<InvVector>} */
        this._waitingInvVectors = new ThrottledQueue(
            BaseConsensusAgent.TRANSACTIONS_AT_ONCE,
            BaseConsensusAgent.TRANSACTIONS_PER_SECOND,
            1000, BaseConsensusAgent.REQUEST_TRANSACTIONS_WAITING_MAX);
        this._timers.setInterval('invVectors', () => this._sendWaitingInvVectors(), BaseConsensusAgent.TRANSACTION_RELAY_INTERVAL);

        // Queue of "free" transaction inv vectors waiting to be sent out
        /** @type {ThrottledQueue.<FreeTransactionVector>} */
        this._waitingFreeInvVectors = new ThrottledQueue(
            BaseConsensusAgent.FREE_TRANSACTIONS_AT_ONCE,
            BaseConsensusAgent.FREE_TRANSACTIONS_PER_SECOND,
            1000, BaseConsensusAgent.REQUEST_TRANSACTIONS_WAITING_MAX);
        this._timers.setInterval('freeInvVectors', () => this._sendFreeWaitingInvVectors(), BaseConsensusAgent.FREE_TRANSACTION_RELAY_INTERVAL);

        // Helper object to keep track of block proofs we're requesting.
        this._blockProofRequest = null;

        // Helper object to keep track of transaction proofs we're requesting.
        this._transactionsProofRequest = null;

        // Helper object to keep track of transaction receipts we're requesting.
        this._transactionReceiptsRequest = null;

        /** @type {MultiSynchronizer} */
        this._synchronizer = new MultiSynchronizer();

        /** @type {InvRequestManager} */
        this._invRequestManager = invRequestManager;

        // Listen to consensus messages from the peer.
        peer.channel.on('inv', msg => this._onInv(msg));
        peer.channel.on('block', msg => this._onBlock(msg));
        peer.channel.on('header', msg => this._onHeader(msg));
        peer.channel.on('tx', msg => this._onTx(msg));
        peer.channel.on('not-found', msg => this._onNotFound(msg));

        peer.channel.on('subscribe', msg => this._onSubscribe(msg));
        peer.channel.on('get-data', msg => this._onGetData(msg));
        peer.channel.on('get-header', msg => this._onGetHeader(msg));

        peer.channel.on('block-proof', msg => this._onBlockProof(msg));
        peer.channel.on('transactions-proof', msg => this._onTransactionsProof(msg));
        peer.channel.on('transaction-receipts', msg => this._onTransactionReceipts(msg));

        peer.channel.on('get-head', msg => this._onGetHead(msg));
        peer.channel.on('head', msg => this._onHead(msg));

        // Clean up when the peer disconnects.
        peer.channel.on('close', () => this._onClose());

        this._requestHead();
    }

    _requestHead() {
        this._peer.channel.getHead();
    }

    onHeadUpdated() {
        this._timers.resetTimeout('get-next-head', () => this._requestHead(), BaseConsensusAgent.HEAD_REQUEST_INTERVAL);
    }

    /**
     * @param {GetHeadMessage} msg
     * @private
     */
    _onGetHead(msg) {
        this._peer.channel.head(this._blockchain.head.header);
    }

    /**
     * @param {HeadMessage} msg
     */
    _onHead(msg) {
        this._peer.head = msg.header;
        this.onHeadUpdated();
    }

    /**
     * @param {Subscription} subscription
     */
    subscribe(subscription) {
        this._targetSubscription = subscription;
        this._subscribe(subscription);
    }

    _subscribeTarget() {
        this._subscribe(this._targetSubscription);
    }

    /**
     * @param {Subscription} subscription
     */
    _subscribe(subscription) {
        this._localSubscription = subscription;
        this._lastSubscriptionChange = Date.now();
        this._peer.channel.subscribe(this._localSubscription);
    }

    /**
     * @param {Block} block
     * @returns {boolean}
     */
    relayBlock(block) {
        // Don't relay block if have not synced with the peer yet.
        if (!this._synced) {
            return false;
        }

        // Only relay block if it matches the peer's subscription.
        if (!this._remoteSubscription.matchesBlock(block)) {
            return false;
        }

        // Create InvVector.
        const vector = InvVector.fromBlock(block);

        // Don't relay block to this peer if it already knows it.
        if (this._knownObjects.contains(vector)) {
            return false;
        }

        // Relay block to peer.
        this._peer.channel.inv([vector, ...this._waitingInvVectors.dequeueMulti(BaseInventoryMessage.VECTORS_MAX_COUNT - 1)]);

        // Assume that the peer knows this block now.
        this._knownObjects.add(vector);

        return true;
    }

    _sendWaitingInvVectors() {
        const invVectors = this._waitingInvVectors.dequeueMulti(BaseInventoryMessage.VECTORS_MAX_COUNT);
        if (invVectors.length > 0) {
            this._peer.channel.inv(invVectors);
            Log.v(BaseConsensusAgent, () => `[INV] Sent ${invVectors.length} vectors to ${this._peer.peerAddress}`);
        }
    }

    _sendFreeWaitingInvVectors() {
        const invVectors = [];
        let size = 0;
        while (invVectors.length <= BaseInventoryMessage.VECTORS_MAX_COUNT && this._waitingFreeInvVectors.length > 0
            && size < BaseConsensusAgent.FREE_TRANSACTION_SIZE_PER_INTERVAL) {
            const freeTransaction = this._waitingFreeInvVectors.dequeue();
            invVectors.push(freeTransaction.inv);
            size += freeTransaction.serializedSize;
        }
        if (invVectors.length > 0) {
            this._peer.channel.inv(invVectors);
            Log.v(BaseConsensusAgent, () => `[INV] Sent ${invVectors.length} vectors to ${this._peer.peerAddress}`);
        }
    }

    /**
     * @param {Transaction} transaction
     * @return {boolean}
     */
    relayTransaction(transaction) {
        // Only relay transaction if it matches the peer's subscription.
        if (!this._remoteSubscription.matchesTransaction(transaction)) {
            return false;
        }

        // Create InvVector.
        const vector = InvVector.fromTransaction(transaction);

        // Don't relay transaction to this peer if it already knows it.
        if (this._knownObjects.contains(vector)) {
            return false;
        }

        // Relay transaction to peer later.
        const serializedSize = transaction.serializedSize;
        if (transaction.fee / serializedSize < BaseConsensusAgent.TRANSACTION_RELAY_FEE_MIN) {
            this._waitingFreeInvVectors.enqueue(new FreeTransactionVector(vector, serializedSize));
        } else {
            this._waitingInvVectors.enqueue(vector);
        }

        // Assume that the peer knows this transaction now.
        this._knownObjects.add(vector);

        return true;
    }

    /**
     * @param {Transaction} transaction
     */
    removeTransaction(transaction) {
        // Create InvVector.
        const vector = InvVector.fromTransaction(transaction);

        // Remove transaction from relay queues.
        this._waitingFreeInvVectors.remove(vector); // InvVector and FreeTransactionVector have the same hashCode.
        this._waitingInvVectors.remove(vector);
    }

    /**
     * @param {Hash} blockHash
     * @returns {boolean}
     */
    knowsBlock(blockHash) {
        const vector = new InvVector(InvVector.Type.BLOCK, blockHash);
        return this._knownObjects.contains(vector);
    }

    /**
     * @param {SubscribeMessage} msg
     * @protected
     */
    _onSubscribe(msg) {
        this._remoteSubscription = msg.subscription;
    }

    /**
     * @param {InvMessage} msg
     * @returns {Promise.<void>}
     * @protected
     */
    async _onInv(msg) {
        // Keep track of the objects the peer knows.
        for (const vector of msg.vectors) {
            this._knownObjects.add(vector);
            this._waitingInvVectors.remove(vector);
            this._waitingFreeInvVectors.remove(vector); // The inv vector has the same hashCode as a FreeTransactionVector
        }

        // Check which of the advertised objects we know
        // Request unknown objects, ignore known ones.
        const unknownBlocks = [];
        const unknownTxs = [];
        for (const vector of msg.vectors) {
            // Ignore objects that we are currently requesting / processing.
            if (this._objectsInFlight.contains(vector) || this._objectsProcessing.contains(vector)) {
                continue;
            }

            // Filter out objects that we are not interested in.
            if (!this._shouldRequestData(vector)) {
                continue;
            }

            switch (vector.type) {
                case InvVector.Type.BLOCK: {
                    const block = await this._getBlock(vector.hash, /*includeForks*/ true); // eslint-disable-line no-await-in-loop
                    if (!block) {
                        unknownBlocks.push(vector);
                        this._onNewBlockAnnounced(vector.hash);
                    } else {
                        this._onKnownBlockAnnounced(vector.hash, block);
                    }
                    break;
                }
                case InvVector.Type.TRANSACTION: {
                    const transaction = await this._getTransaction(vector.hash); // eslint-disable-line no-await-in-loop
                    if (!transaction) {
                        unknownTxs.push(vector);
                        this._onNewTransactionAnnounced(vector.hash);
                    } else {
                        this._onKnownTransactionAnnounced(vector.hash, transaction);
                    }
                    break;
                }
                default:
                    throw `Invalid inventory type: ${vector.type}`;
            }
        }

        Log.v(BaseConsensusAgent, () => `[INV] ${msg.vectors.length} vectors (${unknownBlocks.length} new blocks, ${unknownTxs.length} new txs) received from ${this._peer.peerAddress}`);

        if (unknownBlocks.length > 0 || unknownTxs.length > 0) {
            for (const vector of unknownBlocks) {
                this._invRequestManager.askToRequestVector(this, vector);
            }
            for (const vector of unknownTxs) {
                this._invRequestManager.askToRequestVector(this, vector);
            }
        } else {
            this._onNoUnknownObjects();
        }
    }

    /**
     * @param {InvVector} vector
     */
    requestVector(...vector) {
        // Store unknown vectors in objectsToRequest.
        this._blocksToRequest.enqueueAll(vector.filter(v => v.type === InvVector.Type.BLOCK));
        this._txsToRequest.enqueueAll(vector.filter(v => v.type === InvVector.Type.TRANSACTION));

        // Clear the request throttle timeout.
        this._timers.clearTimeout('inv');

        // If there are enough objects queued up, send out a getData request.
        if (this._blocksToRequest.length + this._txsToRequest.available >= BaseConsensusAgent.REQUEST_THRESHOLD) {
            this._requestData();
        }
        // Otherwise, wait a short time for more inv messages to arrive, then request.
        else {
            this._timers.setTimeout('inv', () => this._requestData(), BaseConsensusAgent.REQUEST_THROTTLE);
        }
    }

    /**
     * @param {InvVector} vector
     * @returns {boolean}
     * @protected
     */
    _shouldRequestData(vector) {
        return true;
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeForks]
     * @param {boolean} [includeBody]
     * @returns {Promise.<?Block>}
     * @protected
     * @abstract
     */
    _getBlock(hash, includeForks = false, includeBody = false) {
        // MUST be implemented by subclasses.
        throw new Error('not implemented');
    }

    /**
     * @param {Hash} hash
     * @param {boolean} [includeForks]
     * @returns {Promise.<?Uint8Array>}
     * @protected
     * @abstract
     */
    _getRawBlock(hash, includeForks = false) {
        // MUST be implemented by subclasses.
        throw new Error('not implemented');
    }

    /**
     * @param {Hash} hash
     * @returns {Promise.<?Transaction>}
     * @protected
     * @abstract
     */
    _getTransaction(hash) {
        // MUST be implemented by subclasses.
        throw new Error('not implemented');
    }

    /**
     * @param {Hash} hash
     * @returns {void}
     * @protected
     */
    _onNewBlockAnnounced(hash) {
    }
    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {void}
     * @protected
     */
    _onKnownBlockAnnounced(hash, block) {
    }
    /**
     * @param {Hash} hash
     * @returns {void}
     * @protected
     */
    _onNewTransactionAnnounced(hash) {
    }
    /**
     * @param {Hash} hash
     * @param {Transaction} transaction
     * @returns {void}
     * @protected
     */
    _onKnownTransactionAnnounced(hash, transaction) {
    }

    /**
     * @returns {void}
     * @protected
     */
    _requestData() {
        // Only one request at a time.
        if (!this._objectsInFlight.isEmpty()) return;

        // Don't do anything if there are no objects queued to request.
        if (this._blocksToRequest.isEmpty() && !this._txsToRequest.isAvailable()) return;

        // Request queued objects from the peer. Only request up to VECTORS_MAX_COUNT objects at a time.
        const vectorsMaxCount = BaseInventoryMessage.VECTORS_MAX_COUNT;
        /** @type {Array.<InvVector>} */
        let vectors = this._blocksToRequest.dequeueMulti(vectorsMaxCount);
        if (vectors.length < vectorsMaxCount) {
            vectors = vectors.concat(this._txsToRequest.dequeueMulti(vectorsMaxCount - vectors.length));
        }

        // Mark the requested objects as in-flight.
        this._objectsInFlight.addAll(vectors);

        // Request data from peer.
        this._doRequestData(vectors);

        // Set timer to detect end of request / missing objects
        this._timers.setTimeout('getData', () => this._noMoreData(), BaseConsensusAgent.REQUEST_TIMEOUT);
    }

    /**
     * @param {Array.<InvVector>} vectors
     * @returns {void}
     * @protected
     */
    _doRequestData(vectors) {
        this._peer.channel.getData(vectors);
    }

    /**
     * @param {BlockMessage} msg
     * @return {Promise.<void>}
     * @protected
     */
    async _onBlock(msg) {
        const hash = msg.block.hash();

        // Check if we have requested this block.
        const vector = new InvVector(InvVector.Type.BLOCK, hash);
        if (!this._objectsInFlight.contains(vector) && !this._objectsThatFlew.contains(vector)) {
            Log.w(BaseConsensusAgent, `Unsolicited block ${hash} received from ${this._peer.peerAddress}, discarding`);
            return;
        }

        // Reuse already known (verified) transactions
        const transactions = msg.block.isFull() ? msg.block.body.transactions : [];
        const transactionPromises = transactions.map(t => this._getTransaction(t.hash()));
        for (let i = 0; i < transactions.length; i++) {
            const transaction = await transactionPromises[i]; // eslint-disable-line no-await-in-loop
            if (transaction) {
                transactions[i] = transaction;
            }
        }

        if ((!this._peer.head && this._peer.headHash.equals(hash)) || (this._peer.head && this._peer.head.height < msg.block.height)) {
            this._peer.head = msg.block.header;
            this.onHeadUpdated();
        }

        // Mark object as received.
        this._onObjectReceived(vector);

        // Process block.
        this._objectsProcessing.add(vector);
        await this._processBlock(hash, msg.block);

        // Mark object as processed.
        this._onObjectProcessed(vector);

        this._invRequestManager.noteVectorReceived(InvVector.fromBlock(msg.block));
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {Promise.<void>}
     * @protected
     */
    async _processBlock(hash, block) {
    }

    /**
     * @param {HeaderMessage} msg
     * @return {Promise.<void>}
     * @protected
     */
    async _onHeader(msg) {
        const hash = msg.header.hash();

        // Check if we have requested this header.
        const vector = new InvVector(InvVector.Type.BLOCK, hash);
        if (!this._objectsInFlight.contains(vector) && !this._objectsThatFlew.contains(vector)) {
            Log.w(BaseConsensusAgent, `Unsolicited header ${hash} received from ${this._peer.peerAddress}, discarding`);
            return;
        }

        if ((!this._peer.head && this._peer.headHash.equals(hash)) || (this._peer.head && this._peer.head.height < msg.header.height)) {
            this._peer.head = msg.header;
            this.onHeadUpdated();
        }

        // Mark object as received.
        this._onObjectReceived(vector);

        // Process header.
        this._objectsProcessing.add(vector);
        await this._processHeader(hash, msg.header);

        // Mark object as processed.
        this._onObjectProcessed(vector);
    }

    /**
     * @param {Hash} hash
     * @param {BlockHeader} header
     * @returns {Promise.<void>}
     * @protected
     */
    async _processHeader(hash, header) {
    }

    /**
     * @param {TxMessage} msg
     * @return {Promise}
     * @protected
     */
    async _onTx(msg) {
        const hash = msg.transaction.hash();
        //Log.d(BaseConsensusAgent, () => `[TX] Received transaction ${hash} from ${this._peer.peerAddress}`);

        // Check if we have requested this transaction.
        const vector = new InvVector(InvVector.Type.TRANSACTION, hash);
        if (!this._objectsInFlight.contains(vector) && !this._objectsThatFlew.contains(vector)) {
            Log.w(BaseConsensusAgent, `Unsolicited transaction ${hash} received from ${this._peer.peerAddress}, discarding`);
            return;
        }

        this._invRequestManager.noteVectorReceived(InvVector.fromTransaction(msg.transaction));

        // Mark object as received.
        this._onObjectReceived(vector);

        // Process transaction.
        this._objectsProcessing.add(vector);

        // Check whether we subscribed for this transaction.
        if (this._localSubscription.matchesTransaction(msg.transaction)) {
            await this._processTransaction(hash, msg.transaction);
        } else if (this._lastSubscriptionChange + BaseConsensusAgent.SUBSCRIPTION_CHANGE_GRACE_PERIOD > Date.now()) {
            this._peer.channel.close(CloseType.RECEIVED_TRANSACTION_NOT_MATCHING_OUR_SUBSCRIPTION, 'received transaction not matching our subscription');
        }

        // Mark object as processed.
        this._onObjectProcessed(vector);
    }

    /**
     * @param {Hash} hash
     * @param {Transaction} transaction
     * @returns {Promise.<void>}
     * @protected
     */
    async _processTransaction(hash, transaction) {
    }

    /**
     * @param {NotFoundMessage} msg
     * @returns {void}
     * @protected
     */
    _onNotFound(msg) {
        Log.d(BaseConsensusAgent, `[NOTFOUND] ${msg.vectors.length} unknown objects received from ${this._peer.peerAddress}`);

        // Remove unknown objects from in-flight list.
        for (const vector of msg.vectors) {
            if (!this._objectsInFlight.contains(vector)) {
                continue;
            }
            this._invRequestManager.noteVectorNotReceived(this, vector);

            // Mark object as received.
            this._onObjectReceived(vector);
        }
    }

    /**
     * @param {InvVector} vector
     * @returns {void}
     * @protected
     */
    _onObjectReceived(vector) {
        if (this._objectsInFlight.isEmpty()) return;

        // Remove the vector from objectsInFlight.
        this._objectsInFlight.remove(vector);

        // Reset the request timeout if we expect more objects to come.
        if (!this._objectsInFlight.isEmpty()) {
            this._timers.resetTimeout('getData', () => this._noMoreData(), BaseConsensusAgent.REQUEST_TIMEOUT);
        } else {
            this._noMoreData();
        }
    }

    /**
     * @returns {void}
     * @protected
     */
    _noMoreData() {
        // Cancel the request timeout timer.
        this._timers.clearTimeout('getData');

        for(const vector of this._objectsInFlight.values()) {
            this._invRequestManager.noteVectorNotReceived(this, vector);
        }

        // Reset objects in flight.
        this._objectsThatFlew.addAll(this._objectsInFlight.values());
        this._objectsInFlight.clear();

        // If there are more objects to request, request them.
        if (!this._blocksToRequest.isEmpty() || this._txsToRequest.isAvailable()) {
            this._requestData();
        } else {
            this._onAllObjectsReceived();
        }
    }

    /**
     * @returns {void}
     * @protected
     */
    _onNoUnknownObjects() {
    }

    /**
     * @returns {void}
     * @protected
     */
    _onAllObjectsReceived() {
    }

    /**
     * @param {InvVector} vector
     * @returns {void}
     * @protected
     */
    _onObjectProcessed(vector) {
        // Remove the vector from objectsProcessing.
        this._objectsProcessing.remove(vector);

        if (this._objectsProcessing.isEmpty()) {
            this._onAllObjectsProcessed();
        }
    }

    /**
     * @returns {void}
     * @protected
     */
    _onAllObjectsProcessed() {
    }

    /**
     * @param {GetDataMessage} msg
     * @returns {Promise}
     * @protected
     */
    async _onGetData(msg) {
        // Keep track of the objects the peer knows.
        for (const vector of msg.vectors) {
            this._knownObjects.add(vector);
        }

        // Check which of the requested objects we know.
        // Send back all known objects.
        // Send notFound for unknown objects.
        const unknownObjects = [];
        for (const vector of msg.vectors) {
            switch (vector.type) {
                case InvVector.Type.BLOCK: {
                    const block = await this._getRawBlock(vector.hash, /*includeForks*/ false); // eslint-disable-line no-await-in-loop
                    if (block) {
                        // We have found a requested block, send it back to the sender.
                        this._peer.channel.rawBlock(block);
                    } else {
                        // Requested block is unknown.
                        unknownObjects.push(vector);
                    }
                    break;
                }
                case InvVector.Type.TRANSACTION: {
                    const tx = await this._getTransaction(vector.hash); // eslint-disable-line no-await-in-loop
                    if (tx) {
                        // We have found a requested transaction, send it back to the sender.
                        this._peer.channel.tx(tx);
                        this.fire('transaction-relayed', tx);
                    } else {
                        // Requested transaction is unknown.
                        unknownObjects.push(vector);
                    }
                    break;
                }
                default:
                    throw `Invalid inventory type: ${vector.type}`;
            }
        }

        // Report any unknown objects back to the sender.
        if (unknownObjects.length) {
            this._peer.channel.notFound(unknownObjects);
        }
    }

    /**
     * @param {GetHeaderMessage} msg
     * @returns {Promise}
     * @protected
     */
    async _onGetHeader(msg) {
        // Keep track of the objects the peer knows.
        for (const vector of msg.vectors) {
            this._knownObjects.add(vector);
        }

        // Check which of the requested objects we know.
        // Send back all known objects.
        // Send notFound for unknown objects.
        const unknownObjects = [];
        for (const vector of msg.vectors) {
            switch (vector.type) {
                case InvVector.Type.BLOCK: {
                    const block = await this._getBlock(vector.hash); // eslint-disable-line no-await-in-loop
                    if (block) {
                        // We have found a requested block, send it back to the sender.
                        this._peer.channel.header(block.header);
                    } else {
                        // Requested block is unknown.
                        unknownObjects.push(vector);
                    }
                    break;
                }
                case InvVector.Type.TRANSACTION:
                default:
                    throw `Invalid inventory type: ${vector.type}`;
            }
        }

        // Report any unknown objects back to the sender.
        if (unknownObjects.length) {
            this._peer.channel.notFound(unknownObjects);
        }
    }

    /**
     * @param {Hash} blockHashToProve
     * @param {Block} knownBlock
     * @returns {Promise.<Block>}
     */
    getBlockProof(blockHashToProve, knownBlock) {
        return this._synchronizer.push('getBlockProof',
            this._getBlockProof.bind(this, blockHashToProve, knownBlock));
    }

    /**
     * @param {Hash} blockHashToProve
     * @param {Block} knownBlock
     * @returns {Promise.<Block>}
     * @private
     */
    _getBlockProof(blockHashToProve, knownBlock) {
        Assert.that(this._blockProofRequest === null);

        Log.v(BaseConsensusAgent, () => `Requesting BlockProof for ${blockHashToProve} from ${this._peer.peerAddress}`);

        return new Promise((resolve, reject) => {
            this._blockProofRequest = {
                blockHashToProve,
                knownBlock,
                resolve,
                reject
            };

            // Request BlockProof from peer.
            this._peer.channel.getBlockProof(blockHashToProve, knownBlock.hash());

            this._peer.channel.expectMessage(Message.Type.BLOCK_PROOF, () => {
                reject(new Error('timeout'));
            }, BaseConsensusAgent.BLOCK_PROOF_REQUEST_TIMEOUT);
        });
    }

    /**
     * @param {BlockProofMessage} msg
     * @returns {Promise.<void>}
     * @private
     */
    async _onBlockProof(msg) {
        Log.v(BaseConsensusAgent, () => `[BLOCK-PROOF] Received from ${this._peer.peerAddress}: proof=${msg.proof} (${msg.serializedSize} bytes)`);

        // Check if we have requested a header proof, reject unsolicited ones.
        if (!this._blockProofRequest) {
            Log.w(BaseConsensusAgent, `Unsolicited header proof received from ${this._peer.peerAddress}`);
            // TODO close/ban?
            return;
        }

        const { blockHashToProve, /** @type {Block} */ knownBlock, resolve, reject } = this._blockProofRequest;
        this._blockProofRequest = null;

        if (!msg.hasProof() || msg.proof.length === 0) {
            reject(new Error('Block proof request was rejected'));
            return;
        }

        // Check that the tail of the proof corresponds to the requested block.
        const proof = msg.proof;
        if (!blockHashToProve.equals(proof.tail.hash())) {
            Log.w(BaseConsensusAgent, `Received BlockProof with invalid tail block from ${this._peer.peerAddress}`);
            reject(new Error('Invalid tail block'));
            return;
        }

        // Check that the proof links up to our reference block.
        if (!(await knownBlock.isInterlinkSuccessorOf(proof.head))) {
            Log.w(BaseConsensusAgent, `Received BlockProof with invalid head block from ${this._peer.peerAddress}`);
            reject(new Error('Invalid head block'));
            return;
        }

        // Verify the proof.
        if (!(await proof.verify())) {
            Log.w(BaseConsensusAgent, `Invalid BlockProof received from ${this._peer.peerAddress}`);
            // TODO ban instead?
            this._peer.channel.close(CloseType.INVALID_BLOCK_PROOF, 'Invalid BlockProof');
            reject(new Error('Invalid BlockProof'));
            return;
        }

        // Verify individual blocks.
        const verificationResults = await Promise.all(proof.blocks.map(block => block.verify(this._time)));
        if (!verificationResults.every(result => result)) {
            Log.w(BaseConsensusAgent, `Invalid BlockProof received from ${this._peer.peerAddress}`);
            // TODO ban instead?
            this._peer.channel.close(CloseType.INVALID_BLOCK_PROOF, 'Invalid BlockProof');
            reject(new Error('Invalid BlockProof'));
            return;
        }

        // Return the proven block.
        resolve(proof.tail);
    }

    /**
     * @param {Block} block
     * @param {Array.<Address>} addresses
     * @returns {Promise.<Array.<Transaction>>}
     */
    getTransactionsProof(block, addresses) {
        return this._synchronizer.push('getTransactionsProof',
            this._getTransactionsProof.bind(this, block, addresses));
    }

    /**
     * @param {Block} block
     * @param {Array.<Address>} addresses
     * @returns {Promise.<Array.<Transaction>>}
     * @private
     */
    _getTransactionsProof(block, addresses) {
        Assert.that(this._transactionsProofRequest === null);

        Log.v(BaseConsensusAgent, () => `Requesting TransactionsProof for ${addresses}@${block.height} from ${this._peer.peerAddress}`);

        return new Promise((resolve, reject) => {
            this._transactionsProofRequest = {
                addresses,
                block,
                resolve,
                reject,
            };

            // Request TransactionProof from peer.
            this._peer.channel.getTransactionsProof(block.hash(), addresses);

            // Drop the peer if it doesn't send the TransactionProof within the timeout.
            this._peer.channel.expectMessage(Message.Type.TRANSACTIONS_PROOF, () => {
                this._peer.channel.close(CloseType.GET_TRANSACTIONS_PROOF_TIMEOUT, 'getTransactionsProof timeout');
                reject(new Error('timeout'));
            }, BaseConsensusAgent.TRANSACTIONS_PROOF_REQUEST_TIMEOUT);
        });
    }

    /**
     * @param {TransactionsProofMessage} msg
     * @returns {void}
     * @private
     */
    _onTransactionsProof(msg) {
        Log.v(BaseConsensusAgent, () => `[TRANSACTIONS-PROOF] Received from ${this._peer.peerAddress}:`
            + ` blockHash=${msg.blockHash}, proof=${msg.proof} (${msg.serializedSize} bytes)`);

        // Check if we have requested a transactions proof, reject unsolicited ones.
        if (!this._transactionsProofRequest) {
            Log.w(BaseConsensusAgent, `Unsolicited transactions proof received from ${this._peer.peerAddress}`);
            // TODO close/ban?
            return;
        }

        const {/** @type {Block} */ block, resolve, reject} = this._transactionsProofRequest;
        this._transactionsProofRequest = null;

        if (!msg.hasProof()) {
            Log.w(BaseConsensusAgent, `TransactionsProof request was rejected by ${this._peer.peerAddress}`);
            reject(new Error('TransactionsProof request was rejected'));
            return;
        }

        // Check that the reference block corresponds to the one we requested.
        if (!block.hash().equals(msg.blockHash)) {
            Log.w(BaseConsensusAgent, `Received TransactionsProof for invalid reference block from ${this._peer.peerAddress}`);
            reject(new Error('Invalid reference block'));
            return;
        }

        // Verify the proof.
        const proof = msg.proof;
        if (!block.bodyHash.equals(proof.root())) {
            Log.w(BaseConsensusAgent, `Invalid TransactionsProof received from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_TRANSACTION_PROOF, 'Invalid TransactionsProof');
            reject(new Error('Invalid TransactionsProof'));
            return;
        }

        // TODO Verify that the proof only contains transactions that match the given addresses.

        // Return the retrieved transactions.
        resolve(proof.transactions);
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<TransactionReceipt>>}
     */
    getTransactionReceipts(address) {
        return this._synchronizer.push('getTransactionReceipts',
            this._getTransactionReceipts.bind(this, address));
    }

    /**
     * @param {Address} address
     * @returns {Promise.<Array.<TransactionReceipt>>}
     * @private
     */
    _getTransactionReceipts(address) {
        Assert.that(this._transactionReceiptsRequest === null);

        return new Promise((resolve, reject) => {
            this._transactionReceiptsRequest = {
                address,
                resolve,
                reject
            };

            this._peer.channel.getTransactionReceipts(address);

            this._peer.channel.expectMessage(Message.Type.TRANSACTION_RECEIPTS, () => {
                this._peer.channel.close(CloseType.GET_TRANSACTION_RECEIPTS_TIMEOUT, 'getTransactionReceipts timeout');
                reject(new Error('timeout'));
            }, BaseConsensusAgent.TRANSACTION_RECEIPTS_REQUEST_TIMEOUT);
        });
    }

    /**
     * @param {TransactionReceiptsMessage} msg
     * @returns {void}
     * @private
     */
    _onTransactionReceipts(msg) {
        Log.v(BaseConsensusAgent, () => `[TRANSACTION-RECEIPTS] Received from ${this._peer.peerAddress}:`
            + ` ${msg.hasReceipts() ? msg.receipts.length : '<rejected>'}`);

        // Check if we have requested transaction receipts, reject unsolicited ones.
        // TODO: How about more than one transactionReceipts message?
        if (!this._transactionReceiptsRequest) {
            Log.w(BaseConsensusAgent, `Unsolicited transaction receipts received from ${this._peer.peerAddress}`);
            // TODO close/ban?
            return;
        }

        const {resolve, reject} = this._transactionReceiptsRequest;
        this._transactionReceiptsRequest = null;

        if (!msg.hasReceipts()) {
            Log.w(BaseConsensusAgent, `TransactionReceipts request was rejected by ${this._peer.peerAddress}`);
            reject(new Error('TransactionReceipts request was rejected'));
            return;
        }

        // TODO Verify that the transaction receipts match the given address.

        resolve(msg.receipts);
    }

    /**
     * @returns {void}
     * @protected
     */
    _onClose() {
        this._synchronizer.clear();

        // Clear all timers and intervals when the peer disconnects.
        this._timers.clearAll();
        this._txsToRequest.stop();
        this._waitingInvVectors.stop();
        this._waitingFreeInvVectors.stop();

        // Notify listeners that the peer has disconnected.
        this.fire('close', this);
    }

    /** @type {Peer} */
    get peer() {
        return this._peer;
    }

    /** @type {boolean} */
    get synced() {
        return this._synced;
    }

    /** @type {boolean} */
    get syncing() {
        return false;
    }
}
/**
 * Number of InvVectors in invToRequest pool to automatically trigger a get-data request.
 * @type {number}
 */
BaseConsensusAgent.REQUEST_THRESHOLD = 50;
/**
 * Time (ms) to wait after the last received inv message before sending get-data.
 * @type {number}
 */
BaseConsensusAgent.REQUEST_THROTTLE = 500;
/**
 * Maximum time (ms) to wait after sending out get-data or receiving the last object for this request.
 * @type {number}
 */
BaseConsensusAgent.REQUEST_TIMEOUT = 1000 * 10;
BaseConsensusAgent.REQUEST_TRANSACTIONS_WAITING_MAX = 5000;
BaseConsensusAgent.REQUEST_BLOCKS_WAITING_MAX = 5000;
/**
 * Maximum time (ms) to wait for block-proof.
 * @type {number}
 */
BaseConsensusAgent.BLOCK_PROOF_REQUEST_TIMEOUT = 1000 * 10;
/**
 * Maximum time (ms) to wait for transactions-proof.
 * @type {number}
 */
BaseConsensusAgent.TRANSACTIONS_PROOF_REQUEST_TIMEOUT = 1000 * 10;
/**
 * Maximum time (ms) to wait for transactions-receipts.
 * @type {number}
 */
BaseConsensusAgent.TRANSACTION_RECEIPTS_REQUEST_TIMEOUT = 1000 * 15;
/**
 * Time interval (ms) to wait between sending out transactions.
 * @type {number}
 */
BaseConsensusAgent.TRANSACTION_RELAY_INTERVAL = 5000;
BaseConsensusAgent.TRANSACTIONS_AT_ONCE = 100;
BaseConsensusAgent.TRANSACTIONS_PER_SECOND = 10;
/**
 * Time interval (ms) to wait between sending out "free" transactions.
 * @type {number}
 */
BaseConsensusAgent.FREE_TRANSACTION_RELAY_INTERVAL = 6000;
BaseConsensusAgent.FREE_TRANSACTIONS_AT_ONCE = 10;
BaseConsensusAgent.FREE_TRANSACTIONS_PER_SECOND = 1;
/**
 * Soft limit for the total size (bytes) of free transactions per relay interval.
 * @type {number}
 */
BaseConsensusAgent.FREE_TRANSACTION_SIZE_PER_INTERVAL = 15000; // ~100 legacy transactions
/**
 * Minimum fee per byte (sat/byte) such that a transaction is not considered free.
 * @type {number}
 */
BaseConsensusAgent.TRANSACTION_RELAY_FEE_MIN = 1;
/**
 * Number of ms the peer may send non-matching transactions/blocks after a subscription change.
 * @type {number}
 */
BaseConsensusAgent.SUBSCRIPTION_CHANGE_GRACE_PERIOD = 1000 * 2;
BaseConsensusAgent.HEAD_REQUEST_INTERVAL = 100 * 1000; // 100 seconds, give client time to announce new head without request

BaseConsensusAgent.KNOWN_OBJECTS_COUNT_MAX = 40000;
Class.register(BaseConsensusAgent);

class FreeTransactionVector {
    /**
     * @param {InvVector} inv
     * @param {number} serializedSize
     */
    constructor(inv, serializedSize) {
        this._inv = inv;
        this._serializedSize = serializedSize;
    }

    /**
     * @returns {string}
     */
    hashCode() {
        return this._inv.hashCode();
    }

    /**
     * @returns {string}
     */
    toString() {
        return this._inv.toString();
    }

    /** @type {InvVector} */
    get inv() {
        return this._inv;
    }

    /** @type {number} */
    get serializedSize() {
        return this._serializedSize;
    }
}