Merge pull request #118 from vector-im/bwindels/idb-promises-txn

Fix TransactionInactiveError on Safari
This commit is contained in:
Bruno Windels 2020-10-01 14:26:42 +00:00 committed by GitHub
commit 912b332c87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 544 additions and 162 deletions

73
doc/INDEXEDDB.md Normal file
View file

@ -0,0 +1,73 @@
## Promises, async/await and indexedDB
Doesn't indexedDB close your transaction if you don't queue more requests from an idb event handler?
So wouldn't that mean that you can't use promises and async/await when using idb?
It used to be like this, and for IE11 on Win7 (not on Windows 10 strangely enough), it still is like this.
Here we manually flush the promise queue synchronously at the end of an idb event handler.
In modern browsers, indexedDB transactions should only be closed after flushing the microtask queue of the event loop,
which is where promises run.
Keep in mind that indexedDB events, just like any other DOM event, are fired as macro tasks.
Promises queue micro tasks, of which the queue is drained before proceeding to the next macro task.
This also means that if a transaction is completed, you will only receive the event once you are ready to process the next macro tasks.
That doesn't prevent any placed request from throwing TransactionInactiveError though.
## TransactionInactiveError in Safari
Safari doesn't fully follow the rules above, in that if you open a transaction,
you need to "use" (not sure if this means getting a store or actually placing a request) it straight away,
without waiting for any *micro*tasks. See comments about Safari at https://github.com/dfahlander/Dexie.js/issues/317#issue-178349994.
Another failure mode perceived in Hydrogen on Safari is that when the (readonly) prepareTxn in sync wasn't awaited to be completed before opening and using the syncTxn.
I haven't found any documentation online about this at all. Awaiting prepareTxn.complete() fixed the issue below. It's strange though the put does not fail.
What is happening below is:
- in the sync loop:
- we first open a readonly txn on inboundGroupSessions, which we don't use in the example below
- we then open a readwrite txn on session, ... (does not overlap with first txn)
- first the first incremental sync on a room (!YxKeAxtNcDZDrGgaMF:matrix.org) it seems to work well
- on a second incremental sync for that same room, the first get throws TransactionInactiveError for some reason.
- the put in the second incremental sync somehow did not throw.
So it looks like safari doesn't like (some) transactions still being active while a second one is being openened, even with non-overlapping stores.
For now I haven't awaited every read txn in the app, as this was the only place it fails, but if this pops up again in safari, we might have to do that.
Keep in mind that the `txn ... inactive` logs are only logged when the "complete" or "abort" events are processed,
which happens in a macro task, as opposed to all of our promises, which run in a micro task.
So the transaction is likely to have closed before it appears in the logs.
```
[Log] txn 4504181722375185 active on inboundGroupSessions
[Log] txn 861052256474256 active on session, roomSummary, roomState, roomMembers, timelineEvents, timelineFragments, pendingEvents, userIdentities, groupSessionDecryptions, deviceIdentities, outboundGroupSessions, operations, accountData
[Info] hydrogen_session_5286139994689036.session.put({"key":"sync","value":{"token":"s1572540047_757284957_7660701_602588550_435736037_1567300_101589125_347651623_132704","filterId":"2"}})
[Info] hydrogen_session_5286139994689036.userIdentities.get("@bwindels:matrix.org")
[Log] txn 4504181722375185 inactive
[Log] * applying sync response to room !YxKeAxtNcDZDrGgaMF:matrix.org ...
[Info] hydrogen_session_5286139994689036.roomMembers.put({"roomId":"!YxKeAxtNcDZDrGgaMF:matrix.org","userId":"@bwindels:matrix.org","membership":"join","avatarUrl":"mxc://matrix.org/aerWVfICBMcyFcEyREcivLuI","displayName":"Bruno","key":"!YxKeAxtNcDZDrGgaMF:matrix.org|@bwindels:matrix.org"})
[Info] hydrogen_session_5286139994689036.roomMembers.get("!YxKeAxtNcDZDrGgaMF:matrix.org|@bwindels:matrix.org")
[Info] hydrogen_session_5286139994689036.timelineEvents.add({"fragmentId":0,"eventIndex":2147483658,"roomId":"!YxKeAxtNcDZDrGgaMF:matrix.org","event":{"content":{"body":"haha","msgtype":"m.text"},"origin_server_ts":1601457573756,"sender":"@bwindels:matrix.org","type":"m.room.message","unsigned":{"age":8360},"event_id":"$eD9z73-lCpXBVby5_fKqzRZzMVHiPzKbE_RSZzqRKx0"},"displayName":"Bruno","avatarUrl":"mxc://matrix.org/aerWVfICBMcyFcEyREcivLuI","key":"!YxKeAxtNcDZDrGgaMF:matrix.org|00000000|8000000a","eventIdKey":"!YxKeAxtNcDZDrGgaMF:matrix.org|$eD9z73-lCpXBVby5_fKqzRZzMVHiPzKbE_RSZzqRKx0"})
[Info] hydrogen_session_5286139994689036.roomSummary.put({"roomId":"!YxKeAxtNcDZDrGgaMF:matrix.org","name":"!!!test8!!!!!!","lastMessageBody":"haha","lastMessageTimestamp":1601457573756,"isUnread":true,"encryption":null,"lastDecryptedEventKey":null,"isDirectMessage":false,"membership":"join","inviteCount":0,"joinCount":2,"heroes":null,"hasFetchedMembers":false,"isTrackingMembers":false,"avatarUrl":null,"notificationCount":5,"highlightCount":0,"tags":{"m.lowpriority":{}}})
[Log] txn 861052256474256 inactive
[Info] syncTxn committed!!
... two more unrelated sync responses ...
[Log] starting sync request with since s1572540191_757284957_7660742_602588567_435736063_1567300_101589126_347651632_132704 ...
[Log] txn 8104296957004707 active on inboundGroupSessions
[Log] txn 2233038992157489 active on session, roomSummary, roomState, roomMembers, timelineEvents, timelineFragments, pendingEvents, userIdentities, groupSessionDecryptions, deviceIdentities, outboundGroupSessions, operations, accountData
[Info] hydrogen_session_5286139994689036.session.put({"key":"sync","value":{"token":"s1572540223_757284957_7660782_602588579_435736078_1567300_101589130_347651633_132704","filterId":"2"}})
[Log] * applying sync response to room !YxKeAxtNcDZDrGgaMF:matrix.org ...
[Info] hydrogen_session_5286139994689036.roomMembers.get("!YxKeAxtNcDZDrGgaMF:matrix.org|@bwindels:matrix.org")
[Warning] stopping sync because of error
[Error] StorageError: get("!YxKeAxtNcDZDrGgaMF:matrix.org|@bwindels:matrix.org") failed on txn with stores accountData, deviceIdentities, groupSessionDecryptions, operations, outboundGroupSessions, pendingEvents, roomMembers, roomState, roomSummary, session, timelineEvents, timelineFragments, userIdentities on hydrogen_session_5286139994689036.roomMembers: (name: TransactionInactiveError) (code: 0) Failed to execute 'get' on 'IDBObjectStore': The transaction is inactive or finished.
(anonymous function)
asyncFunctionResume
(anonymous function)
promiseReactionJobWithoutPromise
promiseReactionJob
[Log] newStatus "SyncError"
[Log] txn 8104296957004707 inactive
[Log] txn 2233038992157489 inactive
```

View file

@ -0,0 +1,112 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<script type="text/javascript" src="promifill.js"></script>
<!-- <script src="https://cdn.jsdelivr.net/npm/promise-polyfill@8/dist/polyfill.min.js"></script> -->
<script type="text/javascript">
//window.Promise = Promifill;
function reqAsPromise(req) {
return new Promise(function (resolve, reject) {
req.onsuccess = function() {
resolve(req);
Promise.flushQueue && Promise.flushQueue();
};
req.onerror = function(e) {
reject(new Error("IDB request failed: " + e.target.error.message));
Promise.flushQueue && Promise.flushQueue();
};
});
}
function Storage(databaseName) {
this._databaseName = databaseName;
this._database = null;
}
Storage.prototype = {
open: function() {
const req = window.indexedDB.open(this._databaseName);
const self = this;
req.onupgradeneeded = function(ev) {
const db = ev.target.result;
const oldVersion = ev.oldVersion;
self._createStores(db, oldVersion);
};
return reqAsPromise(req).then(function() {
self._database = req.result;
});
},
openTxn: function(mode, storeName) {
const txn = this._database.transaction([storeName], mode);
const store = txn.objectStore(storeName);
return Promise.resolve(store);
},
_createStores: function(db) {
db.createObjectStore("foos", {keyPath: ["id"]});
}
};
function getAll(store) {
const request = store.openCursor();
const results = [];
return new Promise(function(resolve, reject) {
request.onsuccess = function(event) {
const cursor = event.target.result;
if(cursor) {
results.push(cursor.value);
cursor.continue();
} else {
resolve(results);
Promise.flushQueue && Promise.flushQueue();
}
};
request.onerror = function(e) {
reject(new Error("IDB request failed: " + e.target.error.message));
Promise.flushQueue && Promise.flushQueue();
};
});
}
async function main() {
try {
let storage = new Storage("idb-promises");
await storage.open();
const store = await storage.openTxn("readwrite", "foos");
store.clear();
store.add({id: 5, name: "foo"});
store.add({id: 6, name: "bar"});
console.log("all1", await getAll(store));
store.add({id: 7, name: "bazzz"});
console.log("all2", await getAll(store));
} catch(err) {
console.error(err.message + ": " + err.stack);
};
}
main();
/*
we basically want something like this for IE11/Win7:
return new Promise(function (resolve, reject) {
req.onsuccess = function() {
resolve(req);
Promise?.flushQueue();
};
req.onerror = function(e) {
reject(new Error("IDB request failed: " + e.target.error.message));
Promise?.flushQueue();
};
});
we don't have this problem on platforms with a native promise implementation, so we can just have our own (forked) promise polyfill?
*/
</script>
</body>
</html>

View file

@ -0,0 +1,169 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<script type="text/javascript">
function reqAsPromise(req) {
return new Promise(function (resolve, reject) {
req.onsuccess = function() {
resolve(req.result);
};
req.onerror = function(e) {
reject(new Error("IDB request failed: " + req.error));
};
});
}
function txnAsPromise(txn) {
return new Promise(function (resolve, reject) {
txn.oncomplete = function() {
resolve(txn);
};
txn.onabort = function(e) {
reject(new Error("Transaction got aborted: " + txn.error));
};
});
}
const BrowserMutationObserver = window.MutationObserver || window.WebKitMutationObserver;
function useMutationObserver(flush) {
let iterations = 0;
const observer = new BrowserMutationObserver(flush);
const node = document.createTextNode('');
observer.observe(node, { characterData: true });
return () => {
node.data = (iterations = ++iterations % 2);
};
}
const wait = (function() {
let resolve = null;
const trigger = useMutationObserver(() => {
resolve();
});
return () => {
return new Promise(r => {
resolve = r;
trigger();
});
};
})();
var _resolve = Promise.resolve.bind(Promise);
var _then = Promise.prototype.then;
async function delay() {
return Promise.resolve();
// two consecutive macro tasks
//await new Promise(r => setImmediate(r));
// the next macro task will now be the complete event of the txn,
// so schedule another macro task to execute after that
//await new Promise(r => setImmediate(r));
//return;
// for (let i = 0; i < 1000; i+=1) {
// console.log("await...");
// await wait();
// }
let p = _resolve(0);
for (let i=0;i<10;++i) {
p = _then.call(p, x => x + 1);
}
let result = await p;
console.log("Result: "+ result + " (should be 10)");
}
class Storage {
constructor(databaseName) {
this._databaseName = databaseName;
this._database = null;
}
open() {
const req = window.indexedDB.open(this._databaseName);
const self = this;
req.onupgradeneeded = function(ev) {
const db = ev.target.result;
const oldVersion = ev.oldVersion;
self._createStores(db, oldVersion);
};
return reqAsPromise(req).then(function() {
self._database = req.result;
});
}
openTxn(mode, storeName) {
const txn = this._database.transaction([storeName], mode);
txn.addEventListener("complete", () => {
console.info(`transaction ${mode} for ${storeName} completed`);
});
txn.addEventListener("abort", e => {
console.warn(`transaction ${mode} for ${storeName} aborted`, e.target.error);
});
return txn;
}
_createStores(db) {
db.createObjectStore("foos", {keyPath: "id"});
}
}
async function getAll(store, depth = 0) {
if (depth < 15) {
return await getAll(store, depth + 1);
}
const request = store.openCursor();
const results = [];
return await new Promise(function(resolve, reject) {
request.onsuccess = function(event) {
const cursor = event.target.result;
if(cursor) {
results.push(cursor.value);
cursor.continue();
} else {
resolve(results);
Promise.flushQueue && Promise.flushQueue();
}
};
request.onerror = function(e) {
reject(new Error("IDB request failed: " + e.target.error.message));
Promise.flushQueue && Promise.flushQueue();
};
});
}
async function main() {
try {
let storage = new Storage("idb-promises");
await storage.open();
//await reqAsPromise(storage.openTxn("readwrite", "foos").objectStore("foos").clear());
for (let i = 0; i < 10; i += 1) {
storage.openTxn("readonly", "foos").objectStore("foos").get(5);
//console.log("from readtxn", await reqAsPromise(storage.openTxn("readonly", "foos").objectStore("foos").get(5)));
const txn = storage.openTxn("readwrite", "foos");
const store = txn.objectStore("foos");
console.log("writing the foos");
store.put({id: 5, name: "foo"});
store.put({id: 6, name: "bar"});
store.put({id: 7, name: "bazzz"});
await delay();
console.log("reading the foos");
console.log("5", await reqAsPromise(store.get(5)));
console.log("6", await reqAsPromise(store.get(6)));
console.log("7", await reqAsPromise(store.get(7)));
// await txnAsPromise(txn);
}
} catch(err) {
console.error(err);
};
}
main();
</script>
</body>
</html>

View file

@ -15,6 +15,14 @@ limitations under the License.
*/
// polyfills needed for IE11
import Promise from "../lib/es6-promise/index.js";
import {checkNeedsSyncPromise} from "./matrix/storage/idb/utils.js";
if (typeof window.Promise === "undefined") {
window.Promise = Promise;
// TODO: should be awaited before opening any session in the picker
checkNeedsSyncPromise();
}
import "core-js/stable";
import "regenerator-runtime/runtime";
import "mdn-polyfills/Element.prototype.closest";
@ -24,14 +32,6 @@ import "mdn-polyfills/Element.prototype.closest";
// it will also include the file supporting *all* the encodings,
// weighing a good extra 500kb :-(
import "text-encoding";
import {checkNeedsSyncPromise} from "./matrix/storage/idb/utils.js";
import Promise from "../lib/es6-promise/index.js";
if (typeof window.Promise === "undefined") {
window.Promise = Promise;
// TODO: should be awaited before opening any session in the picker
checkNeedsSyncPromise();
}
// TODO: contribute this to mdn-polyfills
if (!Element.prototype.remove) {

View file

@ -80,7 +80,7 @@ export class DeviceMessageHandler {
if (!this._olmDecryption) {
return;
}
const readTxn = await this._storage.readTxn([this._storage.storeNames.session]);
const readTxn = this._storage.readTxn([this._storage.storeNames.session]);
const pendingEvents = await this._getPendingEvents(readTxn);
if (pendingEvents.length === 0) {
return;
@ -91,7 +91,7 @@ export class DeviceMessageHandler {
for (const err of decryptChanges.errors) {
console.warn("decryption failed for event", err, err.event);
}
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
// both to remove the pending events and to modify the olm account
this._storage.storeNames.session,
this._storage.storeNames.olmSessions,

View file

@ -164,13 +164,13 @@ export class Session {
}
const key = await ssssKeyFromCredential(type, credential, this._storage, this._cryptoDriver, this._olm);
// and create session backup, which needs to read from accountData
const readTxn = await this._storage.readTxn([
const readTxn = this._storage.readTxn([
this._storage.storeNames.accountData,
]);
await this._createSessionBackup(key, readTxn);
// only after having read a secret, write the key
// as we only find out if it was good if the MAC verification succeeds
const writeTxn = await this._storage.readWriteTxn([
const writeTxn = this._storage.readWriteTxn([
this._storage.storeNames.session,
]);
try {
@ -217,7 +217,7 @@ export class Session {
await this._e2eeAccount.uploadKeys(this._storage);
await this._deviceMessageHandler.decryptPending(this.rooms);
const txn = await this._storage.readTxn([
const txn = this._storage.readTxn([
this._storage.storeNames.session,
this._storage.storeNames.accountData,
]);
@ -231,7 +231,7 @@ export class Session {
}
async load() {
const txn = await this._storage.readTxn([
const txn = this._storage.readTxn([
this._storage.storeNames.session,
this._storage.storeNames.roomSummary,
this._storage.storeNames.roomMembers,
@ -276,7 +276,7 @@ export class Session {
async start(lastVersionResponse) {
if (lastVersionResponse) {
// store /versions response
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.session
]);
txn.session.set("serverVersions", lastVersionResponse);
@ -284,7 +284,7 @@ export class Session {
await txn.complete();
}
const opsTxn = await this._storage.readWriteTxn([
const opsTxn = this._storage.readWriteTxn([
this._storage.storeNames.operations
]);
const operations = await opsTxn.operations.getAll();
@ -341,17 +341,18 @@ export class Session {
deviceMessageDecryptionPending: false
};
const syncToken = syncResponse.next_batch;
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
if (this._e2eeAccount && deviceOneTimeKeysCount) {
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn);
}
if (syncToken !== this.syncToken) {
const syncInfo = {token: syncToken, filterId: syncFilterId};
// don't modify `this` because transaction might still fail
txn.session.set("sync", syncInfo);
changes.syncInfo = syncInfo;
}
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
if (this._e2eeAccount && deviceOneTimeKeysCount) {
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn);
}
if (this._deviceTracker) {
const deviceLists = syncResponse.device_lists;
if (deviceLists) {

View file

@ -184,19 +184,23 @@ export class Sync {
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
await this._prepareRooms(roomStates);
let sessionChanges;
const syncTxn = await this._openSyncTxn();
const syncTxn = this._openSyncTxn();
try {
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
await Promise.all(roomStates.map(async rs => {
console.log(` * applying sync response to room ${rs.room.id} ...`);
rs.changes = await rs.room.writeSync(
rs.roomResponse, isInitialSync, rs.preparation, syncTxn);
}));
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
} catch(err) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
try {
syncTxn.abort();
} catch (abortErr) {
console.error("Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state.", abortErr);
}
throw err;
}
try {
@ -221,24 +225,26 @@ export class Sync {
};
}
async _openPrepareSyncTxn() {
_openPrepareSyncTxn() {
const storeNames = this._storage.storeNames;
return await this._storage.readTxn([
return this._storage.readTxn([
storeNames.inboundGroupSessions,
]);
}
async _prepareRooms(roomStates) {
const prepareTxn = await this._openPrepareSyncTxn();
const prepareTxn = this._openPrepareSyncTxn();
await Promise.all(roomStates.map(async rs => {
rs.preparation = await rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn);
}));
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
await prepareTxn.complete();
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation)));
}
async _openSyncTxn() {
_openSyncTxn() {
const storeNames = this._storage.storeNames;
return await this._storage.readWriteTxn([
return this._storage.readWriteTxn([
storeNames.session,
storeNames.roomSummary,
storeNames.roomState,
@ -250,7 +256,7 @@ export class Sync {
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
// to discard outbound session when somebody leaves a room
// and to create room key messages when somebody leaves
// and to create room key messages when somebody joins
storeNames.outboundGroupSessions,
storeNames.operations,
storeNames.accountData,

View file

@ -45,7 +45,7 @@ export class Account {
}
const pickledAccount = account.pickle(pickleKey);
const areDeviceKeysUploaded = false;
const txn = await storage.readWriteTxn([
const txn = storage.readWriteTxn([
storage.storeNames.session
]);
try {
@ -212,7 +212,7 @@ export class Account {
}
async _updateSessionStorage(storage, callback) {
const txn = await storage.readWriteTxn([
const txn = storage.readWriteTxn([
storage.storeNames.session
]);
try {

View file

@ -68,7 +68,7 @@ export class DeviceTracker {
}
const memberList = await room.loadMemberList();
try {
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.roomSummary,
this._storage.storeNames.userIdentities,
]);
@ -149,7 +149,7 @@ export class DeviceTracker {
}).response();
const verifiedKeysPerUser = this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"]);
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.userIdentities,
this._storage.storeNames.deviceIdentities,
]);
@ -252,7 +252,7 @@ export class DeviceTracker {
* @return {[type]} [description]
*/
async devicesForTrackedRoom(roomId, hsApi) {
const txn = await this._storage.readTxn([
const txn = this._storage.readTxn([
this._storage.storeNames.roomMembers,
this._storage.storeNames.userIdentities,
]);
@ -268,7 +268,7 @@ export class DeviceTracker {
}
async devicesForRoomMembers(roomId, userIds, hsApi) {
const txn = await this._storage.readTxn([
const txn = this._storage.readTxn([
this._storage.storeNames.userIdentities,
]);
return await this._devicesForUserIds(roomId, userIds, txn, hsApi);
@ -298,7 +298,7 @@ export class DeviceTracker {
queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi);
}
const deviceTxn = await this._storage.readTxn([
const deviceTxn = this._storage.readTxn([
this._storage.storeNames.deviceIdentities,
]);
const devicesPerUser = await Promise.all(upToDateIdentities.map(identity => {

View file

@ -183,7 +183,7 @@ export class RoomEncryption {
console.warn("Got session key back from backup with different sender key, ignoring", {session, senderKey});
return;
}
const txn = await this._storage.readWriteTxn([this._storage.storeNames.inboundGroupSessions]);
const txn = this._storage.readWriteTxn([this._storage.storeNames.inboundGroupSessions]);
let roomKey;
try {
roomKey = await this._megolmDecryption.addRoomKeyFromBackup(
@ -251,6 +251,7 @@ export class RoomEncryption {
await this._deviceTracker.trackRoom(this._room);
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams);
if (megolmResult.roomKeyMessage) {
// TODO: should we await this??
this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi);
}
return {
@ -273,7 +274,7 @@ export class RoomEncryption {
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
// store operation for room key share, in case we don't finish here
const writeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
const writeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
let operationId;
try {
operationId = this._writeRoomKeyShareOperation(roomKeyMessage, userIds, writeOpTxn);
@ -290,7 +291,7 @@ export class RoomEncryption {
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
// remove the operation
const removeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
removeOpTxn.operations.remove(operationId);
} catch (err) {
@ -329,7 +330,7 @@ export class RoomEncryption {
this._isFlushingRoomKeyShares = true;
try {
if (!operations) {
const txn = await this._storage.readTxn([this._storage.storeNames.operations]);
const txn = this._storage.readTxn([this._storage.storeNames.operations]);
operations = await txn.operations.getAllByTypeAndScope("share_room_key", this._room.id);
}
for (const operation of operations) {
@ -339,7 +340,7 @@ export class RoomEncryption {
}
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi);
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi);
const removeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
removeTxn.operations.remove(operation.id);
} catch (err) {

View file

@ -54,7 +54,7 @@ export class Encryption {
async encrypt(roomId, type, content, encryptionParams) {
let session = new this._olm.OutboundGroupSession();
try {
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.inboundGroupSessions,
this._storage.storeNames.outboundGroupSessions,
]);

View file

@ -67,7 +67,7 @@ export class Decryption {
return this._senderKeyLock.takeLock(senderKey);
}));
try {
const readSessionsTxn = await this._storage.readTxn([this._storage.storeNames.olmSessions]);
const readSessionsTxn = this._storage.readTxn([this._storage.storeNames.olmSessions]);
// decrypt events for different sender keys in parallel
const senderKeyOperations = await Promise.all(Array.from(eventsPerSenderKey.entries()).map(([senderKey, events]) => {
return this._decryptAllForSenderKey(senderKey, events, timestamp, readSessionsTxn);

View file

@ -98,7 +98,7 @@ export class Encryption {
}
async _findExistingSessions(devices) {
const txn = await this._storage.readTxn([this._storage.storeNames.olmSessions]);
const txn = this._storage.readTxn([this._storage.storeNames.olmSessions]);
const sessionIdsForDevice = await Promise.all(devices.map(async device => {
return await txn.olmSessions.getSessionIds(device.curve25519Key);
}));
@ -213,7 +213,7 @@ export class Encryption {
}
async _loadSessions(encryptionTargets) {
const txn = await this._storage.readTxn([this._storage.storeNames.olmSessions]);
const txn = this._storage.readTxn([this._storage.storeNames.olmSessions]);
// given we run loading in parallel, there might still be some
// storage requests that will finish later once one has failed.
// those should not allocate a session anymore.
@ -239,7 +239,7 @@ export class Encryption {
}
async _storeSessions(encryptionTargets, timestamp) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.olmSessions]);
const txn = this._storage.readWriteTxn([this._storage.storeNames.olmSessions]);
try {
for (const target of encryptionTargets) {
const sessionEntry = createSessionEntry(

View file

@ -82,7 +82,7 @@ export class Room extends EventEmitter {
let retryEntries;
if (retryEventIds) {
retryEntries = [];
txn = await this._storage.readTxn(stores);
txn = this._storage.readTxn(stores);
for (const eventId of retryEventIds) {
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
if (storageEntry) {
@ -99,7 +99,7 @@ export class Room extends EventEmitter {
// check we have not already decrypted the most recent event in the room
// otherwise we know that the messages for this room key will not update the room summary
if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) {
txn = await this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
txn = this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn);
retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries);
}
@ -138,7 +138,7 @@ export class Room extends EventEmitter {
_decryptEntries(source, entries, inboundSessionTxn = null) {
const request = new DecryptionRequest(async r => {
if (!inboundSessionTxn) {
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
inboundSessionTxn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
}
if (r.cancelled) return;
const events = entries.filter(entry => {
@ -155,7 +155,7 @@ export class Room extends EventEmitter {
// read to fetch devices if timeline is open
stores.push(this._storage.storeNames.deviceIdentities);
}
const writeTxn = await this._storage.readWriteTxn(stores);
const writeTxn = this._storage.readWriteTxn(stores);
let decryption;
try {
decryption = await changes.write(writeTxn);
@ -387,7 +387,7 @@ export class Room extends EventEmitter {
}
}).response();
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
@ -490,7 +490,7 @@ export class Room extends EventEmitter {
async _getLastEventId() {
const lastKey = this._syncWriter.lastMessageKey;
if (lastKey) {
const txn = await this._storage.readTxn([
const txn = this._storage.readTxn([
this._storage.storeNames.timelineEvents,
]);
const eventEntry = await txn.timelineEvents.get(this._roomId, lastKey);
@ -511,7 +511,7 @@ export class Room extends EventEmitter {
async clearUnread() {
if (this.isUnread || this.notificationCount) {
const txn = await this._storage.readWriteTxn([
const txn = this._storage.readWriteTxn([
this._storage.storeNames.roomSummary,
]);
let data;

View file

@ -272,7 +272,7 @@ export class RoomSummary {
if (data === this._data) {
return false;
}
const txn = await storage.readWriteTxn([
const txn = storage.readWriteTxn([
storage.storeNames.roomSummary,
]);
try {

View file

@ -18,7 +18,7 @@ limitations under the License.
import {RoomMember} from "./RoomMember.js";
async function loadMembers({roomId, storage}) {
const txn = await storage.readTxn([
const txn = storage.readTxn([
storage.storeNames.roomMembers,
]);
const memberDatas = await txn.roomMembers.getAll(roomId);
@ -33,7 +33,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
const memberResponse = await hsApi.members(roomId, {at: syncToken}).response();
const txn = await storage.readWriteTxn([
const txn = storage.readWriteTxn([
storage.storeNames.roomSummary,
storage.storeNames.roomMembers,
]);

View file

@ -130,7 +130,7 @@ export class SendQueue {
}
async _tryUpdateEvent(pendingEvent) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
console.log("_tryUpdateEvent: got txn");
try {
// pendingEvent might have been removed already here
@ -152,7 +152,7 @@ export class SendQueue {
async _createAndStoreEvent(eventType, content) {
console.log("_createAndStoreEvent");
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
let pendingEvent;
try {
const pendingEventsStore = txn.pendingEvents;

View file

@ -104,6 +104,8 @@ export class SyncWriter {
const memberChange = new MemberChange(this._roomId, event);
const {member} = memberChange;
if (member) {
// TODO: can we avoid writing redundant members here by checking
// if this is not a limited sync and the state is not in the timeline?
txn.roomMembers.set(member.serialize());
return memberChange;
}

View file

@ -108,14 +108,14 @@ export class TimelineReader {
readFrom(eventKey, direction, amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
const txn = this._openTxn();
return await this._readFrom(eventKey, direction, amount, r, txn);
});
}
readFromEnd(amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
const txn = this._openTxn();
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
let entries;
// room hasn't been synced yet

View file

@ -19,7 +19,7 @@ import {keyFromPassphrase} from "./passphrase.js";
import {keyFromRecoveryKey} from "./recoveryKey.js";
async function readDefaultKeyDescription(storage) {
const txn = await storage.readTxn([
const txn = storage.readTxn([
storage.storeNames.accountData
]);
const defaultKeyEvent = await txn.accountData.get("m.secret_storage.default_key");

View file

@ -38,29 +38,12 @@ export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => {
}, {}));
export class StorageError extends Error {
constructor(message, cause, value) {
let fullMessage = message;
if (cause) {
fullMessage += ": ";
if (typeof cause.name === "string") {
fullMessage += `(name: ${cause.name}) `;
}
if (typeof cause.code === "number") {
fullMessage += `(code: ${cause.code}) `;
}
}
if (value) {
fullMessage += `(value: ${JSON.stringify(value)}) `;
}
if (cause) {
fullMessage += cause.message;
}
super(fullMessage);
constructor(message, cause) {
super(message);
if (cause) {
this.errcode = cause.name;
}
this.cause = cause;
this.value = value;
}
get name() {

View file

@ -34,7 +34,7 @@ export class Storage {
}
}
async readTxn(storeNames) {
readTxn(storeNames) {
this._validateStoreNames(storeNames);
try {
const txn = this._db.transaction(storeNames, "readonly");
@ -44,7 +44,7 @@ export class Storage {
}
}
async readWriteTxn(storeNames) {
readWriteTxn(storeNames) {
this._validateStoreNames(storeNames);
try {
const txn = this._db.transaction(storeNames, "readwrite");

View file

@ -15,8 +15,15 @@ limitations under the License.
*/
import {QueryTarget} from "./QueryTarget.js";
import { reqAsPromise } from "./utils.js";
import { StorageError } from "../common.js";
import {IDBRequestAttemptError} from "./error.js";
const LOG_REQUESTS = false;
function logRequest(method, params, source) {
const storeName = source?.name;
const databaseName = source?.transaction?.db?.name;
console.info(`${databaseName}.${storeName}.${method}(${params.map(p => JSON.stringify(p)).join(", ")})`);
}
class QueryTargetWrapper {
constructor(qt) {
@ -36,62 +43,70 @@ class QueryTargetWrapper {
}
openKeyCursor(...params) {
try {
// not supported on Edge 15
if (!this._qt.openKeyCursor) {
LOG_REQUESTS && logRequest("openCursor", params, this._qt);
return this.openCursor(...params);
}
try {
LOG_REQUESTS && logRequest("openKeyCursor", params, this._qt);
return this._qt.openKeyCursor(...params);
} catch(err) {
throw new StorageError("openKeyCursor failed", err);
throw new IDBRequestAttemptError("openKeyCursor", this._qt, err, params);
}
}
openCursor(...params) {
try {
LOG_REQUESTS && logRequest("openCursor", params, this._qt);
return this._qt.openCursor(...params);
} catch(err) {
throw new StorageError("openCursor failed", err);
throw new IDBRequestAttemptError("openCursor", this._qt, err, params);
}
}
put(...params) {
try {
LOG_REQUESTS && logRequest("put", params, this._qt);
return this._qt.put(...params);
} catch(err) {
throw new StorageError("put failed", err);
throw new IDBRequestAttemptError("put", this._qt, err, params);
}
}
add(...params) {
try {
LOG_REQUESTS && logRequest("add", params, this._qt);
return this._qt.add(...params);
} catch(err) {
throw new StorageError("add failed", err);
throw new IDBRequestAttemptError("add", this._qt, err, params);
}
}
get(...params) {
try {
LOG_REQUESTS && logRequest("get", params, this._qt);
return this._qt.get(...params);
} catch(err) {
throw new StorageError("get failed", err);
throw new IDBRequestAttemptError("get", this._qt, err, params);
}
}
getKey(...params) {
try {
LOG_REQUESTS && logRequest("getKey", params, this._qt);
return this._qt.getKey(...params);
} catch(err) {
throw new StorageError("getKey failed", err);
throw new IDBRequestAttemptError("getKey", this._qt, err, params);
}
}
delete(...params) {
try {
LOG_REQUESTS && logRequest("delete", params, this._qt);
return this._qt.delete(...params);
} catch(err) {
throw new StorageError("delete failed", err);
throw new IDBRequestAttemptError("delete", this._qt, err, params);
}
}
@ -99,14 +114,16 @@ class QueryTargetWrapper {
try {
return this._qt.index(...params);
} catch(err) {
throw new StorageError("index failed", err);
// TODO: map to different error? this is not a request
throw new IDBRequestAttemptError("index", this._qt, err, params);
}
}
}
export class Store extends QueryTarget {
constructor(idbStore) {
constructor(idbStore, transaction) {
super(new QueryTargetWrapper(idbStore));
this._transaction = transaction;
}
get _idbStore() {
@ -117,31 +134,27 @@ export class Store extends QueryTarget {
return new QueryTarget(new QueryTargetWrapper(this._idbStore.index(indexName)));
}
async put(value) {
try {
return await reqAsPromise(this._idbStore.put(value));
} catch(err) {
const originalErr = err.cause;
throw new StorageError(`put on ${err.databaseName}.${err.storeName} failed`, originalErr, value);
}
put(value) {
// If this request fails, the error will bubble up to the transaction and abort it,
// which is the behaviour we want. Therefore, it is ok to not create a promise for this
// request and await it.
//
// Perhaps at some later point, we will want to handle an error (like ConstraintError) for
// individual write requests. In that case, we should add a method that returns a promise (e.g. putAndObserve)
// and call preventDefault on the event to prevent it from aborting the transaction
//
// Note that this can still throw synchronously, like it does for TransactionInactiveError,
// see https://www.w3.org/TR/IndexedDB-2/#transaction-lifetime-concept
this._idbStore.put(value);
}
async add(value) {
try {
return await reqAsPromise(this._idbStore.add(value));
} catch(err) {
const originalErr = err.cause;
throw new StorageError(`add on ${err.databaseName}.${err.storeName} failed`, originalErr, value);
}
}
async delete(keyOrKeyRange) {
try {
return await reqAsPromise(this._idbStore.delete(keyOrKeyRange));
} catch(err) {
const originalErr = err.cause;
throw new StorageError(`delete on ${err.databaseName}.${err.storeName} failed`, originalErr, keyOrKeyRange);
add(value) {
// ok to not monitor result of request, see comment in `put`.
this._idbStore.add(value);
}
delete(keyOrKeyRange) {
// ok to not monitor result of request, see comment in `put`.
this._idbStore.delete(keyOrKeyRange);
}
}

View file

@ -0,0 +1,55 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { StorageError } from "../common.js";
export class IDBError extends StorageError {
constructor(message, source, cause) {
const storeName = source?.name || "<unknown store>";
const databaseName = source?.transaction?.db?.name || "<unknown db>";
let fullMessage = `${message} on ${databaseName}.${storeName}`;
if (cause) {
fullMessage += ": ";
if (typeof cause.name === "string") {
fullMessage += `(name: ${cause.name}) `;
}
if (typeof cause.code === "number") {
fullMessage += `(code: ${cause.code}) `;
}
}
if (cause) {
fullMessage += cause.message;
}
super(fullMessage, cause);
this.storeName = storeName;
this.databaseName = databaseName;
}
}
export class IDBRequestError extends IDBError {
constructor(request, message = "IDBRequest failed") {
const source = request?.source;
const cause = request.error;
super(message, source, cause);
}
}
export class IDBRequestAttemptError extends IDBError {
constructor(method, source, cause, params) {
super(`${method}(${params.map(p => JSON.stringify(p)).join(", ")}) failed`, source, cause);
}
}

View file

@ -58,11 +58,11 @@ export class PendingEventStore {
add(pendingEvent) {
pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex);
return this._eventStore.add(pendingEvent);
this._eventStore.add(pendingEvent);
}
update(pendingEvent) {
return this._eventStore.put(pendingEvent);
this._eventStore.put(pendingEvent);
}
getAll() {

View file

@ -27,11 +27,11 @@ export class SessionStore {
}
set(key, value) {
return this._sessionStore.put({key, value});
this._sessionStore.put({key, value});
}
add(key, value) {
return this._sessionStore.add({key, value});
this._sessionStore.add({key, value});
}
remove(key) {

View file

@ -238,7 +238,7 @@ export class TimelineEventStore {
entry.key = encodeKey(entry.roomId, entry.fragmentId, entry.eventIndex);
entry.eventIdKey = encodeEventIdKey(entry.roomId, entry.event.event_id);
// TODO: map error? or in idb/store?
return this._timelineStore.add(entry);
this._timelineStore.add(entry);
}
/** Updates the entry into the store with the given [roomId, eventKey] combination.
@ -247,17 +247,12 @@ export class TimelineEventStore {
* @return {Promise<>} a promise resolving to undefined if the operation was successful, or a StorageError if not.
*/
update(entry) {
return this._timelineStore.put(entry);
this._timelineStore.put(entry);
}
get(roomId, eventKey) {
return this._timelineStore.get(encodeKey(roomId, eventKey.fragmentId, eventKey.eventIndex));
}
// returns the entries as well!! (or not always needed? I guess not always needed, so extra method)
removeRange(roomId, range) {
// TODO: read the entries!
return this._timelineStore.delete(range.asIDBKeyRange(roomId));
}
getByEventId(roomId, eventId) {
return this._timelineStore.index("byEventId").get(encodeEventIdKey(roomId, eventId));

View file

@ -62,11 +62,11 @@ export class TimelineFragmentStore {
// like give them meaning depending on range. not for now probably ...
add(fragment) {
fragment.key = encodeKey(fragment.roomId, fragment.id);
return this._store.add(fragment);
this._store.add(fragment);
}
update(fragment) {
return this._store.put(fragment);
this._store.put(fragment);
}
get(roomId, fragmentId) {

View file

@ -15,6 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import { IDBRequestError } from "./error.js";
import { StorageError } from "../common.js";
let needsSyncPromise = false;
@ -47,17 +48,6 @@ export async function checkNeedsSyncPromise() {
return needsSyncPromise;
}
class IDBRequestError extends StorageError {
constructor(request) {
const source = request?.source;
const storeName = source?.name || "<unknown store>";
const databaseName = source?.transaction?.db?.name || "<unknown db>";
super(`Failed IDBRequest on ${databaseName}.${storeName}`, request.error);
this.storeName = storeName;
this.databaseName = databaseName;
}
}
// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb
export function encodeUint32(n) {
const hex = n.toString(16);
@ -161,17 +151,6 @@ export async function select(db, storeName, toCursor, isDone) {
return await fetchResults(cursor, isDone);
}
export async function updateSingletonStore(db, storeName, value) {
const tx = db.transaction([storeName], "readwrite");
const store = tx.objectStore(storeName);
const cursor = await reqAsPromise(store.openCursor());
if (cursor) {
return reqAsPromise(cursor.update(storeName));
} else {
return reqAsPromise(store.add(value));
}
}
export async function findStoreValue(db, storeName, toCursor, matchesValue) {
if (!matchesValue) {
matchesValue = () => true;

View file

@ -234,11 +234,4 @@ export class RoomTimelineStore extends Store {
const event = count ? this._timeline[startIndex] : undefined;
return Promise.resolve(event);
}
removeRange(roomId, range) {
this.assertWritable();
const {startIndex, count} = range.project(roomId);
const removedEntries = this._timeline.splice(startIndex, count);
return Promise.resolve(removedEntries);
}
}