look for transaction_id in /messages response to delete pending events
This commit is contained in:
parent
6d68ec1bac
commit
f02641c808
2 changed files with 49 additions and 40 deletions
|
@ -68,14 +68,35 @@ export default class Room extends EventEmitter {
|
||||||
limit: amount,
|
limit: amount,
|
||||||
filter: {lazy_load_members: true}
|
filter: {lazy_load_members: true}
|
||||||
}).response();
|
}).response();
|
||||||
const gapWriter = new GapWriter({
|
|
||||||
roomId: this._roomId,
|
const txn = await this._storage.readWriteTxn([
|
||||||
storage: this._storage,
|
this._storage.storeNames.pendingEvents,
|
||||||
fragmentIdComparer: this._fragmentIdComparer
|
this._storage.storeNames.timelineEvents,
|
||||||
});
|
this._storage.storeNames.timelineFragments,
|
||||||
const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response);
|
]);
|
||||||
|
let removedPendingEvents;
|
||||||
|
let newEntries;
|
||||||
|
try {
|
||||||
|
// detect remote echos of pending messages in the gap
|
||||||
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
||||||
|
// write new events into gap
|
||||||
|
const gapWriter = new GapWriter({
|
||||||
|
roomId: this._roomId,
|
||||||
|
storage: this._storage,
|
||||||
|
fragmentIdComparer: this._fragmentIdComparer
|
||||||
|
});
|
||||||
|
newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||||
|
} catch (err) {
|
||||||
|
txn.abort();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
await txn.complete();
|
||||||
|
// once txn is committed, emit events
|
||||||
|
if (removedPendingEvents) {
|
||||||
|
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||||
|
}
|
||||||
if (this._timeline) {
|
if (this._timeline) {
|
||||||
this._timeline.addGapEntries(newEntries)
|
this._timeline.addGapEntries(newEntries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ export default class GapWriter {
|
||||||
txn.timelineFragments.update(fragmentEntry.fragment);
|
txn.timelineFragments.update(fragmentEntry.fragment);
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeFragmentFill(fragmentEntry, response) {
|
async writeFragmentFill(fragmentEntry, response, txn) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
// chunk is in reverse-chronological order when backwards
|
// chunk is in reverse-chronological order when backwards
|
||||||
const {chunk, start, end} = response;
|
const {chunk, start, end} = response;
|
||||||
|
@ -89,40 +89,28 @@ export default class GapWriter {
|
||||||
throw new Error("Invalid end token in response");
|
throw new Error("Invalid end token in response");
|
||||||
}
|
}
|
||||||
|
|
||||||
const txn = await this._storage.readWriteTxn([
|
// make sure we have the latest fragment from the store
|
||||||
this._storage.storeNames.timelineEvents,
|
const fragment = await txn.timelineFragments.get(this._roomId, fragmentId);
|
||||||
this._storage.storeNames.timelineFragments,
|
if (!fragment) {
|
||||||
]);
|
throw new Error(`Unknown fragment: ${fragmentId}`);
|
||||||
|
|
||||||
try {
|
|
||||||
// make sure we have the latest fragment from the store
|
|
||||||
const fragment = await txn.timelineFragments.get(this._roomId, fragmentId);
|
|
||||||
if (!fragment) {
|
|
||||||
throw new Error(`Unknown fragment: ${fragmentId}`);
|
|
||||||
}
|
|
||||||
fragmentEntry = fragmentEntry.withUpdatedFragment(fragment);
|
|
||||||
// check that the request was done with the token we are aware of (extra care to avoid timeline corruption)
|
|
||||||
if (fragmentEntry.token !== start) {
|
|
||||||
throw new Error("start is not equal to prev_batch or next_batch");
|
|
||||||
}
|
|
||||||
// find last event in fragment so we get the eventIndex to begin creating keys at
|
|
||||||
let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn);
|
|
||||||
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
|
||||||
const {
|
|
||||||
nonOverlappingEvents,
|
|
||||||
neighbourFragmentEntry
|
|
||||||
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
|
||||||
|
|
||||||
// create entries for all events in chunk, add them to entries
|
|
||||||
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
|
||||||
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
|
||||||
} catch (err) {
|
|
||||||
txn.abort();
|
|
||||||
throw err;
|
|
||||||
}
|
}
|
||||||
|
fragmentEntry = fragmentEntry.withUpdatedFragment(fragment);
|
||||||
|
// check that the request was done with the token we are aware of (extra care to avoid timeline corruption)
|
||||||
|
if (fragmentEntry.token !== start) {
|
||||||
|
throw new Error("start is not equal to prev_batch or next_batch");
|
||||||
|
}
|
||||||
|
// find last event in fragment so we get the eventIndex to begin creating keys at
|
||||||
|
let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn);
|
||||||
|
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
||||||
|
const {
|
||||||
|
nonOverlappingEvents,
|
||||||
|
neighbourFragmentEntry
|
||||||
|
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
||||||
|
|
||||||
await txn.complete();
|
// create entries for all events in chunk, add them to entries
|
||||||
|
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
||||||
|
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
||||||
|
|
||||||
return entries;
|
return entries;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue