Merge pull request #494 from vector-im/DanilaFe/backfill-changes

Unit tests for GapWriter, using a new timeline mock utility
This commit is contained in:
Bruno Windels 2021-09-23 10:15:37 +02:00 committed by GitHub
commit 45917eae1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 463 additions and 15 deletions

View File

@ -75,6 +75,8 @@ export class NullLogItem {
refDetached() {}
ensureRefId() {}
get level() {
return LogLevel;
}

View File

@ -253,3 +253,176 @@ export class GapWriter {
return {entries, updatedEntries, fragments};
}
}
import {FragmentIdComparer} from "../FragmentIdComparer.js";
import {RelationWriter} from "./RelationWriter.js";
import {createMockStorage} from "../../../../mocks/Storage.js";
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
import {NullLogItem} from "../../../../logging/NullLogger.js";
import {TimelineMock, eventIds, eventId} from "../../../../mocks/TimelineMock.ts";
import {SyncWriter} from "./SyncWriter.js";
import {MemberWriter} from "./MemberWriter.js";
import {KeyLimits} from "../../../storage/common";
export function tests() {
const roomId = "!room:hs.tdl";
const alice = "alice@hs.tdl";
const logger = new NullLogItem();
async function createGapFillTxn(storage) {
return storage.readWriteTxn([
storage.storeNames.roomMembers,
storage.storeNames.pendingEvents,
storage.storeNames.timelineEvents,
storage.storeNames.timelineRelations,
storage.storeNames.timelineFragments,
]);
}
async function setup() {
const storage = await createMockStorage();
const txn = await createGapFillTxn(storage);
const fragmentIdComparer = new FragmentIdComparer([]);
const relationWriter = new RelationWriter({
roomId, fragmentIdComparer, ownUserId: alice,
});
const gapWriter = new GapWriter({
roomId, storage, fragmentIdComparer, relationWriter
});
const memberWriter = new MemberWriter(roomId);
const syncWriter = new SyncWriter({
roomId,
fragmentIdComparer,
memberWriter,
relationWriter
});
return { storage, txn, fragmentIdComparer, gapWriter, syncWriter, timelineMock: new TimelineMock() };
}
async function syncAndWrite(mocks, { previous, limit } = {}) {
const {txn, timelineMock, syncWriter, fragmentIdComparer} = mocks;
const syncResponse = timelineMock.sync(previous?.next_batch, limit);
const {newLiveKey} = await syncWriter.writeSync(syncResponse, false, false, txn, logger);
syncWriter.afterSync(newLiveKey);
return {
syncResponse,
fragmentEntry: newLiveKey ? FragmentBoundaryEntry.start(
await txn.timelineFragments.get(roomId, newLiveKey.fragmentId),
fragmentIdComparer,
) : null,
};
}
async function backfillAndWrite(mocks, fragmentEntry, limit) {
const {txn, timelineMock, gapWriter} = mocks;
const messageResponse = timelineMock.messages(fragmentEntry.token, undefined, fragmentEntry.direction.asApiString(), limit);
await gapWriter.writeFragmentFill(fragmentEntry, messageResponse, txn, logger);
}
async function allFragmentEvents(mocks, fragmentId) {
const {txn} = mocks;
const entries = await txn.timelineEvents.eventsAfter(roomId, new EventKey(fragmentId, KeyLimits.minStorageKey));
return entries.map(e => e.event);
}
async function fetchFragment(mocks, fragmentId) {
const {txn} = mocks;
return txn.timelineFragments.get(roomId, fragmentId);
}
function assertFilledLink(assert, fragment1, fragment2) {
assert.equal(fragment1.nextId, fragment2.id);
assert.equal(fragment2.previousId, fragment1.id);
assert.equal(fragment1.nextToken, null);
assert.equal(fragment2.previousToken, null);
}
function assertGapLink(assert, fragment1, fragment2) {
assert.equal(fragment1.nextId, fragment2.id);
assert.equal(fragment2.previousId, fragment1.id);
assert.notEqual(fragment2.previousToken, null);
}
return {
"Backfilling after one sync": async assert => {
const mocks = await setup();
const { timelineMock } = mocks;
timelineMock.append(30);
const {fragmentEntry} = await syncAndWrite(mocks);
await backfillAndWrite(mocks, fragmentEntry, 10);
const events = await allFragmentEvents(mocks, fragmentEntry.fragmentId);
assert.deepEqual(events.map(e => e.event_id), eventIds(10, 30));
await mocks.txn.complete();
},
"Backfilling a fragment that is expected to close a gap, and does": async assert => {
const mocks = await setup();
const { timelineMock } = mocks;
timelineMock.append(10);
const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 });
timelineMock.append(15);
const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 });
await backfillAndWrite(mocks, secondFragmentEntry, 10);
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
assertFilledLink(assert, firstFragment, secondFragment)
const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId);
assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10));
const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId);
assert.deepEqual(secondEvents.map(e => e.event_id), eventIds(10, 25));
await mocks.txn.complete();
},
"Backfilling a fragment that is expected to close a gap, but doesn't yet": async assert => {
const mocks = await setup();
const { timelineMock } = mocks;
timelineMock.append(10);
const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 });
timelineMock.append(20);
const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 });
await backfillAndWrite(mocks, secondFragmentEntry, 10);
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
assertGapLink(assert, firstFragment, secondFragment)
const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId);
assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10));
const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId);
assert.deepEqual(secondEvents.map(e => e.event_id), eventIds(10, 30));
await mocks.txn.complete();
},
"Receiving a sync with the same events as the current fragment does not create infinite link": async assert => {
const mocks = await setup();
const { txn, timelineMock } = mocks;
timelineMock.append(10);
const {syncResponse, fragmentEntry: fragmentEntry} = await syncAndWrite(mocks, { limit: 10 });
// Mess with the saved token to receive old events in backfill
fragmentEntry.token = syncResponse.next_batch;
txn.timelineFragments.update(fragmentEntry.fragment);
await backfillAndWrite(mocks, fragmentEntry, 10);
const fragment = await fetchFragment(mocks, fragmentEntry.fragmentId);
assert.notEqual(fragment.nextId, fragment.id);
assert.notEqual(fragment.previousId, fragment.id);
await mocks.txn.complete();
},
"An event received by sync does not interrupt backfilling": async assert => {
const mocks = await setup();
const { timelineMock } = mocks;
timelineMock.append(10);
const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 });
timelineMock.append(11);
const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 });
timelineMock.insertAfter(eventId(9), 5);
await backfillAndWrite(mocks, secondFragmentEntry, 10);
const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId);
assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10));
const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId);
assert.deepEqual(secondEvents.map(e => e.event_id), [...eventIds(21,26), ...eventIds(10, 21)]);
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
assertFilledLink(assert, firstFragment, secondFragment)
await mocks.txn.complete();
}
}
}

View File

@ -15,6 +15,7 @@ limitations under the License.
*/
import {iterateCursor, DONE, NOT_DONE, reqAsPromise} from "./utils";
import {Transaction} from "./Transaction";
type Reducer<A,B> = (acc: B, val: A) => B
@ -31,9 +32,19 @@ interface QueryTargetInterface<T> {
export class QueryTarget<T> {
protected _target: QueryTargetInterface<T>;
protected _transaction: Transaction;
constructor(target: QueryTargetInterface<T>) {
constructor(target: QueryTargetInterface<T>, transaction: Transaction) {
this._target = target;
this._transaction = transaction;
}
get idbFactory(): IDBFactory {
return this._transaction.idbFactory;
}
get IDBKeyRange(): typeof IDBKeyRange {
return this._transaction.IDBKeyRange;
}
_openCursor(range?: IDBQuery, direction?: IDBCursorDirection): IDBRequest<IDBCursorWithValue | null> {
@ -155,11 +166,11 @@ export class QueryTarget<T> {
*/
async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, found: boolean) => boolean): Promise<void> {
const direction = backwards ? "prev" : "next";
const compareKeys = (a, b) => backwards ? -indexedDB.cmp(a, b) : indexedDB.cmp(a, b);
const compareKeys = (a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b);
const sortedKeys = keys.slice().sort(compareKeys);
const firstKey = backwards ? sortedKeys[sortedKeys.length - 1] : sortedKeys[0];
const lastKey = backwards ? sortedKeys[0] : sortedKeys[sortedKeys.length - 1];
const cursor = this._target.openKeyCursor(IDBKeyRange.bound(firstKey, lastKey), direction);
const cursor = this._target.openKeyCursor(this.IDBKeyRange.bound(firstKey, lastKey), direction);
let i = 0;
let consumerDone = false;
await iterateCursor(cursor, (value, key) => {

View File

@ -24,12 +24,15 @@ const WEBKITEARLYCLOSETXNBUG_BOGUS_KEY = "782rh281re38-boguskey";
export class Storage {
private _db: IDBDatabase;
private _hasWebkitEarlyCloseTxnBug: boolean;
readonly logger: BaseLogger;
readonly idbFactory: IDBFactory
readonly IDBKeyRange: typeof IDBKeyRange;
readonly storeNames: typeof StoreNames;
constructor(idbDatabase: IDBDatabase, _IDBKeyRange: typeof IDBKeyRange, hasWebkitEarlyCloseTxnBug: boolean, logger: BaseLogger) {
constructor(idbDatabase: IDBDatabase, idbFactory: IDBFactory, _IDBKeyRange: typeof IDBKeyRange, hasWebkitEarlyCloseTxnBug: boolean, logger: BaseLogger) {
this._db = idbDatabase;
this.idbFactory = idbFactory;
this.IDBKeyRange = _IDBKeyRange;
this._hasWebkitEarlyCloseTxnBug = hasWebkitEarlyCloseTxnBug;
this.storeNames = StoreNames;

View File

@ -71,7 +71,7 @@ export class StorageFactory {
const hasWebkitEarlyCloseTxnBug = await detectWebkitEarlyCloseTxnBug(this._idbFactory);
const db = await openDatabaseWithSessionId(sessionId, this._idbFactory, log);
return new Storage(db, this._IDBKeyRange, hasWebkitEarlyCloseTxnBug, log.logger);
return new Storage(db, this._idbFactory, this._IDBKeyRange, hasWebkitEarlyCloseTxnBug, log.logger);
}
delete(sessionId: string): Promise<IDBDatabase> {

View File

@ -133,16 +133,8 @@ class QueryTargetWrapper<T> {
}
export class Store<T> extends QueryTarget<T> {
private _transaction: Transaction;
constructor(idbStore: IDBObjectStore, transaction: Transaction) {
super(new QueryTargetWrapper<T>(idbStore));
this._transaction = transaction;
}
get IDBKeyRange() {
// @ts-ignore
return this._transaction.IDBKeyRange;
super(new QueryTargetWrapper<T>(idbStore), transaction);
}
get _idbStore(): QueryTargetWrapper<T> {
@ -150,7 +142,7 @@ export class Store<T> extends QueryTarget<T> {
}
index(indexName: string): QueryTarget<T> {
return new QueryTarget<T>(new QueryTargetWrapper<T>(this._idbStore.index(indexName)));
return new QueryTarget<T>(new QueryTargetWrapper<T>(this._idbStore.index(indexName)), this._transaction);
}
put(value: T, log?: LogItem): void {

View File

@ -65,6 +65,10 @@ export class Transaction {
this._writeErrors = [];
}
get idbFactory(): IDBFactory {
return this._storage.idbFactory;
}
get IDBKeyRange(): typeof IDBKeyRange {
return this._storage.IDBKeyRange;
}

263
src/mocks/TimelineMock.ts Normal file
View File

@ -0,0 +1,263 @@
import {createEvent, withTextBody, withSender} from "./event.js";
import {TimelineEvent} from "../matrix/storage/types";
export const TIMELINE_START_TOKEN = "timeline_start";
export function eventId(i: number): string {
return `$event${i}`;
}
/** `from` is included, `to` is excluded */
export function eventIds(from: number, to: number): string[] {
return [...Array(to-from).keys()].map(i => eventId(i + from));
}
export class TimelineMock {
private _counter: number;
private _dagOrder: TimelineEvent[];
private _syncOrder: TimelineEvent[];
private _defaultSender: string;
constructor(defaultSender: string) {
this._counter = 0;
this._dagOrder = [];
this._syncOrder = [];
this._defaultSender = defaultSender;
}
_defaultEvent(id: string): TimelineEvent {
return withTextBody(`This is event ${id}`, withSender(this._defaultSender, createEvent("m.room.message", id)));
}
_createEvent(func?: (eventId: string) => TimelineEvent): TimelineEvent {
const id = eventId(this._counter++);
return func ? func(id) : this._defaultEvent(id);
}
_createEvents(n: number, func?: (eventId: string) => TimelineEvent) {
const events: TimelineEvent[] = [];
for (let i = 0; i < n; i++) {
events.push(this._createEvent(func));
}
return events;
}
insertAfter(token: string, n: number, func?: (eventId: string) => TimelineEvent) {
const events = this._createEvents(n, func);
const index = this._findIndex(token, "f", this._dagOrder);
this._dagOrder.splice(index, 0, ...events);
this._syncOrder.push(...events);
return events[events.length - 1]?.event_id;
}
append(n: number, func?: (eventId: string) => TimelineEvent) {
const events = this._createEvents(n, func);
this._dagOrder.push(...events);
this._syncOrder.push(...events);
return events[events.length - 1]?.event_id;
}
_getStep(direction: "f" | "b") : 1 | -1 {
return direction === "f" ? 1 : -1;
}
_findIndex(token: string, direction: "f" | "b", eventOrdering: TimelineEvent[]): number {
const step = this._getStep(direction);
if (token === TIMELINE_START_TOKEN) {
const firstSyncEvent = this._syncOrder[0];
if (!firstSyncEvent) {
// We have no events at all. Wherever you start looking,
// you'll stop looking right away. Zero works as well as anything else.
return 0;
}
const orderIndex = eventOrdering.findIndex(e => e.event_id === firstSyncEvent.event_id);
return orderIndex;
}
// All other tokens are (non-inclusive) event indices
const index = eventOrdering.findIndex(e => e.event_id === token);
if (index === -1) {
// We didn't find this event token at all. What are we
// even looking at?
throw new Error("Invalid token passed to TimelineMock");
}
return index + step;
}
messages(begin: string, end: string | undefined, direction: "f" | "b", limit: number = 10) {
const step = this._getStep(direction);
let index = this._findIndex(begin, direction, this._dagOrder);
const chunk: TimelineEvent[] = [];
for (; limit > 0 && index >= 0 && index < this._dagOrder.length; index += step, limit--) {
if (this._dagOrder[index].event_id === end) {
break;
}
chunk.push(this._dagOrder[index]);
}
return {
start: begin,
end: chunk[chunk.length - 1]?.event_id || begin,
chunk,
state: []
};
}
context(eventId: string, limit: number = 10) {
if (limit <= 0) {
throw new Error("Cannot fetch zero or less events!");
}
let eventIndex = this._dagOrder.findIndex(e => e.event_id === eventId);
if (eventIndex === -1) {
throw new Error("Fetching context for unknown event");
}
const event = this._dagOrder[eventIndex];
let offset = 1;
const eventsBefore: TimelineEvent[] = [];
const eventsAfter: TimelineEvent[] = [];
while (limit !== 0 && (eventIndex - offset >= 0 || eventIndex + offset < this._dagOrder.length)) {
if (eventIndex - offset >= 0) {
eventsBefore.push(this._dagOrder[eventIndex - offset]);
limit--;
}
if (limit !== 0 && eventIndex + offset < this._dagOrder.length) {
eventsAfter.push(this._dagOrder[eventIndex + offset]);
limit--;
}
offset++;
}
return {
start: eventsBefore[eventsBefore.length - 1]?.event_id || eventId,
end: eventsAfter[eventsAfter.length - 1]?.event_id || eventId,
event,
events_before: eventsBefore,
events_after: eventsAfter,
state: []
};
}
sync(since?: string, limit: number = 10) {
const startAt = since ? this._findIndex(since, "f", this._syncOrder) : 0;
const index = Math.max(this._syncOrder.length - limit, startAt);
const limited = this._syncOrder.length - startAt > limit;
const events: TimelineEvent[] = [];
for(let i = index; i < this._syncOrder.length; i++) {
events.push(this._syncOrder[i]);
}
return {
next_batch: events[events.length - 1]?.event_id || since || TIMELINE_START_TOKEN,
timeline: {
prev_batch: events[0]?.event_id || since || TIMELINE_START_TOKEN,
events,
limited
}
}
}
}
export function tests() {
const SENDER = "@alice:hs.tdl";
return {
"Append events are returned via sync": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(10);
const syncResponse = timeline.sync();
const events = syncResponse.timeline.events.map(e => e.event_id);
assert.deepEqual(events, eventIds(0, 10));
assert.equal(syncResponse.timeline.limited, false);
},
"Limiting a sync properly limits the returned events": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(20);
const syncResponse = timeline.sync(undefined, 10);
const events = syncResponse.timeline.events.map(e => e.event_id);
assert.deepEqual(events, eventIds(10, 20));
assert.equal(syncResponse.timeline.limited, true);
},
"The context endpoint returns messages in DAG order around an event": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
const context = timeline.context(eventId(15));
assert.equal(context.event.event_id, eventId(15));
assert.deepEqual(context.events_before.map(e => e.event_id).reverse(), eventIds(10, 15));
assert.deepEqual(context.events_after.map(e => e.event_id), eventIds(16, 21));
},
"The context endpoint returns the proper number of messages": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
for (const i of new Array(29).keys()) {
const middleFetch = timeline.context(eventId(15), i + 1);
assert.equal(middleFetch.events_before.length + middleFetch.events_after.length, i + 1);
const startFetch = timeline.context(eventId(1), i + 1);
assert.equal(startFetch.events_before.length + startFetch.events_after.length, i + 1);
const endFetch = timeline.context(eventId(28), i + 1);
assert.equal(endFetch.events_before.length + endFetch.events_after.length, i + 1);
}
},
"The previous batch from a sync returns the previous events": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(20);
const sync = timeline.sync(undefined, 10);
const messages = timeline.messages(sync.timeline.prev_batch, undefined, "b");
const events = messages.chunk.map(e => e.event_id).reverse();
assert.deepEqual(events, eventIds(0, 10));
},
"Two consecutive message fetches are continuous if no new events are inserted": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
const sync = timeline.sync(undefined, 10);
const messages1 = timeline.messages(sync.timeline.prev_batch, undefined, "b");
const events1 = messages1.chunk.map(e => e.event_id).reverse();
assert.deepEqual(events1, eventIds(10, 20));
const messages2 = timeline.messages(messages1.end, undefined, "b");
const events2 = messages2.chunk.map(e => e.event_id).reverse();
assert.deepEqual(events2, eventIds(0, 10));
},
"Two consecutive message fetches detect newly inserted event": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
const messages1 = timeline.messages(eventId(20), undefined, "b", 10);
const events1 = messages1.chunk.map(e => e.event_id).reverse();
assert.deepEqual(events1, eventIds(10, 20));
timeline.insertAfter(eventId(9), 1);
const messages2 = timeline.messages(eventId(10), undefined, "b", 10);
const events2 = messages2.chunk.map(e => e.event_id).reverse();
const expectedEvents2 = eventIds(1, 10);
expectedEvents2.push(eventId(30));
assert.deepEqual(events2, expectedEvents2);
},
"A sync that receives no events has the same next batch as it started with": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(10);
const sync1 = timeline.sync();
const sync2 = timeline.sync(sync1.next_batch);
assert.equal(sync1.next_batch, sync2.next_batch);
},
"An event inserted at the staart still shows up in a sync": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
const sync1 = timeline.sync(undefined, 10);
const sync2 = timeline.sync(sync1.next_batch, 10)
assert.deepEqual(sync2.timeline.events, []);
assert.equal(sync2.timeline.limited, false);
timeline.insertAfter(TIMELINE_START_TOKEN, 1);
const sync3 = timeline.sync(sync2.next_batch, 10)
const events = sync3.timeline.events.map(e => e.event_id);
assert.deepEqual(events, [eventId(30)]);
},
"An event inserted at the start does not show up in a non-overlapping message fetch": assert => {
const timeline = new TimelineMock(SENDER);
timeline.append(30);
const sync1 = timeline.sync(undefined, 10);
const messages1 = timeline.messages(sync1.timeline.prev_batch, undefined, "f", 10);
timeline.insertAfter(TIMELINE_START_TOKEN, 1);
const messages2 = timeline.messages(sync1.timeline.prev_batch, undefined, "f", 10);
assert.deepEqual(messages1.chunk, messages2.chunk);
},
}
}