Home Reference Source

js-junk-bucket-repo/streams.js

const Future = require("./future");

const {Readable, Transform, Writable} = require("stream");

function promisePiped( from, to ){
	const future = new Future();

	function remove() {
		from.removeListener("end", accept);
		from.removeListener("error", reject);
		to.removeListener("error", reject);
	}
	function reject(e){
		remove();
		future.reject(e);
	}
	function accept(v){
		remove();
		future.accept(v);
	}

	from.on("error", reject);
	to.on("error", reject);

	to.on("finish", accept);
	from.pipe(to);
	return future.promised;
}

class EchoOnReceive extends Transform {
	constructor( log = console ){
		this.log = log;
	}
	_transform( chunk, encoding, cb ) {
		this.log.log("Chunk", chunk);
		cb(null,chunk);
	}

	_final(cb){
		this.log.log("Final called");
		cb();
	}
}

/**
 * A readable which will provide a given buffer on the first read attempt then appear closed on all further attempts.
 */
class MemoryReadable extends Readable {
	/**
	 *
	 * @param source {Buffer} bytes to be provided
	 * @param props additional properties to be passed to Readable
	 */
	constructor(source, props) {
		super(props);
		this.bytes = source;
		this.pushed = false;
	}

	_read( size ){
		if( !this.pushed ) {
			this.pushed = true;
			this.push(this.bytes);
		} else {
			this.readable = false;
			this.push(null);
		}
	}
}

/**
 * Accumulates bytes written to the sink.  This sink is never finished or finalized.
 */
class MemoryWritable extends Writable {
	/**
	 * Initializes an empty buffer with the provided properties.
	 *
	 * @param props properties to provide to the writable
	 */
	constructor(props) {
		super(props);
		this.bytes = Buffer.alloc(0);
	}

	_write(chunk, encoding, callback) {
		try {
			this.bytes = Buffer.concat([this.bytes, chunk]);
			callback(null);
		}catch(e){
			callback(e);
		}
	}
}

module.exports = {
	promisePiped,
	EchoOnReceive,
	MemoryReadable,
	MemoryWritable
};