Home Reference Source Test

src/main/generic/utils/IWorker.js

/**
 * @interface
 */
class IWorker {
    static async createProxy(clazz, name, worker) {
        return new (IWorker.Proxy(clazz))(worker, name);
    }

    static async startWorkerForProxy(clazz, name, workerScript) {
        if (!IWorker._workersSupported) {
            await IWorker._workerImplementation[clazz.name].init(name);
            return IWorker._workerImplementation[clazz.name];
        } else {
            if (!workerScript) {
                workerScript = `${Nimiq._path}worker.js`;
            }
            return IWorker.createProxy(clazz, name, new Worker(URL.createObjectURL(new Blob([`Nimiq = {_path: '${Nimiq._path}'}; importScripts('${workerScript.replace(/'/g, '')}');`]))));
        }
    }

    static async startWorkerPoolForProxy(clazz, name, size, workerScript) {
        return (new (IWorker.Pool(clazz))((name) => IWorker.startWorkerForProxy(clazz, name, workerScript), name, size)).start();
    }

    static async stubBaseOnMessage(msg) {
        try {
            if (msg.data.command === 'init') {
                if (IWorker._workerImplementation[msg.data.args[0]]) {
                    const res = await IWorker._workerImplementation[msg.data.args[0]].init(msg.data.args[1]);
                    self.postMessage({status: 'OK', result: res, id: msg.data.id});
                } else {
                    self.postMessage({status: 'error', result: 'Unknown worker!', id: msg.data.id});
                }
            } else {
                self.postMessage({status: 'error', result: 'Worker not yet initialized!', id: msg.data.id});
            }
        } catch (e) {
            self.postMessage({status: 'error', result: e, id: msg.data.id});
        }
    }

    static get _workersSupported() {
        return typeof Worker !== 'undefined';
    }

    static get areWorkersAsync() {
        return IWorker._workersSupported;
    }

    static get _insideWebWorker() {
        return typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope;
    }

    static get _global() {
        return typeof global !== 'undefined' ? global : typeof window !== 'undefined' ? window : typeof self !== 'undefined' ? self : null;
    }

    static prepareForWorkerUse(baseClazz, impl) {
        if (IWorker._insideWebWorker) {
            // Only inside WebWorker
            self.onmessage = IWorker.stubBaseOnMessage;
        }
        IWorker._workerImplementation = IWorker._workerImplementation || {};
        IWorker._workerImplementation[baseClazz.name] = impl;
    }

    static _loadBrowserScript(url, resolve) {
        // Adding the script tag to the head as suggested before
        const head = document.getElementsByTagName('head')[0];
        const script = document.createElement('script');
        script.type = 'text/javascript';
        script.src = url;

        // Then bind the event to the callback function.
        // There are several events for cross browser compatibility.
        // These events might occur before processing, so delay them a bit.
        const ret = () => setTimeout(resolve, 100);
        script.onreadystatechange = ret;
        script.onload = ret;

        // Fire the loading
        head.appendChild(script);
    }

    static Proxy(clazz) {
        const proxyClass = class extends clazz {
            /**
             * @param {Worker} worker
             * @param {string} [name]
             */
            constructor(worker, name) {
                super();
                this._name = name;
                this._messageId = 0;
                this._worker = worker;
                this._worker.onmessage = this._receive.bind(this);
                /** @type {Map.<number,{resolve:Function,error:Function}>} */
                this._waiting = new Map();
                return this._invoke('init', [clazz.name, name]).then(() => { return this; });
            }

            _receive(msg) {
                const cb = this._waiting.get(msg.data.id);
                if (!cb) {
                    Log.w(WorkerProxy, 'Unknown reply', msg);
                } else {
                    this._waiting.delete(msg.data.id);
                    if (msg.data.status === 'OK') {
                        cb.resolve(msg.data.result);
                    } else if (msg.data.status === 'error') {
                        cb.error(msg.data.result);
                    }
                }
            }

            /**
             * @param {string} command
             * @param {object[]} [args]
             * @returns {Promise}
             * @private
             */
            _invoke(command, args = []) {
                return new Promise((resolve, error) => {
                    const obj = {command: command, args: args, id: this._messageId++};
                    this._waiting.set(obj.id, {resolve, error});
                    this._worker.postMessage(obj);
                });
            }

            destroy() {
                return this._invoke('destroy');
            }
        };
        for (const funcName of Object.getOwnPropertyNames(clazz.prototype)) {
            if (typeof clazz.prototype[funcName] === 'function' && funcName !== 'constructor') {
                proxyClass.prototype[funcName] = function (...args) {
                    return this._invoke(funcName, args);
                };
            }
        }
        return proxyClass;
    }

    /**
     * @param {object} clazz
     * @return {Stub}
     * @constructor
     */
    static Stub(clazz) {
        const Stub = class extends clazz {
            constructor() {
                super();
            }

            _result(msg, status, result) {
                self.postMessage({status, result, id: msg.data.id});
            }

            _onmessage(msg) {
                try {
                    const res = this._invoke(msg.data.command, msg.data.args);
                    if (res instanceof Promise) {
                        res.then((finalRes) => { this._result(msg, 'OK', finalRes); });
                    } else {
                        this._result(msg, 'OK', res);
                    }
                } catch (e) {
                    this._result(msg, 'error', e.message || e);
                }
            }

            init(name) {
                this._name = name;
                if (IWorker._insideWebWorker) {
                    self.name = name;
                    self.onmessage = (msg) => this._onmessage(msg);
                }
            }

            _invoke(command, args) {
                return this[command].apply(this, args);
            }

            destroy() {
                if (IWorker._insideWebWorker) {
                    self.close();
                }
            }
        };
        for (const funcName of Object.getOwnPropertyNames(clazz.prototype)) {
            if (typeof clazz.prototype[funcName] === 'function' && funcName !== 'constructor') {
                Stub.prototype[funcName] = function () {
                    throw `Not implemented in IWorker Stub: ${funcName}`;
                };
            }
        }
        return Stub;
    }

    static Pool(clazz) {
        const poolClass = class extends clazz {
            /**
             *
             * @param {function(string):Promise} proxyInitializer
             * @param {string} [name]
             * @param {number} [size] Number of workers in this pool.
             */
            constructor(proxyInitializer, name = 'pool', size = 1) {
                super();
                /** @type {function(string):Promise} */
                this._proxyInitializer = proxyInitializer;
                /** @type {string} */
                this._name = name;
                /** @type {number} */
                this._poolSize = size;
                /** @type {Array} */
                this._workers = [];
                /** @type {Array} */
                this._freeWorkers = [];
                /** @type {Array.<{name:string, args:Array, resolve:function, error:function}>} */
                this._waitingCalls = [];
            }

            async start() {
                await this._updateToSize();

                return this;
            }

            get poolSize() {
                return this._poolSize;
            }

            set poolSize(_size) {
                this._poolSize = _size;
                this._updateToSize().catch(Log.w.tag(IWorker));
            }

            destroy() {
                this._poolSize = 0;
                return this._updateToSize();
            }

            /**
             * @param {string} name Name of the function to call on a worker
             * @param {Array} args Arguments to pass to the function
             * @returns {Promise}
             */
            _invoke(name, args) {
                if (IWorker._workersSupported) {
                    return new Promise((resolve, error) => {
                        this._waitingCalls.push({name, args, resolve, error});
                        const worker = this._freeWorkers.shift();
                        if (worker) {
                            this._step(worker).catch(Log.w.tag(IWorker));
                        }
                    });
                } else {
                    return this._workers[0][name].apply(this._workers[0], args);
                }
            }

            /**
             * @param worker
             * @returns {Promise.<void>}
             * @private
             */
            async _step(worker) {
                let call = this._waitingCalls.shift();
                while (call) {
                    try {
                        // eslint-disable-next-line no-await-in-loop
                        call.resolve(await worker[call.name].apply(worker, call.args));
                    } catch (e) {
                        call.error(e);
                    }
                    if (this._workers.indexOf(worker) === -1) {
                        worker.destroy();
                        return;
                    }
                    call = this._waitingCalls.shift();
                }
                this._freeWorkers.push(worker);
            }

            async _updateToSize() {
                if (typeof Worker === 'undefined' && this._poolSize > 1) {
                    Log.d(IWorker, 'Pool of size larger than 1 requires WebWorker support.');
                    this._poolSize = 1;
                }

                const workerPromises = [];
                while (this._workers.length + workerPromises.length < this._poolSize) {
                    workerPromises.push(this._proxyInitializer(`${this._name}#${this._workers.length + workerPromises.length}`));
                }
                const createdWorkers = await Promise.all(workerPromises);
                for (const worker of createdWorkers) {
                    this._workers.push(worker);
                    this._step(worker).catch(Log.w.tag(IWorker));
                }

                while (this._workers.length > this._poolSize) {
                    const worker = this._freeWorkers.shift() || this._workers.pop();
                    const idx = this._workers.indexOf(worker);
                    if (idx >= 0) {
                        // This was a free worker, also remove it from the worker list and destroy it now.
                        this._workers.splice(idx, 1);
                        worker.destroy();
                    }
                }
                return this;
            }
        };
        for (const funcName of Object.getOwnPropertyNames(clazz.prototype)) {
            if (typeof clazz.prototype[funcName] === 'function' && funcName !== 'constructor') {
                poolClass.prototype[funcName] = function (...args) {
                    return this._invoke(funcName, args);
                };
            }
        }
        return poolClass;
    }
}

IWorker._moduleLoadedCallbacks = {};
IWorker._workerImplementation = {};
Class.register(IWorker);