Home Identifier Source

src/aggregator.js

import Generator from "./generator";

export default class Aggregator {

    constructor(size, processor, observer) {
        this._generator = new Generator(size);
        this._processor = processor;
        this._bucket = null;
        this._observer = observer;
    }

    /**
     * Gets the current bucket or returns a new one.
     *
     * If a new bucket is generated the result of the old bucket is emitted
     * automatically.
     */
    bucket(d) {
        const thisBucketIndex = this._generator.bucketIndex(d);
        const currentBucketIndex = this._bucket ?
            this._bucket.index().asString() : "";
        if (thisBucketIndex !== currentBucketIndex) {
            if (this._bucket) {
                this._bucket.aggregate(this._processor, event => {
                    if (this._observer) {
                        this._observer(this._bucket.index(), event);
                    }
                });
            }
            this._bucket = this._generator.bucket(d);
        }
        return this._bucket;
    }

    /**
     * Forces the current bucket to emit
     */
    done() {
        if (this._bucket) {
            this._bucket.aggregate(this._processor, event => {
                if (this._observer) {
                    this._observer(this._bucket.index(), event);
                }
                this._bucket = null;
            });
        }
    }

    /**
     * Add an event, which will be assigned to a bucket
     */
    addEvent(event, cb) {
        let t = event.timestamp();
        let bucket = this.bucket(t);

        //
        // Adding the value to the bucket. This could be an async operation
        // so the passed in callback to this function will be called when this
        // is done.
        //

        bucket.addEvent(event, err => {
            if (err) {
                console.error("Could not add value to bucket:", err);
            }
            if (cb) {
                cb(err);
            }
        });
    }

    /**
     * Set the emit callback after the constructor
     */
    onEmit(cb) {
        this._observer = cb;
    }
}