work on txn, ...

This commit is contained in:
Bruno Windels 2019-02-03 21:17:24 +00:00
parent 6303fb611b
commit b6df30bc27
9 changed files with 277 additions and 96 deletions

90
api.md Normal file
View file

@ -0,0 +1,90 @@
Session
properties:
rooms -> Rooms
# storage
Storage
key...() -> KeyRange
start...Txn() -> Transaction
Transaction
store(name) -> ObjectStore
finish()
rollback()
ObjectStore : QueryTarget
index(name)
Index : QueryTarget
Rooms: EventEmitter, Iterator<RoomSummary>
get(id) -> RoomSummary ?
InternalRoom: EventEmitter
applySync(roomResponse, membership, txn)
- this method updates the room summary
- persists the room summary
- persists room state & timeline with RoomPersister
- updates the OpenRoom if present
applyAndPersistSync(roomResponse, membership, txn) {
this._summary.applySync(roomResponse, membership);
this._summary.persist(txn);
this._roomPersister.persist(roomResponse, membership, txn);
if (this._openRoom) {
this._openRoom.applySync(roomResponse);
}
}
RoomPersister
RoomPersister (persists timeline and room state)
RoomSummary (persists room summary)
RoomSummary : EventEmitter
methods:
async open()
id
name
lastMessage
unreadCount
mentionCount
isEncrypted
isDirectMessage
membership
should this have a custom reducer for custom fields?
events
propChange(fieldName)
OpenRoom : EventEmitter
properties:
timeline
events:
RoomState: EventEmitter
[room_id, event_type, state_key] -> [sort_key, event]
Timeline: EventEmitter
// should have a cache of recently lookup sender members?
// can we disambiguate members like this?
methods:
lastEvents(amount)
firstEvents(amount)
eventsAfter(sortKey, amount)
eventsBefore(sortKey, amount)
events:
eventsApppended
RoomMembers : EventEmitter, Iterator
// no order, but need to be able to get all members somehow, needs to map to a ReactiveMap or something
events:
added(ids, values)
removed(ids, values)
changed(id, fieldName)
RoomMember: EventEmitter
properties:
id
name
powerLevel
membership
avatar
events:
propChange(fieldName)

View file

@ -21,6 +21,10 @@ all in one database per stored session:
- unread_message_count ? - unread_message_count ?
- unread_message_with_mention ? - unread_message_with_mention ?
- roomstate_{room_id} - roomstate_{room_id}
we only really need historical roomstate for historical display names?
so we can get away without doing this to begin with ...
how about every state event gets a revision number how about every state event gets a revision number
for each state event, we store the min and max revision number where they form part of the room state for each state event, we store the min and max revision number where they form part of the room state
then we "just" do a where revision_range includes revision, and every state event event/gap in the timeline we store the revision number, and we have an index on it? so we can easily look for the nearest one then we "just" do a where revision_range includes revision, and every state event event/gap in the timeline we store the revision number, and we have an index on it? so we can easily look for the nearest one

47
src/room/persister.js Normal file
View file

@ -0,0 +1,47 @@
class RoomPersister {
constructor(roomId) {
this._roomId = roomId;
this._lastSortKey = null;
}
async loadFromStorage(storage) {
const lastEvent = await storage.timeline.lastEvents(1);
if (lastEvent) {
this._lastSortKey = lastEvent.sortKey;
} else {
this._lastSortKey = new GapSortKey();
}
}
async persistGapFill(...) {
}
async persistSync(roomResponse, txn) {
// persist state
const state = roomResponse.state;
if (state.events) {
const promises = state.events.map((event) => txn.state.setStateEventAt(this._lastSortKey, event));
await Promise.all(promises);
}
let nextKey;
const timeline = roomResponse.timeline;
// is limited true for initial sync???? or do we need to handle that as a special case?
if (timeline.limited) {
nextKey = this._lastSortKey.nextKeyWithGap();
txn.timeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch});
}
nextKey = this._lastSortKey.nextKey();
if (timeline.events) {
for(const event of timeline.events) {
txn.timeline.appendEvent(this._roomId, nextKey, event);
nextKey = nextKey.nextKey();
}
}
// what happens here when the txn fails?
this._lastSortKey = nextKey;
}
}

View file

@ -12,14 +12,9 @@ class Database {
this._syncTxn = null; this._syncTxn = null;
} }
startSyncTxn() { async startSyncTxn() {
if (this._syncTxn) { const txn = this._db.transaction(SYNC_STORES, "readwrite");
return txnAsPromise(this._syncTxn); return new Transaction(txn, SYNC_STORES);
}
this._syncTxn = this._db.transaction(SYNC_STORES, "readwrite");
this._syncTxn.addEventListener("complete", () => this._syncTxn = null);
this._syncTxn.addEventListener("abort", () => this._syncTxn = null);
return txnAsPromise(this._syncTxn);
} }
startReadOnlyTxn(storeName) { startReadOnlyTxn(storeName) {
@ -43,6 +38,42 @@ class Database {
} }
} }
class Transaction {
constructor(txn, allowedStoreNames) {
this._txn = txn;
this._stores = {
sync: null,
summary: null,
timeline: null,
state: null,
};
this._allowedStoreNames = allowedStoreNames;
}
_idbStore(name) {
if (!this._allowedStoreNames.includes(name)) {
throw new Error(`Invalid store for transaction: ${name}, only ${this._allowedStoreNames.join(", ")} are allowed.`);
}
return new ObjectStore(this._txn.getObjectStore(name));
}
get timeline() {
if (!this._stores.timeline) {
const idbStore = this._idbStore("timeline");
this._stores.timeline = new TimelineStore(idbStore);
}
return this._stores.timeline;
}
complete() {
return txnAsPromise(this._txn);
}
abort() {
this._txn.abort();
}
}
class QueryTarget { class QueryTarget {
reduce(range, reducer, initialValue) { reduce(range, reducer, initialValue) {
return this._reduce(range, reducer, initialValue, "next"); return this._reduce(range, reducer, initialValue, "next");
@ -69,7 +100,7 @@ class QueryTarget {
} }
selectAll(range) { selectAll(range) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction); const cursor = this._queryTarget().openCursor(range, direction);
const results = []; const results = [];
return iterateCursor(cursor, (value) => { return iterateCursor(cursor, (value) => {
results.push(value); results.push(value);
@ -95,7 +126,7 @@ class QueryTarget {
_reduce(range, reducer, initialValue, direction) { _reduce(range, reducer, initialValue, direction) {
let reducedValue = initialValue; let reducedValue = initialValue;
const cursor = this._getIdbQueryTarget().openCursor(range, direction); const cursor = this._queryTarget().openCursor(range, direction);
return iterateCursor(cursor, (value) => { return iterateCursor(cursor, (value) => {
reducedValue = reducer(reducedValue, value); reducedValue = reducer(reducedValue, value);
return true; return true;
@ -109,7 +140,7 @@ class QueryTarget {
} }
_selectWhile(range, predicate, direction) { _selectWhile(range, predicate, direction) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction); const cursor = this._queryTarget().openCursor(range, direction);
const results = []; const results = [];
return iterateCursor(cursor, (value) => { return iterateCursor(cursor, (value) => {
results.push(value); results.push(value);
@ -118,7 +149,7 @@ class QueryTarget {
} }
async _find(range, predicate, direction) { async _find(range, predicate, direction) {
const cursor = this._getIdbQueryTarget().openCursor(range, direction); const cursor = this._queryTarget().openCursor(range, direction);
let result; let result;
const found = await iterateCursor(cursor, (value) => { const found = await iterateCursor(cursor, (value) => {
if (predicate(value)) { if (predicate(value)) {
@ -131,45 +162,31 @@ class QueryTarget {
return result; return result;
} }
_getIdbQueryTarget() { _queryTarget() {
throw new Error("override this"); throw new Error("override this");
} }
} }
class ObjectStore extends QueryTarget { class ObjectStore extends QueryTarget {
constructor(db, storeName) { constructor(store) {
this._db = db; this._store = store;
this._storeName = storeName;
} }
_getIdbQueryTarget() { _queryTarget() {
this._db return this._store;
.startReadOnlyTxn(this._storeName)
.getObjectStore(this._storeName);
}
_readWriteTxn() {
this._db
.startReadWriteTxn(this._storeName)
.getObjectStore(this._storeName);
} }
index(indexName) { index(indexName) {
return new Index(this._db, this._storeName, indexName); return new Index(this._store.index(indexName));
} }
} }
class Index extends QueryTarget { class Index extends QueryTarget {
constructor(db, storeName, indexName) { constructor(index) {
this._db = db; this._index = index;
this._storeName = storeName;
this._indexName = indexName;
} }
_getIdbQueryTarget() { _queryTarget() {
this._db return this._index;
.startReadOnlyTxn(this._storeName)
.getObjectStore(this._storeName)
.index(this._indexName);
} }
} }

View file

@ -6,16 +6,16 @@ function createSessionsStore(db) {
function createStores(db) { function createStores(db) {
db.createObjectStore("sync"); //sync token db.createObjectStore("sync"); //sync token
db.createObjectStore("summary", "room_id"); db.createObjectStore("summary", "room_id", {unique: true});
const timeline = db.createObjectStore("timeline", ["room_id", "event_id"]); const timeline = db.createObjectStore("timeline", ["room_id", "sort_key"]);
timeline.createIndex("by_sort_key", ["room_id", "sort_key"], {unique: true}); timeline.createIndex("by_event_id", ["room_id", "event.event_id"], {unique: true});
// how to get the first/last x events for a room? // how to get the first/last x events for a room?
// we don't want to specify the sort key, but would need an index for the room_id? // we don't want to specify the sort key, but would need an index for the room_id?
// take sort_key as primary key then and have index on event_id? // take sort_key as primary key then and have index on event_id?
// still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this. // still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this.
// SOLUTION: with numeric keys, you can just us a min/max value to get first/last // SOLUTION: with numeric keys, you can just us a min/max value to get first/last
db.createObjectStore("members", ["room_id", "state_key"]); // db.createObjectStore("members", ["room_id", "state_key"]);
const state = db.createObjectStore("state", ["room_id", "type", "state_key"]); const state = db.createObjectStore("state", ["event.room_id", "event.type", "event.state_key"]);
} }
class Sessions { class Sessions {

View file

@ -1,37 +1,27 @@
import GapSortKey from "./gapsortkey"; import GapSortKey from "./gapsortkey";
import {select} from "./utils"; import {select} from "./utils";
const TIMELINE_STORE = "timeline";
class TimelineStore { class TimelineStore {
// create with transaction for sync???? constructor(timelineStore) {
constructor(db, roomId) { this._timelineStore = timelineStore;
this._db = db;
this._roomId = roomId;
} }
async lastEvents(amount) { async lastEvents(roomId, amount) {
return this.eventsBefore(GapSortKey.maxKey()); return this.eventsBefore(roomId, GapSortKey.maxKey());
} }
async firstEvents(amount) { async firstEvents(roomId, amount) {
return this.eventsAfter(GapSortKey.minKey()); return this.eventsAfter(roomId, GapSortKey.minKey());
} }
eventsAfter(sortKey, amount) { eventsAfter(roomId, sortKey, amount) {
const range = IDBKeyRange.lowerBound([this._roomId, sortKey], true); const range = IDBKeyRange.lowerBound([roomId, sortKey], true);
return this._db return this._timelineStore.selectLimit(range, amount);
.store(TIMELINE_STORE)
.index("by_sort_key")
.selectLimit(range, amount);
} }
async eventsBefore(sortKey, amount) { async eventsBefore(roomId, sortKey, amount) {
const range = IDBKeyRange.upperBound([this._roomId, sortKey], true); const range = IDBKeyRange.upperBound([roomId, sortKey], true);
const events = await this._db const events = await this._timelineStore.selectLimitReverse(range, amount);
.store(TIMELINE_STORE)
.index("by_sort_key")
.selectLimitReverse(range, amount);
events.reverse(); // because we fetched them backwards events.reverse(); // because we fetched them backwards
return events; return events;
} }
@ -43,19 +33,30 @@ class TimelineStore {
// - new members // - new members
// - new room state // - new room state
// - updated/new account data // - updated/new account data
async addEvents(events) {
const txn = this._db.startReadWriteTxn(TIMELINE_STORE); appendGap(roomId, sortKey, gap) {
const timeline = txn.objectStore(TIMELINE_STORE); this._timelineStore.add({
events.forEach(event => timeline.add(event)); room_id: roomId,
return txnAsPromise(txn); sort_key: sortKey,
content: {
event: null,
gap: gap,
},
});
} }
// used to close gaps (gaps are also inserted as fake events)
// delete old events and add new ones in one transaction appendEvent(roomId, sortKey, event) {
async replaceEvents(oldEventIds, newEvents) { this._timelineStore.add({
const txn = this._db.startReadWriteTxn(TIMELINE_STORE); room_id: roomId,
const timeline = txn.objectStore(TIMELINE_STORE); sort_key: sortKey,
oldEventIds.forEach(event_id => timeline.delete([this._roomId, event_id])); content: {
events.forEach(event => timeline.add(event)); event: event,
return txnAsPromise(txn); gap: null,
},
});
}
async removeEvent(roomId, sortKey) {
this._timelineStore.delete([roomId, sortKey]);
} }
} }

View file

@ -2,24 +2,28 @@ import {parseRooms} from "./common";
import {RequestAbortError} from "../network"; import {RequestAbortError} from "../network";
import {HomeServerError} from "../error"; import {HomeServerError} from "../error";
const TIMEOUT = 30; const INCREMENTAL_TIMEOUT = 30;
export class IncrementalSync { export class IncrementalSync {
constructor(network, session, roomCreator) { constructor(network, session, storage) {
this._network = network; this._network = network;
this._session = session; this._session = session;
this._roomCreator = roomCreator; this._storage = storage;
this._isSyncing = false; this._isSyncing = false;
this._currentRequest = null; this._currentRequest = null;
} }
// returns when initial sync is done
start() { async start() {
if (this._isSyncing) { if (this._isSyncing) {
return; return;
} }
this._isSyncing = true; this._isSyncing = true;
try { try {
this._syncLoop(session.syncToken); let syncToken = session.syncToken;
// do initial sync if needed
if (!syncToken) {
syncToken = await this._syncRequest();
}
} catch(err) { } catch(err) {
//expected when stop is called //expected when stop is called
if (err instanceof RequestAbortError) { if (err instanceof RequestAbortError) {
@ -30,31 +34,49 @@ export class IncrementalSync {
// something threw something // something threw something
} }
} }
this._syncLoop(syncToken);
} }
async _syncLoop(syncToken) { async _syncLoop(syncToken) {
// if syncToken is falsy, it will first do an initial sync ...
while(this._isSyncing) { while(this._isSyncing) {
this._currentRequest = this._network.sync(TIMEOUT, syncToken); try {
syncToken = await this._syncRequest(INCREMENTAL_TIMEOUT, syncToken);
} catch (err) {
this.emit("error", err);
}
}
}
async _syncRequest(timeout, syncToken) {
this._currentRequest = this._network.sync(timeout, syncToken);
const response = await this._currentRequest.response; const response = await this._currentRequest.response;
syncToken = response.next_batch; syncToken = response.next_batch;
const txn = session.startSyncTransaction(); const txn = this._storage.startSyncTxn();
const sessionPromise = session.applySync(syncToken, response.account_data); try {
session.applySync(syncToken, response.account_data, txn);
// to_device // to_device
// presence // presence
const roomPromises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => { parseRooms(response.rooms, async (roomId, roomResponse, membership) => {
let room = session.getRoom(roomId); let room = session.getRoom(roomId);
if (!room) { if (!room) {
room = await session.createRoom(roomId); room = session.createRoom(roomId, txn);
} }
return room.applyIncrementalSync(roomResponse, membership); room.applySync(roomResponse, membership, txn);
}); });
} catch(err) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
txn.abort();
throw err;
}
try { try {
await txn; await txn.complete();
} catch (err) { } catch (err) {
throw new StorageError("unable to commit sync tranaction", err); throw new StorageError("unable to commit sync tranaction", err);
} }
await Promise.all(roomPromises.concat(sessionPromise)); return syncToken;
}
} }
stop() { stop() {