Home Reference Source Test

src/main/generic/consensus/BaseMiniConsensusAgent.js

/**
 * @abstract
 */
class BaseMiniConsensusAgent extends BaseConsensusAgent {
    /**
     * @param {BaseChain} blockchain
     * @param {NanoMempool} mempool
     * @param {Time} time
     * @param {Peer} peer
     * @param {InvRequestManager} invRequestManager
     * @param {Subscription} [targetSubscription]
     */
    constructor(blockchain, mempool, time, peer, invRequestManager, targetSubscription) {
        super(time, peer, invRequestManager, targetSubscription);

        /** @type {BaseChain} */
        this._blockchain = blockchain;
        /** @type {NanoMempool} */
        this._mempool = mempool;

        this._subscribeTarget();

        // Helper object to keep track of the accounts we're requesting from the peer.
        this._accountsRequest = null;
        this._onToDisconnect(peer.channel, 'accounts-proof', msg => this._onAccountsProof(msg));
    }

    requestMempool() {
        // Request the peer's mempool.
        // XXX Use a random delay here to prevent requests to multiple peers at once.
        const delay = BaseMiniConsensusAgent.MEMPOOL_DELAY_MIN
            + Math.random() * (BaseMiniConsensusAgent.MEMPOOL_DELAY_MAX - BaseMiniConsensusAgent.MEMPOOL_DELAY_MIN);
        setTimeout(() => this._peer.channel.mempool(), delay);
    }


    /**
     * @param {Hash} blockHash
     * @param {Array.<Address>} addresses
     * @returns {Promise.<Array.<Account>>}
     */
    getAccounts(blockHash, addresses) {
        return this._synchronizer.push('getAccounts',
            this._getAccounts.bind(this, blockHash, addresses));
    }

    /**
     * @param {Hash} blockHash
     * @param {Array.<Address>} addresses
     * @returns {Promise.<Array<Account>>}
     * @private
     */
    async _getAccounts(blockHash, addresses) {
        Assert.that(this._accountsRequest === null);

        const block = await this._blockchain.getBlock(blockHash);
        if (!block) {
            throw new Error('Unknown block hash');
        }

        Log.d(BaseMiniConsensusAgent, `Requesting AccountsProof for ${addresses} from ${this._peer.peerAddress}`);

        return new Promise((resolve, reject) => {
            this._accountsRequest = {
                addresses,
                block,
                resolve,
                reject
            };

            // Request AccountsProof from peer.
            this._peer.channel.getAccountsProof(blockHash, addresses);

            // Drop the peer if it doesn't send the accounts proof within the timeout.
            this._peer.channel.expectMessage(Message.Type.ACCOUNTS_PROOF, () => {
                this._peer.channel.close(CloseType.GET_ACCOUNTS_PROOF_TIMEOUT, 'getAccountsProof timeout');
                reject(new Error('Timeout'));
            }, BaseMiniConsensusAgent.ACCOUNTSPROOF_REQUEST_TIMEOUT);
        });
    }

    /**
     * @param {AccountsProofMessage} msg
     * @returns {Promise.<void>}
     * @private
     */
    async _onAccountsProof(msg) {
        Log.d(BaseMiniConsensusAgent, `[ACCOUNTS-PROOF] Received from ${this._peer.peerAddress}: blockHash=${msg.blockHash}, proof=${msg.proof} (${msg.serializedSize} bytes)`);

        // Check if we have requested an accounts proof, discard unsolicited ones.
        if (!this._accountsRequest) {
            Log.w(BaseMiniConsensusAgent, `Unsolicited accounts proof received from ${this._peer.peerAddress}`);
            return;
        }

        // Reset accountsRequest.
        const {addresses, /** @type {Block} */ block, resolve, reject} = this._accountsRequest;
        this._accountsRequest = null;

        if (!msg.hasProof()) {
            reject(new Error('Accounts request was rejected'));
            return;
        }

        // Check that the reference block corresponds to the one we requested.
        const blockHash = block.hash();
        if (!blockHash.equals(msg.blockHash)) {
            Log.w(BaseMiniConsensusAgent, `Received AccountsProof for invalid reference block from ${this._peer.peerAddress}`);
            reject(new Error('Invalid reference block'));
            return;
        }

        // Verify the proof.
        const proof = msg.proof;
        if (!proof.verify()) {
            Log.w(BaseMiniConsensusAgent, `Invalid AccountsProof received from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_ACCOUNTS_PROOF, 'Invalid AccountsProof');
            reject(new Error('Invalid AccountsProof'));
            return;
        }

        // Check that the proof root hash matches the accountsHash in the reference block.
        const rootHash = proof.root();
        if (!block.accountsHash.equals(rootHash)) {
            Log.w(BaseMiniConsensusAgent, `Invalid AccountsProof (root hash) received from ${this._peer.peerAddress}`);
            this._peer.channel.close(CloseType.INVALID_ACCOUNTS_PROOF, 'AccountsProof root hash mismatch');
            reject(new Error('AccountsProof root hash mismatch'));
            return;
        }

        // Check that all requested accounts are part of this proof.
        // XXX return a map address -> account instead?
        const accounts = [];
        for (const address of addresses) {
            try {
                const account = proof.getAccount(address);
                accounts.push(account);
            } catch (e) {
                Log.w(BaseMiniConsensusAgent, `Incomplete AccountsProof received from ${this._peer.peerAddress}`);
                this._peer.channel.close(CloseType.INVALID_ACCOUNTS_PROOF, 'Incomplete AccountsProof');
                reject(new Error('Incomplete AccountsProof'));
                return;
            }
        }

        // Return the retrieved accounts.
        resolve(accounts);
    }

    /**
     * @returns {Iterable.<Transaction>}
     * @protected
     * @override
     */
    _getSubscribedMempoolTransactions() {
        switch (this._remoteSubscription.type) {
            case Subscription.Type.ADDRESSES:
                return this._mempool.getTransactionsByAddresses(this._remoteSubscription.addresses, BaseMiniConsensusAgent.MEMPOOL_ENTRIES_MAX);
            case Subscription.Type.MIN_FEE:
                return this._mempool.getTransactions().filter((tx) => tx.feePerByte >= this._remoteSubscription.minFeePerByte);
            case Subscription.Type.ANY:
                return this._mempool.getTransactions(BaseMiniConsensusAgent.MEMPOOL_ENTRIES_MAX);
        }
        return [];
    }
}
/**
 * Maximum time (ms) to wait for accounts-proof after sending out get-accounts-proof before dropping the peer.
 * @type {number}
 */
BaseMiniConsensusAgent.ACCOUNTSPROOF_REQUEST_TIMEOUT = 1000 * 5;
/**
 * Minimum time {ms} to wait before triggering the initial mempool request.
 * @type {number}
 */
BaseMiniConsensusAgent.MEMPOOL_DELAY_MIN = 500; // 0.5 seconds
/**
 * Maximum time {ms} to wait before triggering the initial mempool request.
 * @type {number}
 */
BaseMiniConsensusAgent.MEMPOOL_DELAY_MAX = 1000 * 5; // 5 seconds
/**
 * Number of transaction vectors to send
 * @type {number}
 */
BaseMiniConsensusAgent.MEMPOOL_ENTRIES_MAX = 1000;
Class.register(BaseMiniConsensusAgent);