Compare commits
20 commits
master
...
context-ap
Author | SHA1 | Date | |
---|---|---|---|
|
6d524384e9 | ||
|
668fb37da9 | ||
|
906f95baf9 | ||
|
4f22c23589 | ||
|
39f141820a | ||
|
1d71665c48 | ||
|
fdfea95d22 | ||
|
57c4070505 | ||
|
2501bad4b4 | ||
|
5b868a9064 | ||
|
889f4dd104 | ||
|
7f91e0c1bd | ||
|
3ce2d0777d | ||
|
021844bf0a | ||
|
1b6fea6e4d | ||
|
e06abcc399 | ||
|
8592fcf8c7 | ||
|
299abe3e7e | ||
|
ae6e211150 | ||
|
8d7c12fd59 |
7 changed files with 274 additions and 66 deletions
|
@ -107,6 +107,10 @@ export class HomeServerApi {
|
||||||
return this._get("/sync", {since, timeout, filter}, null, options);
|
return this._get("/sync", {since, timeout, filter}, null, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context(roomId, eventId, params, options = null) {
|
||||||
|
return this._get(`/rooms/${encodeURIComponent(roomId)}/context/${encodeURIComponent(eventId)}`, params, null, options);
|
||||||
|
}
|
||||||
|
|
||||||
// params is from, dir and optionally to, limit, filter.
|
// params is from, dir and optionally to, limit, filter.
|
||||||
messages(roomId, params, options = null) {
|
messages(roomId, params, options = null) {
|
||||||
return this._get(`/rooms/${encodeURIComponent(roomId)}/messages`, params, null, options);
|
return this._get(`/rooms/${encodeURIComponent(roomId)}/messages`, params, null, options);
|
||||||
|
|
|
@ -266,6 +266,67 @@ export class BaseRoom extends EventEmitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async _fetchEvents(callback, log) {
|
||||||
|
const txn = await this._storage.readWriteTxn([
|
||||||
|
this._storage.storeNames.pendingEvents,
|
||||||
|
this._storage.storeNames.timelineEvents,
|
||||||
|
this._storage.storeNames.timelineRelations,
|
||||||
|
this._storage.storeNames.timelineFragments,
|
||||||
|
]);
|
||||||
|
let extraGapFillChanges;
|
||||||
|
let gapResult;
|
||||||
|
try {
|
||||||
|
const relationWriter = new RelationWriter({
|
||||||
|
roomId: this._roomId,
|
||||||
|
fragmentIdComparer: this._fragmentIdComparer,
|
||||||
|
ownUserId: this._user.id,
|
||||||
|
});
|
||||||
|
const gapWriter = new GapWriter({
|
||||||
|
roomId: this._roomId,
|
||||||
|
storage: this._storage,
|
||||||
|
fragmentIdComparer: this._fragmentIdComparer,
|
||||||
|
relationWriter
|
||||||
|
});
|
||||||
|
const callbackResult = await callback(txn, gapWriter);
|
||||||
|
extraGapFillChanges = callbackResult.extraGapFillChanges;
|
||||||
|
gapResult = callbackResult.gapResult;
|
||||||
|
} catch (err) {
|
||||||
|
txn.abort();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
await txn.complete();
|
||||||
|
if (this._roomEncryption) {
|
||||||
|
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
|
||||||
|
await decryptRequest.complete();
|
||||||
|
}
|
||||||
|
// once txn is committed, update in-memory state & emit events
|
||||||
|
this._fragmentIdComparer.add(...gapResult.fragments);
|
||||||
|
if (extraGapFillChanges) {
|
||||||
|
this._applyGapFill(extraGapFillChanges);
|
||||||
|
}
|
||||||
|
if (this._timeline) {
|
||||||
|
// these should not be added if not already there
|
||||||
|
this._timeline.replaceEntries(gapResult.updatedEntries);
|
||||||
|
this._timeline.addEntries(gapResult.entries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _fetchContext(eventId, log = null) {
|
||||||
|
const response = await this._hsApi.context(this._roomId, eventId, {}, {log}).response();
|
||||||
|
let contextEvent = null;
|
||||||
|
await this._fetchEvents(async (txn, gapWriter) => {
|
||||||
|
// Just in case we somehow receive remote echoes during event fetch
|
||||||
|
// Keep events in order just in case.
|
||||||
|
const allEvents = response.events_before.slice().reverse();
|
||||||
|
allEvents.push(response.event, ...response.events_after);
|
||||||
|
const extraGapFillChanges = await this._writeGapFill(allEvents, txn, log);
|
||||||
|
const gapResult = await gapWriter.writeContext(response, txn, log);
|
||||||
|
contextEvent = gapResult.contextEvent;
|
||||||
|
return { extraGapFillChanges, gapResult };
|
||||||
|
}, log);
|
||||||
|
return contextEvent;
|
||||||
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
fillGap(fragmentEntry, amount, log = null) {
|
fillGap(fragmentEntry, amount, log = null) {
|
||||||
// TODO move some/all of this out of BaseRoom
|
// TODO move some/all of this out of BaseRoom
|
||||||
|
@ -287,51 +348,12 @@ export class BaseRoom extends EventEmitter {
|
||||||
}
|
}
|
||||||
}, {log}).response();
|
}, {log}).response();
|
||||||
|
|
||||||
const txn = await this._storage.readWriteTxn([
|
await this._fetchEvents(async (txn, gapWriter) => {
|
||||||
this._storage.storeNames.pendingEvents,
|
|
||||||
this._storage.storeNames.timelineEvents,
|
|
||||||
this._storage.storeNames.timelineRelations,
|
|
||||||
this._storage.storeNames.timelineFragments,
|
|
||||||
]);
|
|
||||||
let extraGapFillChanges;
|
|
||||||
let gapResult;
|
|
||||||
try {
|
|
||||||
// detect remote echos of pending messages in the gap
|
// detect remote echos of pending messages in the gap
|
||||||
extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log);
|
const extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log);
|
||||||
// write new events into gap
|
const gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
|
||||||
const relationWriter = new RelationWriter({
|
return { extraGapFillChanges, gapResult };
|
||||||
roomId: this._roomId,
|
}, log);
|
||||||
fragmentIdComparer: this._fragmentIdComparer,
|
|
||||||
ownUserId: this._user.id,
|
|
||||||
});
|
|
||||||
const gapWriter = new GapWriter({
|
|
||||||
roomId: this._roomId,
|
|
||||||
storage: this._storage,
|
|
||||||
fragmentIdComparer: this._fragmentIdComparer,
|
|
||||||
relationWriter
|
|
||||||
});
|
|
||||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
|
|
||||||
} catch (err) {
|
|
||||||
txn.abort();
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
await txn.complete();
|
|
||||||
if (this._roomEncryption) {
|
|
||||||
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
|
|
||||||
await decryptRequest.complete();
|
|
||||||
}
|
|
||||||
// once txn is committed, update in-memory state & emit events
|
|
||||||
for (const fragment of gapResult.fragments) {
|
|
||||||
this._fragmentIdComparer.add(fragment);
|
|
||||||
}
|
|
||||||
if (extraGapFillChanges) {
|
|
||||||
this._applyGapFill(extraGapFillChanges);
|
|
||||||
}
|
|
||||||
if (this._timeline) {
|
|
||||||
// these should not be added if not already there
|
|
||||||
this._timeline.replaceEntries(gapResult.updatedEntries);
|
|
||||||
this._timeline.addEntries(gapResult.entries);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -533,8 +555,13 @@ export class BaseRoom extends EventEmitter {
|
||||||
const observable = this._observedEvents.observe(eventId, entry);
|
const observable = this._observedEvents.observe(eventId, entry);
|
||||||
if (!entry) {
|
if (!entry) {
|
||||||
// update in the background
|
// update in the background
|
||||||
this._readEventById(eventId).then(entry => {
|
this._readEventById(eventId).then(async entry => {
|
||||||
observable.update(entry);
|
if (entry) {
|
||||||
|
observable.update(entry);
|
||||||
|
} else {
|
||||||
|
const fectchedEntry = await this._fetchContext(eventId);
|
||||||
|
observable.update(fectchedEntry);
|
||||||
|
}
|
||||||
}).catch(err => {
|
}).catch(err => {
|
||||||
console.warn(`could not load event ${eventId} from storage`, err);
|
console.warn(`could not load event ${eventId} from storage`, err);
|
||||||
});
|
});
|
||||||
|
|
|
@ -25,11 +25,6 @@ export class EventKey {
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nextFragmentKey(): EventKey {
|
|
||||||
// could take MIN_EVENT_INDEX here if it can't be paged back
|
|
||||||
return new EventKey(this.fragmentId + 1, KeyLimits.middleStorageKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
nextKeyForDirection(direction: Direction): EventKey {
|
nextKeyForDirection(direction: Direction): EventKey {
|
||||||
if (direction.isForward) {
|
if (direction.isForward) {
|
||||||
return this.nextKey();
|
return this.nextKey();
|
||||||
|
|
|
@ -171,9 +171,11 @@ export class FragmentIdComparer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** use for fragments coming out of persistence, not newly created ones, or also fragments for a new island (like for a permalink) */
|
/** use for fragments coming out of persistence, not newly created ones, or also fragments for a new island (like for a permalink) */
|
||||||
add(fragment) {
|
add(...fragments) {
|
||||||
const copy = new Fragment(fragment.id, fragment.previousId, fragment.nextId);
|
for (const fragment of fragments) {
|
||||||
this._fragmentsById.set(fragment.id, copy);
|
const copy = new Fragment(fragment.id, fragment.previousId, fragment.nextId);
|
||||||
|
this._fragmentsById.set(fragment.id, copy);
|
||||||
|
}
|
||||||
this.rebuild(this._fragmentsById.values());
|
this.rebuild(this._fragmentsById.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@ limitations under the License.
|
||||||
|
|
||||||
import {EventKey} from "../EventKey";
|
import {EventKey} from "../EventKey";
|
||||||
import {EventEntry} from "../entries/EventEntry.js";
|
import {EventEntry} from "../entries/EventEntry.js";
|
||||||
|
import {Direction} from "../Direction";
|
||||||
|
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
|
||||||
import {createEventEntry, directionalAppend} from "./common.js";
|
import {createEventEntry, directionalAppend} from "./common.js";
|
||||||
import {RoomMember, EVENT_TYPE as MEMBER_EVENT_TYPE} from "../../members/RoomMember.js";
|
import {RoomMember, EVENT_TYPE as MEMBER_EVENT_TYPE} from "../../members/RoomMember.js";
|
||||||
|
|
||||||
|
@ -27,10 +29,10 @@ export class GapWriter {
|
||||||
this._relationWriter = relationWriter;
|
this._relationWriter = relationWriter;
|
||||||
}
|
}
|
||||||
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
||||||
async _findOverlappingEvents(fragmentEntry, events, txn, log) {
|
async _findOverlappingEventsFor(currentFragmentId, linkedFragmentId, direction, events, txn, log) {
|
||||||
let expectedOverlappingEventId;
|
let expectedOverlappingEventId;
|
||||||
if (fragmentEntry.hasLinkedFragment) {
|
if (linkedFragmentId !== null) {
|
||||||
expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn);
|
expectedOverlappingEventId = await this._findExpectedOverlappingEventId(linkedFragmentId, direction, txn);
|
||||||
}
|
}
|
||||||
let remainingEvents = events;
|
let remainingEvents = events;
|
||||||
let nonOverlappingEvents = [];
|
let nonOverlappingEvents = [];
|
||||||
|
@ -54,7 +56,7 @@ export class GapWriter {
|
||||||
// get neighbour fragment to link it up later on
|
// get neighbour fragment to link it up later on
|
||||||
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
||||||
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
||||||
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
neighbourFragmentEntry = new FragmentBoundaryEntry(neighbourFragment, direction.isForward, this._fragmentIdComparer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If more events remain, or if this wasn't the expected overlapping event,
|
// If more events remain, or if this wasn't the expected overlapping event,
|
||||||
|
@ -67,18 +69,23 @@ export class GapWriter {
|
||||||
remainingEvents = null;
|
remainingEvents = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (neighbourFragmentEntry?.fragmentId === fragmentEntry.fragmentId) {
|
if (neighbourFragmentEntry?.fragmentId === currentFragmentId) {
|
||||||
log.log("hit #160, prevent fragment linking to itself", log.level.Warn);
|
log.log("hit #160, prevent fragment linking to itself", log.level.Warn);
|
||||||
neighbourFragmentEntry = null;
|
neighbourFragmentEntry = null;
|
||||||
}
|
}
|
||||||
return {nonOverlappingEvents, neighbourFragmentEntry};
|
return {nonOverlappingEvents, neighbourFragmentEntry};
|
||||||
}
|
}
|
||||||
|
|
||||||
async _findExpectedOverlappingEventId(fragmentEntry, txn) {
|
async _findOverlappingEvents(fragmentEntry, events, txn, log) {
|
||||||
|
const linkedFragmentId = fragmentEntry.hasLinkedFragment ? fragmentEntry.linkedFragmentId : null;
|
||||||
|
return this._findOverlappingEventsFor(fragmentEntry.fragmentId, linkedFragmentId, fragmentEntry.direction, events, txn, log);
|
||||||
|
}
|
||||||
|
|
||||||
|
async _findExpectedOverlappingEventId(linkedFragmentId, direction, txn) {
|
||||||
const eventEntry = await this._findFragmentEdgeEvent(
|
const eventEntry = await this._findFragmentEdgeEvent(
|
||||||
fragmentEntry.linkedFragmentId,
|
linkedFragmentId,
|
||||||
// reverse because it's the oppose edge of the linked fragment
|
// reverse because it's the oppose edge of the linked fragment
|
||||||
fragmentEntry.direction.reverse(),
|
direction.reverse(),
|
||||||
txn);
|
txn);
|
||||||
if (eventEntry) {
|
if (eventEntry) {
|
||||||
return eventEntry.event.event_id;
|
return eventEntry.event.event_id;
|
||||||
|
@ -205,6 +212,90 @@ export class GapWriter {
|
||||||
return changedFragments;
|
return changedFragments;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If searching for overlapping entries in two directions,
|
||||||
|
* combine the results of the two searches.
|
||||||
|
*
|
||||||
|
* @param mainOverlap the result of a search that located an existing fragment.
|
||||||
|
* @param otherOverlap the result of a search in the opposite direction to mainOverlap.
|
||||||
|
* @param event the event from which the two-directional search occured.
|
||||||
|
* @param token the new pagination token for mainOverlap.
|
||||||
|
*/
|
||||||
|
async _linkOverlapping(mainOverlap, otherOverlap, event, token, state, txn, log) {
|
||||||
|
const fragmentEntry = mainOverlap.neighbourFragmentEntry;
|
||||||
|
const otherEntry = otherOverlap.neighbourFragmentEntry;
|
||||||
|
|
||||||
|
// We're filling the entry from the opposite direction that the search occured
|
||||||
|
// (e.g. searched up, filling down). Thus, the events need to be added in the opposite
|
||||||
|
// order.
|
||||||
|
const allEvents = mainOverlap.nonOverlappingEvents.reverse();
|
||||||
|
allEvents.push(event, ...otherOverlap.nonOverlappingEvents);
|
||||||
|
|
||||||
|
// TODO Very important: can the 'up' and 'down' entries be the same? If that's
|
||||||
|
// the case, we can end up with a self-link (and thus infinite loop).
|
||||||
|
|
||||||
|
let lastKey = await this._findFragmentEdgeEventKey(fragmentEntry, txn);
|
||||||
|
const {entries, updatedEntries} = await this._storeEvents(allEvents, lastKey, fragmentEntry.direction, state, txn, log);
|
||||||
|
const fragments = await this._updateFragments(fragmentEntry, otherEntry, token, entries, txn);
|
||||||
|
const contextEvent = entries.find(e => e.id === event.event_id) || null;
|
||||||
|
return { entries, updatedEntries, fragments, contextEvent };
|
||||||
|
}
|
||||||
|
|
||||||
|
async _createNewFragment(txn) {
|
||||||
|
const maxFragmentKey = await txn.timelineFragments.getMaxFragmentId(this._roomId);
|
||||||
|
const newFragment = {
|
||||||
|
roomId: this._roomId,
|
||||||
|
id: maxFragmentKey + 1,
|
||||||
|
previousId: null,
|
||||||
|
nextId: null,
|
||||||
|
previousToken: null,
|
||||||
|
nextToken: null
|
||||||
|
};
|
||||||
|
txn.timelineFragments.add(newFragment);
|
||||||
|
return newFragment;
|
||||||
|
}
|
||||||
|
|
||||||
|
async writeContext(response, txn, log) {
|
||||||
|
const {
|
||||||
|
events_before: eventsBefore,
|
||||||
|
events_after: eventsAfter,
|
||||||
|
event, state, start, end
|
||||||
|
} = response;
|
||||||
|
|
||||||
|
if (!Array.isArray(eventsBefore) || !Array.isArray(eventsAfter)) {
|
||||||
|
throw new Error("Invalid chunks in response");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!start || !end) {
|
||||||
|
throw new Error("Context call did not receive start and end tokens");
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventEntry = await txn.timelineEvents.getByEventId(this._roomId, event.event_id);
|
||||||
|
if (eventEntry) {
|
||||||
|
// If we have the current event, eary return.
|
||||||
|
return { entries: [], updatedEntries: [], fragments: [], contextEvent: new EventEntry(eventEntry, this._fragmentIdComparer) }
|
||||||
|
}
|
||||||
|
|
||||||
|
const overlapUp = await this._findOverlappingEventsFor(null, null, Direction.Backward, eventsBefore, txn, log);
|
||||||
|
const overlapDown = await this._findOverlappingEventsFor(null, null, Direction.Forward, eventsAfter, txn, log);
|
||||||
|
if (overlapUp.neighbourFragmentEntry) {
|
||||||
|
return this._linkOverlapping(overlapUp, overlapDown, event, end, state, txn, log);
|
||||||
|
} else if (overlapDown.neighbourFragmentEntry) {
|
||||||
|
return this._linkOverlapping(overlapDown, overlapUp, event, start, state, txn, log);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No overlapping fragments found.
|
||||||
|
const newFragment = await this._createNewFragment(txn);
|
||||||
|
newFragment.nextToken = end;
|
||||||
|
newFragment.previousToken = start;
|
||||||
|
|
||||||
|
// Pretend that we did find an overlapping entry above, and that this entry is for the new fragment.
|
||||||
|
const newEntry = FragmentBoundaryEntry.end(newFragment, this._fragmentIdComparer);
|
||||||
|
overlapUp.neighbourFragmentEntry = newEntry;
|
||||||
|
const linkResult = await this._linkOverlapping(overlapUp, overlapDown, event, end, state, txn, log);
|
||||||
|
linkResult.fragments.push(newFragment);
|
||||||
|
return linkResult;
|
||||||
|
}
|
||||||
|
|
||||||
async writeFragmentFill(fragmentEntry, response, txn, log) {
|
async writeFragmentFill(fragmentEntry, response, txn, log) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
// chunk is in reverse-chronological order when backwards
|
// chunk is in reverse-chronological order when backwards
|
||||||
|
@ -258,7 +349,6 @@ export class GapWriter {
|
||||||
import {FragmentIdComparer} from "../FragmentIdComparer.js";
|
import {FragmentIdComparer} from "../FragmentIdComparer.js";
|
||||||
import {RelationWriter} from "./RelationWriter.js";
|
import {RelationWriter} from "./RelationWriter.js";
|
||||||
import {createMockStorage} from "../../../../mocks/Storage.js";
|
import {createMockStorage} from "../../../../mocks/Storage.js";
|
||||||
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
|
|
||||||
import {NullLogItem} from "../../../../logging/NullLogger.js";
|
import {NullLogItem} from "../../../../logging/NullLogger.js";
|
||||||
import {TimelineMock, eventIds, eventId} from "../../../../mocks/TimelineMock.ts";
|
import {TimelineMock, eventIds, eventId} from "../../../../mocks/TimelineMock.ts";
|
||||||
import {SyncWriter} from "./SyncWriter.js";
|
import {SyncWriter} from "./SyncWriter.js";
|
||||||
|
@ -320,6 +410,12 @@ export function tests() {
|
||||||
await gapWriter.writeFragmentFill(fragmentEntry, messageResponse, txn, logger);
|
await gapWriter.writeFragmentFill(fragmentEntry, messageResponse, txn, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function contextAndWrite(mocks, eventId) {
|
||||||
|
const {txn, timelineMock, gapWriter} = mocks;
|
||||||
|
const contextResponse = timelineMock.context(eventId);
|
||||||
|
return await gapWriter.writeContext(contextResponse, txn, logger);
|
||||||
|
}
|
||||||
|
|
||||||
async function allFragmentEvents(mocks, fragmentId) {
|
async function allFragmentEvents(mocks, fragmentId) {
|
||||||
const {txn} = mocks;
|
const {txn} = mocks;
|
||||||
const entries = await txn.timelineEvents.eventsAfter(roomId, new EventKey(fragmentId, KeyLimits.minStorageKey));
|
const entries = await txn.timelineEvents.eventsAfter(roomId, new EventKey(fragmentId, KeyLimits.minStorageKey));
|
||||||
|
@ -419,6 +515,76 @@ export function tests() {
|
||||||
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
|
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
|
||||||
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
|
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
|
||||||
assertDeepLink(assert, firstFragment, secondFragment)
|
assertDeepLink(assert, firstFragment, secondFragment)
|
||||||
}
|
},
|
||||||
|
"Context sync before anything else just creates a new fragment": async assert => {
|
||||||
|
const mocks = await setup();
|
||||||
|
const { timelineMock } = mocks;
|
||||||
|
timelineMock.append(10);
|
||||||
|
const {fragments} = await contextAndWrite(mocks, eventId(5));
|
||||||
|
|
||||||
|
assert.equal(fragments.length, 1);
|
||||||
|
const events = await allFragmentEvents(mocks, fragments[0].id);
|
||||||
|
assert.deepEqual(events.map(e => e.event_id), eventIds(0, 10));
|
||||||
|
},
|
||||||
|
"Context sync after a single live frament just creates a new fragment.": async assert => {
|
||||||
|
const mocks = await setup();
|
||||||
|
const { timelineMock } = mocks;
|
||||||
|
timelineMock.append(20);
|
||||||
|
const {fragmentEntry} = await syncAndWrite(mocks);
|
||||||
|
const {fragments} = await contextAndWrite(mocks, eventId(5));
|
||||||
|
|
||||||
|
assert.equal(fragments.length, 1);
|
||||||
|
const contextEvents = await allFragmentEvents(mocks, fragments[0].id);
|
||||||
|
assert.deepEqual(contextEvents.map(e => e.event_id), eventIds(0, 10));
|
||||||
|
const syncEvents = await allFragmentEvents(mocks, fragmentEntry.fragmentId);
|
||||||
|
assert.deepEqual(syncEvents.map(e => e.event_id), eventIds(10, 20));
|
||||||
|
|
||||||
|
const contextFragment = await fetchFragment(mocks, fragments[0].id);
|
||||||
|
const syncFragment = await fetchFragment(mocks, fragmentEntry.fragmentId);
|
||||||
|
assert.equal(contextFragment.nextId, null);
|
||||||
|
assert.equal(syncFragment.previousId, null);
|
||||||
|
},
|
||||||
|
"Context sync after another fragment appends to that fragment.": async assert => {
|
||||||
|
const mocks = await setup();
|
||||||
|
const { timelineMock } = mocks;
|
||||||
|
timelineMock.append(20);
|
||||||
|
const {fragments: firstFragments} = await contextAndWrite(mocks, eventId(5));
|
||||||
|
const {fragments: secondFragments} = await contextAndWrite(mocks, eventId(11));
|
||||||
|
|
||||||
|
assert.equal(firstFragments.length, 1);
|
||||||
|
assert.equal(secondFragments.length, 0);
|
||||||
|
const events = await allFragmentEvents(mocks, firstFragments[0].id);
|
||||||
|
assert.deepEqual(events.map(e => e.event_id), eventIds(0, 16));
|
||||||
|
},
|
||||||
|
"Context sync before another fragment prepends to that fragment.": async assert => {
|
||||||
|
const mocks = await setup();
|
||||||
|
const { timelineMock } = mocks;
|
||||||
|
timelineMock.append(20);
|
||||||
|
const {fragments: firstFragments} = await contextAndWrite(mocks, eventId(11));
|
||||||
|
const {fragments: secondFragments} = await contextAndWrite(mocks, eventId(5));
|
||||||
|
|
||||||
|
assert.equal(firstFragments.length, 1);
|
||||||
|
assert.equal(secondFragments.length, 0);
|
||||||
|
const events = await allFragmentEvents(mocks, firstFragments[0].id);
|
||||||
|
assert.deepEqual(events.map(e => e.event_id), eventIds(0, 16));
|
||||||
|
},
|
||||||
|
"Context sync between two fragments links the two fragments, and fills one of them.": async assert => {
|
||||||
|
const mocks = await setup();
|
||||||
|
const { timelineMock } = mocks;
|
||||||
|
timelineMock.append(30);
|
||||||
|
// [0][(1)][2][3][4][5][6][7][8][9] | [10][(11)][12] | [13][14][15][16][17][(18)][19][20][21][22]
|
||||||
|
const {fragments: firstFragments} = await contextAndWrite(mocks, eventId(1));
|
||||||
|
const {fragments: secondFragments} = await contextAndWrite(mocks, eventId(18));
|
||||||
|
const {fragments: thirdFragments} = await contextAndWrite(mocks, eventId(11));
|
||||||
|
|
||||||
|
assert.equal(firstFragments.length, 1);
|
||||||
|
assert.equal(secondFragments.length, 1);
|
||||||
|
assert.equal(thirdFragments.length, 2); // Linked fragments
|
||||||
|
|
||||||
|
const firstEvents = await allFragmentEvents(mocks, firstFragments[0].id);
|
||||||
|
assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 13));
|
||||||
|
const secondEvents = await allFragmentEvents(mocks, secondFragments[0].id);
|
||||||
|
assert.deepEqual(secondEvents.map(e => e.event_id), eventIds(13,23));
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,8 @@ export class SyncWriter {
|
||||||
} else if (timeline.limited) {
|
} else if (timeline.limited) {
|
||||||
// replace live fragment for limited sync, *only* if we had a live fragment already
|
// replace live fragment for limited sync, *only* if we had a live fragment already
|
||||||
const oldFragmentId = currentKey.fragmentId;
|
const oldFragmentId = currentKey.fragmentId;
|
||||||
currentKey = currentKey.nextFragmentKey();
|
const maxFragmentId = await txn.timelineFragments.getMaxFragmentId(this._roomId);
|
||||||
|
currentKey = EventKey.defaultFragmentKey(maxFragmentId + 1);
|
||||||
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
|
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
|
||||||
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
||||||
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
||||||
|
|
|
@ -16,7 +16,7 @@ limitations under the License.
|
||||||
|
|
||||||
import { StorageError } from "../../common";
|
import { StorageError } from "../../common";
|
||||||
import {KeyLimits} from "../../common";
|
import {KeyLimits} from "../../common";
|
||||||
import { encodeUint32 } from "../utils";
|
import { encodeUint32, decodeUint32 } from "../utils";
|
||||||
import {Store} from "../Store";
|
import {Store} from "../Store";
|
||||||
|
|
||||||
interface Fragment {
|
interface Fragment {
|
||||||
|
@ -34,6 +34,11 @@ function encodeKey(roomId: string, fragmentId: number): string {
|
||||||
return `${roomId}|${encodeUint32(fragmentId)}`;
|
return `${roomId}|${encodeUint32(fragmentId)}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function decodeKey(key) {
|
||||||
|
const [roomId, fragmentId] = key.split("|");
|
||||||
|
return { roomId, fragmentId: decodeUint32(fragmentId) };
|
||||||
|
}
|
||||||
|
|
||||||
export class TimelineFragmentStore {
|
export class TimelineFragmentStore {
|
||||||
private _store: Store<FragmentEntry>;
|
private _store: Store<FragmentEntry>;
|
||||||
|
|
||||||
|
@ -71,6 +76,14 @@ export class TimelineFragmentStore {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getMaxFragmentId(roomId) {
|
||||||
|
const maxKey = await this._store.findMaxKey(this._allRange(roomId));
|
||||||
|
if (!maxKey) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return decodeKey(maxKey).fragmentId;
|
||||||
|
}
|
||||||
|
|
||||||
// should generate an id an return it?
|
// should generate an id an return it?
|
||||||
// depends if we want to do anything smart with fragment ids,
|
// depends if we want to do anything smart with fragment ids,
|
||||||
// like give them meaning depending on range. not for now probably ...
|
// like give them meaning depending on range. not for now probably ...
|
||||||
|
|
Reference in a new issue