Home Reference Source Test

src/main/generic/consensus/light/LightConsensusAgent.js

class LightConsensusAgent extends FullConsensusAgent {
    /**
     * @param {LightChain} blockchain
     * @param {Mempool} mempool
     * @param {Time} time
     * @param {Peer} peer
     * @param {InvRequestManager} invRequestManager
     * @param {Subscription} targetSubscription
     */
    constructor(blockchain, mempool, time, peer, invRequestManager, targetSubscription) {
        super(blockchain, mempool, time, peer, invRequestManager, targetSubscription);
        /** @type {LightChain} */
        this._blockchain = blockchain;
        /** @type {PartialLightChain} */
        this._partialChain = null;

        /** @type {boolean} */
        this._syncing = false;

        // Flag indicating whether we do a full catchup or request a proof.
        /** @type {boolean} */
        this._catchup = false;

        // Flag indicating whether we believe to be on the main chain of the client.
        /** @type {boolean} */
        this._onMainChain = false;

        /** @type {Array.<Block>} */
        this._orphanedBlocks = [];

        /** @type {boolean} */
        this._busy = false;

        // Helper object to keep track of the accounts we're requesting from the peer.
        this._accountsRequest = null;

        // Flag to track chain proof requests.
        this._requestedChainProof = false;

        // Number of weak proofs we have received from the peer.
        this._numWeakProofs = 0;

        // Listen to consensus messages from the peer.
        this._onToDisconnect(peer.channel, 'chain-proof', msg => this._onChainProof(msg));
        this._onToDisconnect(peer.channel, 'accounts-tree-chunk', msg => this._onAccountsTreeChunk(msg));
    }

    /**
     * @returns {Promise.<void>}
     * @override
     */
    async syncBlockchain() {
        // We don't sync with nano nodes.
        if (!this.providesServices(Services.FULL_BLOCKS, Services.CHAIN_PROOF, Services.ACCOUNTS_CHUNKS, Services.MEMPOOL)) {
            this._syncFinished();
            return;
        }

        // Wait for all objects to arrive.
        if (!this._objectsInFlight.isEmpty()) {
            Log.v(LightConsensusAgent, `Waiting for ${this._objectsInFlight.length} objects to arrive ...`);
            return;
        }

        // Wait for all objects to be processed.
        if (!this._objectsProcessing.isEmpty()) {
            Log.v(LightConsensusAgent, `Waiting for ${this._objectsProcessing.length} objects to be processed ...`);
            return;
        }

        // Ban peer if the sync failed more often than allowed.
        if (this._failedSyncs >= LightConsensusAgent.SYNC_ATTEMPTS_MAX) {
            this._peer.channel.close(CloseType.BLOCKCHAIN_SYNC_FAILED, 'blockchain sync failed');
            if (this._partialChain) {
                await this._partialChain.abort();
                this._partialChain = null;
            }
            return;
        }

        // Check if we know head block.
        const block = await this._blockchain.getBlock(this._syncTarget, /*includeForks*/ true);

        /*
         * Three cases:
         * 1) We know block and are not yet syncing: All is done.
         * 2) We don't know the block and are not yet syncing: Start syncing.
         *    and determine sync mode (full catchup or not).
         * 3) We are syncing. Behave differently based on sync mode.
         *    Note that we can switch from catchup to proof if we notice that
         *    we're on a fork and get an INV vector starting from the genesis block.
         */

        // Case 1: We're up to date.
        if (block && !this._syncing) {
            this._syncFinished();
            return;
        }

        // Case 2: Check header.
        if (!block && !this._syncing) {
            this._syncing = true;
            this._onMainChain = false;

            let header;
            try {
                header = await this.getHeader(this._syncTarget);
            } catch (e) {
                this._peer.channel.close(CloseType.ABORTED_SYNC, 'Failed to retrieve sync target header');
                return;
            }

            // Check how to sync based on heuristic:
            // Allow catchup sync in a window of NUM_BLOCKS_VERIFICATION in both directions
            this._catchup = Math.abs(header.height - this._blockchain.height) <= Policy.NUM_BLOCKS_VERIFICATION;
            Log.d(LightConsensusAgent, `Start syncing, catchup mode: ${this._catchup}`);
        }

        // Case 3: We are syncing.
        if (this._syncing && !this._busy) {
            if (this._catchup) {
                await FullConsensusAgent.prototype.syncBlockchain.call(this);
            } else {
                // Initialize partial chain on first call.
                if (!this._partialChain) {
                    await this._initChainProofSync();
                }

                switch (this._partialChain.state) {
                    case PartialLightChain.State.PROVE_CHAIN:
                        this._requestChainProof();
                        this.fire('sync-chain-proof', this._peer.peerAddress);
                        break;
                    case PartialLightChain.State.PROVE_ACCOUNTS_TREE:
                        this._requestAccountsTree();
                        this.fire('sync-accounts-tree', this._peer.peerAddress);
                        break;
                    case PartialLightChain.State.PROVE_BLOCKS:
                        this._requestProofBlocks();
                        this.fire('verify-accounts-tree', this._peer.peerAddress);
                        break;
                    case PartialLightChain.State.COMPLETE:
                        // Commit state on success.
                        this.fire('sync-finalize', this._peer.peerAddress);
                        this._busy = true;
                        await this._partialChain.commit();
                        await this._applyOrphanedBlocks();
                        this._syncFinished();
                        break;
                    case PartialLightChain.State.ABORTED:
                        this._peer.channel.close(CloseType.ABORTED_SYNC, 'aborted sync');
                        break;
                    case PartialLightChain.State.WEAK_PROOF:
                        Log.d(LightConsensusAgent, `Not syncing with ${this._peer.peerAddress} - weaker proof`);
                        this._numWeakProofs++;
                        if (this._numWeakProofs >= LightConsensusAgent.WEAK_PROOFS_MAX) {
                            this._peer.channel.close(CloseType.BLOCKCHAIN_SYNC_FAILED, 'too many weak proofs');
                        } else {
                            this._syncFinished();
                        }
                        break;
                }
            }
        }
    }

    /**
     * @returns {Promise.<void>}
     * @private
     */
    async _initChainProofSync() {
        // Subscribe to all announcements from the peer.
        this._subscribeTarget();

        this._syncing = true;
        this._synced = false;
        this._catchup = false;
        this._onMainChain = true;

        if (this._partialChain) {
            await this._partialChain.abort();
        }

        this._partialChain = await this._blockchain.partialChain();
    }

    /**
     * @returns {void}
     * @private
     */
    _syncFinished() {
        if (this._partialChain) {
            this._partialChain = null;
        }

        this._busy = false;
        super._syncFinished();
    }

    /**
     * @returns {Promise.<void>}
     * @private
     */
    async _applyOrphanedBlocks() {
        for (const block of this._orphanedBlocks) {
            const status = await this._blockchain.pushBlock(block);
            if (status === LightChain.ERR_INVALID) {
                this._peer.channel.close(CloseType.INVALID_BLOCK, 'received invalid block');
                break;
            }
        }
        this._orphanedBlocks = [];
    }

    // Syncing stages.
    // Stage 1: Chain proof.
    /**
     * @returns {void}
     * @private
     */
    _requestChainProof() {
        Assert.that(this._partialChain && this._partialChain.state === PartialLightChain.State.PROVE_CHAIN);
        Assert.that(!this._requestedChainProof);
        this._busy = true;

        // Request ChainProof from peer.
        this._peer.channel.getChainProof();
        this._requestedChainProof = true;

        // Drop the peer if it doesn't send the chain proof within the timeout.
        this._peer.channel.expectMessage(Message.Type.CHAIN_PROOF, () => {
            this._peer.channel.close(CloseType.GET_CHAIN_PROOF_TIMEOUT, 'getChainProof timeout');
        }, LightConsensusAgent.CHAINPROOF_REQUEST_TIMEOUT, LightConsensusAgent.CHAINPROOF_CHUNK_TIMEOUT);
    }

    /**
     * @param {ChainProofMessage} msg
     * @returns {Promise.<void>}
     * @private
     */
    async _onChainProof(msg) {
        Assert.that(this._partialChain && this._partialChain.state === PartialLightChain.State.PROVE_CHAIN);
        Log.d(LightConsensusAgent, `[CHAIN-PROOF] Received from ${this._peer.peerAddress}: ${msg.proof}`);

        // Check if we have requested an interlink chain, discard unsolicited ones.
        if (!this._requestedChainProof) {
            Log.w(LightConsensusAgent, `Unsolicited chain proof received from ${this._peer.peerAddress}`);
            return;
        }
        this._requestedChainProof = false;

        if (this._syncing) {
            this.fire('verify-chain-proof', this._peer.peerAddress);
        }

        // Push the proof into the LightChain.
        if (!(await this._partialChain.pushProof(msg.proof))) {
            Log.w(LightConsensusAgent, `Invalid chain proof received from ${this._peer.peerAddress} - verification failed`);
            this._peer.channel.close(CloseType.INVALID_CHAIN_PROOF, 'Invalid chain proof');
            return;
        }

        // TODO add all blocks from the chain proof to knownObjects.

        this._busy = false;
        this.syncBlockchain().catch(Log.e.tag(LightConsensusAgent));
    }

    // Stage 2: Request AccountsTree.
    /**
     * @private
     */
    _requestAccountsTree() {
        Assert.that(this._partialChain && this._partialChain.state === PartialLightChain.State.PROVE_ACCOUNTS_TREE);
        Assert.that(!this._accountsRequest);
        this._busy = true;

        const startPrefix = this._partialChain.getMissingAccountsPrefix();
        const headHash = this._partialChain.headHash;
        Log.d(LightConsensusAgent, `Requesting AccountsTreeChunk starting at ${startPrefix} from ${this._peer.peerAddress}`);

        this._accountsRequest = {
            startPrefix,
            blockHash: headHash
        };

        // Request AccountsProof from peer.
        this._peer.channel.getAccountsTreeChunk(headHash, startPrefix);

        // Drop the peer if it doesn't send the accounts proof within the timeout.
        this._peer.channel.expectMessage(Message.Type.ACCOUNTS_TREE_CHUNK, () => {
            this._accountsRequest = null;
            this._peer.channel.close(CloseType.GET_ACCOUNTS_TREE_CHUNK_TIMEOUT, 'getAccountsTreeChunk timeout');
        }, LightConsensusAgent.ACCOUNTS_TREE_CHUNK_REQUEST_TIMEOUT);
    }

    /**
     * @param {AccountsTreeChunkMessage} msg
     * @returns {Promise.<void>}
     * @private
     */
    async _onAccountsTreeChunk(msg) {
        Log.d(LightConsensusAgent, `[ACCOUNTS-TREE-CHUNK] Received from ${this._peer.peerAddress}: blockHash=${msg.blockHash}, proof=${msg.chunk}`);

        // Check if we have requested an accounts proof, discard unsolicited ones.
        if (!this._accountsRequest) {
            Log.w(LightConsensusAgent, `Unsolicited accounts tree chunk received from ${this._peer.peerAddress}`);
            return;
        }

        Assert.that(this._partialChain && this._partialChain.state === PartialLightChain.State.PROVE_ACCOUNTS_TREE);

        const {startPrefix, blockHash} = this._accountsRequest;
        this._accountsRequest = null;

        if (!msg.hasChunk()) {
            // Restart syncing.
            await this._partialChain.abort();
            this._partialChain = null;
            this._busy = false;
            this._failedSyncs++;
            return;
        }

        // Check that we know the reference block.
        if (!blockHash.equals(msg.blockHash) || msg.chunk.head.prefix <= startPrefix) {
            Log.w(LightConsensusAgent, `Received AccountsTreeChunk for block != head or wrong start prefix from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.UNEXPECTED_ACCOUNTS_TREE_CHUNK, 'Unexpected AccountsTreeChunk');
            return;
        }

        // Verify the proof.
        const chunk = msg.chunk;
        if (!chunk.verify()) {
            Log.w(LightConsensusAgent, `Invalid AccountsTreeChunk received from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_ACCOUNTS_TREE_CHUNCK, 'Invalid AccountsTreeChunk');
            return;
        }

        // Check that the proof root hash matches the accountsHash in the reference block.
        const rootHash = chunk.root();
        const block = await this._partialChain.getBlock(blockHash);
        if (!block.accountsHash.equals(rootHash)) {
            Log.w(LightConsensusAgent, `Invalid AccountsTreeChunk (root hash) received from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_ACCOUNTS_TREE_CHUNCK, 'AccountsTreeChunk root hash mismatch');
            return;
        }

        // Return the retrieved accounts.
        const result = await this._partialChain.pushAccountsTreeChunk(chunk);

        // Something went wrong!
        if (result < 0) {
            Log.e(`AccountsTree sync failed with error code ${result} from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_ACCOUNTS_TREE_CHUNCK, 'AccountsTreeChunk root hash mismatch');
        }

        this._busy = false;
        this.syncBlockchain().catch(Log.e.tag(LightConsensusAgent));
    }

    // Stage 3: Request proof blocks.
    /**
     * @private
     */
    _requestProofBlocks() {
        Assert.that(this._partialChain && this._partialChain.state === PartialLightChain.State.PROVE_BLOCKS);

        // If nothing happend since the last request, increase failed syncs.
        if (this._lastChainHeight === this._partialChain.proofHeadHeight) {
            this._failedSyncs++;
        }
        this._lastChainHeight = this._partialChain.proofHeadHeight;

        // XXX Only one getBlocks request at a time.
        if (this._peer.channel.isExpectingMessage(Message.Type.INV)) {
            Log.e(LightConsensusAgent, 'Duplicate _requestProofBlocks()');
            return;
        }

        // Drop the peer if it doesn't start sending InvVectors for its chain within the timeout.
        this._peer.channel.expectMessage(Message.Type.INV, () => {
            this._peer.channel.close(CloseType.GET_BLOCKS_TIMEOUT, 'getBlocks timeout');
        }, BaseConsensusAgent.REQUEST_TIMEOUT);

        // Request the full block for our proof head.
        const locators = this._partialChain.getBlockLocators();
        this.requestVector(new InvVector(InvVector.Type.BLOCK, locators[0]));

        // Request blocks from peer.
        this._peer.channel.getBlocks(locators, this._partialChain.numBlocksNeeded(), false);
    }

    // Block processing.
    /**
     * @returns {Promise.<void>}
     * @private
     */
    _requestBlocks() {
        // If we are syncing and not yet sure whether our blocks are on the main chain, just sync one block for now.
        if (this._syncing && !this._onMainChain) {
            return super._requestBlocks(1);
        }
        return super._requestBlocks();
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {Promise.<void>}
     * @protected
     * @override
     */
    async _processBlock(hash, block) {
        // If we find that we are on a fork far away from our chain, resync.
        if (block.height < this._chain.height - Policy.NUM_BLOCKS_VERIFICATION
            && (!this._partialChain || this._partialChain.state !== PartialLightChain.State.PROVE_BLOCKS)) {
            this._onMainChain = false;
            await this._initChainProofSync();
            this.syncBlockchain().catch(Log.e.tag(LightConsensusAgent));
            return;
        } else {
            this._onMainChain = true;
        }

        // Put block into blockchain.
        const status = await this._chain.pushBlock(block);

        switch (status) {
            case FullChain.ERR_INVALID:
                this._peer.channel.close(CloseType.INVALID_BLOCK, 'received invalid block');
                break;

            case FullChain.OK_EXTENDED:
            case FullChain.OK_REBRANCHED:
                if (this._syncing) this._numBlocksExtending++;
                break;

            case FullChain.OK_FORKED:
                if (this._syncing) {
                    this._numBlocksForking++;
                    this._forkHead = block;
                }
                break;

            case LightChain.ERR_ORPHAN:
                this._onOrphanBlock(hash, block);
                break;
        }
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @returns {void}
     * @protected
     * @override
     */
    async _onKnownBlockAnnounced(hash, block) {
        if (this._syncing && this._catchup) {
            // If we find that we are on a fork far away from our chain, resync.
            if (block.height < this._chain.height - Policy.NUM_BLOCKS_VERIFICATION
                && (!this._partialChain || this._partialChain.state !== PartialLightChain.State.PROVE_BLOCKS)) {
                this._onMainChain = false;
                await this._initChainProofSync();
                this.syncBlockchain().catch(Log.e.tag(LightConsensusAgent));
                return;
            } else {
                this._onMainChain = true;
            }

            FullConsensusAgent.prototype._onKnownBlockAnnounced.call(this, hash, block);
        }
    }

    /**
     * @param {Hash} hash
     * @param {Block} block
     * @private
     * @override
     */
    _onOrphanBlock(hash, block) {
        if (this._syncing && !this._catchup) {
            this._orphanedBlocks.push(block);
        } else {
            super._onOrphanBlock(hash, block);
        }
    }

    // Header processing.
    /**
     * @param {Hash} hash
     * @return {Promise.<BlockHeader>}
     */
    getHeader(hash) {
        Assert.that(!this._headerRequest);

        return new Promise((resolve, reject) => {
            const vector = new InvVector(InvVector.Type.BLOCK, hash);
            this._headerRequest = {
                hash,
                resolve,
                reject
            };

            this._peer.channel.getHeader([vector]);

            // Drop the peer if it doesn't send the accounts proof within the timeout.
            this._peer.channel.expectMessage(Message.Type.HEADER, () => {
                this._headerRequest = null;
                this._peer.channel.close(CloseType.GET_HEADER_TIMEOUT, 'getHeader timeout');
                reject(new Error('Timeout'));
            }, BaseConsensusAgent.REQUEST_TIMEOUT);
        });
    }

    /**
     * @param {HeaderMessage} msg
     * @return {void}
     * @protected
     * @override
     */
    _onHeader(msg) {
        const header = msg.header;
        const hash = header.hash();

        // Check if we have requested this block.
        if (!this._headerRequest) {
            Log.w(NanoConsensusAgent, `Unsolicited header ${hash} received from ${this._peer.peerAddress}, discarding`);
            return;
        }

        const {hash: requestedHash, resolve, reject} = this._headerRequest;
        this._headerRequest = null;

        // Check that it is the correct hash.
        if (!requestedHash.equals(hash)) {
            Log.w(LightConsensusAgent, `Received wrong header from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.UNEXPECTED_HEADER, 'Received wrong header');
            reject(new Error('Received wrong header'));
            return;
        }

        resolve(header);
    }

    /**
     * @returns {void}
     * @protected
     * @override
     */
    _onClose() {
        if (this._partialChain) {
            this._partialChain.abort().catch(Log.w.tag(LightConsensusAgent));
        }

        super._onClose();
    }

    /** @type {LightChain} */
    get _chain() {
        if (this._syncing && !this._catchup && this._partialChain) {
            return this._partialChain;
        }
        return this._blockchain;
    }

    /** @type {boolean} */
    get syncing() {
        return this._syncing;
    }
}
/**
 * Maximum time (ms) to wait for chain-proof after sending out get-chain-proof before dropping the peer.
 * @type {number}
 */
LightConsensusAgent.CHAINPROOF_REQUEST_TIMEOUT = 1000 * 45;
/**
 * Maximum time (ms) to wait for between chain-proof chunks before dropping the peer.
 * @type {number}
 */
LightConsensusAgent.CHAINPROOF_CHUNK_TIMEOUT = 1000 * 10;
/**
 * Maximum time (ms) to wait for accounts-tree-chunk after sending out get-accounts-tree-chunk before dropping the peer.
 * @type {number}
 */
LightConsensusAgent.ACCOUNTS_TREE_CHUNK_REQUEST_TIMEOUT = 1000 * 8;
/**
 * Maximum number of blockchain sync retries before closing the connection.
 * @type {number}
 */
LightConsensusAgent.SYNC_ATTEMPTS_MAX = 5;
/**
 * Maximum number of inventory vectors to sent in the response for onGetBlocks.
 * @type {number}
 */
LightConsensusAgent.GETBLOCKS_VECTORS_MAX = 500;
/**
 * Maximum number of weak proofs we allow before closing the connection.
 * @type {number}
 */
LightConsensusAgent.WEAK_PROOFS_MAX = 3;

Class.register(LightConsensusAgent);