Home Reference Source

src/event-queue.js

'use strict';

/**
 * A wrapper class. It is used to raise errors and signal interruptions to normal iteration flow.
 */
class Signal {
    /**
     * Constructor
     * @param {Error} err - an error to be trhown during iteration
     */
    constructor(err) {
        /** an error object to be held */
        this.err = err;
    }

    /**
     * Accessor to wrapped Error instance
     */
    get error() {
        return this.err;
    }
}

/**
 *  Class implements JS asyncIterator protocol to be used in `for-await-of loop`
 */
class Iterator {
    /**
     * Constructor
     * @param {EventQueue} queue
     */
    constructor(queue) {
        /** Reference to queue object */
        this.queue = queue;
    }

    /**
     * returns the result with two optional data members: `value` must be returned if there is more data expected.
     * `done` must be set to true when iteration is finished, in this case `value` is ignored
     */
    async next() {
        let data = await this.queue.receive();
        if (this.queue.isSignal(data)) {
            throw data.error;
        }
        let result = {
            value: data,
            done: (data === this.queue.end)
        };
        return result;
    }

    /**
     * Support for `for-await-of loop` using iterator object
     */
    [Symbol.asyncIterator]() {
        return this.queue[Symbol.asyncIterator]();
    }
}

/**
 * Event queue with support for `async-await` reading by a single consumer
 */
class EventQueue {

    /**
     * Default constructor
     */
    constructor() {
        /** @private array of messages */
        this.queue = [];
        /** @private flag whether the queue is closed */
        this.isDown = false;
        /** @private special object used to mark the last message */
        this.endMarker = {id: 'Queue is terminated'}; // special object
    }

    /**
     * Getter for the end marker. This object is sent when queue is shutdown
     */
    get end() {
        return this.endMarker;
    }

    /**
     * Async method that can be waited on to receive the next message.
     * At most one consumer at a time is expected to be waiting.
     * The method throws an exception after queue was shut down and all pendingmessages were delivered.
     */
    async receive() {
        if (this.queue.length > 0) {
            let val = this.queue.shift();
            return Promise.resolve(val);
        }
        if (this.resolver) {
            throw new Error('Another block of code is already waiting to receive data from this queue.');
        }
        if (this.isDown) {
            throw new Error('Can\'t read from queue. The queue was shutdown.');
        }
        return new Promise((resolver) => {
            /**
             * @private callback to use when the next value is received.
             */
            this.resolver = resolver;
        });
    }

    /**
     * Sends a message to a waiting consumer, or stores it in-memory to be delivered later.
     * Once the queue is shut down, no more messages are accepted for delivery.
     * Pending messages will be delivered to a consumer.
     * @param {*} val - value to be delivered
     */
    send(val) {
        if (!this.isDown) {
            if (this.resolver) {
                this.resolver(val);
                this.resolver = undefined;
            }
            else {
                this.queue.push(val);
            }
        }
        else {
            throw new Error('Can\'t send a message. The queue was shutdown.');
        }
    }

    /**
     * Closes queue from accepting more messages for delivery
     */
    shutdown() {
        this.send(this.endMarker);
        this.isDown = true;
    }

    /**
     * Send a message containing an Error object. Iterator will throw an exception when processing this object
     * @param {Error} err - an error to be thrown
     *
     * @example
     * // Reading from queue using `for await` loop:
     * let queue = new EventQueue();
     * setTimeout(() => {
     *     queue.send(0);
     *     queue.send(1);
     *     queue.send(2);
     *     queue.raiseSignal(new Error('Processing failed'));
     *     queue.send(3);
     *     queue.shutdown();
     * }, 15);
     * let results = [];
     * try {
     *     for await (let data of queue) {
     *         results.push(data);
     *     }
     * }
     * catch(err) {
     *     // err.message is 'Processing failed'
     * }
     * // results contain [0, 1, 2]
     * @example
     * // If for some reason one can't use `for await` loop then a similar loop can be implemented using `while`:
     * let queue = new EventQueue();
     * setTimeout(() => {
     *     queue.send(0);
     *     queue.send(1);
     *     queue.send(2);
     *     queue.raiseSignal(new Error('Processing failed'));
     *     queue.send(3);
     *     queue.shutdown();
     * }, 15);
     * let results = [];
     * let data;
     * // while loop checks whether queue was shutdown by comparing returned value
     * // to a special data member of the queue object. It also checks for signals and throws errors
     * try {
     *     while ((data = await queue.receive()) !== queue.end) {
     *         if (queue.isSignal(res)) {
     *             throw res.error;
     *         }
     *         results.push(data);
     *     }
     * }
     * finally {
     * }
     * // results contain [0, 1, 2, 3]
     */
    raiseSignal(err) {
        if (!(err instanceof Error)) {
            throw new Error('Signal can be raised only by using instances of an Error class');
        }
        this.send(new Signal(err));
    }

    /**
     * Checks whether an object was created by raiseSginal method.
     * @param {*} signal
     */
    isSignal(signal) {
        let result = (signal instanceof Signal);
        return result;
    }

    /**
     * Support for `for-await-of loop`
     */
    [Symbol.asyncIterator]() {
        return new Iterator(this);
    }
}

module.exports = EventQueue;