diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 7c17b3b1..42d3eef1 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -3,9 +3,10 @@ import RoomSummary from "./summary.js"; import SyncWriter from "./timeline/persistence/SyncWriter.js"; import Timeline from "./timeline/Timeline.js"; import FragmentIdComparer from "./timeline/FragmentIdComparer.js"; +import SendQueue from "./sending/SendQueue.js"; export default class Room extends EventEmitter { - constructor({roomId, storage, hsApi, emitCollectionChange}) { + constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents}) { super(); this._roomId = roomId; this._storage = storage; @@ -14,16 +15,21 @@ export default class Room extends EventEmitter { this._fragmentIdComparer = new FragmentIdComparer([]); this._syncWriter = new SyncWriter({roomId, storage, fragmentIdComparer: this._fragmentIdComparer}); this._emitCollectionChange = emitCollectionChange; + this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents}); this._timeline = null; } async persistSync(roomResponse, membership, txn) { const summaryChanged = this._summary.applySync(roomResponse, membership, txn); const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn); - return {summaryChanged, newTimelineEntries}; + let removedPendingEvents; + if (roomResponse.timeline && roomResponse.timeline.events) { + removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); + } + return {summaryChanged, newTimelineEntries, removedPendingEvents}; } - emitSync({summaryChanged, newTimelineEntries}) { + emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) { if (summaryChanged) { this.emit("change"); this._emitCollectionChange(this); @@ -31,6 +37,9 @@ export default class Room extends EventEmitter { if (this._timeline) { this._timeline.appendLiveEntries(newTimelineEntries); } + if (removedPendingEvents) { + this._sendQueue.emitRemovals(removedPendingEvents); + } } load(summary, txn) { @@ -38,6 +47,10 @@ export default class Room extends EventEmitter { return this._syncWriter.load(txn); } + sendEvent(eventType, content) { + this._sendQueue.enqueueEvent(eventType, content); + } + get name() { return this._summary.name; } @@ -55,6 +68,7 @@ export default class Room extends EventEmitter { storage: this._storage, hsApi: this._hsApi, fragmentIdComparer: this._fragmentIdComparer, + pendingEvents: this._sendQueue.pendingEvents, closeCallback: () => this._timeline = null, }); await this._timeline.load(); diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 300e271e..d1666c02 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,6 +1,5 @@ import SortedArray from "../../../observable/list/SortedArray.js"; import {NetworkError} from "../../error.js"; -import {StorageError} from "../../storage/common.js"; import PendingEvent from "./PendingEvent.js"; function makeTxnId() { @@ -10,10 +9,11 @@ function makeTxnId() { } export default class SendQueue { - constructor({roomId, storage, scheduler, pendingEvents}) { + constructor({roomId, storage, sendScheduler, pendingEvents}) { + pendingEvents = pendingEvents || []; this._roomId = roomId; this._storage = storage; - this._scheduler = scheduler; + this._sendScheduler = sendScheduler; this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); this._isSending = false; @@ -30,7 +30,7 @@ export default class SendQueue { if (pendingEvent.remoteId) { continue; } - const response = await this._scheduler.request(hsApi => { + const response = await this._sendScheduler.request(hsApi => { return hsApi.send( pendingEvent.roomId, pendingEvent.eventType, @@ -50,14 +50,29 @@ export default class SendQueue { } } + removeRemoteEchos(events, txn) { + const removed = []; + for (const event of events) { + const txnId = event.unsigned && event.unsigned.transaction_id; + if (txnId) { + const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); + if (idx !== -1) { + const pendingEvent = this._pendingEvents.get(idx); + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + removed.push(pendingEvent); + } + } + } + return removed; + } - async receiveRemoteEcho(txnId) { - const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); - if (idx !== 0) { - const pendingEvent = this._pendingEvents.get(idx); - this._amountSent -= 1; - this._pendingEvents.remove(idx); - await this._removeEvent(pendingEvent); + emitRemovals(pendingEvents) { + for (const pendingEvent of pendingEvents) { + const idx = this._pendingEvents.array.indexOf(pendingEvent); + if (idx !== -1) { + this._amountSent -= 1; + this._pendingEvents.remove(idx); + } } } @@ -95,17 +110,6 @@ export default class SendQueue { await txn.complete(); } - async _removeEvent(pendingEvent) { - const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); - try { - txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); - } catch (err) { - txn.abort(); - throw err; - } - await txn.complete(); - } - async _createAndStoreEvent(eventType, content) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); let pendingEvent; diff --git a/src/matrix/session.js b/src/matrix/session.js index a6b7b354..15b45181 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -1,5 +1,6 @@ import Room from "./room/room.js"; import { ObservableMap } from "../observable/index.js"; +import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js"; export default class Session { // sessionInfo contains deviceId, userId and homeServer @@ -9,6 +10,7 @@ export default class Session { this._session = null; this._sessionInfo = sessionInfo; this._rooms = new ObservableMap(); + this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()}); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); } @@ -19,6 +21,7 @@ export default class Session { this._storage.storeNames.roomState, this._storage.storeNames.timelineEvents, this._storage.storeNames.timelineFragments, + this._storage.storeNames.pendingEvents, ]); // restore session object this._session = await txn.session.get(); @@ -26,24 +29,40 @@ export default class Session { this._session = {}; return; } + const pendingEventsByRoomId = await this._getPendingEventsByRoom(txn); // load rooms const rooms = await txn.roomSummary.getAll(); await Promise.all(rooms.map(summary => { - const room = this.createRoom(summary.roomId); + const room = this.createRoom(summary.roomId, pendingEventsByRoomId[summary.roomId]); return room.load(summary, txn); })); } + async _getPendingEventsByRoom(txn) { + const pendingEvents = await txn.pendingEvents.getAll(); + return pendingEvents.reduce((groups, pe) => { + const group = groups.get(pe.roomId); + if (group) { + group.push(pe); + } else { + groups.set(pe.roomId, [pe]); + } + return groups; + }, new Map()); + } + get rooms() { return this._rooms; } - createRoom(roomId) { + createRoom(roomId, pendingEvents) { const room = new Room({ roomId, storage: this._storage, emitCollectionChange: this._roomUpdateCallback, hsApi: this._hsApi, + sendScheduler: this._sendScheduler, + pendingEvents, }); this._rooms.add(roomId, room); return room; diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js index fa54848c..d73b649e 100644 --- a/src/matrix/storage/idb/stores/PendingEventStore.js +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -49,7 +49,7 @@ export default class PendingEventStore { return this._eventStore.put(pendingEvent); } - getAllEvents() { + getAll() { return this._eventStore.selectAll(); } } diff --git a/src/matrix/sync.js b/src/matrix/sync.js index fd0d2bf6..270d72e3 100644 --- a/src/matrix/sync.js +++ b/src/matrix/sync.js @@ -77,6 +77,7 @@ export default class Sync extends EventEmitter { storeNames.roomState, storeNames.timelineEvents, storeNames.timelineFragments, + storeNames.pendingEvents, ]); const roomChanges = []; try {