This commit is contained in:
Bruno Windels 2019-07-26 22:03:57 +02:00
parent f3d1128f28
commit ccb722d766
11 changed files with 324 additions and 194 deletions

View file

@ -1,5 +1,5 @@
// #ifdef PLATFORM_GNOME
// export {default} from "./ui/gnome/GnomePlatform.js";
// #else
//#ifdef PLATFORM_GNOME
//##export {default} from "./ui/gnome/GnomePlatform.js";
//#else
export {default} from "./ui/web/WebPlatform.js";
// #endif
//#endif

121
src/matrix/SendScheduler.js Normal file
View file

@ -0,0 +1,121 @@
import Platform from "../../../Platform.js";
import {HomeServerError, NetworkError} from "./error.js";
export class RateLimitingBackoff {
constructor() {
this._remainingRateLimitedRequest = 0;
}
async waitAfterLimitExceeded(retryAfterMs) {
// this._remainingRateLimitedRequest = 5;
// if (typeof retryAfterMs !== "number") {
// } else {
// }
if (!retryAfterMs) {
retryAfterMs = 5000;
}
await Platform.delay(retryAfterMs);
}
// do we have to know about succeeding requests?
// we can just
async waitForNextSend() {
// this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1);
Platform.delay(1000);
}
}
/*
this represents a slot to do one rate limited api call.
because rate-limiting is handled here, it should only
try to do one call, so the SendScheduler can safely
retry if the call ends up being rate limited.
This is also why we have this abstraction it hsApi is not
passed straight to SendQueue when it is its turn to send.
e.g. we wouldn't want to repeat the callback in SendQueue that could
have other side-effects before the call to hsApi that we wouldn't want
repeated (setting up progress handlers for file uploads,
... a UI update to say it started sending?
... updating storage would probably only happen once the call succeeded
... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried)
) maybe it is a bit overengineering, but lets stick with it for now.
At least the above is a clear definition why we have this class
*/
//class SendSlot -- obsolete
export class SendScheduler {
constructor({hsApi, backoff}) {
this._hsApi = hsApi;
this._sendRequests = [];
this._sendScheduled = false;
this._offline = false;
this._waitTime = 0;
this._backoff = backoff;
/*
we should have some sort of flag here that we enable
after all the rooms have been notified that they can resume
sending, so that from session, we can say scheduler.enable();
this way, when we have better scheduling, it won't be first come,
first serve, when there are a lot of events in different rooms to send,
but we can apply some priorization of who should go first
*/
// this._enabled;
}
// this should really be per roomId to avoid head-of-line blocking
//
// takes a callback instead of returning a promise with the slot
// to make sure the scheduler doesn't get blocked by a slot that is not consumed
request(sendCallback) {
let request;
const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback});
this._sendRequests.push(request);
if (!this._sendScheduled && !this._offline) {
this._sendLoop();
}
return promise;
}
async _sendLoop() {
while (this._sendRequests.length) {
const request = this._sendRequests.unshift();
let result;
try {
// this can throw!
result = await this._doSend(request.sendCallback);
} catch (err) {
if (err instanceof NetworkError) {
// we're offline, everybody will have
// to re-request slots when we come back online
this._offline = true;
for (const r of this._sendRequests) {
r.reject(err);
}
this._sendRequests = [];
}
request.reject(err);
break;
}
request.resolve(result);
}
// do next here instead of in _doSend
}
async _doSend(sendCallback) {
this._sendScheduled = false;
await this._backoff.waitForNextSend();
// loop is left by return or throw
while (true) { // eslint-disable-line no-constant-condition
try {
return await sendCallback(this._hsApi);
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
await this._backoff.waitAfterLimitExceeded(err.retry_after_ms);
} else {
throw err;
}
}
}
}
}

View file

@ -32,6 +32,7 @@ class RequestWrapper {
}
}
// todo: everywhere here, encode params in the url that could have slashes ... mainly event ids?
export default class HomeServerApi {
constructor(homeserver, accessToken) {
// store these both in a closure somehow so it's harder to get at in case of XSS?
@ -98,6 +99,10 @@ export default class HomeServerApi {
return this._request("POST", csPath, queryParams, body);
}
_put(csPath, queryParams, body) {
return this._request("PUT", csPath, queryParams, body);
}
_get(csPath, queryParams, body) {
return this._request("GET", csPath, queryParams, body);
}
@ -111,6 +116,10 @@ export default class HomeServerApi {
return this._get(`/rooms/${roomId}/messages`, params);
}
send(roomId, eventType, txnId, content) {
return this._put(`/rooms/${roomId}/send/${eventType}/${txnId}`, {}, content);
}
passwordLogin(username, password) {
return this._post("/login", undefined, {
"type": "m.login.password",

View file

@ -1,11 +1,14 @@
export default class PendingEvent {
constructor(roomId, queueIndex, eventType, content, txnId) {
this._roomId = roomId;
this._eventType = eventType;
this._content = content;
this._txnId = txnId;
this._queueIndex = queueIndex;
constructor(data) {
this._data = data;
}
get roomId() { return this._data.roomId; }
get queueIndex() { return this._data.queueIndex; }
get eventType() { return this._data.eventType; }
get txnId() { return this._data.txnId; }
get remoteId() { return this._data.remoteId; }
set remoteId(value) { this._data.remoteId = value; }
get content() { return this._data.content; }
get data() { return this._data; }
}

View file

@ -1,153 +1,7 @@
import Platform from "../../../Platform.js";
class RateLimitingBackoff {
constructor() {
this._remainingRateLimitedRequest = 0;
}
async waitAfterLimitExceeded(retryAfterMs) {
// this._remainingRateLimitedRequest = 5;
// if (typeof retryAfterMs !== "number") {
// } else {
// }
if (!retryAfterMs) {
retryAfterMs = 5000;
}
await Platform.delay(retryAfterMs);
}
// do we have to know about succeeding requests?
// we can just
async waitForNextSend() {
// this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1);
Platform.delay(1000);
}
}
class SendScheduler {
constructor({hsApi, backoff}) {
this._hsApi = hsApi;
this._slotRequests = [];
this._sendScheduled = false;
this._offline = false;
this._waitTime = 0;
this._backoff = backoff;
}
// this should really be per roomId to avoid head-of-line blocking
//
// takes a callback instead of returning a promise with the slot
// to make sure the scheduler doesn't get blocked by a slot that is not consumed
runSlot(slotCallback) {
let request;
const promise = new Promise((resolve, reject) => request = {resolve, reject, slotCallback});
this._slotRequests.push(request);
if (!this._sendScheduled && !this._offline) {
this._sendLoop();
}
return promise;
}
async _sendLoop() {
while (this._slotRequests.length) {
const request = this._slotRequests.unshift();
this._currentSlot = new SendSlot(this);
// this can throw!
let result;
try {
result = await request.slotCallback(this._currentSlot);
} catch (err) {
if (err instanceof NetworkError) {
// we're offline, everybody will have
// to re-request slots when we come back online
this._offline = true;
for (const r of this._slotRequests) {
r.reject(err);
}
this._slotRequests = [];
}
request.reject(err);
break;
}
request.resolve(result);
}
// do next here instead of in _doSend
}
async _doSend(slot, sendCallback) {
this._sendScheduled = false;
if (slot !== this._currentSlot) {
throw new Error("Slot is not active");
}
await this._backoff.waitForNextSend();
// loop is left by return or throw
while (true) { // eslint-disable-line no-constant-condition
try {
return await sendCallback(this._hsApi);
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
await this._backoff.waitAfterLimitExceeded(err.retry_after_ms);
} else {
throw err;
}
}
}
}
}
/*
this represents a slot to do one rate limited api call.
because rate-limiting is handled here, it should only
try to do one call, so the SendScheduler can safely
retry if the call ends up being rate limited.
This is also why we have this abstraction it hsApi is not
passed straight to SendQueue when it is its turn to send.
e.g. we wouldn't want to repeat the callback in SendQueue that could
have other side-effects before the call to hsApi that we wouldn't want
repeated (setting up progress handlers for file uploads,
... a UI update to say it started sending?
... updating storage would probably only happen once the call succeeded
... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried)
) maybe it is a bit overengineering, but lets stick with it for now.
At least the above is a clear definition why we have this class
*/
class SendSlot {
constructor(scheduler) {
this._scheduler = scheduler;
}
sendContentEvent(pendingEvent) {
return this._scheduler._doSend(this, async hsApi => {
const request = hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
);
const response = await request.response();
return response.event_id;
});
}
sendRedaction(pendingEvent) {
return this._scheduler._doSend(this, async hsApi => {
const request = hsApi.redact(
pendingEvent.roomId,
pendingEvent.redacts,
pendingEvent.txnId,
pendingEvent.reason
);
const response = await request.response();
return response.event_id;
});
}
// progressCallback should report the amount of bytes sent
uploadMedia(fileName, contentType, blob, progressCallback) {
}
}
import SortedArray from "../../../observable/list/SortedArray.js";
import {NetworkError} from "../../error.js";
import {StorageError} from "../../storage/common.js";
import PendingEvent from "./PendingEvent.js";
function makeTxnId() {
const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
@ -160,46 +14,118 @@ export default class SendQueue {
this._roomId = roomId;
this._storage = storage;
this._scheduler = scheduler;
this._pendingEvents = pendingEvents.map(d => PendingEvent.fromData(d));
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data)));
this._isSending = false;
this._offline = false;
this._amountSent = 0;
}
async _sendLoop() {
let pendingEvent = null;
// eslint-disable-next-line no-cond-assign
while (pendingEvent = await this._nextPendingEvent()) {
// const mxcUrl = await this._scheduler.runSlot(slot => {
// return slot.uploadMedia(fileName, contentType, blob, bytesSent => {
// pendingEvent.updateAttachmentUploadProgress(bytesSent);
// });
// });
// pendingEvent.setAttachmentUrl(mxcUrl);
//update storage for pendingEvent after updating url,
//remove blob only later to keep preview?
await this._scheduler.runSlot(slot => {
if (pendingEvent.isRedaction) {
return slot.sendRedaction(pendingEvent);
} else if (pendingEvent.isContentEvent) {
return slot.sendContentEvent(pendingEvent);
this._isSending = true;
try {
while (this._amountSent < this._pendingEvents.length) {
const pendingEvent = this._pendingEvents.get(this._amountSent);
this._amountSent += 1;
if (pendingEvent.remoteId) {
continue;
}
});
const response = await this._scheduler.request(hsApi => {
return hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
);
});
pendingEvent.remoteId = response.event_id;
await this._tryUpdateEvent(pendingEvent);
}
} catch(err) {
if (err instanceof NetworkError) {
this._offline = true;
}
} finally {
this._isSending = false;
}
}
async receiveRemoteEcho(txnId) {
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
if (idx !== 0) {
const pendingEvent = this._pendingEvents.get(idx);
this._amountSent -= 1;
this._pendingEvents.remove(idx);
await this._removeEvent(pendingEvent);
}
}
resumeSending() {
this._offline = false;
if (!this._isSending) {
this._sendLoop();
}
}
async enqueueEvent(eventType, content) {
// temporary
const pendingEvent = await this._createAndStoreEvent(eventType, content);
this._pendingEvents.set(pendingEvent);
if (!this._isSending && !this._offline) {
this._sendLoop();
}
}
get pendingEvents() {
return this._pendingEvents;
}
async _tryUpdateEvent(pendingEvent) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
const pendingEventsStore = txn.pendingEvents;
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
const queueIndex = maxQueueIndex + 1;
const pendingEvent = new PendingEvent(this._roomId, queueIndex, eventType, content, makeTxnId());
pendingEventsStore.add(pendingEvent.data);
try {
// pendingEvent might have been removed already here
// by a racing remote echo, so check first so we don't recreate it
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
txn.pendingEvents.update(pendingEvent.data);
}
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
// create txnId
// create queueOrder
// store event
// if online and not running send loop
// start sending loop
}
async _removeEvent(pendingEvent) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
try {
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
}
async _createAndStoreEvent(eventType, content) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
let pendingEvent;
try {
const pendingEventsStore = txn.pendingEvents;
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
const queueIndex = maxQueueIndex + 1;
pendingEvent = new PendingEvent({
roomId: this._roomId,
queueIndex,
eventType,
content,
txnId: makeTxnId()
});
pendingEventsStore.add(pendingEvent.data);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
return pendingEvent;
}
}

View file

@ -1,5 +1,6 @@
//entries can be sorted, first by fragment, then by entry index.
import EventKey from "../EventKey.js";
import { PENDING_FRAGMENT_ID } from "./PendingEventEntry.js";
export default class BaseEntry {
constructor(fragmentIdComparer) {
@ -17,6 +18,10 @@ export default class BaseEntry {
compare(otherEntry) {
if (this.fragmentId === otherEntry.fragmentId) {
return this.entryIndex - otherEntry.entryIndex;
} else if (this.fragmentId === PENDING_FRAGMENT_ID) {
return 1;
} else if (otherEntry.fragmentId === PENDING_FRAGMENT_ID) {
return -1;
} else {
// This might throw if the relation of two fragments is unknown.
return this._fragmentIdComparer.compare(this.fragmentId, otherEntry.fragmentId);

View file

@ -0,0 +1,34 @@
import BaseEntry from "./BaseEntry.js";
export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER;
export default class PendingEventEntry extends BaseEntry {
constructor(pendingEvent) {
super(null);
this._pendingEvent = pendingEvent;
}
get fragmentId() {
return PENDING_FRAGMENT_ID;
}
get entryIndex() {
return this._pendingEvent.queueIndex;
}
get content() {
return this._pendingEvent.content;
}
get event() {
return null;
}
get type() {
return this._pendingEvent.eventType;
}
get id() {
return this._pendingEvent.txnId;
}
}

View file

@ -23,5 +23,8 @@ export class StorageError extends Error {
fullMessage += cause.message;
}
super(fullMessage);
if (cause) {
this.errcode = cause.name;
}
}
}

View file

@ -47,6 +47,14 @@ class QueryTargetWrapper {
}
}
delete(...params) {
try {
return this._qt.delete(...params);
} catch(err) {
throw new StorageError("delete failed", err);
}
}
index(...params) {
try {
return this._qt.index(...params);
@ -76,4 +84,8 @@ export default class Store extends QueryTarget {
add(value) {
return reqAsPromise(this._idbStore.add(value));
}
delete(keyOrKeyRange) {
return reqAsPromise(this._idbStore.delete(keyOrKeyRange));
}
}

View file

@ -29,6 +29,17 @@ export default class PendingEventStore {
}
}
remove(roomId, queueIndex) {
const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex));
this._eventStore.delete(keyRange);
}
async exists(roomId, queueIndex) {
const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex));
const key = await this._eventStore.getKey(keyRange);
return !!key;
}
add(pendingEvent) {
pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex);
return this._eventStore.add(pendingEvent);

View file

@ -32,8 +32,14 @@ export default class SortedArray extends BaseObservableList {
}
}
remove(item) {
throw new Error("unimplemented");
get(idx) {
return this._items[idx];
}
remove(idx) {
const item = this._items[idx];
this._items.splice(idx, 1);
this.emitRemove(idx, item);
}
get array() {