src/lib/log-watcher.js
/**
* @file The file that does the log watching.
* @author willyb321
* @copyright MIT
*/
'use strict';
const events = require('events');
const os = require('os');
const path = require('path');
const fs = require('fs-extra');
const debug = require('debug')('wotch');
const POLL_INTERVAL = 1000;
const DEFAULT_SAVE_DIR = path.join(
os.homedir(),
'Saved Games',
'Frontier Developments',
'Elite Dangerous'
);
/**
* @class
*/
class LogWatcher extends events.EventEmitter {
/**
* Construct the log watcher.
* @param dirpath
* @param maxfiles
*/
constructor(dirpath, maxfiles) {
super();
this._dirpath = dirpath || DEFAULT_SAVE_DIR;
this._filter = isCommanderLog;
this._maxfiles = maxfiles || 3;
this._logDetailMap = {};
this._ops = [];
this._op = null;
this._timer = null;
this._die = false;
this._loop();
}
/**
* Bury a file
* @param filename
*/
bury(filename) {
debug('bury', {filename});
this._logDetailMap[filename].tombstoned = true;
}
/**
* Stop running
*/
stop() {
debug('stop');
if (this._op === null) {
clearTimeout(this._timer);
this.emit('stopped');
} else {
this._ops.splice(this._ops.length);
this._die = true;
}
}
/**
* The main loop
*/
_loop() {
debug('_loop', {opcount: this._ops.length});
this._op = null;
if (this._ops.length === 0) {
this._timer = setTimeout(() => {
this._ops.push(callback => this._poll(callback));
setImmediate(() => this._loop());
}, POLL_INTERVAL);
return;
}
this._op = this._ops.shift();
try {
this._op(err => {
if (err) {
this.emit('error', err);
} else if (this._die) {
this.emit('stopped');
} else {
setImmediate(() => this._loop());
}
});
} catch (err) {
this.emit('error', err);
// assumption: it crashed BEFORE an async wait
// otherwise, we'll end up with more simultaneous
// activity
setImmediate(() => this._loop());
}
}
/**
* Poll the logs directory for new/updated files.
* @param callback
*/
_poll(callback) {
debug('_poll');
const unseen = {};
Object.keys(this._logDetailMap).forEach(filename => {
if (!this._logDetailMap[filename].tombstoned) {
unseen[filename] = true;
}
});
fs.readdir(this._dirpath, (err, filenames) => {
if (err) {
callback(err);
} else {
const files = filenames.slice(filenames.length - this._maxfiles, filenames.length);
files.forEach(filename => {
filename = path.join(this._dirpath, filename);
if (this._filter(filename)) {
delete unseen[filename];
this._ops.push(cb => this._statfile(filename, cb));
}
});
Object.keys(unseen).forEach(filename => {
this.bury(filename);
});
callback(null);
}
});
}
/**
* Stat the new/updated files in log directory
* @param filename
* @param callback
*/
_statfile(filename, callback) {
debug('_statfile', {filename});
fs.stat(filename, (err, stats) => {
if (err && err.code === 'ENOENT') {
if (this._logDetailMap[filename]) {
this.bury(filename);
}
callback(null); // file deleted
} else if (err) {
callback(err);
} else {
this._ops.push(cb => this._process(filename, stats, cb));
callback(null);
}
});
}
/**
* Process the files
* @param filename
* @param stats
* @param callback
*/
_process(filename, stats, callback) {
debug('_process', {filename, stats});
let CURRENT_FILE = 0;
setImmediate(callback, null);
const info = this._logDetailMap[filename];
if (info === undefined && CURRENT_FILE < this._maxfiles) {
this._logDetailMap[filename] = {
ino: stats.ino,
mtime: stats.mtime,
size: stats.size,
watermark: 0,
tombstoned: false
};
CURRENT_FILE++;
this._ops.push(cb => this._read(filename, cb));
return;
}
if (info.tombstoned) {
return;
}
if (info.ino !== stats.ino) {
// file replaced... can't trust it any more
// if the client API supported replay from scratch, we could do that
// but we can't yet, so:
CURRENT_FILE = 0;
this.bury(filename);
} else if (stats.size > info.size) {
// file not replaced; got longer... assume append
this._ops.push(cb => this._read(filename, cb));
} else if (info.ino === stats.ino && info.size === stats.size) {
// even if mtime is different, treat it as unchanged
// e.g. ^Z when COPY CON to a fake log
// don't queue read
}
info.mtime = stats.mtime;
info.size = stats.size;
}
/**
* Read the files
* @param filename
* @param callback
*/
_read(filename, callback) {
const {watermark, size} = this._logDetailMap[filename];
debug('_read', {filename, watermark, size});
let leftover = new Buffer('', 'utf8');
const s = fs.createReadStream(filename, {
flags: 'r',
start: watermark,
end: size
});
const finish = err => {
if (err) {
// On any error, emit the error and bury the file.
this.emit('error', err);
this.bury(filename);
}
setImmediate(callback, null);
callback = () => {
}; // no-op
};
s.once('error', finish);
s.once('end', finish);
s.on('data', chunk => {
const idx = chunk.lastIndexOf('\n');
if (idx < 0) {
leftover = Buffer.concat([leftover, chunk]);
} else {
this._logDetailMap[filename].watermark += idx + 1;
try {
const obs = Buffer.concat([leftover, chunk.slice(0, idx + 1)])
.toString('utf8')
.split(/[\r\n]+/)
.filter(l => l.length > 0)
.map(l => JSON.parse(l));
leftover = chunk.slice(idx + 1);
setImmediate(() => this.emit('data', obs) && this.emit('finished'));
} catch (err) {
finish(err);
}
}
});
}
}
/**
* Get the path of the logs.
* @param fpath
* @returns {boolean}
*/
function isCommanderLog(fpath) {
const base = path.basename(fpath);
return base.indexOf('Journal.') === 0 && path.extname(fpath) === '.log';
}
module.exports = {
LogWatcher,
isCommanderLog
};
if (!module.parent) {
process.on('uncaughtException', err => {
console.error(err.stack || err);
throw new Error(err.stack || err);
});
const watcher = new LogWatcher(DEFAULT_SAVE_DIR);
watcher.on('error', err => {
console.error(err.stack || err);
throw new Error(err.stack || err);
});
watcher.on('data', obs => {
obs.forEach(ob => {
const {timestamp, event} = ob;
console.log('\n' + timestamp, event);
delete ob.timestamp;
delete ob.event;
Object.keys(ob).sort().forEach(k => {
console.log('\t' + k, ob[k]);
});
});
});
}