better handle remote echos and hookup in session and room

This commit is contained in:
Bruno Windels 2019-07-26 22:33:33 +02:00
parent ccb722d766
commit 707988f806
5 changed files with 66 additions and 28 deletions

View file

@ -3,9 +3,10 @@ import RoomSummary from "./summary.js";
import SyncWriter from "./timeline/persistence/SyncWriter.js";
import Timeline from "./timeline/Timeline.js";
import FragmentIdComparer from "./timeline/FragmentIdComparer.js";
import SendQueue from "./sending/SendQueue.js";
export default class Room extends EventEmitter {
constructor({roomId, storage, hsApi, emitCollectionChange}) {
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents}) {
super();
this._roomId = roomId;
this._storage = storage;
@ -14,16 +15,21 @@ export default class Room extends EventEmitter {
this._fragmentIdComparer = new FragmentIdComparer([]);
this._syncWriter = new SyncWriter({roomId, storage, fragmentIdComparer: this._fragmentIdComparer});
this._emitCollectionChange = emitCollectionChange;
this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents});
this._timeline = null;
}
async persistSync(roomResponse, membership, txn) {
const summaryChanged = this._summary.applySync(roomResponse, membership, txn);
const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn);
return {summaryChanged, newTimelineEntries};
let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
}
return {summaryChanged, newTimelineEntries, removedPendingEvents};
}
emitSync({summaryChanged, newTimelineEntries}) {
emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) {
if (summaryChanged) {
this.emit("change");
this._emitCollectionChange(this);
@ -31,6 +37,9 @@ export default class Room extends EventEmitter {
if (this._timeline) {
this._timeline.appendLiveEntries(newTimelineEntries);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
}
load(summary, txn) {
@ -38,6 +47,10 @@ export default class Room extends EventEmitter {
return this._syncWriter.load(txn);
}
sendEvent(eventType, content) {
this._sendQueue.enqueueEvent(eventType, content);
}
get name() {
return this._summary.name;
}
@ -55,6 +68,7 @@ export default class Room extends EventEmitter {
storage: this._storage,
hsApi: this._hsApi,
fragmentIdComparer: this._fragmentIdComparer,
pendingEvents: this._sendQueue.pendingEvents,
closeCallback: () => this._timeline = null,
});
await this._timeline.load();

View file

@ -1,6 +1,5 @@
import SortedArray from "../../../observable/list/SortedArray.js";
import {NetworkError} from "../../error.js";
import {StorageError} from "../../storage/common.js";
import PendingEvent from "./PendingEvent.js";
function makeTxnId() {
@ -10,10 +9,11 @@ function makeTxnId() {
}
export default class SendQueue {
constructor({roomId, storage, scheduler, pendingEvents}) {
constructor({roomId, storage, sendScheduler, pendingEvents}) {
pendingEvents = pendingEvents || [];
this._roomId = roomId;
this._storage = storage;
this._scheduler = scheduler;
this._sendScheduler = sendScheduler;
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data)));
this._isSending = false;
@ -30,7 +30,7 @@ export default class SendQueue {
if (pendingEvent.remoteId) {
continue;
}
const response = await this._scheduler.request(hsApi => {
const response = await this._sendScheduler.request(hsApi => {
return hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
@ -50,14 +50,29 @@ export default class SendQueue {
}
}
removeRemoteEchos(events, txn) {
const removed = [];
for (const event of events) {
const txnId = event.unsigned && event.unsigned.transaction_id;
if (txnId) {
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
if (idx !== -1) {
const pendingEvent = this._pendingEvents.get(idx);
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
removed.push(pendingEvent);
}
}
}
return removed;
}
async receiveRemoteEcho(txnId) {
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
if (idx !== 0) {
const pendingEvent = this._pendingEvents.get(idx);
this._amountSent -= 1;
this._pendingEvents.remove(idx);
await this._removeEvent(pendingEvent);
emitRemovals(pendingEvents) {
for (const pendingEvent of pendingEvents) {
const idx = this._pendingEvents.array.indexOf(pendingEvent);
if (idx !== -1) {
this._amountSent -= 1;
this._pendingEvents.remove(idx);
}
}
}
@ -95,17 +110,6 @@ export default class SendQueue {
await txn.complete();
}
async _removeEvent(pendingEvent) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
try {
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
}
async _createAndStoreEvent(eventType, content) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
let pendingEvent;

View file

@ -1,5 +1,6 @@
import Room from "./room/room.js";
import { ObservableMap } from "../observable/index.js";
import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js";
export default class Session {
// sessionInfo contains deviceId, userId and homeServer
@ -9,6 +10,7 @@ export default class Session {
this._session = null;
this._sessionInfo = sessionInfo;
this._rooms = new ObservableMap();
this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()});
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
}
@ -19,6 +21,7 @@ export default class Session {
this._storage.storeNames.roomState,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
this._storage.storeNames.pendingEvents,
]);
// restore session object
this._session = await txn.session.get();
@ -26,24 +29,40 @@ export default class Session {
this._session = {};
return;
}
const pendingEventsByRoomId = await this._getPendingEventsByRoom(txn);
// load rooms
const rooms = await txn.roomSummary.getAll();
await Promise.all(rooms.map(summary => {
const room = this.createRoom(summary.roomId);
const room = this.createRoom(summary.roomId, pendingEventsByRoomId[summary.roomId]);
return room.load(summary, txn);
}));
}
async _getPendingEventsByRoom(txn) {
const pendingEvents = await txn.pendingEvents.getAll();
return pendingEvents.reduce((groups, pe) => {
const group = groups.get(pe.roomId);
if (group) {
group.push(pe);
} else {
groups.set(pe.roomId, [pe]);
}
return groups;
}, new Map());
}
get rooms() {
return this._rooms;
}
createRoom(roomId) {
createRoom(roomId, pendingEvents) {
const room = new Room({
roomId,
storage: this._storage,
emitCollectionChange: this._roomUpdateCallback,
hsApi: this._hsApi,
sendScheduler: this._sendScheduler,
pendingEvents,
});
this._rooms.add(roomId, room);
return room;

View file

@ -49,7 +49,7 @@ export default class PendingEventStore {
return this._eventStore.put(pendingEvent);
}
getAllEvents() {
getAll() {
return this._eventStore.selectAll();
}
}

View file

@ -77,6 +77,7 @@ export default class Sync extends EventEmitter {
storeNames.roomState,
storeNames.timelineEvents,
storeNames.timelineFragments,
storeNames.pendingEvents,
]);
const roomChanges = [];
try {