diff --git a/src/matrix/Sync3.ts b/src/matrix/Sync3.ts index 7062f72b..85c74db6 100644 --- a/src/matrix/Sync3.ts +++ b/src/matrix/Sync3.ts @@ -80,11 +80,22 @@ type RoomSubscriptionsRequest = { [roomId: string]: RoomSubscriptionRequest }; +type ExtensionsRequest = { + to_device?: ToDeviceExtensionRequest; +}; + +type ToDeviceExtensionRequest = { + enabled: boolean; + since?: string; + limit?: number; +}; + interface Sync3RequestBody { session_id: string; lists: Sync3List[]; room_subscriptions?: RoomSubscriptionsRequest; unsubscribe_rooms?: string[]; + extensions?: ExtensionsRequest; }; // Response types are below: See api.md for what these fields mean. @@ -95,6 +106,7 @@ interface Sync3Response { ops: Op[]; counts: number[]; pos: number; + extensions: ExtensionsResponse; }; interface Op { @@ -119,6 +131,15 @@ interface RoomResponse { highlight_count: number; }; +type ExtensionsResponse = { + to_device?: ToDeviceExtensionResponse; +}; + +type ToDeviceExtensionResponse = { + next_batch: string; + events: any[]; +}; + // Some internal data structure types type IndexToRoomId = { @@ -322,6 +343,7 @@ export class Sync3 { // track the number of times we've failed to work out how long to wait between requests let backoffCounter = 0; + let toDeviceSince; // TODO: load/save this? while (this.status.get() !== SyncStatus.Stopped) { let isFirstSync = this.status.get() === SyncStatus.InitialSync; @@ -338,6 +360,13 @@ export class Sync3 { let requestBody: Sync3RequestBody = { session_id: sessionId, lists: [list], + extensions: { + to_device: { + enabled: true, + limit: 100, + since: toDeviceSince, + }, + }, }; if (this.nextRoomSubscriptions.length > 0) { // we may have been interruped before to update the room subscriptions, if this array @@ -352,6 +381,9 @@ export class Sync3 { let resp: Sync3Response; this.currentRequest = await this.hsApi.sync3(requestBody, pos); resp = await this.currentRequest.response(); + if (resp.extensions?.to_device?.next_batch) { + toDeviceSince = resp.extensions?.to_device?.next_batch; + } backoffCounter = 0; // regardless of whether we process the sync response without error, update the room subs // to reflect the new reality. @@ -409,8 +441,23 @@ export class Sync3 { await this.logger.run("sync3", async log => { const syncTxn = await this.openSyncTxn(); try { - // session.prepareSync // E2EE decrypts room keys - // this.session.writeSync() // write account data, device lists, etc. + // E2EE decrypts room keys + const v2DeviceResponse = { + to_device: { + events: resp.extensions?.to_device?.events, + }, + }; + const lock = await this.session.obtainSyncLock(v2DeviceResponse); + const sessionPreparation = await this.session.prepareSync(v2DeviceResponse, lock, syncTxn, log); + // TODO: + // response.device_one_time_keys_count + // response.device_lists + // response.account_data + const changes = await this.session.writeSync( + v2DeviceResponse, null, sessionPreparation, syncTxn, log + ); + await this.session.afterSync(changes, log); + // TODO add any rooms with new keys but no sync response to the list of rooms to be synced await Promise.all(updates.map(async (roomResponse) => { await log.wrap("room", async () => { // get or create a room