Home Reference Source Test

src/main/generic/consensus/base/mempool/Mempool.js

class Mempool extends Observable {
    /**
     * @param {IBlockchain} blockchain
     * @param {Accounts} accounts
     */
    constructor(blockchain, accounts) {
        super();
        /** @type {IBlockchain} */
        this._blockchain = blockchain;
        /** @type {Accounts} */
        this._accounts = accounts;

        // Our pool of transactions.
        /** @type {SortedList.<Transaction>} */
        this._transactionsByFeePerByte = new SortedList(); // uses Transaction.compare, by fee descending
        /** @type {HashMap.<Hash, Transaction>} */
        this._transactionsByHash = new HashMap();
        /** @type {HashMap.<Address, MempoolTransactionSet>} */
        this._transactionSetBySender = new HashMap();
        /** @type {HashMap.<Address, HashSet.<Hash>>} */
        this._transactionSetByRecipient = new HashMap();
        /** @type {Synchronizer} */
        this._synchronizer = new Synchronizer();

        // Listen for changes in the blockchain head to evict transactions that have become invalid.
        blockchain.on('head-changed', () => this._evictTransactions());
        blockchain.on('block-reverted', (block) => this._restoreTransactions(block));
    }

    /**
     * @param {Transaction} transaction
     * @fires Mempool#transaction-added
     * @returns {Promise.<Mempool.ReturnCode>}
     */
    pushTransaction(transaction) {
        return this._synchronizer.push(() => this._pushTransaction(transaction));
    }

    /**
     * @param {Transaction} transaction
     * @returns {Promise.<Mempool.ReturnCode>}
     * @private
     */
    async _pushTransaction(transaction) {
        // Check if we already know this transaction.
        const hash = transaction.hash();
        if (this._transactionsByHash.contains(hash)) {
            return Mempool.ReturnCode.KNOWN;
        }

        const set = this._transactionSetBySender.get(transaction.sender) || new MempoolTransactionSet();
        // Check limit for free transactions.
        if (transaction.fee / transaction.serializedSize < Mempool.TRANSACTION_RELAY_FEE_MIN
            && set.numBelowFeePerByte(Mempool.TRANSACTION_RELAY_FEE_MIN) >= Mempool.FREE_TRANSACTIONS_PER_SENDER_MAX) {
            return Mempool.ReturnCode.FEE_TOO_LOW;
        }

        // Intrinsic transaction verification
        if (!transaction.verify()) {
            return Mempool.ReturnCode.INVALID;
        }

        // Retrieve recipient account and test incoming transaction.
        /** @type {Account} */
        let recipientAccount;
        try {
            recipientAccount = await this._accounts.get(transaction.recipient);
            recipientAccount.withIncomingTransaction(transaction, this._blockchain.height + 1);
        } catch (e) {
            Log.d(Mempool, () => `Rejected transaction from ${transaction.sender} - ${e.message}`);
            return Mempool.ReturnCode.INVALID;
        }

        // Retrieve sender account.
        /** @type {Account} */
        let senderAccount;
        try {
            senderAccount = await this._accounts.get(transaction.sender, transaction.senderType);
        } catch (e) {
            Log.d(Mempool, () => `Rejected transaction from ${transaction.sender} - ${e.message}`);
            return Mempool.ReturnCode.INVALID;
        }

        // Add new transaction to the sender's pending transaction set. Then re-check all transactions in the set
        // in fee/byte order against the sender account state. Adding high fee transactions may thus invalidate
        // low fee transactions in the set.
        const transactions = [];
        let tmpAccount = senderAccount;
        for (const tx of set.copyAndAdd(transaction).transactions) {
            let error = 'transactions per sender exceeded';
            try {
                if (transactions.length < Mempool.TRANSACTIONS_PER_SENDER_MAX) {
                    tmpAccount = tmpAccount.withOutgoingTransaction(tx, this._blockchain.height + 1, this._blockchain.transactionCache);
                    transactions.push(tx);

                    // Transaction ok, move to next one.
                    continue;
                }
            } catch (e) {
                error = e.message;
            }

            // An error occurred processing this transaction.
            // If the rejected transaction is the one we're pushing, fail.
            // Otherwise, evict the rejected transaction from the mempool.
            if (tx.equals(transaction)) {
                Log.d(Mempool, () => `Rejected transaction from ${transaction.sender} - ${error}`);
                return Mempool.ReturnCode.INVALID;
            } else {
                // Remove transaction
                this._removeTransaction(tx);
            }
        }

        if (this._transactionsByFeePerByte.length >= Mempool.SIZE_MAX) {
            this._popLowFeeTransaction();
        }

        // Transaction is valid, add it to the mempool.
        this._transactionsByFeePerByte.add(transaction);
        this._transactionsByHash.put(hash, transaction);
        this._transactionSetBySender.put(transaction.sender, new MempoolTransactionSet(transactions));
        /** @type {HashSet.<Hash>} */
        const byRecipient = this._transactionSetByRecipient.get(transaction.recipient) || new HashSet();
        byRecipient.add(transaction.hash());
        this._transactionSetByRecipient.put(transaction.recipient, byRecipient);

        // Tell listeners about the new valid transaction we received.
        this.fire('transaction-added', transaction);

        return Mempool.ReturnCode.ACCEPTED;
    }

    /**
     * @private
     */
    _popLowFeeTransaction() {
        // Remove transaction
        const transaction = this._transactionsByFeePerByte.pop();

        /** @type {MempoolTransactionSet} */
        const set = this._transactionSetBySender.get(transaction.sender);
        set.remove(transaction);

        /** @type {HashSet.<Hash>} */
        const byRecipient = this._transactionSetByRecipient.get(transaction.recipient);
        if (byRecipient) {
            if (byRecipient.length === 1) {
                this._transactionSetByRecipient.remove(transaction.recipient);
            } else {
                byRecipient.remove(transaction.hash());
            }
        } else {
            Log.e(Mempool, `Invalid state: no transactionsByRecipient for ${transaction}`);
        }

        this._transactionsByHash.remove(transaction.hash());
        this.fire('transaction-removed', transaction);
    }

    /**
     * Does *not* remove transaction from transactionsBySender!
     * @param {Transaction} transaction
     * @private
     */
    _removeTransaction(transaction) {
        this._transactionsByHash.remove(transaction.hash());

        // TODO: Optimise remove from this._transactionsByMinFee.
        this._transactionsByFeePerByte.remove(transaction);

        /** @type {HashSet.<Hash>} */
        const byRecipient = this._transactionSetByRecipient.get(transaction.recipient);
        if (byRecipient) {
            if (byRecipient.length === 1) {
                this._transactionSetByRecipient.remove(transaction.recipient);
            } else {
                byRecipient.remove(transaction.hash());
            }
            this.fire('transaction-removed', transaction);
        } else {
            Log.e(Mempool, `Invalid state: no transactionsByRecipient for ${transaction}`);
        }
    }

    /**
     * @param {Hash} hash
     * @returns {Transaction}
     */
    getTransaction(hash) {
        return this._transactionsByHash.get(hash);
    }

    /**
     * @param {number} [maxSize]
     * @param {number} [minFeePerByte]
     * @returns {Array.<Transaction>}
     */
    *transactionGenerator(maxSize = Infinity, minFeePerByte = 0) {
        let size = 0;
        for (const /** @type {Transaction} */ tx of this._transactionsByFeePerByte) {
            const txSize = tx.serializedSize;
            if (size + txSize >= maxSize) continue;
            if (tx.feePerByte < minFeePerByte) break;

            yield tx;
            size += txSize;
        }
    }

    /**
     * @param {number} [maxSize]
     * @param {number} [minFeePerByte]
     * @returns {Array.<Transaction>}
     */
    getTransactions(maxSize = Infinity, minFeePerByte = 0) {
        return Array.from(this.transactionGenerator(maxSize, minFeePerByte));
    }

    /**
     * @param {number} maxSize
     * @returns {Promise.<Array.<Transaction>>}
     */
    async getTransactionsForBlock(maxSize) {
        const transactions = this.getTransactions(maxSize);
        const prunedAccounts = await this._accounts.gatherToBePrunedAccounts(transactions, this._blockchain.height + 1, this._blockchain.transactionCache);
        const prunedAccountsSize = prunedAccounts.reduce((sum, acc) => sum + acc.serializedSize, 0);

        let size = prunedAccountsSize + transactions.reduce((sum, tx) => sum + tx.serializedSize, 0);
        while (size > maxSize) {
            size -= transactions.pop().serializedSize;
        }

        transactions.sort((a, b) => a.compareBlockOrder(b));
        return transactions;
    }

    /**
     * @param {Address} address
     * @return {Array.<Transaction>}
     */
    getPendingTransactions(address) {
        return this.getTransactionsBySender(address);
    }

    /**
     * @param {Address} address
     * @return {Array.<Transaction>}
     */
    getTransactionsBySender(address) {
        /** @type {MempoolTransactionSet} */
        const set = this._transactionSetBySender.get(address);
        return set ? set.transactions : [];
    }

    /**
     * @param {Address} address
     * @return {Array.<Transaction>}
     */
    getTransactionsByRecipient(address) {
        /** @type {HashSet.<Hash>} */
        const set = this._transactionSetByRecipient.get(address);
        if (!set) {
            return [];
        }

        /** @type {Array.<Transaction>} */
        const transactions = [];
        for (const hash of set.valueIterator()) {
            const tx = this._transactionsByHash.get(hash);
            Assert.that(!!tx);
            transactions.push(tx);
        }
        return transactions;
    }

    /**
     * @param {Array.<Address>} addresses
     * @param {number} [maxTransactions]
     * @return {Array.<Transaction>}
     */
    getTransactionsByAddresses(addresses, maxTransactions = Infinity) {
        const transactions = [];
        for (const address of addresses) {
            // Fetch transactions by sender first
            /** @type {Array.<Transaction>} */
            const bySender = this.getTransactionsBySender(address);
            for (const tx of bySender) {
                if (transactions.length >= maxTransactions) return transactions;
                transactions.push(tx);
            }

            // Fetch transactions by recipient second
            /** @type {Array.<Transaction>} */
            const byRecipient = this.getTransactionsByRecipient(address);
            for (const tx of byRecipient) {
                if (transactions.length >= maxTransactions) return transactions;
                transactions.push(tx);
            }
        }
        return transactions;
    }

    /**
     * @param {number} minFeePerByte
     */
    evictBelowMinFeePerByte(minFeePerByte) {
        /** @type {Transaction} */
        let transaction = this._transactionsByFeePerByte.peekLast();
        while (transaction && transaction.feePerByte < minFeePerByte) {
            this._transactionsByFeePerByte.pop();

            this._transactionsByHash.remove(transaction.hash());

            /** @type {MempoolTransactionSet} */
            const bySender = this._transactionSetBySender.get(transaction.sender);
            if (bySender.length === 1) {
                this._transactionSetBySender.remove(transaction.sender);
            } else {
                bySender.remove(transaction);
            }
            /** @type {HashSet.<Hash>} */
            const byRecipient = this._transactionSetByRecipient.get(transaction.recipient);
            if (byRecipient.length === 1) {
                this._transactionSetByRecipient.remove(transaction.recipient);
            } else {
                byRecipient.remove(transaction.hash());
            }

            transaction = this._transactionsByFeePerByte.peekLast();
        }
    }

    /**
     * @param {Block} block
     * @returns {Promise}
     * @private
     */
    _restoreTransactions(block) {
        return this._synchronizer.push(async () => {
            for (const tx of block.transactions) {
                await this._pushTransaction(tx);
            }
        });
    }

    /**
     * @fires Mempool#transactions-ready
     * @returns {Promise}
     * @private
     */
    _evictTransactions() {
        return this._synchronizer.push(() => this.__evictTransactions());
    }

    /**
     * @fires Mempool#transactions-ready
     * @returns {Promise}
     * @private
     */
    async __evictTransactions() {
        // Evict all transactions from the pool that have become invalid due
        // to changes in the account state (i.e. typically because the were included
        // in a newly mined block). No need to re-check signatures.
        for (const sender of this._transactionSetBySender.keys()) {
            /** @type {MempoolTransactionSet} */
            const set = this._transactionSetBySender.get(sender);

            try {
                const senderAccount = await this._accounts.get(set.sender, set.senderType);

                // If a transaction in the set is not valid anymore,
                // we try to construct a new set based on the heuristic of including
                // high fee/byte transactions first.
                const transactions = [];
                let account = senderAccount;
                for (const tx of set.transactions) {
                    try {
                        const tmpAccount = account.withOutgoingTransaction(tx, this._blockchain.height + 1, this._blockchain.transactionCache);

                        const recipientAccount = await this._accounts.get(tx.recipient);
                        recipientAccount.withIncomingTransaction(tx, this._blockchain.height + 1);

                        transactions.push(tx);
                        account = tmpAccount;
                    } catch (e) {
                        // Remove transaction
                        this._removeTransaction(tx);
                    }
                }
                if (transactions.length === 0) {
                    this._transactionSetBySender.remove(sender);
                } else {
                    this._transactionSetBySender.put(sender, new MempoolTransactionSet(transactions));
                }
            } catch (e) {
                // In case of an error, remove all transactions of this set.
                for (const tx of set.transactions) {
                    this._removeTransaction(tx);
                }
                this._transactionSetBySender.remove(sender);
            }
        }

        // Tell listeners that the pool has updated after a blockchain head change.
        /**
         * @event Mempool#transactions-ready
         */
        this.fire('transactions-ready');
    }

    /** @type {number} */
    get length() {
        return this._transactionsByHash.length;
    }

    /** @type {Synchronizer} */
    get queue() {
        return this._synchronizer;
    }
}

/**
 * Fee threshold in sat/byte below which transactions are considered "free".
 * @type {number}
 */
Mempool.TRANSACTION_RELAY_FEE_MIN = 1;
/**
 * Maximum number of transactions per sender.
 * @type {number}
 */
Mempool.TRANSACTIONS_PER_SENDER_MAX = 500;
/**
 * Maximum number of "free" transactions per sender.
 * @type {number}
 */
Mempool.FREE_TRANSACTIONS_PER_SENDER_MAX = 10;
/**
 * Maximum number of transactions in the mempool.
 * @type {number}
 */
Mempool.SIZE_MAX = 100000;

/** @enum {number} */
Mempool.ReturnCode = {
    FEE_TOO_LOW: -2,
    INVALID: -1,

    ACCEPTED: 1,
    KNOWN: 2
};

Class.register(Mempool);