diff --git a/src/logging/NullLogger.js b/src/logging/NullLogger.js index da04f16b..060212bd 100644 --- a/src/logging/NullLogger.js +++ b/src/logging/NullLogger.js @@ -75,6 +75,8 @@ export class NullLogItem { refDetached() {} + ensureRefId() {} + get level() { return LogLevel; } diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index ec23db5a..d51f4a18 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -253,3 +253,176 @@ export class GapWriter { return {entries, updatedEntries, fragments}; } } + +import {FragmentIdComparer} from "../FragmentIdComparer.js"; +import {RelationWriter} from "./RelationWriter.js"; +import {createMockStorage} from "../../../../mocks/Storage.js"; +import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js"; +import {NullLogItem} from "../../../../logging/NullLogger.js"; +import {TimelineMock, eventIds, eventId} from "../../../../mocks/TimelineMock.ts"; +import {SyncWriter} from "./SyncWriter.js"; +import {MemberWriter} from "./MemberWriter.js"; +import {KeyLimits} from "../../../storage/common"; + +export function tests() { + const roomId = "!room:hs.tdl"; + const alice = "alice@hs.tdl"; + const logger = new NullLogItem(); + + async function createGapFillTxn(storage) { + return storage.readWriteTxn([ + storage.storeNames.roomMembers, + storage.storeNames.pendingEvents, + storage.storeNames.timelineEvents, + storage.storeNames.timelineRelations, + storage.storeNames.timelineFragments, + ]); + } + + async function setup() { + const storage = await createMockStorage(); + const txn = await createGapFillTxn(storage); + const fragmentIdComparer = new FragmentIdComparer([]); + const relationWriter = new RelationWriter({ + roomId, fragmentIdComparer, ownUserId: alice, + }); + const gapWriter = new GapWriter({ + roomId, storage, fragmentIdComparer, relationWriter + }); + const memberWriter = new MemberWriter(roomId); + const syncWriter = new SyncWriter({ + roomId, + fragmentIdComparer, + memberWriter, + relationWriter + }); + return { storage, txn, fragmentIdComparer, gapWriter, syncWriter, timelineMock: new TimelineMock() }; + } + + async function syncAndWrite(mocks, { previous, limit } = {}) { + const {txn, timelineMock, syncWriter, fragmentIdComparer} = mocks; + const syncResponse = timelineMock.sync(previous?.next_batch, limit); + const {newLiveKey} = await syncWriter.writeSync(syncResponse, false, false, txn, logger); + syncWriter.afterSync(newLiveKey); + return { + syncResponse, + fragmentEntry: newLiveKey ? FragmentBoundaryEntry.start( + await txn.timelineFragments.get(roomId, newLiveKey.fragmentId), + fragmentIdComparer, + ) : null, + }; + } + + async function backfillAndWrite(mocks, fragmentEntry, limit) { + const {txn, timelineMock, gapWriter} = mocks; + const messageResponse = timelineMock.messages(fragmentEntry.token, undefined, fragmentEntry.direction.asApiString(), limit); + await gapWriter.writeFragmentFill(fragmentEntry, messageResponse, txn, logger); + } + + async function allFragmentEvents(mocks, fragmentId) { + const {txn} = mocks; + const entries = await txn.timelineEvents.eventsAfter(roomId, new EventKey(fragmentId, KeyLimits.minStorageKey)); + return entries.map(e => e.event); + } + + async function fetchFragment(mocks, fragmentId) { + const {txn} = mocks; + return txn.timelineFragments.get(roomId, fragmentId); + } + + function assertFilledLink(assert, fragment1, fragment2) { + assert.equal(fragment1.nextId, fragment2.id); + assert.equal(fragment2.previousId, fragment1.id); + assert.equal(fragment1.nextToken, null); + assert.equal(fragment2.previousToken, null); + } + + function assertGapLink(assert, fragment1, fragment2) { + assert.equal(fragment1.nextId, fragment2.id); + assert.equal(fragment2.previousId, fragment1.id); + assert.notEqual(fragment2.previousToken, null); + } + + return { + "Backfilling after one sync": async assert => { + const mocks = await setup(); + const { timelineMock } = mocks; + timelineMock.append(30); + const {fragmentEntry} = await syncAndWrite(mocks); + await backfillAndWrite(mocks, fragmentEntry, 10); + const events = await allFragmentEvents(mocks, fragmentEntry.fragmentId); + assert.deepEqual(events.map(e => e.event_id), eventIds(10, 30)); + await mocks.txn.complete(); + }, + "Backfilling a fragment that is expected to close a gap, and does": async assert => { + const mocks = await setup(); + const { timelineMock } = mocks; + timelineMock.append(10); + const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 }); + timelineMock.append(15); + const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 }); + await backfillAndWrite(mocks, secondFragmentEntry, 10); + + const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId); + const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId); + assertFilledLink(assert, firstFragment, secondFragment) + const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId); + assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10)); + const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId); + assert.deepEqual(secondEvents.map(e => e.event_id), eventIds(10, 25)); + await mocks.txn.complete(); + }, + "Backfilling a fragment that is expected to close a gap, but doesn't yet": async assert => { + const mocks = await setup(); + const { timelineMock } = mocks; + timelineMock.append(10); + const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 }); + timelineMock.append(20); + const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 }); + await backfillAndWrite(mocks, secondFragmentEntry, 10); + + const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId); + const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId); + assertGapLink(assert, firstFragment, secondFragment) + const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId); + assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10)); + const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId); + assert.deepEqual(secondEvents.map(e => e.event_id), eventIds(10, 30)); + await mocks.txn.complete(); + }, + "Receiving a sync with the same events as the current fragment does not create infinite link": async assert => { + const mocks = await setup(); + const { txn, timelineMock } = mocks; + timelineMock.append(10); + const {syncResponse, fragmentEntry: fragmentEntry} = await syncAndWrite(mocks, { limit: 10 }); + // Mess with the saved token to receive old events in backfill + fragmentEntry.token = syncResponse.next_batch; + txn.timelineFragments.update(fragmentEntry.fragment); + await backfillAndWrite(mocks, fragmentEntry, 10); + + const fragment = await fetchFragment(mocks, fragmentEntry.fragmentId); + assert.notEqual(fragment.nextId, fragment.id); + assert.notEqual(fragment.previousId, fragment.id); + await mocks.txn.complete(); + }, + "An event received by sync does not interrupt backfilling": async assert => { + const mocks = await setup(); + const { timelineMock } = mocks; + timelineMock.append(10); + const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 }); + timelineMock.append(11); + const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 }); + timelineMock.insertAfter(eventId(9), 5); + await backfillAndWrite(mocks, secondFragmentEntry, 10); + + const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId); + assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10)); + const secondEvents = await allFragmentEvents(mocks, secondFragmentEntry.fragmentId); + assert.deepEqual(secondEvents.map(e => e.event_id), [...eventIds(21,26), ...eventIds(10, 21)]); + const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId); + const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId); + assertFilledLink(assert, firstFragment, secondFragment) + await mocks.txn.complete(); + } + } +} diff --git a/src/matrix/storage/idb/QueryTarget.ts b/src/matrix/storage/idb/QueryTarget.ts index e3b77810..3ab33a6b 100644 --- a/src/matrix/storage/idb/QueryTarget.ts +++ b/src/matrix/storage/idb/QueryTarget.ts @@ -15,6 +15,7 @@ limitations under the License. */ import {iterateCursor, DONE, NOT_DONE, reqAsPromise} from "./utils"; +import {Transaction} from "./Transaction"; type Reducer = (acc: B, val: A) => B @@ -31,9 +32,19 @@ interface QueryTargetInterface { export class QueryTarget { protected _target: QueryTargetInterface; + protected _transaction: Transaction; - constructor(target: QueryTargetInterface) { + constructor(target: QueryTargetInterface, transaction: Transaction) { this._target = target; + this._transaction = transaction; + } + + get idbFactory(): IDBFactory { + return this._transaction.idbFactory; + } + + get IDBKeyRange(): typeof IDBKeyRange { + return this._transaction.IDBKeyRange; } _openCursor(range?: IDBQuery, direction?: IDBCursorDirection): IDBRequest { @@ -155,11 +166,11 @@ export class QueryTarget { */ async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, found: boolean) => boolean): Promise { const direction = backwards ? "prev" : "next"; - const compareKeys = (a, b) => backwards ? -indexedDB.cmp(a, b) : indexedDB.cmp(a, b); + const compareKeys = (a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b); const sortedKeys = keys.slice().sort(compareKeys); const firstKey = backwards ? sortedKeys[sortedKeys.length - 1] : sortedKeys[0]; const lastKey = backwards ? sortedKeys[0] : sortedKeys[sortedKeys.length - 1]; - const cursor = this._target.openKeyCursor(IDBKeyRange.bound(firstKey, lastKey), direction); + const cursor = this._target.openKeyCursor(this.IDBKeyRange.bound(firstKey, lastKey), direction); let i = 0; let consumerDone = false; await iterateCursor(cursor, (value, key) => { diff --git a/src/matrix/storage/idb/Storage.ts b/src/matrix/storage/idb/Storage.ts index 6f7f4c90..72be55ce 100644 --- a/src/matrix/storage/idb/Storage.ts +++ b/src/matrix/storage/idb/Storage.ts @@ -24,12 +24,15 @@ const WEBKITEARLYCLOSETXNBUG_BOGUS_KEY = "782rh281re38-boguskey"; export class Storage { private _db: IDBDatabase; private _hasWebkitEarlyCloseTxnBug: boolean; + readonly logger: BaseLogger; + readonly idbFactory: IDBFactory readonly IDBKeyRange: typeof IDBKeyRange; readonly storeNames: typeof StoreNames; - constructor(idbDatabase: IDBDatabase, _IDBKeyRange: typeof IDBKeyRange, hasWebkitEarlyCloseTxnBug: boolean, logger: BaseLogger) { + constructor(idbDatabase: IDBDatabase, idbFactory: IDBFactory, _IDBKeyRange: typeof IDBKeyRange, hasWebkitEarlyCloseTxnBug: boolean, logger: BaseLogger) { this._db = idbDatabase; + this.idbFactory = idbFactory; this.IDBKeyRange = _IDBKeyRange; this._hasWebkitEarlyCloseTxnBug = hasWebkitEarlyCloseTxnBug; this.storeNames = StoreNames; diff --git a/src/matrix/storage/idb/StorageFactory.ts b/src/matrix/storage/idb/StorageFactory.ts index 38b48a17..59c988a0 100644 --- a/src/matrix/storage/idb/StorageFactory.ts +++ b/src/matrix/storage/idb/StorageFactory.ts @@ -71,7 +71,7 @@ export class StorageFactory { const hasWebkitEarlyCloseTxnBug = await detectWebkitEarlyCloseTxnBug(this._idbFactory); const db = await openDatabaseWithSessionId(sessionId, this._idbFactory, log); - return new Storage(db, this._IDBKeyRange, hasWebkitEarlyCloseTxnBug, log.logger); + return new Storage(db, this._idbFactory, this._IDBKeyRange, hasWebkitEarlyCloseTxnBug, log.logger); } delete(sessionId: string): Promise { diff --git a/src/matrix/storage/idb/Store.ts b/src/matrix/storage/idb/Store.ts index 84e02e60..9c350b98 100644 --- a/src/matrix/storage/idb/Store.ts +++ b/src/matrix/storage/idb/Store.ts @@ -133,16 +133,8 @@ class QueryTargetWrapper { } export class Store extends QueryTarget { - private _transaction: Transaction; - constructor(idbStore: IDBObjectStore, transaction: Transaction) { - super(new QueryTargetWrapper(idbStore)); - this._transaction = transaction; - } - - get IDBKeyRange() { - // @ts-ignore - return this._transaction.IDBKeyRange; + super(new QueryTargetWrapper(idbStore), transaction); } get _idbStore(): QueryTargetWrapper { @@ -150,7 +142,7 @@ export class Store extends QueryTarget { } index(indexName: string): QueryTarget { - return new QueryTarget(new QueryTargetWrapper(this._idbStore.index(indexName))); + return new QueryTarget(new QueryTargetWrapper(this._idbStore.index(indexName)), this._transaction); } put(value: T, log?: LogItem): void { diff --git a/src/matrix/storage/idb/Transaction.ts b/src/matrix/storage/idb/Transaction.ts index cd68457c..9de4caf2 100644 --- a/src/matrix/storage/idb/Transaction.ts +++ b/src/matrix/storage/idb/Transaction.ts @@ -65,6 +65,10 @@ export class Transaction { this._writeErrors = []; } + get idbFactory(): IDBFactory { + return this._storage.idbFactory; + } + get IDBKeyRange(): typeof IDBKeyRange { return this._storage.IDBKeyRange; } diff --git a/src/mocks/TimelineMock.ts b/src/mocks/TimelineMock.ts new file mode 100644 index 00000000..d38e37cc --- /dev/null +++ b/src/mocks/TimelineMock.ts @@ -0,0 +1,263 @@ +import {createEvent, withTextBody, withSender} from "./event.js"; +import {TimelineEvent} from "../matrix/storage/types"; + +export const TIMELINE_START_TOKEN = "timeline_start"; + +export function eventId(i: number): string { + return `$event${i}`; +} + +/** `from` is included, `to` is excluded */ +export function eventIds(from: number, to: number): string[] { + return [...Array(to-from).keys()].map(i => eventId(i + from)); +} + +export class TimelineMock { + private _counter: number; + private _dagOrder: TimelineEvent[]; + private _syncOrder: TimelineEvent[]; + private _defaultSender: string; + + constructor(defaultSender: string) { + this._counter = 0; + this._dagOrder = []; + this._syncOrder = []; + this._defaultSender = defaultSender; + } + + _defaultEvent(id: string): TimelineEvent { + return withTextBody(`This is event ${id}`, withSender(this._defaultSender, createEvent("m.room.message", id))); + } + + _createEvent(func?: (eventId: string) => TimelineEvent): TimelineEvent { + const id = eventId(this._counter++); + return func ? func(id) : this._defaultEvent(id); + } + + _createEvents(n: number, func?: (eventId: string) => TimelineEvent) { + const events: TimelineEvent[] = []; + for (let i = 0; i < n; i++) { + events.push(this._createEvent(func)); + } + return events; + } + + insertAfter(token: string, n: number, func?: (eventId: string) => TimelineEvent) { + const events = this._createEvents(n, func); + const index = this._findIndex(token, "f", this._dagOrder); + this._dagOrder.splice(index, 0, ...events); + this._syncOrder.push(...events); + return events[events.length - 1]?.event_id; + } + + append(n: number, func?: (eventId: string) => TimelineEvent) { + const events = this._createEvents(n, func); + this._dagOrder.push(...events); + this._syncOrder.push(...events); + return events[events.length - 1]?.event_id; + } + + _getStep(direction: "f" | "b") : 1 | -1 { + return direction === "f" ? 1 : -1; + } + + _findIndex(token: string, direction: "f" | "b", eventOrdering: TimelineEvent[]): number { + const step = this._getStep(direction); + if (token === TIMELINE_START_TOKEN) { + const firstSyncEvent = this._syncOrder[0]; + if (!firstSyncEvent) { + // We have no events at all. Wherever you start looking, + // you'll stop looking right away. Zero works as well as anything else. + return 0; + } + const orderIndex = eventOrdering.findIndex(e => e.event_id === firstSyncEvent.event_id); + return orderIndex; + } + // All other tokens are (non-inclusive) event indices + const index = eventOrdering.findIndex(e => e.event_id === token); + if (index === -1) { + // We didn't find this event token at all. What are we + // even looking at? + throw new Error("Invalid token passed to TimelineMock"); + } + return index + step; + } + + messages(begin: string, end: string | undefined, direction: "f" | "b", limit: number = 10) { + const step = this._getStep(direction); + let index = this._findIndex(begin, direction, this._dagOrder); + const chunk: TimelineEvent[] = []; + for (; limit > 0 && index >= 0 && index < this._dagOrder.length; index += step, limit--) { + if (this._dagOrder[index].event_id === end) { + break; + } + chunk.push(this._dagOrder[index]); + } + return { + start: begin, + end: chunk[chunk.length - 1]?.event_id || begin, + chunk, + state: [] + }; + } + + context(eventId: string, limit: number = 10) { + if (limit <= 0) { + throw new Error("Cannot fetch zero or less events!"); + } + let eventIndex = this._dagOrder.findIndex(e => e.event_id === eventId); + if (eventIndex === -1) { + throw new Error("Fetching context for unknown event"); + } + const event = this._dagOrder[eventIndex]; + let offset = 1; + const eventsBefore: TimelineEvent[] = []; + const eventsAfter: TimelineEvent[] = []; + while (limit !== 0 && (eventIndex - offset >= 0 || eventIndex + offset < this._dagOrder.length)) { + if (eventIndex - offset >= 0) { + eventsBefore.push(this._dagOrder[eventIndex - offset]); + limit--; + } + if (limit !== 0 && eventIndex + offset < this._dagOrder.length) { + eventsAfter.push(this._dagOrder[eventIndex + offset]); + limit--; + } + offset++; + } + return { + start: eventsBefore[eventsBefore.length - 1]?.event_id || eventId, + end: eventsAfter[eventsAfter.length - 1]?.event_id || eventId, + event, + events_before: eventsBefore, + events_after: eventsAfter, + state: [] + }; + } + + sync(since?: string, limit: number = 10) { + const startAt = since ? this._findIndex(since, "f", this._syncOrder) : 0; + const index = Math.max(this._syncOrder.length - limit, startAt); + const limited = this._syncOrder.length - startAt > limit; + const events: TimelineEvent[] = []; + for(let i = index; i < this._syncOrder.length; i++) { + events.push(this._syncOrder[i]); + } + return { + next_batch: events[events.length - 1]?.event_id || since || TIMELINE_START_TOKEN, + timeline: { + prev_batch: events[0]?.event_id || since || TIMELINE_START_TOKEN, + events, + limited + } + } + } +} + +export function tests() { + const SENDER = "@alice:hs.tdl"; + + return { + "Append events are returned via sync": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(10); + const syncResponse = timeline.sync(); + const events = syncResponse.timeline.events.map(e => e.event_id); + assert.deepEqual(events, eventIds(0, 10)); + assert.equal(syncResponse.timeline.limited, false); + }, + "Limiting a sync properly limits the returned events": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(20); + const syncResponse = timeline.sync(undefined, 10); + const events = syncResponse.timeline.events.map(e => e.event_id); + assert.deepEqual(events, eventIds(10, 20)); + assert.equal(syncResponse.timeline.limited, true); + }, + "The context endpoint returns messages in DAG order around an event": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + const context = timeline.context(eventId(15)); + assert.equal(context.event.event_id, eventId(15)); + assert.deepEqual(context.events_before.map(e => e.event_id).reverse(), eventIds(10, 15)); + assert.deepEqual(context.events_after.map(e => e.event_id), eventIds(16, 21)); + }, + "The context endpoint returns the proper number of messages": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + for (const i of new Array(29).keys()) { + const middleFetch = timeline.context(eventId(15), i + 1); + assert.equal(middleFetch.events_before.length + middleFetch.events_after.length, i + 1); + const startFetch = timeline.context(eventId(1), i + 1); + assert.equal(startFetch.events_before.length + startFetch.events_after.length, i + 1); + const endFetch = timeline.context(eventId(28), i + 1); + assert.equal(endFetch.events_before.length + endFetch.events_after.length, i + 1); + } + }, + "The previous batch from a sync returns the previous events": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(20); + const sync = timeline.sync(undefined, 10); + const messages = timeline.messages(sync.timeline.prev_batch, undefined, "b"); + const events = messages.chunk.map(e => e.event_id).reverse(); + assert.deepEqual(events, eventIds(0, 10)); + }, + "Two consecutive message fetches are continuous if no new events are inserted": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + + const sync = timeline.sync(undefined, 10); + const messages1 = timeline.messages(sync.timeline.prev_batch, undefined, "b"); + const events1 = messages1.chunk.map(e => e.event_id).reverse(); + assert.deepEqual(events1, eventIds(10, 20)); + + const messages2 = timeline.messages(messages1.end, undefined, "b"); + const events2 = messages2.chunk.map(e => e.event_id).reverse(); + assert.deepEqual(events2, eventIds(0, 10)); + }, + "Two consecutive message fetches detect newly inserted event": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + + const messages1 = timeline.messages(eventId(20), undefined, "b", 10); + const events1 = messages1.chunk.map(e => e.event_id).reverse(); + assert.deepEqual(events1, eventIds(10, 20)); + timeline.insertAfter(eventId(9), 1); + + const messages2 = timeline.messages(eventId(10), undefined, "b", 10); + const events2 = messages2.chunk.map(e => e.event_id).reverse(); + const expectedEvents2 = eventIds(1, 10); + expectedEvents2.push(eventId(30)); + assert.deepEqual(events2, expectedEvents2); + }, + "A sync that receives no events has the same next batch as it started with": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(10); + const sync1 = timeline.sync(); + const sync2 = timeline.sync(sync1.next_batch); + assert.equal(sync1.next_batch, sync2.next_batch); + }, + "An event inserted at the staart still shows up in a sync": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + const sync1 = timeline.sync(undefined, 10); + const sync2 = timeline.sync(sync1.next_batch, 10) + assert.deepEqual(sync2.timeline.events, []); + assert.equal(sync2.timeline.limited, false); + + timeline.insertAfter(TIMELINE_START_TOKEN, 1); + const sync3 = timeline.sync(sync2.next_batch, 10) + const events = sync3.timeline.events.map(e => e.event_id); + assert.deepEqual(events, [eventId(30)]); + }, + "An event inserted at the start does not show up in a non-overlapping message fetch": assert => { + const timeline = new TimelineMock(SENDER); + timeline.append(30); + const sync1 = timeline.sync(undefined, 10); + const messages1 = timeline.messages(sync1.timeline.prev_batch, undefined, "f", 10); + + timeline.insertAfter(TIMELINE_START_TOKEN, 1); + const messages2 = timeline.messages(sync1.timeline.prev_batch, undefined, "f", 10); + assert.deepEqual(messages1.chunk, messages2.chunk); + }, + } +}