src/main/generic/ObjectStore.js
/**
* This is the main implementation of an object store.
* It uses a specified backend (which itself implements the very same interface)
* and builds upon this backend to answer queries.
* The main task of this object store is to manage transactions
* and ensure read isolation on these transactions.
* @implements {IObjectStore}
* @implements {ICommittable}
*/
class ObjectStore {
/**
* Creates a new object store based on a backend and an underlying database.
* The database is only used to determine the connection status.
* @param {IBackend} backend The backend underlying this object store.
* @param {JungleDB} db The database underlying the backend.
* @param {string} [name] The name of the object store if existent.
*/
constructor(backend, db, name) {
this._backend = backend;
this._db = db;
this._name = name;
/** @type {Array.<TransactionInfo>} */
this._stateStack = [];
this._backendInfo = new TransactionInfo(this._backend, null);
/**
* Maps transactions to their TransactionInfo objects.
* @type {Map.<number|string,TransactionInfo>}
*/
this._transactions = new Map();
this._transactions.set(ObjectStore.BACKEND_ID, this._backendInfo);
/**
* The set of currently open snapshots.
* @type {Set.<Snapshot>}
*/
this._snapshotManager = new SnapshotManager();
this._synchronizer = new Synchronizer();
}
/** @type {JungleDB} */
get jungleDB() {
return this._db;
}
/** @type {boolean} */
get connected() {
return this._backend.connected;
}
/** @type {IObjectStore} */
get _currentState() {
return this._stateStack.length > 0 ? this._stateStack[this._stateStack.length - 1].transaction : this._backend;
}
/** @type {TransactionInfo} */
get _currentStateInfo() {
return this._stateStack.length > 0 ? this._stateStack[this._stateStack.length - 1] : this._backendInfo;
}
/** @type {number|string} */
get _currentStateId() {
return this._stateStack.length > 0 ? this._stateStack[this._stateStack.length - 1].id : ObjectStore.BACKEND_ID;
}
/**
* A map of index names to indices.
* The index names can be used to access an index.
* @type {Map.<string,IIndex>}
*/
get indices() {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.indices;
}
/**
* Returns a promise of the object stored under the given primary key.
* Resolves to undefined if the key is not present in the object store.
* @param {string} key The primary key to look for.
* @param {RetrievalConfig} [options] Advanced retrieval options.
* @returns {Promise.<*>} A promise of the object stored under the given key, or undefined if not present.
*/
get(key, options = {}) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.get(key, options);
}
/**
* Inserts or replaces a key-value pair.
* Implicitly creates a transaction for this operation and commits it.
* @param {string} key The primary key to associate the value with.
* @param {*} value The value to write.
* @returns {Promise.<boolean>} A promise of the success outcome.
*/
async put(key, value) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
const tx = this.transaction();
try {
await tx.put(key, value);
} catch (err) {
await tx.abort();
throw err;
}
return tx.commit();
}
/**
* Removes the key-value pair of the given key from the object store.
* Implicitly creates a transaction for this operation and commits it.
* @param {string} key The primary key to delete along with the associated object.
* @returns {Promise.<boolean>} A promise of the success outcome.
*/
async remove(key) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
const tx = this.transaction();
try {
await tx.remove(key);
} catch (err) {
await tx.abort();
throw err;
}
return tx.commit();
}
/**
* Returns the object stored under the given primary key.
* Resolves to undefined if the key is not present in the object store.
* @param {string} key The primary key to look for.
* @param {SyncRetrievalConfig} [options] Advanced retrieval options.
* @returns {*} The object stored under the given key, or undefined if not present.
*/
getSync(key, options = {}) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (!this._currentState.isSynchronous()) throw new Error('Only works on synchronous backends');
return this._currentState.getSync(key, options);
}
/**
* A check whether a certain key is cached.
* @param {string} key The key to check.
* @return {boolean} A boolean indicating whether the key is already in the cache.
*/
isCached(key) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (!this._currentState.isSynchronous()) throw new Error('Only works on synchronous backends');
return this._currentState.isCached(key);
}
/**
* Returns a promise of a set of keys fulfilling the given query.
* If the optional query is not given, it returns all keys in the object store.
* If the query is of type KeyRange, it returns all keys of the object store being within this range.
* If the query is of type Query, it returns all keys fulfilling the query.
* @param {Query|KeyRange} [query] Optional query to check keys against.
* @param {number} [limit] Limits the number of results if given.
* @returns {Promise.<Set.<string>>} A promise of the set of keys relevant to the query.
*/
keys(query = null, limit = null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (query !== null && query instanceof Query) {
return query.keys(this._currentState, limit);
}
return this._currentState.keys(query, limit);
}
/**
* Returns a promise of an array of objects whose primary keys fulfill the given query.
* If the optional query is not given, it returns all objects in the object store.
* If the query is of type KeyRange, it returns all objects whose primary keys are within this range.
* If the query is of type Query, it returns all objects whose primary keys fulfill the query.
* @param {Query|KeyRange} [query] Optional query to check keys against.
* @param {number} [limit] Limits the number of results if given.
* @returns {Promise.<Array.<*>>} A promise of the array of objects relevant to the query.
*/
values(query = null, limit = null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (query !== null && query instanceof Query) {
return query.values(this._currentState, limit);
}
return this._currentState.values(query, limit);
}
/**
* Iterates over the keys in a given range and direction.
* The callback is called for each primary key fulfilling the query
* until it returns false and stops the iteration.
* @param {function(key:string):boolean} callback A predicate called for each key until returning false.
* @param {boolean} ascending Determines the direction of traversal.
* @param {KeyRange} query An optional KeyRange to narrow down the iteration space.
* @returns {Promise} The promise resolves after all elements have been streamed.
*/
keyStream(callback, ascending=true, query=null) {
return this._currentState.keyStream(callback, ascending, query);
}
/**
* Iterates over the keys and values in a given range and direction.
* The callback is called for each value and primary key fulfilling the query
* until it returns false and stops the iteration.
* @param {function(value:*, key:string):boolean} callback A predicate called for each value and key until returning false.
* @param {boolean} ascending Determines the direction of traversal.
* @param {KeyRange} query An optional KeyRange to narrow down the iteration space.
* @returns {Promise} The promise resolves after all elements have been streamed.
*/
valueStream(callback, ascending=true, query=null) {
return this._currentState.valueStream(callback, ascending, query);
}
/**
* Returns a promise of the object whose primary key is maximal for the given range.
* If the optional query is not given, it returns the object whose key is maximal.
* If the query is of type KeyRange, it returns the object whose primary key is maximal for the given range.
* @param {KeyRange} [query] Optional query to check keys against.
* @returns {Promise.<*>} A promise of the object relevant to the query.
*/
maxValue(query=null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.maxValue(query);
}
/**
* Returns a promise of the key being maximal for the given range.
* If the optional query is not given, it returns the maximal key.
* If the query is of type KeyRange, it returns the key being maximal for the given range.
* @param {KeyRange} [query] Optional query to check keys against.
* @returns {Promise.<string>} A promise of the key relevant to the query.
*/
maxKey(query=null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.maxKey(query);
}
/**
* Returns a promise of the key being minimal for the given range.
* If the optional query is not given, it returns the minimal key.
* If the query is of type KeyRange, it returns the key being minimal for the given range.
* @param {KeyRange} [query] Optional query to check keys against.
* @returns {Promise.<string>} A promise of the key relevant to the query.
*/
minKey(query=null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.minKey(query);
}
/**
* Returns a promise of the object whose primary key is minimal for the given range.
* If the optional query is not given, it returns the object whose key is minimal.
* If the query is of type KeyRange, it returns the object whose primary key is minimal for the given range.
* @param {KeyRange} [query] Optional query to check keys against.
* @returns {Promise.<*>} A promise of the object relevant to the query.
*/
minValue(query=null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.minValue(query);
}
/**
* Returns the count of entries in the given range.
* If the optional query is not given, it returns the count of entries in the object store.
* If the query is of type KeyRange, it returns the count of entries within the given range.
* @param {KeyRange} [query]
* @returns {Promise.<number>}
*/
count(query=null) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.count(query);
}
/**
* This method is only used by transactions internally to commit themselves to the corresponding object store.
* Thus, the tx argument is non-optional.
* A call to this method checks whether the given transaction can be applied and pushes it to
* the stack of applied transactions. When there is no other transaction requiring to enforce
* read isolation, the state will be flattened and all transactions will be applied to the backend.
* @param {Transaction} tx The transaction to be applied.
* @returns {Promise.<boolean>} A promise of the success outcome.
* @protected
*/
async commit(tx) {
if (!this._isCommittable(tx)) {
await this.abort(tx);
return false;
}
await this._commitInternal(tx);
return true;
}
/**
* Is used to probe whether a transaction can be committed.
* This, for example, includes a check whether another transaction has already been committed.
* @protected
* @param {Transaction} tx The transaction to be applied.
* @returns {boolean} Whether a commit will be successful.
*/
_isCommittable(tx) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (!(tx instanceof Transaction) || tx.state !== Transaction.STATE.OPEN || !this._transactions.has(tx.id)) {
throw new Error('Can only commit open transactions');
}
const info = this._transactions.get(tx.id);
// Another transaction was already committed.
return info.isCommittable();
}
/**
* Commits the transaction to the backend.
* @returns {Promise.<boolean>} A promise of the success outcome.
* @protected
*/
async _commitBackend() {
throw new Error('Cannot commit object stores');
}
/**
* Is used to commit the transaction.
* @protected
* @param {Transaction} tx The transaction to be applied.
* @returns {Promise} A promise that resolves upon successful application of the transaction.
*/
async _commitInternal(tx) {
const info = this._transactions.get(tx.id);
// Create new layer on stack (might be immediately removed by a state flattening).
if (this._stateStack.length >= ObjectStore.MAX_STACK_SIZE) {
Log.e(ObjectStore, `Transaction stack size exceeded ${this.toStringFull()}`);
throw new Error('Transaction stack size exceeded');
}
this._stateStack.push(info);
info.close();
// If this is the last transaction, we push our changes to the underlying layer.
// This only works if the given transaction does not have dependencies or the current state is the backend.
if (info.isFlushable()) {
// The underlying layer *has to be* the last one in our stack.
await this._flattenState(tx);
}
}
/**
* Allows to change the backend of a Transaction when the state has been flushed.
* @param parent
* @protected
*/
_setParent(parent) {
throw new Error('Unsupported operation');
}
/**
* This method is only used by transactions internally to abort themselves at the corresponding object store.
* Thus, the tx argument is non-optional.
* @param {Transaction} tx The transaction to be aborted.
* @returns {Promise.<boolean>} A promise of the success outcome.
* @protected
*/
async abort(tx) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
if (tx instanceof Snapshot) {
return this._snapshotManager.abortSnapshot(tx);
}
if (!(tx instanceof Transaction) || tx.state !== Transaction.STATE.OPEN || !this._transactions.has(tx.id)) {
throw new Error('Can only abort open transactions');
}
const info = this._transactions.get(tx.id);
info.abort();
// If this abortion resolves a conflict, try flattening the state.
if (info.parent && info.parent.numOpenChildren === 0) {
await this._flattenState();
}
this._transactions.delete(tx.id);
return true;
}
/**
* This internal method applies a transaction to the current state
* and tries flattening the stack of transactions.
* @param {Transaction} [tx] An optional transaction to apply to the current state.
* @returns {Promise.<boolean>} If a tx is given, this boolean indicates whether the state has been merged.
* If tx is not given, the return value is false and does not convey a meaning.
* @private
*/
_flattenState(tx) {
return this._synchronizer.push(() => this._flattenStateInternal(tx));
}
/**
* This internal method applies a transaction to the current state
* and tries flattening the stack of transactions.
* @param {Transaction} [tx] An optional transaction to apply to the current state.
* @returns {Promise.<boolean>} If a tx is given, this boolean indicates whether the state has been merged.
* If tx is not given, the return value is false and does not convey a meaning.
* @private
*/
async _flattenStateInternal(tx) {
// If there is a tx argument, merge it with the current state.
if (tx && (tx instanceof Transaction)) {
// Check whether the state can be flattened.
// For this, the following conditions have to hold:
// 1. the base state does not have open transactions
const info = this._transactions.get(tx.id);
if (!info.isFlushable()) {
return false;
}
// Applying is possible.
// We apply it first and upon successful application, we update transactions.
// This way, we ensure that intermediate reads still work and that transactions
// are still consistent even if the application fails.
const backend = info.parent.transaction;
const cleanup = () => {
// Change pointers in child transactions.
info.flush();
this._transactions.delete(tx.id);
// Look for tx on stack and remove it.
const statePosition = this._stateStack.indexOf(info);
if (statePosition >= 0) {
this._stateStack.splice(statePosition, 1);
}
this._flattenState().catch(Log.w.tag(ObjectStore));
};
if (tx.dependency === null) {
// If we apply to the backend, update the snapshots.
if (info.parent.isBackend()) {
await this._snapshotManager.applyTx(tx, backend);
}
try {
await backend._apply(tx);
} catch (err) {
// Change pointers in child transactions.
info.abort();
this._transactions.delete(tx.id);
// Look for tx on stack and remove it.
const statePosition = this._stateStack.indexOf(info);
if (statePosition >= 0) {
this._stateStack.splice(statePosition, 1);
}
tx._setAborted();
Log.e(ObjectStore, 'Error while applying transaction', err);
}
cleanup();
return true;
} else {
// We apply to the backend, so also update snapshots before the flush.
return await tx.dependency.onFlushable(tx, cleanup, () => this._snapshotManager.applyTx(tx, backend));
}
} else {
// Check both ends of the stack.
// Start with the easy part: The last state.
// Start flattening at the end.
while (this._stateStack.length > 0) {
if (!(await this._flattenStateInternal(this._currentState))) {
break;
}
}
// Then try flattening from the start.
while (this._stateStack.length > 0) {
if (!(await this._flattenStateInternal(this._stateStack[0].transaction))) {
break;
}
}
return false;
}
}
/**
* Returns the index of the given name.
* If the index does not exist, it returns undefined.
* @param {string} indexName The name of the requested index.
* @returns {IIndex} The index associated with the given name.
*/
index(indexName) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
return this._currentState.index(indexName);
}
/**
* Creates a new secondary index on the object store.
* Currently, all secondary indices are non-unique.
* They are defined by a key within the object or alternatively a path through the object to a specific subkey.
* For example, ['a', 'b'] could be used to use 'key' as the key in the following object:
* { 'a': { 'b': 'key' } }
* Secondary indices may be multiEntry, i.e., if the keyPath resolves to an iterable object, each item within can
* be used to find this entry.
* If a new object does not possess the key path associated with that index, it is simply ignored.
*
* This function may only be called before the database is connected.
* Moreover, it is only executed on database version updates or on first creation.
* @param {string} indexName The name of the index.
* @param {string|Array.<string>} [keyPath] The path to the key within the object. May be an array for multiple levels.
* @param {IndexConfig} [options] An options object.
*/
createIndex(indexName, keyPath, options = {}) {
return this._backend.createIndex(indexName, keyPath, options);
}
/**
* Deletes a secondary index from the object store.
* @param indexName
* @param {{upgradeCondition:?boolean|?function(oldVersion:number, newVersion:number):boolean}} [options]
*/
deleteIndex(indexName, options = {}) {
return this._backend.deleteIndex(indexName, options);
}
/**
* Creates a new transaction, ensuring read isolation
* on the most recently successfully committed state.
* @param {boolean} [enableWatchdog]
* @returns {Transaction} The transaction object.
*/
transaction(enableWatchdog=true) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
// Prefer synchronous transactions.
if (this._backend.isSynchronous()) return this.synchronousTransaction(enableWatchdog);
const tx = new Transaction(this, this._currentState, this, enableWatchdog);
this._transactions.set(tx.id, new TransactionInfo(tx, this._currentStateInfo));
return tx;
}
/**
* Creates a new synchronous transaction, ensuring read isolation
* on the most recently successfully committed state.
*
* WARNING: If not all required key-value-pairs are preloaded, the results of any call on a synchronous transaction
* might be wrong. Only use synchronous transactions, if unavoidable.
* @param {boolean} [enableWatchdog]
* @returns {SynchronousTransaction} The synchronous transaction object.
*/
synchronousTransaction(enableWatchdog=true) {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
const tx = new SynchronousTransaction(this, this._currentState, this, enableWatchdog);
this._transactions.set(tx.id, new TransactionInfo(tx, this._currentStateInfo));
return tx;
}
/**
* Checks whether an object store implements the ISynchronousObjectStore interface.
* @returns {boolean} The transaction object.
*/
isSynchronous() {
return this._backend.isSynchronous();
}
/**
* Creates an in-memory snapshot of the current state.
* This snapshot only maintains the differences between the state at the time of the snapshot
* and the current state.
* To stop maintaining the snapshot, it has to be aborted.
* @returns {Snapshot}
*/
snapshot() {
if (this._currentStateId !== ObjectStore.BACKEND_ID) {
return this._currentState.snapshot();
}
return this._snapshotManager.createSnapshot(this, this._currentState);
}
/**
* An object store is strongly connected to a backend.
* Hence, it does not store anything by itself and the _apply method is not supported.
* @param {Transaction} tx
* @returns {Promise.<boolean>}
* @protected
*/
async _apply(tx) {
throw new Error('Unsupported operation');
}
/**
* Empties the object store.
* @returns {Promise} The promise resolves after emptying the object store.
*/
async truncate() {
if (!this._backend.connected) throw new Error('JungleDB is not connected');
const tx = this.transaction();
await tx.truncate();
return tx.commit();
}
/**
* Closes the object store and potential connections.
* @returns {Promise} The promise resolves after closing the object store.
*/
close() {
// TODO perhaps use a different strategy here
if (this._stateStack.length > 0) {
throw new Error('Cannot close database while transactions are active');
}
return this._backend.close();
}
/**
* Method called to decode a single value.
* @param {*} value Value to be decoded.
* @param {string} key Key corresponding to the value.
* @returns {*} The decoded value.
*/
decode(value, key) {
return this._backend.decode(value, key);
}
/**
* Method called to encode a single value.
* @param {*} value Value to be encoded.
* @returns {*} The encoded value.
*/
encode(value) {
return this._backend.encode(value);
}
toStringFull() {
return `ObjectStore{
stack=[${this._stateStack.map(tx => `{tx=${tx}, open=${this._openTransactions.get(tx.id) ? this._openTransactions.get(tx.id).size : 0}}`)}],
db=${this._db}/${this._name ? this._name : 'unnamed'}
}`;
}
toString() {
return `ObjectStore{stackSize=${this._stateStack.length}, db=${this._db}/${this._name ? this._name : 'unnamed'}}`;
}
}
/** @type {number} The maximum number of states to stack. */
ObjectStore.MAX_STACK_SIZE = 10;
ObjectStore.BACKEND_ID = 'backend';
Class.register(ObjectStore);
class TransactionInfo {
/**
* @param {Transaction} transaction
* @param {TransactionInfo} parentInfo
* @param {Array.<TransactionInfo>} children
*/
constructor(transaction, parentInfo, children = []) {
this.transaction = transaction;
this.children = children;
this._parentInfo = parentInfo;
this._open = true;
if (this._parentInfo) {
this._parentInfo.addChild(this);
}
}
/**
* @param {TransactionInfo} transaction
*/
addChild(transaction) {
this.children.push(transaction);
}
/**
* @param {TransactionInfo} transaction
*/
removeChild(transaction) {
const i = this.children.indexOf(transaction);
if (i >= 0) {
this.children.splice(i, 1);
}
}
flush() {
if (!this.isBackend()) {
const parent = this.parent;
this.parent.removeChild(this);
for (const /** @type {TransactionInfo} */ child of this.children.slice()) {
child.parent = parent;
}
this.children = [];
this._parentInfo = null;
}
}
abort() {
if (!this.isBackend()) {
this.parent.removeChild(this);
}
}
close() {
this._open = false;
}
/** @type {TransactionInfo} */
get parent() {
return this._parentInfo;
}
/**
* @param {TransactionInfo} parent
*/
set parent(parent) {
this.parent.removeChild(this);
this._parentInfo = parent;
this.parent.addChild(this);
this.transaction._setParent(parent.transaction);
}
/** @type {number} */
get id() {
return this.isBackend() ? ObjectStore.BACKEND_ID : this.transaction.id;
}
/**
* @returns {boolean}
*/
isBackend() {
return this._parentInfo === null;
}
/**
* @returns {boolean}
*/
isOpen() {
return this._open;
}
/**
* @type {number}
*/
get numOpenChildren() {
return this.children.filter(child => child.isOpen()).length;
}
/**
* @returns {*|boolean}
*/
isCommittable() {
return this._parentInfo && this._parentInfo.children.every(child => child.isOpen());
}
/**
* @returns {boolean}
*/
isFlushable() {
return this.parent && this.parent.numOpenChildren === 0 && (this.transaction.dependency === null || this.parent.isBackend());
}
toString() {
return this.transaction.toStringShort();
}
}