Home Reference Source Test

src/main/generic/utils/synchronizer/PrioritySynchronizer.js

class PrioritySynchronizer extends Observable {
    /**
     * @param {number} numPriorities
     * @param {?number} [throttleAfter]
     * @param {?number} [throttleWait]
     */
    constructor(numPriorities, throttleAfter, throttleWait) {
        super();

        /** @type {Array.<LinkedList.<object>>} */
        this._queues = [];
        for (let i = 0; i < numPriorities; i++) {
            this._queues[i] = new LinkedList();
        }

        /** @type {boolean} */
        this._working = false;
        /** @type {?number} */
        this._throttleAfter = throttleAfter;
        /** @type {?number} */
        this._throttleWait = throttleWait;
        /** @type {number} */
        this._elapsed = 0;
        /** @type {number} */
        this._totalElapsed = 0;
        /** @type {number} */
        this._totalJobs = 0;
        /** @type {number} */
        this._totalThrottles = 0;
    }

    /**
     * Push function to the Synchronizer for later, synchronous execution
     * @template T
     * @param {number} priority A discrete priority, 0 being highest.
     * @param {function():T} fn Function to be invoked later by this Synchronizer
     * @returns {Promise.<T>}
     */
    push(priority, fn) {
        Assert.that(priority >= 0 && priority < this._queues.length && Number.isInteger(priority), 'Invalid priority');

        return new Promise((resolve, reject) => {
            this._queues[priority].push({fn: fn, resolve: resolve, reject: reject});
            if (!this._working) {
                this.fire('work-start', this);
                this._doWork().catch(Log.w.tag(PrioritySynchronizer));
            }
        });
    }

    /**
     * Reject all jobs in the queue and clear it.
     * @returns {void}
     */
    clear() {
        for (const queue of this._queues) {
            for (const job of queue) {
                if (job.reject) job.reject();
            }
            queue.clear();
        }
    }

    async _doWork() {
        this._working = true;

        for (const queue of this._queues) {
            while (queue.length > 0) {
                const start = Date.now();

                const job = queue.shift();
                try {
                    const result = await job.fn();
                    job.resolve(result);
                } catch (e) {
                    if (job.reject) job.reject(e);
                }

                this._totalJobs++;

                if (this._throttleAfter !== undefined) {
                    this._elapsed += Date.now() - start;
                    if (this._elapsed >= this._throttleAfter) {
                        this._totalElapsed += this._elapsed;
                        this._totalThrottles++;
                        this._elapsed = 0;
                        setTimeout(this._doWork.bind(this), this._throttleWait);
                        return;
                    }
                }
            }
        }

        this._working = false;
        this._totalElapsed += this._elapsed;
        this._elapsed = 0;
        this.fire('work-end', this);
    }

    /** @type {boolean} */
    get working() {
        return this._working;
    }

    /** @type {number} */
    get length() {
        return this._queues.reduce((sum, q) => sum + q.length, 0);
    }

    /** @type {number} */
    get totalElapsed() {
        return this._totalElapsed;
    }

    /** @type {number} */
    get totalJobs() {
        return this._totalJobs;
    }

    /** @type {number} */
    get totalThrottles() {
        return this._totalThrottles;
    }
}
Class.register(PrioritySynchronizer);