src/transmission.js
// Copyright 2016 Hound Technology, Inc. All rights reserved.
// Use of this source code is governed by the Apache License 2.0
// license that can be found in the LICENSE file.
/* global window, global */
/**
* @module
*/
import superagent from "superagent";
import proxy from "superagent-proxy";
import urljoin from "urljoin";
const USER_AGENT = "libhoney-js/<@LIBHONEY_JS_VERSION@>";
const _global =
typeof window !== "undefined"
? window
: typeof global !== "undefined"
? global
: undefined;
// how many events to collect in a batch
const batchSizeTrigger = 50; // either when the eventQueue is > this length
const batchTimeTrigger = 100; // or it's been more than this many ms since the first push
// how many batches to maintain in parallel
const maxConcurrentBatches = 10;
// how many events to queue up for busy batches before we start dropping
const pendingWorkCapacity = 10000;
const emptyResponseCallback = function() {};
const eachPromise = (arr, iteratorFn) =>
arr.reduce(function(p, item) {
return p.then(function() {
return iteratorFn(item);
});
}, Promise.resolve());
const partition = (arr, keyfn, createfn, addfn) => {
let result = Object.create(null);
arr.forEach(v => {
let key = keyfn(v);
if (!result[key]) {
result[key] = createfn(v);
} else {
addfn(result[key], v);
}
});
return result;
};
class BatchEndpointAggregator {
constructor(events) {
this.batches = partition(
events,
/* keyfn */
ev => `${ev.apiHost}_${ev.writeKey}_${ev.dataset}`,
/* createfn */
ev => ({
apiHost: ev.apiHost,
writeKey: ev.writeKey,
dataset: ev.dataset,
events: [ev]
}),
/* addfn */
(batch, ev) => batch.events.push(ev)
);
}
encodeBatchEvents(events) {
let first = true;
let numEncoded = 0;
let encodedEvents = events.reduce((acc, ev) => {
try {
let encodedEvent = ev.toJSON(); // directly call toJSON, not JSON.stringify, because the latter wraps it in an additional set of quotes
numEncoded++;
let newAcc = acc + (!first ? "," : "") + encodedEvent;
first = false;
return newAcc;
} catch (e) {
ev.encodeError = e;
return acc;
}
}, "");
let encoded = "[" + encodedEvents + "]";
return { encoded, numEncoded };
}
}
/**
* @private
*/
export class ValidatedEvent {
constructor({
timestamp,
apiHost,
postData,
writeKey,
dataset,
sampleRate,
metadata
}) {
this.timestamp = timestamp;
this.apiHost = apiHost;
this.postData = postData;
this.writeKey = writeKey;
this.dataset = dataset;
this.sampleRate = sampleRate;
this.metadata = metadata;
}
toJSON() {
let fields = [];
if (this.timestamp) {
fields.push(`"time":${JSON.stringify(this.timestamp)}`);
}
if (this.sampleRate) {
fields.push(`"samplerate":${JSON.stringify(this.sampleRate)}`);
}
if (this.postData) {
fields.push(`"data":${this.postData}`);
}
return `{${fields.join(",")}}`;
}
}
export class MockTransmission {
constructor(options) {
this.constructorArg = options;
this.events = [];
}
sendEvent(ev) {
this.events.push(ev);
}
sendPresampledEvent(ev) {
this.events.push(ev);
}
reset() {
this.constructorArg = null;
this.events = [];
}
}
export class WriterTransmission {
sendEvent(ev) {
console.log(JSON.stringify(ev));
}
sendPresampledEvent(ev) {
console.log(JSON.stringify(ev));
}
}
export class NullTransmission {
sendEvent(_ev) {}
sendPresampledEvent(_ev) {}
}
/**
* @private
*/
export class Transmission {
constructor(options) {
this._responseCallback = emptyResponseCallback;
this._batchSizeTrigger = batchSizeTrigger;
this._batchTimeTrigger = batchTimeTrigger;
this._maxConcurrentBatches = maxConcurrentBatches;
this._pendingWorkCapacity = pendingWorkCapacity;
this._sendTimeoutId = -1;
this._eventQueue = [];
this._batchCount = 0;
if (typeof options.responseCallback == "function") {
this._responseCallback = options.responseCallback;
}
if (typeof options.batchSizeTrigger == "number") {
this._batchSizeTrigger = Math.max(options.batchSizeTrigger, 1);
}
if (typeof options.batchTimeTrigger == "number") {
this._batchTimeTrigger = options.batchTimeTrigger;
}
if (typeof options.maxConcurrentBatches == "number") {
this._maxConcurrentBatches = options.maxConcurrentBatches;
}
if (typeof options.pendingWorkCapacity == "number") {
this._pendingWorkCapacity = options.pendingWorkCapacity;
}
this._userAgentAddition = options.userAgentAddition || "";
this._proxy = options.proxy;
// Included for testing; to stub out randomness and verify that an event
// was dropped.
this._randomFn = Math.random;
}
_droppedCallback(ev, reason) {
this._responseCallback([
{
metadata: ev.metadata,
error: new Error(reason)
}
]);
}
sendEvent(ev) {
// bail early if we aren't sampling this event
if (!this._shouldSendEvent(ev)) {
this._droppedCallback(ev, "event dropped due to sampling");
return;
}
this.sendPresampledEvent(ev);
}
sendPresampledEvent(ev) {
if (this._eventQueue.length >= this._pendingWorkCapacity) {
this._droppedCallback(ev, "queue overflow");
return;
}
this._eventQueue.push(ev);
if (this._eventQueue.length >= this._batchSizeTrigger) {
this._sendBatch();
} else {
this._ensureSendTimeout();
}
}
_sendBatch() {
if (this._batchCount == maxConcurrentBatches) {
// don't start up another concurrent batch. the next timeout/sendEvent or batch completion
// will cause us to send another
return;
}
this._clearSendTimeout();
this._batchCount++;
var batch = this._eventQueue.splice(0, this._batchSizeTrigger);
let batchAgg = new BatchEndpointAggregator(batch);
const finishBatch = () => {
this._batchCount--;
let queueLength = this._eventQueue.length;
if (queueLength > 0) {
if (queueLength >= this._batchSizeTrigger) {
this._sendBatch();
} else {
this._ensureSendTimeout();
}
}
};
let batches = Object.keys(batchAgg.batches).map(k => batchAgg.batches[k]);
eachPromise(batches, batch => {
var url = urljoin(batch.apiHost, "/1/batch", batch.dataset);
var req = superagent.post(url);
if (this._proxy) {
req = proxy(req, this._proxy);
}
let { encoded, numEncoded } = batchAgg.encodeBatchEvents(batch.events);
return new Promise(resolve => {
// if we failed to encode any of the events, no point in sending anything to honeycomb
if (numEncoded === 0) {
this._responseCallback(
batch.events.map(ev => ({
metadata: ev.metadata,
error: ev.encodeError
}))
);
resolve();
return;
}
let userAgent = USER_AGENT;
let trimmedAddition = this._userAgentAddition.trim();
if (trimmedAddition) {
userAgent = `${USER_AGENT} ${trimmedAddition}`;
}
var start = Date.now();
req
.set("X-Hny-Team", batch.writeKey)
.set("User-Agent", userAgent)
.type("json")
.send(encoded)
.end((err, res) => {
let end = Date.now();
if (err) {
this._responseCallback(
batch.events.map(ev => ({
status_code: ev.encodeError ? undefined : err.status,
duration: end - start,
metadata: ev.metadata,
error: ev.encodeError || err
}))
);
} else {
let response = JSON.parse(res.text);
let respIdx = 0;
this._responseCallback(
batch.events.map(ev => {
if (ev.encodeError) {
return {
duration: end - start,
metadata: ev.metadata,
error: ev.encodeError
};
} else {
let res = response[respIdx++];
return {
status_code: res.status,
duration: end - start,
metadata: ev.metadata,
error: res.err
};
}
})
);
}
// we resolve unconditionally to continue the iteration in eachSeries. errors will cause
// the event to be re-enqueued/dropped.
resolve();
});
});
})
.then(finishBatch)
.catch(finishBatch);
}
_shouldSendEvent(ev) {
var { sampleRate } = ev;
if (sampleRate <= 1) {
return true;
}
return this._randomFn() < 1 / sampleRate;
}
_ensureSendTimeout() {
if (this._sendTimeoutId === -1) {
this._sendTimeoutId = _global.setTimeout(
() => this._sendBatch(),
this._batchTimeTrigger
);
}
}
_clearSendTimeout() {
if (this._sendTimeoutId !== -1) {
_global.clearTimeout(this._sendTimeoutId);
this._sendTimeoutId = -1;
}
}
}