more work on idb sync transaction, other storage stuff

This commit is contained in:
Bruno Windels 2019-01-09 11:06:09 +01:00
parent 25a84d41a5
commit 12bb3a7147
16 changed files with 786 additions and 7 deletions

View file

@ -3,3 +3,9 @@ goal:
to write a minimal matrix client that should you all your rooms, allows you to pick one and read and write messages in it.
on the technical side, the goal is to go low-memory, and test the performance of storing every event individually in indexeddb.
nice properties of this approach:
easy to delete oldest events when db becomes certain size/full (do we need new pagination token after deleting oldest? how to do that)
sync is persisted in one transaction, so you always have state at some sync_token

194
partialkey.html Normal file
View file

@ -0,0 +1,194 @@
<html>
<head><meta charset="utf-8"></head>
<body>
<ul id="messages"></ul>
<script type="text/javascript">
class Key {
constructor() {
this._keys = new Int32Array(2);
}
get gapKey() {
return this._keys[0];
}
set gapKey(value) {
this._keys[0] = value;
}
get eventKey() {
return this._keys[1];
}
set eventKey(value) {
this._keys[1] = value;
}
buffer() {
return this._keys.buffer;
}
nextKeyWithGap() {
const k = new Key();
k.gapKey = this.gapKey + 1;
k.eventKey = 0;
return k;
}
nextKey() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey + 1;
return k;
}
previousKey() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey - 1;
return k;
}
clone() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey;
return k;
}
}
function reqAsPromise(req) {
return new Promise((resolve, reject) => {
req.onsuccess = () => resolve(req);
req.onerror = (err) => reject(err);
});
}
function fetchResults(cursor, isDone, resultMapper) {
return new Promise((resolve, reject) => {
const results = [];
cursor.onerror = (event) => {
reject(new Error("Query failed: " + event.target.errorCode));
};
// collect results
cursor.onsuccess = (event) => {
console.log("got a result");
const cursor = event.target.result;
if (!cursor) {
resolve(results);
return; // end of results
}
results.push(resultMapper(cursor));
if (!isDone(results)) {
cursor.continue();
} else {
resolve(results);
}
};
});
}
class Storage {
constructor(databaseName) {
this._databaseName = databaseName;
this._database = null;
}
async open() {
const req = window.indexedDB.open(this._databaseName);
req.onupgradeneeded = (ev) => {
const db = ev.target.result;
const oldVersion = ev.oldVersion;
this._createStores(db, oldVersion);
};
await reqAsPromise(req);
this._database = req.result;
}
_createStores(db) {
db.createObjectStore("timeline", {keyPath: ["roomId", "sortKey"]});
}
async insert(value) {
const tx = this._database.transaction(["timeline"], "readwrite");
const store = tx.objectStore("timeline");
await reqAsPromise(store.add(value));
}
async selectLast(roomId, amount) {
const tx = this._database.transaction(["timeline"], "readonly");
const store = tx.objectStore("timeline");
const maxKey = new Key();
maxKey.gapKey = Number.MAX_SAFE_INTEGER;
maxKey.eventKey = Number.MAX_SAFE_INTEGER;
const range = IDBKeyRange.upperBound([roomId, maxKey.buffer()]);
const cursor = store.openCursor(range, "prev");
const events = await fetchResults(cursor,
(results) => results.length === amount,
(cursor) => cursor.value);
events.reverse();
return events;
}
async selectFirst(roomId, amount) {
const tx = this._database.transaction(["timeline"], "readonly");
const store = tx.objectStore("timeline");
const minKey = new Key();
const range = IDBKeyRange.lowerBound([roomId, minKey.buffer()]);
const cursor = store.openCursor(range, "next");
return await fetchResults(cursor,
(results) => results.length === amount,
(cursor) => cursor.value);
}
}
(async () => {
const initialSortKey = new Key();
initialSortKey.gapKey = 1000;
const roomId = "!abc:hs.tld";
const storage = new Storage("mysession");
await storage.open();
let records = await storage.selectFirst(roomId, 15);
if (!records.length) {
// insert first batch backwards,
// to see we're not assuming insertion order to sort
let sortKey = initialSortKey.clone();
sortKey.eventKey = 10;
for (var i = 10; i > 0; i--) {
await storage.insert({
roomId,
sortKey: sortKey.buffer(),
message: `message ${i} before gap`
});
sortKey = sortKey.previousKey();
}
sortKey = sortKey.nextKeyWithGap();
await storage.insert({
roomId,
sortKey: sortKey.buffer(),
message: `event to represent gap!`
});
for (var i = 1; i <= 10; i++) {
sortKey = sortKey.nextKey();
await storage.insert({
roomId,
sortKey: sortKey.buffer(),
message: `message ${i} after gap`
});
}
records = await storage.selectFirst(roomId, 15);
}
console.log(records, "records");
const nodes = records.map(r => {
const li = document.createElement("li");
li.appendChild(document.createTextNode(r.message));
return li;
});
const parentNode = document.getElementById("messages");
nodes.forEach(n => parentNode.appendChild(n));
})();
</script>
</body>
</html>

View file

@ -1,9 +1,9 @@
class Room {
constructor(roomId, storage) {
constructor(roomId, storage, storedSummary) {
this._roomId = roomId;
this._storage = storage;
this._summary = new RoomSummary(this._roomId, this._storage);
this._summary = new RoomSummary(this._roomId, this._storage, storedSummary);
}
async applyInitialSync(roomResponse, membership) {

View file

@ -8,7 +8,7 @@ class Request {
this._controller.abort();
}
response() {
get response() {
return this._promise;
}
}
@ -52,7 +52,7 @@ export class Network {
return new Request(promise, controller);
}
sync(timeout = 0, since = null) {
sync(timeout = 0, since = undefined) {
return this._request("GET", "/sync", {since, timeout});
}
}

175
src/storage/idb/db.js Normal file
View file

@ -0,0 +1,175 @@
const SYNC_STORES = [
"sync",
"summary",
"timeline",
"members",
"state"
];
class Database {
constructor(idbDatabase) {
this._db = idbDatabase;
this._syncTxn = null;
}
startSyncTxn() {
if (this._syncTxn) {
return txnAsPromise(this._syncTxn);
}
this._syncTxn = this._db.transaction(SYNC_STORES, "readwrite");
this._syncTxn.addEventListener("complete", () => this._syncTxn = null);
this._syncTxn.addEventListener("abort", () => this._syncTxn = null);
return txnAsPromise(this._syncTxn);
}
startReadOnlyTxn(storeName) {
if (this._syncTxn && SYNC_STORES.includes(storeName)) {
return this._syncTxn;
} else {
return this._db.transaction([storeName], "readonly");
}
}
startReadWriteTxn(storeName) {
if (this._syncTxn && SYNC_STORES.includes(storeName)) {
return this._syncTxn;
} else {
return this._db.transaction([storeName], "readwrite");
}
}
store(storeName) {
return new ObjectStore(this, storeName);
}
}
class QueryTarget {
reduce(range, reducer, initialValue) {
return this._reduce(range, reducer, initialValue, "next");
}
reduceReverse(range, reducer, initialValue) {
return this._reduce(range, reducer, initialValue, "next");
}
selectLimit(range, amount) {
return this._selectLimit(range, amount, "next");
}
selectLimitReverse(range, amount) {
return this._selectLimit(range, amount, "prev");
}
selectWhile(range, predicate) {
return this._selectWhile(range, predicate, "next");
}
selectWhileReverse(range, predicate) {
return this._selectWhile(range, predicate, "prev");
}
selectAll(range) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction);
const results = [];
return iterateCursor(cursor, (value) => {
results.push(value);
return true;
});
}
selectFirst(range) {
return this._find(range, () => true, "next");
}
selectLast(range) {
return this._find(range, () => true, "prev");
}
find(range, predicate) {
return this._find(range, predicate, "next");
}
findReverse(range, predicate) {
return this._find(range, predicate, "prev");
}
_reduce(range, reducer, initialValue, direction) {
let reducedValue = initialValue;
const cursor = this._getIdbQueryTarget().openCursor(range, direction);
return iterateCursor(cursor, (value) => {
reducedValue = reducer(reducedValue, value);
return true;
});
}
_selectLimit(range, amount, direction) {
return this._selectWhile(range, (results) => {
return results.length === amount;
}, direction);
}
_selectWhile(range, predicate, direction) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction);
const results = [];
return iterateCursor(cursor, (value) => {
results.push(value);
return predicate(results);
});
}
async _find(range, predicate, direction) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction);
let result;
const found = await iterateCursor(cursor, (value) => {
if (predicate(value)) {
result = value;
}
});
if (!found) {
throw new Error("not found");
}
return result;
}
_getIdbQueryTarget() {
throw new Error("override this");
}
}
class ObjectStore extends QueryTarget {
constructor(db, storeName) {
this._db = db;
this._storeName = storeName;
}
_getIdbQueryTarget() {
this._db
.startReadOnlyTxn(this._storeName)
.getObjectStore(this._storeName);
}
_readWriteTxn() {
this._db
.startReadWriteTxn(this._storeName)
.getObjectStore(this._storeName);
}
index(indexName) {
return new Index(this._db, this._storeName, indexName);
}
}
class Index extends QueryTarget {
constructor(db, storeName, indexName) {
this._db = db;
this._storeName = storeName;
this._indexName = indexName;
}
_getIdbQueryTarget() {
this._db
.startReadOnlyTxn(this._storeName)
.getObjectStore(this._storeName)
.index(this._indexName);
}
}

View file

@ -0,0 +1,67 @@
class GapSortKey {
constructor() {
this._keys = new Int32Array(2);
}
get gapKey() {
return this._keys[0];
}
set gapKey(value) {
this._keys[0] = value;
}
get eventKey() {
return this._keys[1];
}
set eventKey(value) {
this._keys[1] = value;
}
buffer() {
return this._keys.buffer;
}
nextKeyWithGap() {
const k = new Key();
k.gapKey = this.gapKey + 1;
k.eventKey = 0;
return k;
}
nextKey() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey + 1;
return k;
}
previousKey() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey - 1;
return k;
}
clone() {
const k = new Key();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey;
return k;
}
static get maxKey() {
const maxKey = new GapSortKey();
maxKey.gapKey = Number.MAX_SAFE_INTEGER;
maxKey.eventKey = Number.MAX_SAFE_INTEGER;
return maxKey;
}
static get minKey() {
const minKey = new GapSortKey();
minKey.gapKey = 0;
minKey.eventKey = 0;
return minKey;
}
}

18
src/storage/idb/member.js Normal file
View file

@ -0,0 +1,18 @@
// no historical members for now
class MemberStore {
async getMember(roomId, userId) {
}
/* async getMemberAtSortKey(roomId, userId, sortKey) {
} */
// multiple members here? does it happen at same sort key?
async setMembers(roomId, members) {
}
async getSortedMembers(roomId, offset, amount) {
}
}

27
src/storage/idb/room.js Normal file
View file

@ -0,0 +1,27 @@
class RoomStore {
constructor(summary, db, syncTxn) {
this._summary = summary;
}
getSummary() {
return Promise.resolve(this._summary);
}
async setSummary(summary) {
this._summary = summary;
//...
}
get timelineStore() {
}
get memberStore() {
}
get stateStore() {
}
}

View file

@ -0,0 +1,39 @@
import {openDatabase, select} from "./utils";
function createSessionsStore(db) {
db.createObjectStore("sessions", "id");
}
function createStores(db) {
db.createObjectStore("sync"); //sync token
db.createObjectStore("summary", "room_id");
const timeline = db.createObjectStore("timeline", ["room_id", "event_id"]);
timeline.createIndex("by_sort_key", ["room_id", "sort_key"], {unique: true});
// how to get the first/last x events for a room?
// we don't want to specify the sort key, but would need an index for the room_id?
// take sort_key as primary key then and have index on event_id?
// still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this.
// SOLUTION: with numeric keys, you can just us a min/max value to get first/last
db.createObjectStore("members", ["room_id", "state_key"]);
const state = db.createObjectStore("state", ["room_id", "type", "state_key"]);
}
class Sessions {
constructor(databaseName, idToSessionDbName) {
this._databaseName = databaseName;
this._idToSessionDbName = idToSessionDbName;
}
async getSessions(sessionsDbName) {
const db = await openDatabase(this._databaseName, db => createSessionsStore(db));
const sessions = await select(db, "sessions");
db.close();
return sessions;
}
async openSessionStore(session) {
const db = await openDatabase(this._idToSessionDbName(session.id), db => createStores(db));
return new SessionStore(db);
}
}

View file

@ -0,0 +1,52 @@
class SessionStore {
constructor(session, db) {
this._db = new Database(db);
}
get session() {
return this._session;
}
// or dedicated set sync_token method?
async setAvatar(avatar) {
}
async setDisplayName(displayName) {
}
getSyncStatus() {
return this._db.store("sync").selectFirst();
}
setSyncStatus(syncToken, lastSynced) {
return this._db.store("sync").updateFirst({sync_token: syncToken, last_synced: lastSynced});
// return updateSingletonStore(this._db, "sync", {sync_token: syncToken, last_synced: lastSynced});
}
setAccessToken(accessToken) {
}
async addRoom(room) {
}
async removeRoom(roomId) {
}
async getRoomStores() {
}
async getRoomStore(roomId) {
}
async startSyncTransaction() {
return this._db.startSyncTxn();
}
}

17
src/storage/idb/state.js Normal file
View file

@ -0,0 +1,17 @@
class StateStore {
constructor(roomId, db) {
}
async getEvents(type) {
}
async getEventsForKey(type, stateKey) {
}
async setState(events) {
}
}

13
src/storage/idb/sync.js Normal file
View file

@ -0,0 +1,13 @@
class SyncTransaction {
setSyncToken(syncToken, lastSynced) {
}
getRoomStore(roomId) {
new RoomStore(new Database(null, this._txn), roomId)
}
result() {
return txnAsPromise(this._txn);
}
}

View file

@ -0,0 +1,61 @@
import GapSortKey from "./gapsortkey";
import {select} from "./utils";
const TIMELINE_STORE = "timeline";
class TimelineStore {
// create with transaction for sync????
constructor(db, roomId) {
this._db = db;
this._roomId = roomId;
}
async lastEvents(amount) {
return this.eventsBefore(GapSortKey.maxKey());
}
async firstEvents(amount) {
return this.eventsAfter(GapSortKey.minKey());
}
eventsAfter(sortKey, amount) {
const range = IDBKeyRange.lowerBound([this._roomId, sortKey], true);
return this._db
.store(TIMELINE_STORE)
.index("by_sort_key")
.selectLimit(range, amount);
}
async eventsBefore(sortKey, amount) {
const range = IDBKeyRange.upperBound([this._roomId, sortKey], true);
const events = await this._db
.store(TIMELINE_STORE)
.index("by_sort_key")
.selectLimitReverse(range, amount);
events.reverse(); // because we fetched them backwards
return events;
}
// should this happen as part of a transaction that stores all synced in changes?
// e.g.:
// - timeline events for all rooms
// - latest sync token
// - new members
// - new room state
// - updated/new account data
async addEvents(events) {
const txn = this._db.startReadWriteTxn(TIMELINE_STORE);
const timeline = txn.objectStore(TIMELINE_STORE);
events.forEach(event => timeline.add(event));
return txnAsPromise(txn);
}
// used to close gaps (gaps are also inserted as fake events)
// delete old events and add new ones in one transaction
async replaceEvents(oldEventIds, newEvents) {
const txn = this._db.startReadWriteTxn(TIMELINE_STORE);
const timeline = txn.objectStore(TIMELINE_STORE);
oldEventIds.forEach(event_id => timeline.delete([this._roomId, event_id]));
events.forEach(event => timeline.add(event));
return txnAsPromise(txn);
}
}

103
src/storage/idb/utils.js Normal file
View file

@ -0,0 +1,103 @@
export function openDatabase(name, createObjectStore, version = undefined) {
const req = window.indexedDB.open(name, version);
req.onupgradeneeded = (ev) => {
const db = ev.target.result;
const oldVersion = ev.oldVersion;
createObjectStore(db, oldVersion, version);
};
return reqAsPromise(req);
}
export function reqAsPromise(req) {
return new Promise((resolve, reject) => {
txn.addEventListener("success", event => resolve(event.target.result));
txn.addEventListener("error", event => reject(event.target.error));
});
}
export function txnAsPromise(txn) {
return new Promise((resolve, reject) => {
txn.addEventListener("complete", resolve);
txn.addEventListener("abort", reject);
});
}
export function iterateCursor(cursor, processValue) {
// TODO: does cursor already have a value here??
return new Promise((resolve, reject) => {
cursor.onerror = (event) => {
reject(new Error("Query failed: " + event.target.errorCode));
};
// collect results
cursor.onsuccess = (event) => {
const cursor = event.target.result;
if (!cursor) {
resolve(false);
return; // end of results
}
const isDone = processValue(cursor.value);
if (isDone) {
resolve(true);
} else {
cursor.continue();
}
};
});
}
export async function fetchResults(cursor, isDone) {
const results = [];
await iterateCursor(cursor, (value) => {
results.push(value);
return isDone(results);
});
return results;
}
export async function select(db, storeName, toCursor, isDone) {
if (!isDone) {
isDone = () => false;
}
if (!toCursor) {
toCursor = store => store.openCursor();
}
const tx = db.transaction([storeName], "readonly");
const store = tx.objectStore(storeName);
const cursor = toCursor(store);
return await fetchResults(cursor, isDone);
}
export async function updateSingletonStore(db, storeName, value) {
const tx = db.transaction([storeName], "readwrite");
const store = tx.objectStore(storeName);
const cursor = await reqAsPromise(store.openCursor());
if (cursor) {
return reqAsPromise(cursor.update(storeName));
} else {
return reqAsPromise(store.add(value));
}
}
export async function findStoreValue(db, storeName, toCursor, matchesValue) {
if (!matchesValue) {
matchesValue = () => true;
}
if (!toCursor) {
toCursor = store => store.openCursor();
}
const tx = db.transaction([storeName], "readwrite");
const store = tx.objectStore(storeName);
const cursor = await reqAsPromise(toCursor(store));
let match;
const matched = await iterateCursor(cursor, (value) => {
if (matchesValue(value)) {
match = value;
return true;
}
});
if (!matched) {
throw new Error("Value not found");
}
return match;
}

View file

@ -35,8 +35,9 @@ export class IncrementalSync {
async _syncLoop(syncToken) {
while(this._isSyncing) {
this._currentRequest = this._network.sync(TIMEOUT, syncToken);
const response = await this._currentRequest.response();
const response = await this._currentRequest.response;
syncToken = response.next_batch;
const txn = session.startSyncTransaction();
const sessionPromise = session.applySync(syncToken, response.account_data);
// to_device
// presence
@ -47,6 +48,11 @@ export class IncrementalSync {
}
return room.applyIncrementalSync(roomResponse, membership);
});
try {
await txn;
} catch (err) {
throw new StorageError("unable to commit sync tranaction", err);
}
await Promise.all(roomPromises.concat(sessionPromise));
}
}

View file

@ -1,4 +1,5 @@
<html>
<head><meta charset="utf-8"></head>
<body>
<ul id="messages"></ul>
<script type="text/javascript">
@ -95,7 +96,7 @@
async open() {
const req = window.indexedDB.open(this._databaseName);
req.onupgradeneeded = async (ev) => {
req.onupgradeneeded = (ev) => {
const db = ev.target.result;
const oldVersion = ev.oldVersion;
this._createStores(db, oldVersion);