src/collection.js
"use strict";
import BaseAdapter from "./adapters/base";
import IDB from "./adapters/IDB";
import { waterfall, deepEqual } from "./utils";
import { v4 as uuid4 } from "uuid";
import { omitKeys, RE_RECORD_ID } from "./utils";
const RECORD_FIELDS_TO_CLEAN = ["_status"];
const AVAILABLE_HOOKS = ["incoming-changes"];
/**
* Compare two records omitting local fields and synchronization
* attributes (like _status and last_modified)
* @param {Object} a A record to compare.
* @param {Object} b A record to compare.
* @param {Array} localFields Additional fields to ignore during the comparison
* @return {boolean}
*/
export function recordsEqual(a, b, localFields = []) {
const fieldsToClean = RECORD_FIELDS_TO_CLEAN.concat(["last_modified"]).concat(
localFields
);
const cleanLocal = r => omitKeys(r, fieldsToClean);
return deepEqual(cleanLocal(a), cleanLocal(b));
}
/**
* Synchronization result object.
*/
export class SyncResultObject {
/**
* Object default values.
* @type {Object}
*/
static get defaults() {
return {
ok: true,
lastModified: null,
errors: [],
created: [],
updated: [],
deleted: [],
published: [],
conflicts: [],
skipped: [],
resolved: [],
};
}
/**
* Public constructor.
*/
constructor() {
/**
* Current synchronization result status; becomes `false` when conflicts or
* errors are registered.
* @type {Boolean}
*/
this.ok = true;
Object.assign(this, SyncResultObject.defaults);
}
/**
* Adds entries for a given result type.
*
* @param {String} type The result type.
* @param {Array} entries The result entries.
* @return {SyncResultObject}
*/
add(type, entries) {
if (!Array.isArray(this[type])) {
return;
}
if (!Array.isArray(entries)) {
entries = [entries];
}
// Deduplicate entries by id. If the values don't have `id` attribute, just
// keep all.
const recordsWithoutId = new Set();
const recordsById = new Map();
function addOneRecord(record) {
if (!record.id) {
recordsWithoutId.add(record);
} else {
recordsById.set(record.id, record);
}
}
this[type].forEach(addOneRecord);
entries.forEach(addOneRecord);
this[type] = Array.from(recordsById.values()).concat(
Array.from(recordsWithoutId)
);
this.ok = this.errors.length + this.conflicts.length === 0;
return this;
}
/**
* Reinitializes result entries for a given result type.
*
* @param {String} type The result type.
* @return {SyncResultObject}
*/
reset(type) {
this[type] = SyncResultObject.defaults[type];
this.ok = this.errors.length + this.conflicts.length === 0;
return this;
}
}
export class ServerWasFlushedError extends Error {
constructor(clientTimestamp, serverTimestamp, message) {
super(message);
if (Error.captureStackTrace) {
Error.captureStackTrace(this, ServerWasFlushedError);
}
this.clientTimestamp = clientTimestamp;
this.serverTimestamp = serverTimestamp;
}
}
function createUUIDSchema() {
return {
generate() {
return uuid4();
},
validate(id) {
return typeof id == "string" && RE_RECORD_ID.test(id);
},
};
}
/**
* IDSchema for when using kinto.js as a key-value store.
* Using this IDSchema requires you to set a property as the id.
* This will be the property used to retrieve this record.
*
* @example
* const exampleCollection = db.collection("example", { idSchema: createKeyValueStoreIdSchema() })
* await exampleCollection.create({ title: "How to tie a tie", favoriteColor: "blue", id: "user123" }, { useRecordId: true })
* await exampleCollection.getAny("user123")
*/
export function createKeyValueStoreIdSchema() {
return {
generate() {
throw new Error("createKeyValueStoreIdSchema() does not generate an id");
},
validate() {
return true;
},
};
}
function markStatus(record, status) {
return { ...record, _status: status };
}
function markDeleted(record) {
return markStatus(record, "deleted");
}
function markSynced(record) {
return markStatus(record, "synced");
}
/**
* Import a remote change into the local database.
*
* @param {IDBTransactionProxy} transaction The transaction handler.
* @param {Object} remote The remote change object to import.
* @param {Array<String>} localFields The list of fields that remain local.
* @return {Object}
*/
function importChange(transaction, remote, localFields) {
const local = transaction.get(remote.id);
if (!local) {
// Not found locally but remote change is marked as deleted; skip to
// avoid recreation.
if (remote.deleted) {
return { type: "skipped", data: remote };
}
const synced = markSynced(remote);
transaction.create(synced);
return { type: "created", data: synced };
}
// Compare local and remote, ignoring local fields.
const isIdentical = recordsEqual(local, remote, localFields);
// Apply remote changes on local record.
const synced = { ...local, ...markSynced(remote) };
// Detect or ignore conflicts if record has also been modified locally.
if (local._status !== "synced") {
// Locally deleted, unsynced: scheduled for remote deletion.
if (local._status === "deleted") {
return { type: "skipped", data: local };
}
if (isIdentical) {
// If records are identical, import anyway, so we bump the
// local last_modified value from the server and set record
// status to "synced".
transaction.update(synced);
return { type: "updated", data: { old: local, new: synced } };
}
if (
local.last_modified !== undefined &&
local.last_modified === remote.last_modified
) {
// If our local version has the same last_modified as the remote
// one, this represents an object that corresponds to a resolved
// conflict. Our local version represents the final output, so
// we keep that one. (No transaction operation to do.)
// But if our last_modified is undefined,
// that means we've created the same object locally as one on
// the server, which *must* be a conflict.
return { type: "void" };
}
return {
type: "conflicts",
data: { type: "incoming", local: local, remote: remote },
};
}
// Local record was synced.
if (remote.deleted) {
transaction.delete(remote.id);
return { type: "deleted", data: local };
}
// Import locally.
transaction.update(synced);
// if identical, simply exclude it from all SyncResultObject lists
const type = isIdentical ? "void" : "updated";
return { type, data: { old: local, new: synced } };
}
/**
* Abstracts a collection of records stored in the local database, providing
* CRUD operations and synchronization helpers.
*/
export default class Collection {
/**
* Constructor.
*
* Options:
* - `{BaseAdapter} adapter` The DB adapter (default: `IDB`)
*
* @param {String} bucket The bucket identifier.
* @param {String} name The collection name.
* @param {Api} api The Api instance.
* @param {Object} options The options object.
*/
constructor(bucket, name, api, options = {}) {
this._bucket = bucket;
this._name = name;
this._lastModified = null;
const DBAdapter = options.adapter || IDB;
if (!DBAdapter) {
throw new Error("No adapter provided");
}
const db = new DBAdapter(`${bucket}/${name}`, options.adapterOptions);
if (!(db instanceof BaseAdapter)) {
throw new Error("Unsupported adapter.");
}
// public properties
/**
* The db adapter instance
* @type {BaseAdapter}
*/
this.db = db;
/**
* The Api instance.
* @type {KintoClient}
*/
this.api = api;
/**
* The event emitter instance.
* @type {EventEmitter}
*/
this.events = options.events;
/**
* The IdSchema instance.
* @type {Object}
*/
this.idSchema = this._validateIdSchema(options.idSchema);
/**
* The list of remote transformers.
* @type {Array}
*/
this.remoteTransformers = this._validateRemoteTransformers(
options.remoteTransformers
);
/**
* The list of hooks.
* @type {Object}
*/
this.hooks = this._validateHooks(options.hooks);
/**
* The list of fields names that will remain local.
* @type {Array}
*/
this.localFields = options.localFields || [];
}
/**
* The collection name.
* @type {String}
*/
get name() {
return this._name;
}
/**
* The bucket name.
* @type {String}
*/
get bucket() {
return this._bucket;
}
/**
* The last modified timestamp.
* @type {Number}
*/
get lastModified() {
return this._lastModified;
}
/**
* Synchronization strategies. Available strategies are:
*
* - `MANUAL`: Conflicts will be reported in a dedicated array.
* - `SERVER_WINS`: Conflicts are resolved using remote data.
* - `CLIENT_WINS`: Conflicts are resolved using local data.
*
* @type {Object}
*/
static get strategy() {
return {
CLIENT_WINS: "client_wins",
SERVER_WINS: "server_wins",
MANUAL: "manual",
};
}
/**
* Validates an idSchema.
*
* @param {Object|undefined} idSchema
* @return {Object}
*/
_validateIdSchema(idSchema) {
if (typeof idSchema === "undefined") {
return createUUIDSchema();
}
if (typeof idSchema !== "object") {
throw new Error("idSchema must be an object.");
} else if (typeof idSchema.generate !== "function") {
throw new Error("idSchema must provide a generate function.");
} else if (typeof idSchema.validate !== "function") {
throw new Error("idSchema must provide a validate function.");
}
return idSchema;
}
/**
* Validates a list of remote transformers.
*
* @param {Array|undefined} remoteTransformers
* @return {Array}
*/
_validateRemoteTransformers(remoteTransformers) {
if (typeof remoteTransformers === "undefined") {
return [];
}
if (!Array.isArray(remoteTransformers)) {
throw new Error("remoteTransformers should be an array.");
}
return remoteTransformers.map(transformer => {
if (typeof transformer !== "object") {
throw new Error("A transformer must be an object.");
} else if (typeof transformer.encode !== "function") {
throw new Error("A transformer must provide an encode function.");
} else if (typeof transformer.decode !== "function") {
throw new Error("A transformer must provide a decode function.");
}
return transformer;
});
}
/**
* Validate the passed hook is correct.
*
* @param {Array|undefined} hook.
* @return {Array}
**/
_validateHook(hook) {
if (!Array.isArray(hook)) {
throw new Error("A hook definition should be an array of functions.");
}
return hook.map(fn => {
if (typeof fn !== "function") {
throw new Error("A hook definition should be an array of functions.");
}
return fn;
});
}
/**
* Validates a list of hooks.
*
* @param {Object|undefined} hooks
* @return {Object}
*/
_validateHooks(hooks) {
if (typeof hooks === "undefined") {
return {};
}
if (Array.isArray(hooks)) {
throw new Error("hooks should be an object, not an array.");
}
if (typeof hooks !== "object") {
throw new Error("hooks should be an object.");
}
const validatedHooks = {};
for (const hook in hooks) {
if (!AVAILABLE_HOOKS.includes(hook)) {
throw new Error(
"The hook should be one of " + AVAILABLE_HOOKS.join(", ")
);
}
validatedHooks[hook] = this._validateHook(hooks[hook]);
}
return validatedHooks;
}
/**
* Deletes every records in the current collection and marks the collection as
* never synced.
*
* @return {Promise}
*/
async clear() {
await this.db.clear();
await this.db.saveMetadata(null);
await this.db.saveLastModified(null);
return { data: [], permissions: {} };
}
/**
* Encodes a record.
*
* @param {String} type Either "remote" or "local".
* @param {Object} record The record object to encode.
* @return {Promise}
*/
_encodeRecord(type, record) {
if (!this[`${type}Transformers`].length) {
return Promise.resolve(record);
}
return waterfall(
this[`${type}Transformers`].map(transformer => {
return record => transformer.encode(record);
}),
record
);
}
/**
* Decodes a record.
*
* @param {String} type Either "remote" or "local".
* @param {Object} record The record object to decode.
* @return {Promise}
*/
_decodeRecord(type, record) {
if (!this[`${type}Transformers`].length) {
return Promise.resolve(record);
}
return waterfall(
this[`${type}Transformers`].reverse().map(transformer => {
return record => transformer.decode(record);
}),
record
);
}
/**
* Adds a record to the local database, asserting that none
* already exist with this ID.
*
* Note: If either the `useRecordId` or `synced` options are true, then the
* record object must contain the id field to be validated. If none of these
* options are true, an id is generated using the current IdSchema; in this
* case, the record passed must not have an id.
*
* Options:
* - {Boolean} synced Sets record status to "synced" (default: `false`).
* - {Boolean} useRecordId Forces the `id` field from the record to be used,
* instead of one that is generated automatically
* (default: `false`).
*
* @param {Object} record
* @param {Object} options
* @return {Promise}
*/
create(record, options = { useRecordId: false, synced: false }) {
// Validate the record and its ID (if any), even though this
// validation is also done in the CollectionTransaction method,
// because we need to pass the ID to preloadIds.
const reject = msg => Promise.reject(new Error(msg));
if (typeof record !== "object") {
return reject("Record is not an object.");
}
if (
(options.synced || options.useRecordId) &&
!record.hasOwnProperty("id")
) {
return reject(
"Missing required Id; synced and useRecordId options require one"
);
}
if (
!options.synced &&
!options.useRecordId &&
record.hasOwnProperty("id")
) {
return reject("Extraneous Id; can't create a record having one set.");
}
const newRecord = {
...record,
id:
options.synced || options.useRecordId
? record.id
: this.idSchema.generate(record),
_status: options.synced ? "synced" : "created",
};
if (!this.idSchema.validate(newRecord.id)) {
return reject(`Invalid Id: ${newRecord.id}`);
}
return this.execute(txn => txn.create(newRecord), {
preloadIds: [newRecord.id],
}).catch(err => {
if (options.useRecordId) {
throw new Error(
"Couldn't create record. It may have been virtually deleted."
);
}
throw err;
});
}
/**
* Like {@link CollectionTransaction#update}, but wrapped in its own transaction.
*
* Options:
* - {Boolean} synced: Sets record status to "synced" (default: false)
* - {Boolean} patch: Extends the existing record instead of overwriting it
* (default: false)
*
* @param {Object} record
* @param {Object} options
* @return {Promise}
*/
update(record, options = { synced: false, patch: false }) {
// Validate the record and its ID, even though this validation is
// also done in the CollectionTransaction method, because we need
// to pass the ID to preloadIds.
if (typeof record !== "object") {
return Promise.reject(new Error("Record is not an object."));
}
if (!record.hasOwnProperty("id")) {
return Promise.reject(new Error("Cannot update a record missing id."));
}
if (!this.idSchema.validate(record.id)) {
return Promise.reject(new Error(`Invalid Id: ${record.id}`));
}
return this.execute(txn => txn.update(record, options), {
preloadIds: [record.id],
});
}
/**
* Like {@link CollectionTransaction#upsert}, but wrapped in its own transaction.
*
* @param {Object} record
* @return {Promise}
*/
upsert(record) {
// Validate the record and its ID, even though this validation is
// also done in the CollectionTransaction method, because we need
// to pass the ID to preloadIds.
if (typeof record !== "object") {
return Promise.reject(new Error("Record is not an object."));
}
if (!record.hasOwnProperty("id")) {
return Promise.reject(new Error("Cannot update a record missing id."));
}
if (!this.idSchema.validate(record.id)) {
return Promise.reject(new Error(`Invalid Id: ${record.id}`));
}
return this.execute(txn => txn.upsert(record), { preloadIds: [record.id] });
}
/**
* Like {@link CollectionTransaction#get}, but wrapped in its own transaction.
*
* Options:
* - {Boolean} includeDeleted: Include virtually deleted records.
*
* @param {String} id
* @param {Object} options
* @return {Promise}
*/
get(id, options = { includeDeleted: false }) {
return this.execute(txn => txn.get(id, options), { preloadIds: [id] });
}
/**
* Like {@link CollectionTransaction#getAny}, but wrapped in its own transaction.
*
* @param {String} id
* @return {Promise}
*/
getAny(id) {
return this.execute(txn => txn.getAny(id), { preloadIds: [id] });
}
/**
* Same as {@link Collection#delete}, but wrapped in its own transaction.
*
* Options:
* - {Boolean} virtual: When set to `true`, doesn't actually delete the record,
* update its `_status` attribute to `deleted` instead (default: true)
*
* @param {String} id The record's Id.
* @param {Object} options The options object.
* @return {Promise}
*/
delete(id, options = { virtual: true }) {
return this.execute(
transaction => {
return transaction.delete(id, options);
},
{ preloadIds: [id] }
);
}
/**
* Same as {@link Collection#deleteAll}, but wrapped in its own transaction, execulding the parameter.
*
* @return {Promise}
*/
async deleteAll() {
const { data } = await this.list({}, { includeDeleted: false });
const recordIds = data.map(record => record.id);
return this.execute(
transaction => {
return transaction.deleteAll(recordIds);
},
{ preloadIds: recordIds }
);
}
/**
* The same as {@link CollectionTransaction#deleteAny}, but wrapped
* in its own transaction.
*
* @param {String} id The record's Id.
* @return {Promise}
*/
deleteAny(id) {
return this.execute(txn => txn.deleteAny(id), { preloadIds: [id] });
}
/**
* Lists records from the local database.
*
* Params:
* - {Object} filters Filter the results (default: `{}`).
* - {String} order The order to apply (default: `-last_modified`).
*
* Options:
* - {Boolean} includeDeleted: Include virtually deleted records.
*
* @param {Object} params The filters and order to apply to the results.
* @param {Object} options The options object.
* @return {Promise}
*/
async list(params = {}, options = { includeDeleted: false }) {
params = { order: "-last_modified", filters: {}, ...params };
const results = await this.db.list(params);
let data = results;
if (!options.includeDeleted) {
data = results.filter(record => record._status !== "deleted");
}
return { data, permissions: {} };
}
/**
* Imports remote changes into the local database.
* This method is in charge of detecting the conflicts, and resolve them
* according to the specified strategy.
* @param {SyncResultObject} syncResultObject The sync result object.
* @param {Array} decodedChanges The list of changes to import in the local database.
* @param {String} strategy The {@link Collection.strategy} (default: MANUAL)
* @return {Promise}
*/
async importChanges(
syncResultObject,
decodedChanges,
strategy = Collection.strategy.MANUAL
) {
// Retrieve records matching change ids.
try {
const { imports, resolved } = await this.db.execute(
transaction => {
const imports = decodedChanges.map(remote => {
// Store remote change into local database.
return importChange(transaction, remote, this.localFields);
});
const conflicts = imports
.filter(i => i.type === "conflicts")
.map(i => i.data);
const resolved = this._handleConflicts(
transaction,
conflicts,
strategy
);
return { imports, resolved };
},
{ preload: decodedChanges.map(record => record.id) }
);
// Lists of created/updated/deleted records
imports.forEach(({ type, data }) => syncResultObject.add(type, data));
// Automatically resolved conflicts (if not manual)
if (resolved.length > 0) {
syncResultObject.reset("conflicts").add("resolved", resolved);
}
} catch (err) {
const data = {
type: "incoming",
message: err.message,
stack: err.stack,
};
// XXX one error of the whole transaction instead of per atomic op
syncResultObject.add("errors", data);
}
return syncResultObject;
}
/**
* Imports the responses of pushed changes into the local database.
* Basically it stores the timestamp assigned by the server into the local
* database.
* @param {SyncResultObject} syncResultObject The sync result object.
* @param {Array} toApplyLocally The list of changes to import in the local database.
* @param {Array} conflicts The list of conflicts that have to be resolved.
* @param {String} strategy The {@link Collection.strategy}.
* @return {Promise}
*/
async _applyPushedResults(
syncResultObject,
toApplyLocally,
conflicts,
strategy = Collection.strategy.MANUAL
) {
const toDeleteLocally = toApplyLocally.filter(r => r.deleted);
const toUpdateLocally = toApplyLocally.filter(r => !r.deleted);
const { published, resolved } = await this.db.execute(transaction => {
const updated = toUpdateLocally.map(record => {
const synced = markSynced(record);
transaction.update(synced);
return synced;
});
const deleted = toDeleteLocally.map(record => {
transaction.delete(record.id);
// Amend result data with the deleted attribute set
return { id: record.id, deleted: true };
});
const published = updated.concat(deleted);
// Handle conflicts, if any
const resolved = this._handleConflicts(transaction, conflicts, strategy);
return { published, resolved };
});
syncResultObject.add("published", published);
if (resolved.length > 0) {
syncResultObject
.reset("conflicts")
.reset("resolved")
.add("resolved", resolved);
}
return syncResultObject;
}
/**
* Handles synchronization conflicts according to specified strategy.
*
* @param {SyncResultObject} result The sync result object.
* @param {String} strategy The {@link Collection.strategy}.
* @return {Promise<Array<Object>>} The resolved conflicts, as an
* array of {accepted, rejected} objects
*/
_handleConflicts(transaction, conflicts, strategy) {
if (strategy === Collection.strategy.MANUAL) {
return [];
}
return conflicts.map(conflict => {
const resolution =
strategy === Collection.strategy.CLIENT_WINS
? conflict.local
: conflict.remote;
const rejected =
strategy === Collection.strategy.CLIENT_WINS
? conflict.remote
: conflict.local;
let accepted, status, id;
if (resolution === null) {
// We "resolved" with the server-side deletion. Delete locally.
// This only happens during SERVER_WINS because the local
// version of a record can never be null.
// We can get "null" from the remote side if we got a conflict
// and there is no remote version available; see kinto-http.js
// batch.js:aggregate.
transaction.delete(conflict.local.id);
accepted = null;
// The record was deleted, but that status is "synced" with
// the server, so we don't need to push the change.
status = "synced";
id = conflict.local.id;
} else {
const updated = this._resolveRaw(conflict, resolution);
transaction.update(updated);
accepted = updated;
status = updated._status;
id = updated.id;
}
return { rejected, accepted, id, _status: status };
});
}
/**
* Execute a bunch of operations in a transaction.
*
* This transaction should be atomic -- either all of its operations
* will succeed, or none will.
*
* The argument to this function is itself a function which will be
* called with a {@link CollectionTransaction}. Collection methods
* are available on this transaction, but instead of returning
* promises, they are synchronous. execute() returns a Promise whose
* value will be the return value of the provided function.
*
* Most operations will require access to the record itself, which
* must be preloaded by passing its ID in the preloadIds option.
*
* Options:
* - {Array} preloadIds: list of IDs to fetch at the beginning of
* the transaction
*
* @return {Promise} Resolves with the result of the given function
* when the transaction commits.
*/
execute(doOperations, { preloadIds = [] } = {}) {
for (const id of preloadIds) {
if (!this.idSchema.validate(id)) {
return Promise.reject(Error(`Invalid Id: ${id}`));
}
}
return this.db.execute(
transaction => {
const txn = new CollectionTransaction(this, transaction);
const result = doOperations(txn);
txn.emitEvents();
return result;
},
{ preload: preloadIds }
);
}
/**
* Resets the local records as if they were never synced; existing records are
* marked as newly created, deleted records are dropped.
*
* A next call to {@link Collection.sync} will thus republish the whole
* content of the local collection to the server.
*
* @return {Promise} Resolves with the number of processed records.
*/
async resetSyncStatus() {
const unsynced = await this.list(
{ filters: { _status: ["deleted", "synced"] }, order: "" },
{ includeDeleted: true }
);
await this.db.execute(transaction => {
unsynced.data.forEach(record => {
if (record._status === "deleted") {
// Garbage collect deleted records.
transaction.delete(record.id);
} else {
// Records that were synced become «created».
transaction.update({
...record,
last_modified: undefined,
_status: "created",
});
}
});
});
this._lastModified = null;
await this.db.saveLastModified(null);
return unsynced.data.length;
}
/**
* Returns an object containing two lists:
*
* - `toDelete`: unsynced deleted records we can safely delete;
* - `toSync`: local updates to send to the server.
*
* @return {Promise}
*/
async gatherLocalChanges() {
const unsynced = await this.list({
filters: { _status: ["created", "updated"] },
order: "",
});
const deleted = await this.list(
{ filters: { _status: "deleted" }, order: "" },
{ includeDeleted: true }
);
return await Promise.all(
unsynced.data
.concat(deleted.data)
.map(this._encodeRecord.bind(this, "remote"))
);
}
/**
* Fetch remote changes, import them to the local database, and handle
* conflicts according to `options.strategy`. Then, updates the passed
* {@link SyncResultObject} with import results.
*
* Options:
* - {String} strategy: The selected sync strategy.
* - {String} expectedTimestamp: A timestamp to use as a "cache busting" query parameter.
* - {Array<String>} exclude: A list of record ids to exclude from pull.
* - {Object} headers: The HTTP headers to use in the request.
* - {int} retry: The number of retries to do if the HTTP request fails.
* - {int} lastModified: The timestamp to use in `?_since` query.
*
* @param {KintoClient.Collection} client Kinto client Collection instance.
* @param {SyncResultObject} syncResultObject The sync result object.
* @param {Object} options The options object.
* @return {Promise}
*/
async pullChanges(client, syncResultObject, options = {}) {
if (!syncResultObject.ok) {
return syncResultObject;
}
const since = this.lastModified
? this.lastModified
: await this.db.getLastModified();
options = {
strategy: Collection.strategy.MANUAL,
lastModified: since,
headers: {},
...options,
};
// Optionally ignore some records when pulling for changes.
// (avoid redownloading our own changes on last step of #sync())
let filters;
if (options.exclude) {
// Limit the list of excluded records to the first 50 records in order
// to remain under de-facto URL size limit (~2000 chars).
// http://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184#417184
const exclude_id = options.exclude
.slice(0, 50)
.map(r => r.id)
.join(",");
filters = { exclude_id };
}
if (options.expectedTimestamp) {
filters = {
...filters,
_expected: options.expectedTimestamp,
};
}
// First fetch remote changes from the server
const { data, last_modified } = await client.listRecords({
// Since should be ETag (see https://github.com/Kinto/kinto.js/issues/356)
since: options.lastModified ? `${options.lastModified}` : undefined,
headers: options.headers,
retry: options.retry,
// Fetch every page by default (FIXME: option to limit pages, see #277)
pages: Infinity,
filters,
});
// last_modified is the ETag header value (string).
// For retro-compatibility with first kinto.js versions
// parse it to integer.
const unquoted = last_modified ? parseInt(last_modified, 10) : undefined;
// Check if server was flushed.
// This is relevant for the Kinto demo server
// (and thus for many new comers).
const localSynced = options.lastModified;
const serverChanged = unquoted > options.lastModified;
const emptyCollection = data.length === 0;
if (!options.exclude && localSynced && serverChanged && emptyCollection) {
const e = new ServerWasFlushedError(
localSynced,
unquoted,
"Server has been flushed. Client Side Timestamp: " +
localSynced +
" Server Side Timestamp: " +
unquoted
);
throw e;
}
syncResultObject.lastModified = unquoted;
// Decode incoming changes.
const decodedChanges = await Promise.all(
data.map(change => {
return this._decodeRecord("remote", change);
})
);
// Hook receives decoded records.
const payload = { lastModified: unquoted, changes: decodedChanges };
const afterHooks = await this.applyHook("incoming-changes", payload);
// No change, nothing to import.
if (afterHooks.changes.length > 0) {
// Reflect these changes locally
await this.importChanges(
syncResultObject,
afterHooks.changes,
options.strategy
);
}
return syncResultObject;
}
applyHook(hookName, payload) {
if (typeof this.hooks[hookName] == "undefined") {
return Promise.resolve(payload);
}
return waterfall(
this.hooks[hookName].map(hook => {
return record => {
const result = hook(payload, this);
const resultThenable = result && typeof result.then === "function";
const resultChanges = result && result.hasOwnProperty("changes");
if (!(resultThenable || resultChanges)) {
throw new Error(
`Invalid return value for hook: ${JSON.stringify(
result
)} has no 'then()' or 'changes' properties`
);
}
return result;
};
}),
payload
);
}
/**
* Publish local changes to the remote server and updates the passed
* {@link SyncResultObject} with publication results.
*
* Options:
* - {String} strategy: The selected sync strategy.
* - {Object} headers: The HTTP headers to use in the request.
* - {int} retry: The number of retries to do if the HTTP request fails.
*
* @param {KintoClient.Collection} client Kinto client Collection instance.
* @param {SyncResultObject} syncResultObject The sync result object.
* @param {Object} changes The change object.
* @param {Array} changes.toDelete The list of records to delete.
* @param {Array} changes.toSync The list of records to create/update.
* @param {Object} options The options object.
* @return {Promise}
*/
async pushChanges(client, changes, syncResultObject, options = {}) {
if (!syncResultObject.ok) {
return syncResultObject;
}
const safe =
!options.strategy || options.strategy !== Collection.CLIENT_WINS;
const toDelete = changes.filter(r => r._status == "deleted");
const toSync = changes.filter(r => r._status != "deleted");
// Perform a batch request with every changes.
const synced = await client.batch(
batch => {
toDelete.forEach(r => {
// never published locally deleted records should not be pusblished
if (r.last_modified) {
batch.deleteRecord(r);
}
});
toSync.forEach(r => {
// Clean local fields (like _status) before sending to server.
const published = this.cleanLocalFields(r);
if (r._status === "created") {
batch.createRecord(published);
} else {
batch.updateRecord(published);
}
});
},
{
headers: options.headers,
retry: options.retry,
safe,
aggregate: true,
}
);
// Store outgoing errors into sync result object
syncResultObject.add(
"errors",
synced.errors.map(e => ({ ...e, type: "outgoing" }))
);
// Store outgoing conflicts into sync result object
const conflicts = [];
for (const { type, local, remote } of synced.conflicts) {
// Note: we ensure that local data are actually available, as they may
// be missing in the case of a published deletion.
const safeLocal = (local && local.data) || { id: remote.id };
const realLocal = await this._decodeRecord("remote", safeLocal);
// We can get "null" from the remote side if we got a conflict
// and there is no remote version available; see kinto-http.js
// batch.js:aggregate.
const realRemote = remote && (await this._decodeRecord("remote", remote));
const conflict = { type, local: realLocal, remote: realRemote };
conflicts.push(conflict);
}
syncResultObject.add("conflicts", conflicts);
// Records that must be deleted are either deletions that were pushed
// to server (published) or deleted records that were never pushed (skipped).
const missingRemotely = synced.skipped.map(r => ({ ...r, deleted: true }));
// For created and updated records, the last_modified coming from server
// will be stored locally.
// Reflect publication results locally using the response from
// the batch request.
const published = synced.published.map(c => c.data);
const toApplyLocally = published.concat(missingRemotely);
// Apply the decode transformers, if any
const decoded = await Promise.all(
toApplyLocally.map(record => {
return this._decodeRecord("remote", record);
})
);
// We have to update the local records with the responses of the server
// (eg. last_modified values etc.).
if (decoded.length > 0 || conflicts.length > 0) {
await this._applyPushedResults(
syncResultObject,
decoded,
conflicts,
options.strategy
);
}
return syncResultObject;
}
/**
* Return a copy of the specified record without the local fields.
*
* @param {Object} record A record with potential local fields.
* @return {Object}
*/
cleanLocalFields(record) {
const localKeys = RECORD_FIELDS_TO_CLEAN.concat(this.localFields);
return omitKeys(record, localKeys);
}
/**
* Resolves a conflict, updating local record according to proposed
* resolution — keeping remote record `last_modified` value as a reference for
* further batch sending.
*
* @param {Object} conflict The conflict object.
* @param {Object} resolution The proposed record.
* @return {Promise}
*/
resolve(conflict, resolution) {
return this.db.execute(transaction => {
const updated = this._resolveRaw(conflict, resolution);
transaction.update(updated);
return { data: updated, permissions: {} };
});
}
/**
* @private
*/
_resolveRaw(conflict, resolution) {
const resolved = {
...resolution,
// Ensure local record has the latest authoritative timestamp
last_modified: conflict.remote && conflict.remote.last_modified,
};
// If the resolution object is strictly equal to the
// remote record, then we can mark it as synced locally.
// Otherwise, mark it as updated (so that the resolution is pushed).
const synced = deepEqual(resolved, conflict.remote);
return markStatus(resolved, synced ? "synced" : "updated");
}
/**
* Synchronize remote and local data. The promise will resolve with a
* {@link SyncResultObject}, though will reject:
*
* - if the server is currently backed off;
* - if the server has been detected flushed.
*
* Options:
* - {Object} headers: HTTP headers to attach to outgoing requests.
* - {String} expectedTimestamp: A timestamp to use as a "cache busting" query parameter.
* - {Number} retry: Number of retries when server fails to process the request (default: 1).
* - {Collection.strategy} strategy: See {@link Collection.strategy}.
* - {Boolean} ignoreBackoff: Force synchronization even if server is currently
* backed off.
* - {String} bucket: The remove bucket id to use (default: null)
* - {String} collection: The remove collection id to use (default: null)
* - {String} remote The remote Kinto server endpoint to use (default: null).
*
* @param {Object} options Options.
* @return {Promise}
* @throws {Error} If an invalid remote option is passed.
*/
async sync(
options = {
strategy: Collection.strategy.MANUAL,
headers: {},
retry: 1,
ignoreBackoff: false,
bucket: null,
collection: null,
remote: null,
expectedTimestamp: null,
}
) {
options = {
...options,
bucket: options.bucket || this.bucket,
collection: options.collection || this.name,
};
const previousRemote = this.api.remote;
if (options.remote) {
// Note: setting the remote ensures it's valid, throws when invalid.
this.api.remote = options.remote;
}
if (!options.ignoreBackoff && this.api.backoff > 0) {
const seconds = Math.ceil(this.api.backoff / 1000);
return Promise.reject(
new Error(
`Server is asking clients to back off; retry in ${seconds}s or use the ignoreBackoff option.`
)
);
}
const client = this.api
.bucket(options.bucket)
.collection(options.collection);
const result = new SyncResultObject();
try {
// Fetch collection metadata.
await this.pullMetadata(client, options);
// Fetch last changes from the server.
await this.pullChanges(client, result, options);
const { lastModified } = result;
// Fetch local changes
const toSync = await this.gatherLocalChanges();
// Publish local changes and pull local resolutions
await this.pushChanges(client, toSync, result, options);
// Publish local resolution of push conflicts to server (on CLIENT_WINS)
const resolvedUnsynced = result.resolved.filter(
r => r._status !== "synced"
);
if (resolvedUnsynced.length > 0) {
const resolvedEncoded = await Promise.all(
resolvedUnsynced.map(resolution => {
let record = resolution.accepted;
if (record === null) {
record = { id: resolution.id, _status: resolution._status };
}
return this._encodeRecord("remote", record);
})
);
await this.pushChanges(client, resolvedEncoded, result, options);
}
// Perform a last pull to catch changes that occured after the last pull,
// while local changes were pushed. Do not do it nothing was pushed.
if (result.published.length > 0) {
// Avoid redownloading our own changes during the last pull.
const pullOpts = {
...options,
lastModified,
exclude: result.published,
};
await this.pullChanges(client, result, pullOpts);
}
// Don't persist lastModified value if any conflict or error occured
if (result.ok) {
// No conflict occured, persist collection's lastModified value
this._lastModified = await this.db.saveLastModified(
result.lastModified
);
}
} catch (e) {
this.events.emit("sync:error", { ...options, error: e });
throw e;
} finally {
// Ensure API default remote is reverted if a custom one's been used
this.api.remote = previousRemote;
}
this.events.emit("sync:success", { ...options, result });
return result;
}
/**
* Load a list of records already synced with the remote server.
*
* The local records which are unsynced or whose timestamp is either missing
* or superior to those being loaded will be ignored.
*
* @deprecated Use {@link importBulk} instead.
* @param {Array} records The previously exported list of records to load.
* @return {Promise} with the effectively imported records.
*/
async loadDump(records) {
return this.importBulk(records);
}
/**
* Load a list of records already synced with the remote server.
*
* The local records which are unsynced or whose timestamp is either missing
* or superior to those being loaded will be ignored.
*
* @param {Array} records The previously exported list of records to load.
* @return {Promise} with the effectively imported records.
*/
async importBulk(records) {
if (!Array.isArray(records)) {
throw new Error("Records is not an array.");
}
for (const record of records) {
if (!record.hasOwnProperty("id") || !this.idSchema.validate(record.id)) {
throw new Error("Record has invalid ID: " + JSON.stringify(record));
}
if (!record.last_modified) {
throw new Error(
"Record has no last_modified value: " + JSON.stringify(record)
);
}
}
// Fetch all existing records from local database,
// and skip those who are newer or not marked as synced.
// XXX filter by status / ids in records
const { data } = await this.list({}, { includeDeleted: true });
const existingById = data.reduce((acc, record) => {
acc[record.id] = record;
return acc;
}, {});
const newRecords = records.filter(record => {
const localRecord = existingById[record.id];
const shouldKeep =
// No local record with this id.
localRecord === undefined ||
// Or local record is synced
(localRecord._status === "synced" &&
// And was synced from server
localRecord.last_modified !== undefined &&
// And is older than imported one.
record.last_modified > localRecord.last_modified);
return shouldKeep;
});
return await this.db.importBulk(newRecords.map(markSynced));
}
async pullMetadata(client, options = {}) {
const { expectedTimestamp, headers } = options;
const query = expectedTimestamp
? { query: { _expected: expectedTimestamp } }
: undefined;
const metadata = await client.getData({
...query,
headers,
});
return this.db.saveMetadata(metadata);
}
async metadata() {
return this.db.getMetadata();
}
}
/**
* A Collection-oriented wrapper for an adapter's transaction.
*
* This defines the high-level functions available on a collection.
* The collection itself offers functions of the same name. These will
* perform just one operation in its own transaction.
*/
export class CollectionTransaction {
constructor(collection, adapterTransaction) {
this.collection = collection;
this.adapterTransaction = adapterTransaction;
this._events = [];
}
_queueEvent(action, payload) {
this._events.push({ action, payload });
}
/**
* Emit queued events, to be called once every transaction operations have
* been executed successfully.
*/
emitEvents() {
for (const { action, payload } of this._events) {
this.collection.events.emit(action, payload);
}
if (this._events.length > 0) {
const targets = this._events.map(({ action, payload }) => ({
action,
...payload,
}));
this.collection.events.emit("change", { targets });
}
this._events = [];
}
/**
* Retrieve a record by its id from the local database, or
* undefined if none exists.
*
* This will also return virtually deleted records.
*
* @param {String} id
* @return {Object}
*/
getAny(id) {
const record = this.adapterTransaction.get(id);
return { data: record, permissions: {} };
}
/**
* Retrieve a record by its id from the local database.
*
* Options:
* - {Boolean} includeDeleted: Include virtually deleted records.
*
* @param {String} id
* @param {Object} options
* @return {Object}
*/
get(id, options = { includeDeleted: false }) {
const res = this.getAny(id);
if (
!res.data ||
(!options.includeDeleted && res.data._status === "deleted")
) {
throw new Error(`Record with id=${id} not found.`);
}
return res;
}
/**
* Deletes a record from the local database.
*
* Options:
* - {Boolean} virtual: When set to `true`, doesn't actually delete the record,
* update its `_status` attribute to `deleted` instead (default: true)
*
* @param {String} id The record's Id.
* @param {Object} options The options object.
* @return {Object}
*/
delete(id, options = { virtual: true }) {
// Ensure the record actually exists.
const existing = this.adapterTransaction.get(id);
const alreadyDeleted = existing && existing._status == "deleted";
if (!existing || (alreadyDeleted && options.virtual)) {
throw new Error(`Record with id=${id} not found.`);
}
// Virtual updates status.
if (options.virtual) {
this.adapterTransaction.update(markDeleted(existing));
} else {
// Delete for real.
this.adapterTransaction.delete(id);
}
this._queueEvent("delete", { data: existing });
return { data: existing, permissions: {} };
}
/**
* Soft delete all records from the local database.
*
* @param {Array} ids Array of non-deleted Record Ids.
* @return {Object}
*/
deleteAll(ids) {
const existingRecords = [];
ids.forEach(id => {
existingRecords.push(this.adapterTransaction.get(id));
this.delete(id);
});
this._queueEvent("deleteAll", { data: existingRecords });
return { data: existingRecords, permissions: {} };
}
/**
* Deletes a record from the local database, if any exists.
* Otherwise, do nothing.
*
* @param {String} id The record's Id.
* @return {Object}
*/
deleteAny(id) {
const existing = this.adapterTransaction.get(id);
if (existing) {
this.adapterTransaction.update(markDeleted(existing));
this._queueEvent("delete", { data: existing });
}
return { data: { id, ...existing }, deleted: !!existing, permissions: {} };
}
/**
* Adds a record to the local database, asserting that none
* already exist with this ID.
*
* @param {Object} record, which must contain an ID
* @return {Object}
*/
create(record) {
if (typeof record !== "object") {
throw new Error("Record is not an object.");
}
if (!record.hasOwnProperty("id")) {
throw new Error("Cannot create a record missing id");
}
if (!this.collection.idSchema.validate(record.id)) {
throw new Error(`Invalid Id: ${record.id}`);
}
this.adapterTransaction.create(record);
this._queueEvent("create", { data: record });
return { data: record, permissions: {} };
}
/**
* Updates a record from the local database.
*
* Options:
* - {Boolean} synced: Sets record status to "synced" (default: false)
* - {Boolean} patch: Extends the existing record instead of overwriting it
* (default: false)
*
* @param {Object} record
* @param {Object} options
* @return {Object}
*/
update(record, options = { synced: false, patch: false }) {
if (typeof record !== "object") {
throw new Error("Record is not an object.");
}
if (!record.hasOwnProperty("id")) {
throw new Error("Cannot update a record missing id.");
}
if (!this.collection.idSchema.validate(record.id)) {
throw new Error(`Invalid Id: ${record.id}`);
}
const oldRecord = this.adapterTransaction.get(record.id);
if (!oldRecord) {
throw new Error(`Record with id=${record.id} not found.`);
}
const newRecord = options.patch ? { ...oldRecord, ...record } : record;
const updated = this._updateRaw(oldRecord, newRecord, options);
this.adapterTransaction.update(updated);
this._queueEvent("update", { data: updated, oldRecord });
return { data: updated, oldRecord, permissions: {} };
}
/**
* Lower-level primitive for updating a record while respecting
* _status and last_modified.
*
* @param {Object} oldRecord: the record retrieved from the DB
* @param {Object} newRecord: the record to replace it with
* @return {Object}
*/
_updateRaw(oldRecord, newRecord, { synced = false } = {}) {
const updated = { ...newRecord };
// Make sure to never loose the existing timestamp.
if (oldRecord && oldRecord.last_modified && !updated.last_modified) {
updated.last_modified = oldRecord.last_modified;
}
// If only local fields have changed, then keep record as synced.
// If status is created, keep record as created.
// If status is deleted, mark as updated.
const isIdentical =
oldRecord &&
recordsEqual(oldRecord, updated, this.collection.localFields);
const keepSynced = isIdentical && oldRecord._status == "synced";
const neverSynced =
!oldRecord || (oldRecord && oldRecord._status == "created");
const newStatus =
keepSynced || synced ? "synced" : neverSynced ? "created" : "updated";
return markStatus(updated, newStatus);
}
/**
* Upsert a record into the local database.
*
* This record must have an ID.
*
* If a record with this ID already exists, it will be replaced.
* Otherwise, this record will be inserted.
*
* @param {Object} record
* @return {Object}
*/
upsert(record) {
if (typeof record !== "object") {
throw new Error("Record is not an object.");
}
if (!record.hasOwnProperty("id")) {
throw new Error("Cannot update a record missing id.");
}
if (!this.collection.idSchema.validate(record.id)) {
throw new Error(`Invalid Id: ${record.id}`);
}
let oldRecord = this.adapterTransaction.get(record.id);
const updated = this._updateRaw(oldRecord, record);
this.adapterTransaction.update(updated);
// Don't return deleted records -- pretend they are gone
if (oldRecord && oldRecord._status == "deleted") {
oldRecord = undefined;
}
if (oldRecord) {
this._queueEvent("update", { data: updated, oldRecord });
} else {
this._queueEvent("create", { data: updated });
}
return { data: updated, oldRecord, permissions: {} };
}
}