From b6df30bc277aacd5562f18c731f49a58fc11feaf Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sun, 3 Feb 2019 21:17:24 +0000 Subject: [PATCH] work on txn, ... --- api.md | 90 ++++++++++++++++++++++++++++++++ matrix.mjs | 4 ++ src/room/persister.js | 47 +++++++++++++++++ {room => src/room}/room.js | 0 {room => src/room}/summary.js | 0 src/storage/idb/db.js | 89 ++++++++++++++++++------------- src/storage/idb/session-index.js | 10 ++-- src/storage/idb/timeline.js | 71 ++++++++++++------------- src/sync/incremental.js | 62 +++++++++++++++------- 9 files changed, 277 insertions(+), 96 deletions(-) create mode 100644 api.md create mode 100644 src/room/persister.js rename {room => src/room}/room.js (100%) rename {room => src/room}/summary.js (100%) diff --git a/api.md b/api.md new file mode 100644 index 00000000..89e03639 --- /dev/null +++ b/api.md @@ -0,0 +1,90 @@ +Session + properties: + rooms -> Rooms + +# storage +Storage + key...() -> KeyRange + start...Txn() -> Transaction +Transaction + store(name) -> ObjectStore + finish() + rollback() +ObjectStore : QueryTarget + index(name) +Index : QueryTarget + + +Rooms: EventEmitter, Iterator + get(id) -> RoomSummary ? +InternalRoom: EventEmitter + applySync(roomResponse, membership, txn) + - this method updates the room summary + - persists the room summary + - persists room state & timeline with RoomPersister + - updates the OpenRoom if present + + + applyAndPersistSync(roomResponse, membership, txn) { + this._summary.applySync(roomResponse, membership); + this._summary.persist(txn); + this._roomPersister.persist(roomResponse, membership, txn); + if (this._openRoom) { + this._openRoom.applySync(roomResponse); + } + } + +RoomPersister + RoomPersister (persists timeline and room state) + RoomSummary (persists room summary) +RoomSummary : EventEmitter + methods: + async open() + id + name + lastMessage + unreadCount + mentionCount + isEncrypted + isDirectMessage + membership + + should this have a custom reducer for custom fields? + + events + propChange(fieldName) + +OpenRoom : EventEmitter + properties: + timeline + events: + + +RoomState: EventEmitter + [room_id, event_type, state_key] -> [sort_key, event] +Timeline: EventEmitter + // should have a cache of recently lookup sender members? + // can we disambiguate members like this? + methods: + lastEvents(amount) + firstEvents(amount) + eventsAfter(sortKey, amount) + eventsBefore(sortKey, amount) + events: + eventsApppended + +RoomMembers : EventEmitter, Iterator + // no order, but need to be able to get all members somehow, needs to map to a ReactiveMap or something + events: + added(ids, values) + removed(ids, values) + changed(id, fieldName) +RoomMember: EventEmitter + properties: + id + name + powerLevel + membership + avatar + events: + propChange(fieldName) \ No newline at end of file diff --git a/matrix.mjs b/matrix.mjs index 963fa703..9838c365 100644 --- a/matrix.mjs +++ b/matrix.mjs @@ -21,6 +21,10 @@ all in one database per stored session: - unread_message_count ? - unread_message_with_mention ? - roomstate_{room_id} + + we only really need historical roomstate for historical display names? + so we can get away without doing this to begin with ... + how about every state event gets a revision number for each state event, we store the min and max revision number where they form part of the room state then we "just" do a where revision_range includes revision, and every state event event/gap in the timeline we store the revision number, and we have an index on it? so we can easily look for the nearest one diff --git a/src/room/persister.js b/src/room/persister.js new file mode 100644 index 00000000..fe4f7563 --- /dev/null +++ b/src/room/persister.js @@ -0,0 +1,47 @@ +class RoomPersister { + constructor(roomId) { + this._roomId = roomId; + this._lastSortKey = null; + + } + + async loadFromStorage(storage) { + const lastEvent = await storage.timeline.lastEvents(1); + if (lastEvent) { + this._lastSortKey = lastEvent.sortKey; + } else { + this._lastSortKey = new GapSortKey(); + } + } + + async persistGapFill(...) { + + } + + async persistSync(roomResponse, txn) { + // persist state + const state = roomResponse.state; + if (state.events) { + const promises = state.events.map((event) => txn.state.setStateEventAt(this._lastSortKey, event)); + await Promise.all(promises); + } + + let nextKey; + const timeline = roomResponse.timeline; + // is limited true for initial sync???? or do we need to handle that as a special case? + if (timeline.limited) { + nextKey = this._lastSortKey.nextKeyWithGap(); + txn.timeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch}); + } + nextKey = this._lastSortKey.nextKey(); + + if (timeline.events) { + for(const event of timeline.events) { + txn.timeline.appendEvent(this._roomId, nextKey, event); + nextKey = nextKey.nextKey(); + } + } + // what happens here when the txn fails? + this._lastSortKey = nextKey; + } +} \ No newline at end of file diff --git a/room/room.js b/src/room/room.js similarity index 100% rename from room/room.js rename to src/room/room.js diff --git a/room/summary.js b/src/room/summary.js similarity index 100% rename from room/summary.js rename to src/room/summary.js diff --git a/src/storage/idb/db.js b/src/storage/idb/db.js index 71b155b0..38d258cf 100644 --- a/src/storage/idb/db.js +++ b/src/storage/idb/db.js @@ -12,14 +12,9 @@ class Database { this._syncTxn = null; } - startSyncTxn() { - if (this._syncTxn) { - return txnAsPromise(this._syncTxn); - } - this._syncTxn = this._db.transaction(SYNC_STORES, "readwrite"); - this._syncTxn.addEventListener("complete", () => this._syncTxn = null); - this._syncTxn.addEventListener("abort", () => this._syncTxn = null); - return txnAsPromise(this._syncTxn); + async startSyncTxn() { + const txn = this._db.transaction(SYNC_STORES, "readwrite"); + return new Transaction(txn, SYNC_STORES); } startReadOnlyTxn(storeName) { @@ -43,6 +38,42 @@ class Database { } } +class Transaction { + constructor(txn, allowedStoreNames) { + this._txn = txn; + this._stores = { + sync: null, + summary: null, + timeline: null, + state: null, + }; + this._allowedStoreNames = allowedStoreNames; + } + + _idbStore(name) { + if (!this._allowedStoreNames.includes(name)) { + throw new Error(`Invalid store for transaction: ${name}, only ${this._allowedStoreNames.join(", ")} are allowed.`); + } + return new ObjectStore(this._txn.getObjectStore(name)); + } + + get timeline() { + if (!this._stores.timeline) { + const idbStore = this._idbStore("timeline"); + this._stores.timeline = new TimelineStore(idbStore); + } + return this._stores.timeline; + } + + complete() { + return txnAsPromise(this._txn); + } + + abort() { + this._txn.abort(); + } +} + class QueryTarget { reduce(range, reducer, initialValue) { return this._reduce(range, reducer, initialValue, "next"); @@ -69,7 +100,7 @@ class QueryTarget { } selectAll(range) { - const cursor = this._getIdbQueryTarget().openCursor(range, direction); + const cursor = this._queryTarget().openCursor(range, direction); const results = []; return iterateCursor(cursor, (value) => { results.push(value); @@ -95,7 +126,7 @@ class QueryTarget { _reduce(range, reducer, initialValue, direction) { let reducedValue = initialValue; - const cursor = this._getIdbQueryTarget().openCursor(range, direction); + const cursor = this._queryTarget().openCursor(range, direction); return iterateCursor(cursor, (value) => { reducedValue = reducer(reducedValue, value); return true; @@ -109,7 +140,7 @@ class QueryTarget { } _selectWhile(range, predicate, direction) { - const cursor = this._getIdbQueryTarget().openCursor(range, direction); + const cursor = this._queryTarget().openCursor(range, direction); const results = []; return iterateCursor(cursor, (value) => { results.push(value); @@ -118,7 +149,7 @@ class QueryTarget { } async _find(range, predicate, direction) { - const cursor = this._getIdbQueryTarget().openCursor(range, direction); + const cursor = this._queryTarget().openCursor(range, direction); let result; const found = await iterateCursor(cursor, (value) => { if (predicate(value)) { @@ -131,45 +162,31 @@ class QueryTarget { return result; } - _getIdbQueryTarget() { + _queryTarget() { throw new Error("override this"); } } class ObjectStore extends QueryTarget { - constructor(db, storeName) { - this._db = db; - this._storeName = storeName; + constructor(store) { + this._store = store; } - _getIdbQueryTarget() { - this._db - .startReadOnlyTxn(this._storeName) - .getObjectStore(this._storeName); - } - - _readWriteTxn() { - this._db - .startReadWriteTxn(this._storeName) - .getObjectStore(this._storeName); + _queryTarget() { + return this._store; } index(indexName) { - return new Index(this._db, this._storeName, indexName); + return new Index(this._store.index(indexName)); } } class Index extends QueryTarget { - constructor(db, storeName, indexName) { - this._db = db; - this._storeName = storeName; - this._indexName = indexName; + constructor(index) { + this._index = index; } - _getIdbQueryTarget() { - this._db - .startReadOnlyTxn(this._storeName) - .getObjectStore(this._storeName) - .index(this._indexName); + _queryTarget() { + return this._index; } } \ No newline at end of file diff --git a/src/storage/idb/session-index.js b/src/storage/idb/session-index.js index 9134826b..095660f8 100644 --- a/src/storage/idb/session-index.js +++ b/src/storage/idb/session-index.js @@ -6,16 +6,16 @@ function createSessionsStore(db) { function createStores(db) { db.createObjectStore("sync"); //sync token - db.createObjectStore("summary", "room_id"); - const timeline = db.createObjectStore("timeline", ["room_id", "event_id"]); - timeline.createIndex("by_sort_key", ["room_id", "sort_key"], {unique: true}); + db.createObjectStore("summary", "room_id", {unique: true}); + const timeline = db.createObjectStore("timeline", ["room_id", "sort_key"]); + timeline.createIndex("by_event_id", ["room_id", "event.event_id"], {unique: true}); // how to get the first/last x events for a room? // we don't want to specify the sort key, but would need an index for the room_id? // take sort_key as primary key then and have index on event_id? // still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this. // SOLUTION: with numeric keys, you can just us a min/max value to get first/last - db.createObjectStore("members", ["room_id", "state_key"]); - const state = db.createObjectStore("state", ["room_id", "type", "state_key"]); + // db.createObjectStore("members", ["room_id", "state_key"]); + const state = db.createObjectStore("state", ["event.room_id", "event.type", "event.state_key"]); } class Sessions { diff --git a/src/storage/idb/timeline.js b/src/storage/idb/timeline.js index be4bc916..f50e7378 100644 --- a/src/storage/idb/timeline.js +++ b/src/storage/idb/timeline.js @@ -1,37 +1,27 @@ import GapSortKey from "./gapsortkey"; import {select} from "./utils"; -const TIMELINE_STORE = "timeline"; - class TimelineStore { - // create with transaction for sync???? - constructor(db, roomId) { - this._db = db; - this._roomId = roomId; + constructor(timelineStore) { + this._timelineStore = timelineStore; } - async lastEvents(amount) { - return this.eventsBefore(GapSortKey.maxKey()); + async lastEvents(roomId, amount) { + return this.eventsBefore(roomId, GapSortKey.maxKey()); } - async firstEvents(amount) { - return this.eventsAfter(GapSortKey.minKey()); + async firstEvents(roomId, amount) { + return this.eventsAfter(roomId, GapSortKey.minKey()); } - eventsAfter(sortKey, amount) { - const range = IDBKeyRange.lowerBound([this._roomId, sortKey], true); - return this._db - .store(TIMELINE_STORE) - .index("by_sort_key") - .selectLimit(range, amount); + eventsAfter(roomId, sortKey, amount) { + const range = IDBKeyRange.lowerBound([roomId, sortKey], true); + return this._timelineStore.selectLimit(range, amount); } - async eventsBefore(sortKey, amount) { - const range = IDBKeyRange.upperBound([this._roomId, sortKey], true); - const events = await this._db - .store(TIMELINE_STORE) - .index("by_sort_key") - .selectLimitReverse(range, amount); + async eventsBefore(roomId, sortKey, amount) { + const range = IDBKeyRange.upperBound([roomId, sortKey], true); + const events = await this._timelineStore.selectLimitReverse(range, amount); events.reverse(); // because we fetched them backwards return events; } @@ -43,19 +33,30 @@ class TimelineStore { // - new members // - new room state // - updated/new account data - async addEvents(events) { - const txn = this._db.startReadWriteTxn(TIMELINE_STORE); - const timeline = txn.objectStore(TIMELINE_STORE); - events.forEach(event => timeline.add(event)); - return txnAsPromise(txn); + + appendGap(roomId, sortKey, gap) { + this._timelineStore.add({ + room_id: roomId, + sort_key: sortKey, + content: { + event: null, + gap: gap, + }, + }); } - // used to close gaps (gaps are also inserted as fake events) - // delete old events and add new ones in one transaction - async replaceEvents(oldEventIds, newEvents) { - const txn = this._db.startReadWriteTxn(TIMELINE_STORE); - const timeline = txn.objectStore(TIMELINE_STORE); - oldEventIds.forEach(event_id => timeline.delete([this._roomId, event_id])); - events.forEach(event => timeline.add(event)); - return txnAsPromise(txn); + + appendEvent(roomId, sortKey, event) { + this._timelineStore.add({ + room_id: roomId, + sort_key: sortKey, + content: { + event: event, + gap: null, + }, + }); + } + + async removeEvent(roomId, sortKey) { + this._timelineStore.delete([roomId, sortKey]); } } diff --git a/src/sync/incremental.js b/src/sync/incremental.js index 21c5ad22..96c14f12 100644 --- a/src/sync/incremental.js +++ b/src/sync/incremental.js @@ -2,24 +2,28 @@ import {parseRooms} from "./common"; import {RequestAbortError} from "../network"; import {HomeServerError} from "../error"; -const TIMEOUT = 30; +const INCREMENTAL_TIMEOUT = 30; export class IncrementalSync { - constructor(network, session, roomCreator) { + constructor(network, session, storage) { this._network = network; this._session = session; - this._roomCreator = roomCreator; + this._storage = storage; this._isSyncing = false; this._currentRequest = null; } - - start() { + // returns when initial sync is done + async start() { if (this._isSyncing) { return; } this._isSyncing = true; try { - this._syncLoop(session.syncToken); + let syncToken = session.syncToken; + // do initial sync if needed + if (!syncToken) { + syncToken = await this._syncRequest(); + } } catch(err) { //expected when stop is called if (err instanceof RequestAbortError) { @@ -30,31 +34,49 @@ export class IncrementalSync { // something threw something } } + this._syncLoop(syncToken); } async _syncLoop(syncToken) { + // if syncToken is falsy, it will first do an initial sync ... while(this._isSyncing) { - this._currentRequest = this._network.sync(TIMEOUT, syncToken); - const response = await this._currentRequest.response; - syncToken = response.next_batch; - const txn = session.startSyncTransaction(); - const sessionPromise = session.applySync(syncToken, response.account_data); + try { + syncToken = await this._syncRequest(INCREMENTAL_TIMEOUT, syncToken); + } catch (err) { + this.emit("error", err); + } + } + } + + async _syncRequest(timeout, syncToken) { + this._currentRequest = this._network.sync(timeout, syncToken); + const response = await this._currentRequest.response; + syncToken = response.next_batch; + const txn = this._storage.startSyncTxn(); + try { + session.applySync(syncToken, response.account_data, txn); // to_device // presence - const roomPromises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => { + parseRooms(response.rooms, async (roomId, roomResponse, membership) => { let room = session.getRoom(roomId); if (!room) { - room = await session.createRoom(roomId); + room = session.createRoom(roomId, txn); } - return room.applyIncrementalSync(roomResponse, membership); + room.applySync(roomResponse, membership, txn); }); - try { - await txn; - } catch (err) { - throw new StorageError("unable to commit sync tranaction", err); - } - await Promise.all(roomPromises.concat(sessionPromise)); + } catch(err) { + // avoid corrupting state by only + // storing the sync up till the point + // the exception occurred + txn.abort(); + throw err; } + try { + await txn.complete(); + } catch (err) { + throw new StorageError("unable to commit sync tranaction", err); + } + return syncToken; } stop() {