Home Reference Source Repository

src/lib/log-watcher.js

/**
 * @file The file that does the log watching.
 * @author willyb321
 * @copyright MIT
 */
'use strict';
const events = require('events');
const os = require('os');
const path = require('path');
const fs = require('fs-extra');
const debug = require('debug')('wotch');

const POLL_INTERVAL = 1000;

const DEFAULT_SAVE_DIR = path.join(
	os.homedir(),
	'Saved Games',
	'Frontier Developments',
	'Elite Dangerous'
);
/**
 * @class
 */
class LogWatcher extends events.EventEmitter {
	/**
	 * Construct the log watcher.
	 * @param dirpath
	 * @param maxfiles
	 */
	constructor(dirpath, maxfiles) {
		super();

		this._dirpath = dirpath || DEFAULT_SAVE_DIR;
		this._filter = isCommanderLog;
		this._maxfiles = maxfiles || 3;
		this._logDetailMap = {};
		this._ops = [];
		this._op = null;
		this._timer = null;
		this._die = false;

		this._loop();
	}

	/**
	 * Bury a file
	 * @param filename
	 */
	bury(filename) {
		debug('bury', {filename});
		this._logDetailMap[filename].tombstoned = true;
	}

	/**
	 * Stop running
	 */
	stop() {
		debug('stop');

		if (this._op === null) {
			clearTimeout(this._timer);
			this.emit('stopped');
		} else {
			this._ops.splice(this._ops.length);
			this._die = true;
		}
	}

	/**
	 * The main loop
	 */
	_loop() {
		debug('_loop', {opcount: this._ops.length});

		this._op = null;

		if (this._ops.length === 0) {
			this._timer = setTimeout(() => {
				this._ops.push(callback => this._poll(callback));
				setImmediate(() => this._loop());
			}, POLL_INTERVAL);
			return;
		}

		this._op = this._ops.shift();

		try {
			this._op(err => {
				if (err) {
					this.emit('error', err);
				} else if (this._die) {
					this.emit('stopped');
				} else {
					setImmediate(() => this._loop());
				}
			});
		} catch (err) {
			this.emit('error', err);
				// assumption: it crashed BEFORE an async wait
				// otherwise, we'll end up with more simultaneous
				// activity
			setImmediate(() => this._loop());
		}
	}

	/**
	 * Poll the logs directory for new/updated files.
	 * @param callback
	 */
	_poll(callback) {
		debug('_poll');

		const unseen = {};
		Object.keys(this._logDetailMap).forEach(filename => {
			if (!this._logDetailMap[filename].tombstoned) {
				unseen[filename] = true;
			}
		});

		fs.readdir(this._dirpath, (err, filenames) => {
			if (err) {
				callback(err);
			} else {
				const files = filenames.slice(filenames.length - this._maxfiles, filenames.length);
				files.forEach(filename => {
					filename = path.join(this._dirpath, filename);
					if (this._filter(filename)) {
						delete unseen[filename];
						this._ops.push(cb => this._statfile(filename, cb));
					}
				});

				Object.keys(unseen).forEach(filename => {
					this.bury(filename);
				});

				callback(null);
			}
		});
	}

	/**
	 * Stat the new/updated files in log directory
	 * @param filename
	 * @param callback
	 */
	_statfile(filename, callback) {
		debug('_statfile', {filename});

		fs.stat(filename, (err, stats) => {
			if (err && err.code === 'ENOENT') {
				if (this._logDetailMap[filename]) {
					this.bury(filename);
				}
				callback(null); // file deleted
			} else if (err) {
				callback(err);
			} else {
				this._ops.push(cb => this._process(filename, stats, cb));
				callback(null);
			}
		});
	}

	/**
	 * Process the files
	 * @param filename
	 * @param stats
	 * @param callback
	 */
	_process(filename, stats, callback) {
		debug('_process', {filename, stats});
		let CURRENT_FILE = 0;
		setImmediate(callback, null);
		const info = this._logDetailMap[filename];

		if (info === undefined && CURRENT_FILE < this._maxfiles) {
			this._logDetailMap[filename] = {
				ino: stats.ino,
				mtime: stats.mtime,
				size: stats.size,
				watermark: 0,
				tombstoned: false
			};
			CURRENT_FILE++;
			this._ops.push(cb => this._read(filename, cb));
			return;
		}

		if (info.tombstoned) {
			return;
		}

		if (info.ino !== stats.ino) {
				// file replaced... can't trust it any more
				// if the client API supported replay from scratch, we could do that
				// but we can't yet, so:
			CURRENT_FILE = 0;
			this.bury(filename);
		} else if (stats.size > info.size) {
				// file not replaced; got longer... assume append
			this._ops.push(cb => this._read(filename, cb));
		} else if (info.ino === stats.ino && info.size === stats.size) {
				// even if mtime is different, treat it as unchanged
				// e.g. ^Z when COPY CON to a fake log
				// don't queue read
		}

		info.mtime = stats.mtime;
		info.size = stats.size;
	}

	/**
	 * Read the files
	 * @param filename
	 * @param callback
	 */
	_read(filename, callback) {
		const {watermark, size} = this._logDetailMap[filename];
		debug('_read', {filename, watermark, size});

		let leftover = new Buffer('', 'utf8');

		const s = fs.createReadStream(filename, {
			flags: 'r',
			start: watermark,
			end: size
		});
		const finish = err => {
			if (err) {
					// On any error, emit the error and bury the file.
				this.emit('error', err);
				this.bury(filename);
			}
			setImmediate(callback, null);
			callback = () => {
			}; // no-op
		};
		s.once('error', finish);

		s.once('end', finish);

		s.on('data', chunk => {
			const idx = chunk.lastIndexOf('\n');
			if (idx < 0) {
				leftover = Buffer.concat([leftover, chunk]);
			} else {
				this._logDetailMap[filename].watermark += idx + 1;
				try {
					const obs = Buffer.concat([leftover, chunk.slice(0, idx + 1)])
							.toString('utf8')
							.split(/[\r\n]+/)
							.filter(l => l.length > 0)
							.map(l => JSON.parse(l));
					leftover = chunk.slice(idx + 1);
					setImmediate(() => this.emit('data', obs) && this.emit('finished'));
				} catch (err) {
					finish(err);
				}
			}
		});
	}
	}
/**
 * Get the path of the logs.
 * @param fpath
 * @returns {boolean}
 */
function isCommanderLog(fpath) {
	const base = path.basename(fpath);
	return base.indexOf('Journal.') === 0 && path.extname(fpath) === '.log';
}

module.exports = {
	LogWatcher,
	isCommanderLog
};

if (!module.parent) {
	process.on('uncaughtException', err => {
		console.error(err.stack || err);
		throw new Error(err.stack || err);
	});

	const watcher = new LogWatcher(DEFAULT_SAVE_DIR);
	watcher.on('error', err => {
		console.error(err.stack || err);
		throw new Error(err.stack || err);
	});
	watcher.on('data', obs => {
		obs.forEach(ob => {
			const {timestamp, event} = ob;
			console.log('\n' + timestamp, event);
			delete ob.timestamp;
			delete ob.event;
			Object.keys(ob).sort().forEach(k => {
				console.log('\t' + k, ob[k]);
			});
		});
	});
}