forked from mystiq/hydrogen-web
Read to-device messages and pass them through to H E2EE bits
This now allows you to read incoming encrypted events. This is VERY flakey due to not having device lists, OTK counts and a fudged session sync lifecycle callbacks but it does appear to work empirically!
This commit is contained in:
parent
6a0923a333
commit
b9af4a585c
1 changed files with 49 additions and 2 deletions
|
@ -80,11 +80,22 @@ type RoomSubscriptionsRequest = {
|
||||||
[roomId: string]: RoomSubscriptionRequest
|
[roomId: string]: RoomSubscriptionRequest
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type ExtensionsRequest = {
|
||||||
|
to_device?: ToDeviceExtensionRequest;
|
||||||
|
};
|
||||||
|
|
||||||
|
type ToDeviceExtensionRequest = {
|
||||||
|
enabled: boolean;
|
||||||
|
since?: string;
|
||||||
|
limit?: number;
|
||||||
|
};
|
||||||
|
|
||||||
interface Sync3RequestBody {
|
interface Sync3RequestBody {
|
||||||
session_id: string;
|
session_id: string;
|
||||||
lists: Sync3List[];
|
lists: Sync3List[];
|
||||||
room_subscriptions?: RoomSubscriptionsRequest;
|
room_subscriptions?: RoomSubscriptionsRequest;
|
||||||
unsubscribe_rooms?: string[];
|
unsubscribe_rooms?: string[];
|
||||||
|
extensions?: ExtensionsRequest;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Response types are below: See api.md for what these fields mean.
|
// Response types are below: See api.md for what these fields mean.
|
||||||
|
@ -95,6 +106,7 @@ interface Sync3Response {
|
||||||
ops: Op[];
|
ops: Op[];
|
||||||
counts: number[];
|
counts: number[];
|
||||||
pos: number;
|
pos: number;
|
||||||
|
extensions: ExtensionsResponse;
|
||||||
};
|
};
|
||||||
|
|
||||||
interface Op {
|
interface Op {
|
||||||
|
@ -119,6 +131,15 @@ interface RoomResponse {
|
||||||
highlight_count: number;
|
highlight_count: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type ExtensionsResponse = {
|
||||||
|
to_device?: ToDeviceExtensionResponse;
|
||||||
|
};
|
||||||
|
|
||||||
|
type ToDeviceExtensionResponse = {
|
||||||
|
next_batch: string;
|
||||||
|
events: any[];
|
||||||
|
};
|
||||||
|
|
||||||
// Some internal data structure types
|
// Some internal data structure types
|
||||||
|
|
||||||
type IndexToRoomId = {
|
type IndexToRoomId = {
|
||||||
|
@ -322,6 +343,7 @@ export class Sync3 {
|
||||||
|
|
||||||
// track the number of times we've failed to work out how long to wait between requests
|
// track the number of times we've failed to work out how long to wait between requests
|
||||||
let backoffCounter = 0;
|
let backoffCounter = 0;
|
||||||
|
let toDeviceSince; // TODO: load/save this?
|
||||||
|
|
||||||
while (this.status.get() !== SyncStatus.Stopped) {
|
while (this.status.get() !== SyncStatus.Stopped) {
|
||||||
let isFirstSync = this.status.get() === SyncStatus.InitialSync;
|
let isFirstSync = this.status.get() === SyncStatus.InitialSync;
|
||||||
|
@ -338,6 +360,13 @@ export class Sync3 {
|
||||||
let requestBody: Sync3RequestBody = {
|
let requestBody: Sync3RequestBody = {
|
||||||
session_id: sessionId,
|
session_id: sessionId,
|
||||||
lists: [list],
|
lists: [list],
|
||||||
|
extensions: {
|
||||||
|
to_device: {
|
||||||
|
enabled: true,
|
||||||
|
limit: 100,
|
||||||
|
since: toDeviceSince,
|
||||||
|
},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
if (this.nextRoomSubscriptions.length > 0) {
|
if (this.nextRoomSubscriptions.length > 0) {
|
||||||
// we may have been interruped before to update the room subscriptions, if this array
|
// we may have been interruped before to update the room subscriptions, if this array
|
||||||
|
@ -352,6 +381,9 @@ export class Sync3 {
|
||||||
let resp: Sync3Response;
|
let resp: Sync3Response;
|
||||||
this.currentRequest = await this.hsApi.sync3(requestBody, pos);
|
this.currentRequest = await this.hsApi.sync3(requestBody, pos);
|
||||||
resp = await this.currentRequest.response();
|
resp = await this.currentRequest.response();
|
||||||
|
if (resp.extensions?.to_device?.next_batch) {
|
||||||
|
toDeviceSince = resp.extensions?.to_device?.next_batch;
|
||||||
|
}
|
||||||
backoffCounter = 0;
|
backoffCounter = 0;
|
||||||
// regardless of whether we process the sync response without error, update the room subs
|
// regardless of whether we process the sync response without error, update the room subs
|
||||||
// to reflect the new reality.
|
// to reflect the new reality.
|
||||||
|
@ -409,8 +441,23 @@ export class Sync3 {
|
||||||
await this.logger.run("sync3", async log => {
|
await this.logger.run("sync3", async log => {
|
||||||
const syncTxn = await this.openSyncTxn();
|
const syncTxn = await this.openSyncTxn();
|
||||||
try {
|
try {
|
||||||
// session.prepareSync // E2EE decrypts room keys
|
// E2EE decrypts room keys
|
||||||
// this.session.writeSync() // write account data, device lists, etc.
|
const v2DeviceResponse = {
|
||||||
|
to_device: {
|
||||||
|
events: resp.extensions?.to_device?.events,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const lock = await this.session.obtainSyncLock(v2DeviceResponse);
|
||||||
|
const sessionPreparation = await this.session.prepareSync(v2DeviceResponse, lock, syncTxn, log);
|
||||||
|
// TODO:
|
||||||
|
// response.device_one_time_keys_count
|
||||||
|
// response.device_lists
|
||||||
|
// response.account_data
|
||||||
|
const changes = await this.session.writeSync(
|
||||||
|
v2DeviceResponse, null, sessionPreparation, syncTxn, log
|
||||||
|
);
|
||||||
|
await this.session.afterSync(changes, log);
|
||||||
|
// TODO add any rooms with new keys but no sync response to the list of rooms to be synced
|
||||||
await Promise.all(updates.map(async (roomResponse) => {
|
await Promise.all(updates.map(async (roomResponse) => {
|
||||||
await log.wrap("room", async () => {
|
await log.wrap("room", async () => {
|
||||||
// get or create a room
|
// get or create a room
|
||||||
|
|
Loading…
Reference in a new issue