forked from mystiq/hydrogen-web
Merge pull request #5 from bwindels/bwindels/sending
Send text messages
This commit is contained in:
commit
378eea8ceb
49 changed files with 1011 additions and 81 deletions
|
@ -3,3 +3,6 @@ goal:
|
|||
write client that works on lumia 950 phone, so I can use matrix on my phone.
|
||||
|
||||
try approach offline to indexeddb. go low-memory, and test the performance of storing every event individually in indexeddb.
|
||||
|
||||
try to use little bandwidth, mainly by being an offline application and storing all requested data in indexeddb.
|
||||
be as functional as possible while offline
|
||||
|
|
|
@ -1,3 +1,11 @@
|
|||
# Remaining stuffs
|
||||
- don't swallow send errors, they should probably appear in the room error?
|
||||
- not sure it makes sense to show them where the composer is,
|
||||
because they might get sent a long time after you enter them in brawl,
|
||||
so you don't neccessarily have the context of the composer anymore
|
||||
- local echo
|
||||
|
||||
|
||||
takes care of rate limiting,
|
||||
and sending events from different rooms in parallel,
|
||||
NO: txnIds are created inside room. ~~making txnIds? ... it's rooms though that will receive the event in their sync response~~
|
||||
|
@ -76,8 +84,36 @@ steps of sending
|
|||
// sender is the thing that is shared across rooms to handle rate limiting.
|
||||
const sendQueue = new SendQueue({roomId, hsApi, sender, storage});
|
||||
await sendQueue.load(); //loads the queue?
|
||||
//might need to load members for e2e rooms
|
||||
//might need to load members for e2e rooms ...
|
||||
//events should be encrypted before storing them though ...
|
||||
|
||||
|
||||
// terminology ...?
|
||||
// task: to let us wait for it to be our turn
|
||||
// given rate limiting
|
||||
class Sender {
|
||||
acquireSlot() {
|
||||
return new SendSlot();
|
||||
}
|
||||
}
|
||||
// terminology ...?
|
||||
// task: after waiting for it to be our turn given rate-limiting,
|
||||
// send the actual thing we want to send.
|
||||
// this should be used for all rate-limited apis... ?
|
||||
class SendSlot {
|
||||
sendContent(content) {
|
||||
|
||||
}
|
||||
|
||||
sendRedaction() {
|
||||
|
||||
}
|
||||
|
||||
uploadMedia() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
class SendQueue {
|
||||
// when trying to send
|
||||
enqueueEvent(pendingEvent) {
|
||||
|
@ -93,11 +129,13 @@ steps of sending
|
|||
while (let pendingEvent = await findNextPendingEvent()) {
|
||||
pendingEvent.status = QUEUED;
|
||||
try {
|
||||
await this.sender.sendEvent(() => {
|
||||
// callback gets called
|
||||
pendingEvent.status = SENDING;
|
||||
return pendingEvent;
|
||||
});
|
||||
const mediaSlot = await this.sender.acquireSlot();
|
||||
const mxcUrl = await mediaSlot.uploadMedia(pendingEvent.blob);
|
||||
pendingEvent.content.url = mxcUrl;
|
||||
const contentSlot = await this.sender.acquireSlot();
|
||||
contentSlot.sendContent(pendingEvent.content);
|
||||
pendingEvent.status = SENDING;
|
||||
await slot.sendContent(...);
|
||||
} catch (err) {
|
||||
//offline
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// #ifdef PLATFORM_GNOME
|
||||
// export {default} from "./ui/gnome/GnomePlatform.js";
|
||||
// #else
|
||||
//#ifdef PLATFORM_GNOME
|
||||
//##export {default} from "./ui/gnome/GnomePlatform.js";
|
||||
//#else
|
||||
export {default} from "./ui/web/WebPlatform.js";
|
||||
// #endif
|
||||
//#endif
|
||||
|
|
|
@ -45,7 +45,7 @@ export default class SessionViewModel extends EventEmitter {
|
|||
}
|
||||
this._currentRoomViewModel = new RoomViewModel({
|
||||
room,
|
||||
ownUserId: this._session.userId,
|
||||
ownUserId: this._session.user.id,
|
||||
closeCallback: () => this._closeCurrentRoom(),
|
||||
});
|
||||
this._currentRoomViewModel.load();
|
||||
|
|
|
@ -63,4 +63,10 @@ export default class RoomViewModel extends EventEmitter {
|
|||
get avatarInitials() {
|
||||
return avatarInitials(this._room.name);
|
||||
}
|
||||
|
||||
sendMessage(message) {
|
||||
if (message) {
|
||||
this._room.sendEvent("m.room.message", {msgtype: "m.text", body: message});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,8 +4,8 @@ export default class MessageTile extends SimpleTile {
|
|||
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this._isOwn = this._entry.event.sender === options.ownUserId;
|
||||
this._date = new Date(this._entry.event.origin_server_ts);
|
||||
this._isOwn = this._entry.sender === options.ownUserId;
|
||||
this._date = new Date(this._entry.timestamp);
|
||||
this._isContinuation = false;
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@ export default class MessageTile extends SimpleTile {
|
|||
}
|
||||
|
||||
get sender() {
|
||||
return this._entry.event.sender;
|
||||
return this._entry.sender;
|
||||
}
|
||||
|
||||
get date() {
|
||||
|
@ -34,8 +34,7 @@ export default class MessageTile extends SimpleTile {
|
|||
}
|
||||
|
||||
_getContent() {
|
||||
const event = this._entry.event;
|
||||
return event && event.content;
|
||||
return this._entry.content;
|
||||
}
|
||||
|
||||
updatePreviousSibling(prev) {
|
||||
|
|
|
@ -7,21 +7,20 @@ export default class RoomNameTile extends SimpleTile {
|
|||
}
|
||||
|
||||
get announcement() {
|
||||
const event = this._entry.event;
|
||||
const content = event.content;
|
||||
const {sender, content, stateKey} = this._entry;
|
||||
switch (content.membership) {
|
||||
case "invite": return `${event.state_key} was invited to the room by ${event.sender}`;
|
||||
case "join": return `${event.state_key} joined the room`;
|
||||
case "invite": return `${stateKey} was invited to the room by ${sender}`;
|
||||
case "join": return `${stateKey} joined the room`;
|
||||
case "leave": {
|
||||
if (event.state_key === event.sender) {
|
||||
return `${event.state_key} left the room`;
|
||||
if (stateKey === sender) {
|
||||
return `${stateKey} left the room`;
|
||||
} else {
|
||||
const reason = content.reason;
|
||||
return `${event.state_key} was kicked from the room by ${event.sender}${reason ? `: ${reason}` : ""}`;
|
||||
return `${stateKey} was kicked from the room by ${sender}${reason ? `: ${reason}` : ""}`;
|
||||
}
|
||||
}
|
||||
case "ban": return `${event.state_key} was banned from the room by ${event.sender}`;
|
||||
default: return `${event.sender} membership changed to ${content.membership}`;
|
||||
case "ban": return `${stateKey} was banned from the room by ${sender}`;
|
||||
default: return `${sender} membership changed to ${content.membership}`;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,8 +7,7 @@ export default class RoomNameTile extends SimpleTile {
|
|||
}
|
||||
|
||||
get announcement() {
|
||||
const event = this._entry.event;
|
||||
const content = event.content;
|
||||
return `${event.sender} named the room "${content.name}"`
|
||||
const content = this._entry.content;
|
||||
return `${this._entry.sender} named the room "${content.name}"`
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,4 +69,8 @@ export default class SimpleTile {
|
|||
get internalId() {
|
||||
return this._entry.asEventKey().toString();
|
||||
}
|
||||
|
||||
get isPending() {
|
||||
return this._entry.isPending;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,9 +4,8 @@ export default class TextTile extends MessageTile {
|
|||
get text() {
|
||||
const content = this._getContent();
|
||||
const body = content && content.body;
|
||||
const sender = this._entry.event.sender;
|
||||
if (this._entry.type === "m.emote") {
|
||||
return `* ${sender} ${body}`;
|
||||
if (content.msgtype === "m.emote") {
|
||||
return `* ${this._entry.sender} ${body}`;
|
||||
} else {
|
||||
return body;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import GapTile from "./tiles/GapTile.js";
|
||||
import TextTile from "./tiles/TextTile.js";
|
||||
import ImageTile from "./tiles/ImageTile.js";
|
||||
import LocationTile from "./tiles/LocationTile.js";
|
||||
import RoomNameTile from "./tiles/RoomNameTile.js";
|
||||
import RoomMemberTile from "./tiles/RoomMemberTile.js";
|
||||
|
@ -10,11 +9,10 @@ export default function ({timeline, ownUserId}) {
|
|||
const options = {entry, emitUpdate, ownUserId};
|
||||
if (entry.isGap) {
|
||||
return new GapTile(options, timeline);
|
||||
} else if (entry.event) {
|
||||
const event = entry.event;
|
||||
switch (event.type) {
|
||||
} else if (entry.eventType) {
|
||||
switch (entry.eventType) {
|
||||
case "m.room.message": {
|
||||
const content = event.content;
|
||||
const content = entry.content;
|
||||
const msgtype = content && content.msgtype;
|
||||
switch (msgtype) {
|
||||
case "m.text":
|
||||
|
|
|
@ -5,7 +5,7 @@ import Sync from "./matrix/sync.js";
|
|||
import SessionView from "./ui/web/session/SessionView.js";
|
||||
import SessionViewModel from "./domain/session/SessionViewModel.js";
|
||||
|
||||
const HOST = "192.168.2.108";
|
||||
const HOST = "127.0.0.1";
|
||||
const HOMESERVER = `http://${HOST}:8008`;
|
||||
const USERNAME = "bruno1";
|
||||
const USER_ID = `@${USERNAME}:localhost`;
|
||||
|
@ -76,6 +76,8 @@ export default async function main(container) {
|
|||
if (needsInitialSync) {
|
||||
showSession(container, session, sync);
|
||||
}
|
||||
// this will start sending unsent messages
|
||||
session.notifyNetworkAvailable();
|
||||
} catch(err) {
|
||||
console.error(`${err.message}:\n${err.stack}`);
|
||||
}
|
||||
|
|
121
src/matrix/SendScheduler.js
Normal file
121
src/matrix/SendScheduler.js
Normal file
|
@ -0,0 +1,121 @@
|
|||
import Platform from "../Platform.js";
|
||||
import {HomeServerError, NetworkError} from "./error.js";
|
||||
|
||||
export class RateLimitingBackoff {
|
||||
constructor() {
|
||||
this._remainingRateLimitedRequest = 0;
|
||||
}
|
||||
|
||||
async waitAfterLimitExceeded(retryAfterMs) {
|
||||
// this._remainingRateLimitedRequest = 5;
|
||||
// if (typeof retryAfterMs !== "number") {
|
||||
// } else {
|
||||
// }
|
||||
if (!retryAfterMs) {
|
||||
retryAfterMs = 5000;
|
||||
}
|
||||
await Platform.delay(retryAfterMs);
|
||||
}
|
||||
|
||||
// do we have to know about succeeding requests?
|
||||
// we can just
|
||||
|
||||
async waitForNextSend() {
|
||||
// this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
this represents a slot to do one rate limited api call.
|
||||
because rate-limiting is handled here, it should only
|
||||
try to do one call, so the SendScheduler can safely
|
||||
retry if the call ends up being rate limited.
|
||||
This is also why we have this abstraction it hsApi is not
|
||||
passed straight to SendQueue when it is its turn to send.
|
||||
e.g. we wouldn't want to repeat the callback in SendQueue that could
|
||||
have other side-effects before the call to hsApi that we wouldn't want
|
||||
repeated (setting up progress handlers for file uploads,
|
||||
... a UI update to say it started sending?
|
||||
... updating storage would probably only happen once the call succeeded
|
||||
... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried)
|
||||
) maybe it is a bit overengineering, but lets stick with it for now.
|
||||
At least the above is a clear definition why we have this class
|
||||
*/
|
||||
//class SendSlot -- obsolete
|
||||
|
||||
export class SendScheduler {
|
||||
constructor({hsApi, backoff}) {
|
||||
this._hsApi = hsApi;
|
||||
this._sendRequests = [];
|
||||
this._sendScheduled = false;
|
||||
this._offline = false;
|
||||
this._waitTime = 0;
|
||||
this._backoff = backoff;
|
||||
/*
|
||||
we should have some sort of flag here that we enable
|
||||
after all the rooms have been notified that they can resume
|
||||
sending, so that from session, we can say scheduler.enable();
|
||||
this way, when we have better scheduling, it won't be first come,
|
||||
first serve, when there are a lot of events in different rooms to send,
|
||||
but we can apply some priorization of who should go first
|
||||
*/
|
||||
// this._enabled;
|
||||
}
|
||||
|
||||
// this should really be per roomId to avoid head-of-line blocking
|
||||
//
|
||||
// takes a callback instead of returning a promise with the slot
|
||||
// to make sure the scheduler doesn't get blocked by a slot that is not consumed
|
||||
request(sendCallback) {
|
||||
let request;
|
||||
const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback});
|
||||
this._sendRequests.push(request);
|
||||
if (!this._sendScheduled && !this._offline) {
|
||||
this._sendLoop();
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
async _sendLoop() {
|
||||
while (this._sendRequests.length) {
|
||||
const request = this._sendRequests.shift();
|
||||
let result;
|
||||
try {
|
||||
// this can throw!
|
||||
result = await this._doSend(request.sendCallback);
|
||||
} catch (err) {
|
||||
if (err instanceof NetworkError) {
|
||||
// we're offline, everybody will have
|
||||
// to re-request slots when we come back online
|
||||
this._offline = true;
|
||||
for (const r of this._sendRequests) {
|
||||
r.reject(err);
|
||||
}
|
||||
this._sendRequests = [];
|
||||
}
|
||||
console.error("error for request", request);
|
||||
request.reject(err);
|
||||
break;
|
||||
}
|
||||
request.resolve(result);
|
||||
}
|
||||
// do next here instead of in _doSend
|
||||
}
|
||||
|
||||
async _doSend(sendCallback) {
|
||||
this._sendScheduled = false;
|
||||
await this._backoff.waitForNextSend();
|
||||
// loop is left by return or throw
|
||||
while (true) { // eslint-disable-line no-constant-condition
|
||||
try {
|
||||
return await sendCallback(this._hsApi);
|
||||
} catch (err) {
|
||||
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
|
||||
await this._backoff.waitAfterLimitExceeded(err.retry_after_ms);
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
9
src/matrix/User.js
Normal file
9
src/matrix/User.js
Normal file
|
@ -0,0 +1,9 @@
|
|||
export default class User {
|
||||
constructor(userId) {
|
||||
this._userId = userId;
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this._userId;
|
||||
}
|
||||
}
|
|
@ -2,6 +2,13 @@ export class HomeServerError extends Error {
|
|||
constructor(method, url, body) {
|
||||
super(`${body.error} on ${method} ${url}`);
|
||||
this.errcode = body.errcode;
|
||||
this.retry_after_ms = body.retry_after_ms;
|
||||
}
|
||||
|
||||
get isFatal() {
|
||||
switch (this.errcode) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ class RequestWrapper {
|
|||
}
|
||||
}
|
||||
|
||||
// todo: everywhere here, encode params in the url that could have slashes ... mainly event ids?
|
||||
export default class HomeServerApi {
|
||||
constructor(homeserver, accessToken) {
|
||||
// store these both in a closure somehow so it's harder to get at in case of XSS?
|
||||
|
@ -98,6 +99,10 @@ export default class HomeServerApi {
|
|||
return this._request("POST", csPath, queryParams, body);
|
||||
}
|
||||
|
||||
_put(csPath, queryParams, body) {
|
||||
return this._request("PUT", csPath, queryParams, body);
|
||||
}
|
||||
|
||||
_get(csPath, queryParams, body) {
|
||||
return this._request("GET", csPath, queryParams, body);
|
||||
}
|
||||
|
@ -111,6 +116,10 @@ export default class HomeServerApi {
|
|||
return this._get(`/rooms/${roomId}/messages`, params);
|
||||
}
|
||||
|
||||
send(roomId, eventType, txnId, content) {
|
||||
return this._put(`/rooms/${roomId}/send/${eventType}/${txnId}`, {}, content);
|
||||
}
|
||||
|
||||
passwordLogin(username, password) {
|
||||
return this._post("/login", undefined, {
|
||||
"type": "m.login.password",
|
||||
|
|
|
@ -3,9 +3,10 @@ import RoomSummary from "./summary.js";
|
|||
import SyncWriter from "./timeline/persistence/SyncWriter.js";
|
||||
import Timeline from "./timeline/Timeline.js";
|
||||
import FragmentIdComparer from "./timeline/FragmentIdComparer.js";
|
||||
import SendQueue from "./sending/SendQueue.js";
|
||||
|
||||
export default class Room extends EventEmitter {
|
||||
constructor({roomId, storage, hsApi, emitCollectionChange}) {
|
||||
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user}) {
|
||||
super();
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
|
@ -14,16 +15,22 @@ export default class Room extends EventEmitter {
|
|||
this._fragmentIdComparer = new FragmentIdComparer([]);
|
||||
this._syncWriter = new SyncWriter({roomId, storage, fragmentIdComparer: this._fragmentIdComparer});
|
||||
this._emitCollectionChange = emitCollectionChange;
|
||||
this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents});
|
||||
this._timeline = null;
|
||||
this._user = user;
|
||||
}
|
||||
|
||||
async persistSync(roomResponse, membership, txn) {
|
||||
const summaryChanged = this._summary.applySync(roomResponse, membership, txn);
|
||||
const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn);
|
||||
return {summaryChanged, newTimelineEntries};
|
||||
let removedPendingEvents;
|
||||
if (roomResponse.timeline && roomResponse.timeline.events) {
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
||||
}
|
||||
return {summaryChanged, newTimelineEntries, removedPendingEvents};
|
||||
}
|
||||
|
||||
emitSync({summaryChanged, newTimelineEntries}) {
|
||||
emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) {
|
||||
if (summaryChanged) {
|
||||
this.emit("change");
|
||||
this._emitCollectionChange(this);
|
||||
|
@ -31,13 +38,24 @@ export default class Room extends EventEmitter {
|
|||
if (this._timeline) {
|
||||
this._timeline.appendLiveEntries(newTimelineEntries);
|
||||
}
|
||||
if (removedPendingEvents) {
|
||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||
}
|
||||
}
|
||||
|
||||
resumeSending() {
|
||||
this._sendQueue.resumeSending();
|
||||
}
|
||||
|
||||
load(summary, txn) {
|
||||
this._summary.load(summary);
|
||||
return this._syncWriter.load(txn);
|
||||
}
|
||||
|
||||
sendEvent(eventType, content) {
|
||||
this._sendQueue.enqueueEvent(eventType, content);
|
||||
}
|
||||
|
||||
get name() {
|
||||
return this._summary.name;
|
||||
}
|
||||
|
@ -55,7 +73,9 @@ export default class Room extends EventEmitter {
|
|||
storage: this._storage,
|
||||
hsApi: this._hsApi,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
pendingEvents: this._sendQueue.pendingEvents,
|
||||
closeCallback: () => this._timeline = null,
|
||||
user: this._user,
|
||||
});
|
||||
await this._timeline.load();
|
||||
return this._timeline;
|
||||
|
|
14
src/matrix/room/sending/PendingEvent.js
Normal file
14
src/matrix/room/sending/PendingEvent.js
Normal file
|
@ -0,0 +1,14 @@
|
|||
export default class PendingEvent {
|
||||
constructor(data) {
|
||||
this._data = data;
|
||||
}
|
||||
|
||||
get roomId() { return this._data.roomId; }
|
||||
get queueIndex() { return this._data.queueIndex; }
|
||||
get eventType() { return this._data.eventType; }
|
||||
get txnId() { return this._data.txnId; }
|
||||
get remoteId() { return this._data.remoteId; }
|
||||
set remoteId(value) { this._data.remoteId = value; }
|
||||
get content() { return this._data.content; }
|
||||
get data() { return this._data; }
|
||||
}
|
151
src/matrix/room/sending/SendQueue.js
Normal file
151
src/matrix/room/sending/SendQueue.js
Normal file
|
@ -0,0 +1,151 @@
|
|||
import SortedArray from "../../../observable/list/SortedArray.js";
|
||||
import {NetworkError} from "../../error.js";
|
||||
import PendingEvent from "./PendingEvent.js";
|
||||
|
||||
function makeTxnId() {
|
||||
const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
|
||||
const str = n.toString(16);
|
||||
return "t" + "0".repeat(14 - str.length) + str;
|
||||
}
|
||||
|
||||
export default class SendQueue {
|
||||
constructor({roomId, storage, sendScheduler, pendingEvents}) {
|
||||
pendingEvents = pendingEvents || [];
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._sendScheduler = sendScheduler;
|
||||
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
|
||||
if (pendingEvents.length) {
|
||||
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
|
||||
}
|
||||
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data)));
|
||||
this._isSending = false;
|
||||
this._offline = false;
|
||||
this._amountSent = 0;
|
||||
}
|
||||
|
||||
async _sendLoop() {
|
||||
this._isSending = true;
|
||||
try {
|
||||
console.log("start sending", this._amountSent, "<", this._pendingEvents.length);
|
||||
while (this._amountSent < this._pendingEvents.length) {
|
||||
const pendingEvent = this._pendingEvents.get(this._amountSent);
|
||||
console.log("trying to send", pendingEvent.content.body);
|
||||
this._amountSent += 1;
|
||||
if (pendingEvent.remoteId) {
|
||||
continue;
|
||||
}
|
||||
console.log("really sending now");
|
||||
const response = await this._sendScheduler.request(hsApi => {
|
||||
console.log("got sendScheduler slot");
|
||||
return hsApi.send(
|
||||
pendingEvent.roomId,
|
||||
pendingEvent.eventType,
|
||||
pendingEvent.txnId,
|
||||
pendingEvent.content
|
||||
);
|
||||
});
|
||||
pendingEvent.remoteId = response.event_id;
|
||||
//
|
||||
console.log("writing remoteId now");
|
||||
await this._tryUpdateEvent(pendingEvent);
|
||||
console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length);
|
||||
}
|
||||
} catch(err) {
|
||||
if (err instanceof NetworkError) {
|
||||
this._offline = true;
|
||||
}
|
||||
} finally {
|
||||
this._isSending = false;
|
||||
}
|
||||
}
|
||||
|
||||
removeRemoteEchos(events, txn) {
|
||||
const removed = [];
|
||||
for (const event of events) {
|
||||
const txnId = event.unsigned && event.unsigned.transaction_id;
|
||||
if (txnId) {
|
||||
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
|
||||
if (idx !== -1) {
|
||||
const pendingEvent = this._pendingEvents.get(idx);
|
||||
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
||||
removed.push(pendingEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
emitRemovals(pendingEvents) {
|
||||
for (const pendingEvent of pendingEvents) {
|
||||
const idx = this._pendingEvents.array.indexOf(pendingEvent);
|
||||
if (idx !== -1) {
|
||||
this._amountSent -= 1;
|
||||
this._pendingEvents.remove(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resumeSending() {
|
||||
this._offline = false;
|
||||
if (!this._isSending) {
|
||||
this._sendLoop();
|
||||
}
|
||||
}
|
||||
|
||||
async enqueueEvent(eventType, content) {
|
||||
const pendingEvent = await this._createAndStoreEvent(eventType, content);
|
||||
this._pendingEvents.set(pendingEvent);
|
||||
if (!this._isSending && !this._offline) {
|
||||
this._sendLoop();
|
||||
}
|
||||
}
|
||||
|
||||
get pendingEvents() {
|
||||
return this._pendingEvents;
|
||||
}
|
||||
|
||||
async _tryUpdateEvent(pendingEvent) {
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
||||
console.log("_tryUpdateEvent: got txn");
|
||||
try {
|
||||
// pendingEvent might have been removed already here
|
||||
// by a racing remote echo, so check first so we don't recreate it
|
||||
console.log("_tryUpdateEvent: before exists");
|
||||
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
|
||||
console.log("_tryUpdateEvent: inside if exists");
|
||||
txn.pendingEvents.update(pendingEvent.data);
|
||||
}
|
||||
console.log("_tryUpdateEvent: after exists");
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
console.log("_tryUpdateEvent: error", err);
|
||||
throw err;
|
||||
}
|
||||
console.log("_tryUpdateEvent: try complete");
|
||||
await txn.complete();
|
||||
}
|
||||
|
||||
async _createAndStoreEvent(eventType, content) {
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
||||
let pendingEvent;
|
||||
try {
|
||||
const pendingEventsStore = txn.pendingEvents;
|
||||
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
|
||||
const queueIndex = maxQueueIndex + 1;
|
||||
pendingEvent = new PendingEvent({
|
||||
roomId: this._roomId,
|
||||
queueIndex,
|
||||
eventType,
|
||||
content,
|
||||
txnId: makeTxnId()
|
||||
});
|
||||
pendingEventsStore.add(pendingEvent.data);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
return pendingEvent;
|
||||
}
|
||||
}
|
|
@ -1,32 +1,39 @@
|
|||
import { SortedArray } from "../../../observable/index.js";
|
||||
import { SortedArray, MappedList, ConcatList } from "../../../observable/index.js";
|
||||
import Direction from "./Direction.js";
|
||||
import GapWriter from "./persistence/GapWriter.js";
|
||||
import TimelineReader from "./persistence/TimelineReader.js";
|
||||
import PendingEventEntry from "./entries/PendingEventEntry.js";
|
||||
|
||||
export default class Timeline {
|
||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, hsApi}) {
|
||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user, hsApi}) {
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._closeCallback = closeCallback;
|
||||
this._fragmentIdComparer = fragmentIdComparer;
|
||||
this._hsApi = hsApi;
|
||||
this._entriesList = new SortedArray((a, b) => a.compare(b));
|
||||
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
|
||||
this._timelineReader = new TimelineReader({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer
|
||||
});
|
||||
const localEntries = new MappedList(pendingEvents, pe => {
|
||||
return new PendingEventEntry({pendingEvent: pe, user});
|
||||
}, (pee, params) => {
|
||||
pee.notifyUpdate(params);
|
||||
});
|
||||
this._allEntries = new ConcatList(this._remoteEntries, localEntries);
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async load() {
|
||||
const entries = await this._timelineReader.readFromEnd(50);
|
||||
this._entriesList.setManySorted(entries);
|
||||
this._remoteEntries.setManySorted(entries);
|
||||
}
|
||||
|
||||
/** @package */
|
||||
appendLiveEntries(newEntries) {
|
||||
this._entriesList.setManySorted(newEntries);
|
||||
this._remoteEntries.setManySorted(newEntries);
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
@ -42,12 +49,12 @@ export default class Timeline {
|
|||
fragmentIdComparer: this._fragmentIdComparer
|
||||
});
|
||||
const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response);
|
||||
this._entriesList.setManySorted(newEntries);
|
||||
this._remoteEntries.setManySorted(newEntries);
|
||||
}
|
||||
|
||||
// tries to prepend `amount` entries to the `entries` list.
|
||||
async loadAtTop(amount) {
|
||||
const firstEventEntry = this._entriesList.array.find(e => !!e.event);
|
||||
const firstEventEntry = this._remoteEntries.array.find(e => !!e.eventType);
|
||||
if (!firstEventEntry) {
|
||||
return;
|
||||
}
|
||||
|
@ -56,12 +63,12 @@ export default class Timeline {
|
|||
Direction.Backward,
|
||||
amount
|
||||
);
|
||||
this._entriesList.setManySorted(entries);
|
||||
this._remoteEntries.setManySorted(entries);
|
||||
}
|
||||
|
||||
/** @public */
|
||||
get entries() {
|
||||
return this._entriesList;
|
||||
return this._allEntries;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//entries can be sorted, first by fragment, then by entry index.
|
||||
import EventKey from "../EventKey.js";
|
||||
export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER;
|
||||
|
||||
export default class BaseEntry {
|
||||
constructor(fragmentIdComparer) {
|
||||
|
@ -17,6 +18,10 @@ export default class BaseEntry {
|
|||
compare(otherEntry) {
|
||||
if (this.fragmentId === otherEntry.fragmentId) {
|
||||
return this.entryIndex - otherEntry.entryIndex;
|
||||
} else if (this.fragmentId === PENDING_FRAGMENT_ID) {
|
||||
return 1;
|
||||
} else if (otherEntry.fragmentId === PENDING_FRAGMENT_ID) {
|
||||
return -1;
|
||||
} else {
|
||||
// This might throw if the relation of two fragments is unknown.
|
||||
return this._fragmentIdComparer.compare(this.fragmentId, otherEntry.fragmentId);
|
||||
|
|
|
@ -18,12 +18,20 @@ export default class EventEntry extends BaseEntry {
|
|||
return this._eventEntry.event.content;
|
||||
}
|
||||
|
||||
get event() {
|
||||
return this._eventEntry.event;
|
||||
get eventType() {
|
||||
return this._eventEntry.event.type;
|
||||
}
|
||||
|
||||
get type() {
|
||||
return this._eventEntry.event.type;
|
||||
get stateKey() {
|
||||
return this._eventEntry.event.state_key;
|
||||
}
|
||||
|
||||
get sender() {
|
||||
return this._eventEntry.event.sender;
|
||||
}
|
||||
|
||||
get timestamp() {
|
||||
return this._eventEntry.event.origin_server_ts;
|
||||
}
|
||||
|
||||
get id() {
|
||||
|
|
53
src/matrix/room/timeline/entries/PendingEventEntry.js
Normal file
53
src/matrix/room/timeline/entries/PendingEventEntry.js
Normal file
|
@ -0,0 +1,53 @@
|
|||
import BaseEntry, {PENDING_FRAGMENT_ID} from "./BaseEntry.js";
|
||||
|
||||
export default class PendingEventEntry extends BaseEntry {
|
||||
constructor({pendingEvent, user}) {
|
||||
super(null);
|
||||
this._pendingEvent = pendingEvent;
|
||||
this._user = user;
|
||||
}
|
||||
|
||||
get fragmentId() {
|
||||
return PENDING_FRAGMENT_ID;
|
||||
}
|
||||
|
||||
get entryIndex() {
|
||||
return this._pendingEvent.queueIndex;
|
||||
}
|
||||
|
||||
get content() {
|
||||
return this._pendingEvent.content;
|
||||
}
|
||||
|
||||
get event() {
|
||||
return null;
|
||||
}
|
||||
|
||||
get eventType() {
|
||||
return this._pendingEvent.eventType;
|
||||
}
|
||||
|
||||
get stateKey() {
|
||||
return null;
|
||||
}
|
||||
|
||||
get sender() {
|
||||
return this._user.id;
|
||||
}
|
||||
|
||||
get timestamp() {
|
||||
return null;
|
||||
}
|
||||
|
||||
get isPending() {
|
||||
return true;
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this._pendingEvent.txnId;
|
||||
}
|
||||
|
||||
notifyUpdate() {
|
||||
|
||||
}
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
import Room from "./room/room.js";
|
||||
import { ObservableMap } from "../observable/index.js";
|
||||
import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js";
|
||||
import User from "./User.js";
|
||||
|
||||
export default class Session {
|
||||
// sessionInfo contains deviceId, userId and homeServer
|
||||
|
@ -9,7 +11,9 @@ export default class Session {
|
|||
this._session = null;
|
||||
this._sessionInfo = sessionInfo;
|
||||
this._rooms = new ObservableMap();
|
||||
this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()});
|
||||
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
|
||||
this._user = new User(sessionInfo.userId);
|
||||
}
|
||||
|
||||
async load() {
|
||||
|
@ -19,6 +23,7 @@ export default class Session {
|
|||
this._storage.storeNames.roomState,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
this._storage.storeNames.pendingEvents,
|
||||
]);
|
||||
// restore session object
|
||||
this._session = await txn.session.get();
|
||||
|
@ -26,24 +31,47 @@ export default class Session {
|
|||
this._session = {};
|
||||
return;
|
||||
}
|
||||
const pendingEventsByRoomId = await this._getPendingEventsByRoom(txn);
|
||||
// load rooms
|
||||
const rooms = await txn.roomSummary.getAll();
|
||||
await Promise.all(rooms.map(summary => {
|
||||
const room = this.createRoom(summary.roomId);
|
||||
const room = this.createRoom(summary.roomId, pendingEventsByRoomId.get(summary.roomId));
|
||||
return room.load(summary, txn);
|
||||
}));
|
||||
}
|
||||
|
||||
notifyNetworkAvailable() {
|
||||
for (const [, room] of this._rooms) {
|
||||
room.resumeSending();
|
||||
}
|
||||
}
|
||||
|
||||
async _getPendingEventsByRoom(txn) {
|
||||
const pendingEvents = await txn.pendingEvents.getAll();
|
||||
return pendingEvents.reduce((groups, pe) => {
|
||||
const group = groups.get(pe.roomId);
|
||||
if (group) {
|
||||
group.push(pe);
|
||||
} else {
|
||||
groups.set(pe.roomId, [pe]);
|
||||
}
|
||||
return groups;
|
||||
}, new Map());
|
||||
}
|
||||
|
||||
get rooms() {
|
||||
return this._rooms;
|
||||
}
|
||||
|
||||
createRoom(roomId) {
|
||||
createRoom(roomId, pendingEvents) {
|
||||
const room = new Room({
|
||||
roomId,
|
||||
storage: this._storage,
|
||||
emitCollectionChange: this._roomUpdateCallback,
|
||||
hsApi: this._hsApi,
|
||||
sendScheduler: this._sendScheduler,
|
||||
pendingEvents,
|
||||
user: this._user,
|
||||
});
|
||||
this._rooms.add(roomId, room);
|
||||
return room;
|
||||
|
@ -60,7 +88,7 @@ export default class Session {
|
|||
return this._session.syncToken;
|
||||
}
|
||||
|
||||
get userId() {
|
||||
return this._sessionInfo.userId;
|
||||
get user() {
|
||||
return this._user;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,11 @@
|
|||
export const STORE_NAMES = Object.freeze(["session", "roomState", "roomSummary", "timelineEvents", "timelineFragments"]);
|
||||
export const STORE_NAMES = Object.freeze([
|
||||
"session",
|
||||
"roomState",
|
||||
"roomSummary",
|
||||
"timelineEvents",
|
||||
"timelineFragments",
|
||||
"pendingEvents",
|
||||
]);
|
||||
|
||||
export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => {
|
||||
nameMap[name] = name;
|
||||
|
@ -16,5 +23,8 @@ export class StorageError extends Error {
|
|||
fullMessage += cause.message;
|
||||
}
|
||||
super(fullMessage);
|
||||
if (cause) {
|
||||
this.errcode = cause.name;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ function createStores(db) {
|
|||
timelineEvents.createIndex("byEventId", "eventIdKey", {unique: true});
|
||||
//key = room_id | event.type | event.state_key,
|
||||
db.createObjectStore("roomState", {keyPath: "key"});
|
||||
db.createObjectStore("pendingEvents", {keyPath: "key"});
|
||||
|
||||
// const roomMembers = db.createObjectStore("roomMembers", {keyPath: [
|
||||
// "event.room_id",
|
||||
|
|
|
@ -21,6 +21,10 @@ export default class QueryTarget {
|
|||
return reqAsPromise(this._target.get(key));
|
||||
}
|
||||
|
||||
getKey(key) {
|
||||
return reqAsPromise(this._target.getKey(key));
|
||||
}
|
||||
|
||||
reduce(range, reducer, initialValue) {
|
||||
return this._reduce(range, reducer, initialValue, "next");
|
||||
}
|
||||
|
@ -71,6 +75,16 @@ export default class QueryTarget {
|
|||
return this._find(range, predicate, "prev");
|
||||
}
|
||||
|
||||
async findMaxKey(range) {
|
||||
const cursor = this._target.openKeyCursor(range, "prev");
|
||||
let maxKey;
|
||||
await iterateCursor(cursor, (_, key) => {
|
||||
maxKey = key;
|
||||
return {done: true};
|
||||
});
|
||||
return maxKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a given set of keys exist.
|
||||
* Calls `callback(key, found)` for each key in `keys`, in key sorting order (or reversed if backwards=true).
|
||||
|
|
|
@ -46,6 +46,22 @@ class QueryTargetWrapper {
|
|||
throw new StorageError("get failed", err);
|
||||
}
|
||||
}
|
||||
|
||||
getKey(...params) {
|
||||
try {
|
||||
return this._qt.getKey(...params);
|
||||
} catch(err) {
|
||||
throw new StorageError("getKey failed", err);
|
||||
}
|
||||
}
|
||||
|
||||
delete(...params) {
|
||||
try {
|
||||
return this._qt.delete(...params);
|
||||
} catch(err) {
|
||||
throw new StorageError("delete failed", err);
|
||||
}
|
||||
}
|
||||
|
||||
index(...params) {
|
||||
try {
|
||||
|
@ -76,4 +92,8 @@ export default class Store extends QueryTarget {
|
|||
add(value) {
|
||||
return reqAsPromise(this._idbStore.add(value));
|
||||
}
|
||||
|
||||
delete(keyOrKeyRange) {
|
||||
return reqAsPromise(this._idbStore.delete(keyOrKeyRange));
|
||||
}
|
||||
}
|
||||
|
|
55
src/matrix/storage/idb/stores/PendingEventStore.js
Normal file
55
src/matrix/storage/idb/stores/PendingEventStore.js
Normal file
|
@ -0,0 +1,55 @@
|
|||
import { encodeUint32, decodeUint32 } from "../utils.js";
|
||||
import Platform from "../../../../Platform.js";
|
||||
|
||||
function encodeKey(roomId, queueIndex) {
|
||||
return `${roomId}|${encodeUint32(queueIndex)}`;
|
||||
}
|
||||
|
||||
function decodeKey(key) {
|
||||
const [roomId, encodedQueueIndex] = key.split("|");
|
||||
const queueIndex = decodeUint32(encodedQueueIndex);
|
||||
return {roomId, queueIndex};
|
||||
}
|
||||
|
||||
export default class PendingEventStore {
|
||||
constructor(eventStore) {
|
||||
this._eventStore = eventStore;
|
||||
}
|
||||
|
||||
async getMaxQueueIndex(roomId) {
|
||||
const range = IDBKeyRange.bound(
|
||||
encodeKey(roomId, Platform.minStorageKey),
|
||||
encodeKey(roomId, Platform.maxStorageKey),
|
||||
false,
|
||||
false,
|
||||
);
|
||||
const maxKey = await this._eventStore.findMaxKey(range);
|
||||
if (maxKey) {
|
||||
return decodeKey(maxKey).queueIndex;
|
||||
}
|
||||
}
|
||||
|
||||
remove(roomId, queueIndex) {
|
||||
const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex));
|
||||
this._eventStore.delete(keyRange);
|
||||
}
|
||||
|
||||
async exists(roomId, queueIndex) {
|
||||
const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex));
|
||||
const key = await this._eventStore.getKey(keyRange);
|
||||
return !!key;
|
||||
}
|
||||
|
||||
add(pendingEvent) {
|
||||
pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex);
|
||||
return this._eventStore.add(pendingEvent);
|
||||
}
|
||||
|
||||
update(pendingEvent) {
|
||||
return this._eventStore.put(pendingEvent);
|
||||
}
|
||||
|
||||
getAll() {
|
||||
return this._eventStore.selectAll();
|
||||
}
|
||||
}
|
|
@ -1,13 +1,8 @@
|
|||
import EventKey from "../../../room/timeline/EventKey.js";
|
||||
import { StorageError } from "../../common.js";
|
||||
import { encodeUint32 } from "../utils.js";
|
||||
import Platform from "../../../../Platform.js";
|
||||
|
||||
// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb
|
||||
function encodeUint32(n) {
|
||||
const hex = n.toString(16);
|
||||
return "0".repeat(8 - hex.length) + hex;
|
||||
}
|
||||
|
||||
function encodeKey(roomId, fragmentId, eventIndex) {
|
||||
return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`;
|
||||
}
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import { StorageError } from "../../common.js";
|
||||
import Platform from "../../../../Platform.js";
|
||||
import { encodeUint32 } from "../utils.js";
|
||||
|
||||
function encodeKey(roomId, fragmentId) {
|
||||
let fragmentIdHex = fragmentId.toString(16);
|
||||
fragmentIdHex = "0".repeat(8 - fragmentIdHex.length) + fragmentIdHex;
|
||||
return `${roomId}|${fragmentIdHex}`;
|
||||
return `${roomId}|${encodeUint32(fragmentId)}`;
|
||||
}
|
||||
|
||||
export default class RoomFragmentStore {
|
||||
|
|
|
@ -6,6 +6,7 @@ import RoomSummaryStore from "./stores/RoomSummaryStore.js";
|
|||
import TimelineEventStore from "./stores/TimelineEventStore.js";
|
||||
import RoomStateStore from "./stores/RoomStateStore.js";
|
||||
import TimelineFragmentStore from "./stores/TimelineFragmentStore.js";
|
||||
import PendingEventStore from "./stores/PendingEventStore.js";
|
||||
|
||||
export default class Transaction {
|
||||
constructor(txn, allowedStoreNames) {
|
||||
|
@ -55,6 +56,10 @@ export default class Transaction {
|
|||
return this._store("roomState", idbStore => new RoomStateStore(idbStore));
|
||||
}
|
||||
|
||||
get pendingEvents() {
|
||||
return this._store("pendingEvents", idbStore => new PendingEventStore(idbStore));
|
||||
}
|
||||
|
||||
complete() {
|
||||
return txnAsPromise(this._txn);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,16 @@
|
|||
import { StorageError } from "../common.js";
|
||||
|
||||
|
||||
// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb
|
||||
export function encodeUint32(n) {
|
||||
const hex = n.toString(16);
|
||||
return "0".repeat(8 - hex.length) + hex;
|
||||
}
|
||||
|
||||
export function decodeUint32(str) {
|
||||
return parseInt(str, 16);
|
||||
}
|
||||
|
||||
export function openDatabase(name, createObjectStore, version) {
|
||||
const req = window.indexedDB.open(name, version);
|
||||
req.onupgradeneeded = (ev) => {
|
||||
|
|
|
@ -77,6 +77,7 @@ export default class Sync extends EventEmitter {
|
|||
storeNames.roomState,
|
||||
storeNames.timelineEvents,
|
||||
storeNames.timelineFragments,
|
||||
storeNames.pendingEvents,
|
||||
]);
|
||||
const roomChanges = [];
|
||||
try {
|
||||
|
|
|
@ -18,7 +18,7 @@ export default class BaseObservableCollection {
|
|||
}
|
||||
return () => {
|
||||
if (handler) {
|
||||
this._handlers.delete(this._handler);
|
||||
this._handlers.delete(handler);
|
||||
if (this._handlers.size === 0) {
|
||||
this.onUnsubscribeLast();
|
||||
}
|
||||
|
@ -30,3 +30,25 @@ export default class BaseObservableCollection {
|
|||
|
||||
// Add iterator over handlers here
|
||||
}
|
||||
|
||||
export function tests() {
|
||||
class Collection extends BaseObservableCollection {
|
||||
constructor() {
|
||||
super();
|
||||
this.firstSubscribeCalls = 0;
|
||||
this.firstUnsubscribeCalls = 0;
|
||||
}
|
||||
onSubscribeFirst() { this.firstSubscribeCalls += 1; }
|
||||
onUnsubscribeLast() { this.firstUnsubscribeCalls += 1; }
|
||||
}
|
||||
|
||||
return {
|
||||
test_unsubscribe(assert) {
|
||||
const c = new Collection();
|
||||
const unsubscribe = c.subscribe({});
|
||||
unsubscribe();
|
||||
assert.equal(c.firstSubscribeCalls, 1);
|
||||
assert.equal(c.firstUnsubscribeCalls, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ import BaseObservableMap from "./map/BaseObservableMap.js";
|
|||
// re-export "root" (of chain) collections
|
||||
export { default as ObservableArray } from "./list/ObservableArray.js";
|
||||
export { default as SortedArray } from "./list/SortedArray.js";
|
||||
export { default as MappedList } from "./list/MappedList.js";
|
||||
export { default as ConcatList } from "./list/ConcatList.js";
|
||||
export { default as ObservableMap } from "./map/ObservableMap.js";
|
||||
|
||||
// avoid circular dependency between these classes
|
||||
|
|
|
@ -3,26 +3,26 @@ import BaseObservableCollection from "../BaseObservableCollection.js";
|
|||
export default class BaseObservableList extends BaseObservableCollection {
|
||||
emitReset() {
|
||||
for(let h of this._handlers) {
|
||||
h.onReset();
|
||||
h.onReset(this);
|
||||
}
|
||||
}
|
||||
// we need batch events, mostly on index based collection though?
|
||||
// maybe we should get started without?
|
||||
emitAdd(index, value) {
|
||||
for(let h of this._handlers) {
|
||||
h.onAdd(index, value);
|
||||
h.onAdd(index, value, this);
|
||||
}
|
||||
}
|
||||
|
||||
emitUpdate(index, value, params) {
|
||||
for(let h of this._handlers) {
|
||||
h.onUpdate(index, value, params);
|
||||
h.onUpdate(index, value, params, this);
|
||||
}
|
||||
}
|
||||
|
||||
emitRemove(index, value) {
|
||||
for(let h of this._handlers) {
|
||||
h.onRemove(index, value);
|
||||
h.onRemove(index, value, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,7 @@ export default class BaseObservableList extends BaseObservableCollection {
|
|||
// been removed from its fromIdx
|
||||
emitMove(fromIdx, toIdx, value) {
|
||||
for(let h of this._handlers) {
|
||||
h.onMove(fromIdx, toIdx, value);
|
||||
h.onMove(fromIdx, toIdx, value, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
144
src/observable/list/ConcatList.js
Normal file
144
src/observable/list/ConcatList.js
Normal file
|
@ -0,0 +1,144 @@
|
|||
import BaseObservableList from "./BaseObservableList.js";
|
||||
|
||||
export default class ConcatList extends BaseObservableList {
|
||||
constructor(...sourceLists) {
|
||||
super();
|
||||
this._sourceLists = sourceLists;
|
||||
this._sourceUnsubscribes = null;
|
||||
}
|
||||
|
||||
_offsetForSource(sourceList) {
|
||||
const listIdx = this._sourceLists.indexOf(sourceList);
|
||||
let offset = 0;
|
||||
for (let i = 0; i < listIdx; ++i) {
|
||||
offset += this._sourceLists[i].length;
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
onSubscribeFirst() {
|
||||
this._sourceUnsubscribes = [];
|
||||
for (const sourceList of this._sourceLists) {
|
||||
this._sourceUnsubscribes.push(sourceList.subscribe(this));
|
||||
}
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
for (const sourceUnsubscribe of this._sourceUnsubscribes) {
|
||||
sourceUnsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
onReset() {
|
||||
// TODO: not ideal if other source lists are large
|
||||
// but working impl for now
|
||||
// reset, and
|
||||
this.emitReset();
|
||||
let idx = 0;
|
||||
for(const item of this) {
|
||||
this.emitAdd(idx, item);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
onAdd(index, value, sourceList) {
|
||||
this.emitAdd(this._offsetForSource(sourceList) + index, value);
|
||||
}
|
||||
|
||||
onUpdate(index, value, params, sourceList) {
|
||||
this.emitUpdate(this._offsetForSource(sourceList) + index, value, params);
|
||||
}
|
||||
|
||||
onRemove(index, value, sourceList) {
|
||||
this.emitRemove(this._offsetForSource(sourceList) + index, value);
|
||||
}
|
||||
|
||||
onMove(fromIdx, toIdx, value, sourceList) {
|
||||
const offset = this._offsetForSource(sourceList);
|
||||
this.emitMove(offset + fromIdx, offset + toIdx, value);
|
||||
}
|
||||
|
||||
get length() {
|
||||
let len = 0;
|
||||
for (let i = 0; i < this._sourceLists.length; ++i) {
|
||||
len += this._sourceLists[i].length;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
[Symbol.iterator]() {
|
||||
let sourceListIdx = 0;
|
||||
let it = this._sourceLists[0][Symbol.iterator]();
|
||||
return {
|
||||
next: () => {
|
||||
let result = it.next();
|
||||
while (result.done) {
|
||||
sourceListIdx += 1;
|
||||
if (sourceListIdx >= this._sourceLists.length) {
|
||||
return result; //done
|
||||
}
|
||||
it = this._sourceLists[sourceListIdx][Symbol.iterator]();
|
||||
result = it.next();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
import ObservableArray from "./ObservableArray.js";
|
||||
export async function tests() {
|
||||
return {
|
||||
test_length(assert) {
|
||||
const all = new ConcatList(
|
||||
new ObservableArray([1, 2, 3]),
|
||||
new ObservableArray([11, 12, 13])
|
||||
);
|
||||
assert.equal(all.length, 6);
|
||||
},
|
||||
test_iterator(assert) {
|
||||
const all = new ConcatList(
|
||||
new ObservableArray([1, 2, 3]),
|
||||
new ObservableArray([11, 12, 13])
|
||||
);
|
||||
const it = all[Symbol.iterator]();
|
||||
assert.equal(it.next().value, 1);
|
||||
assert.equal(it.next().value, 2);
|
||||
assert.equal(it.next().value, 3);
|
||||
assert.equal(it.next().value, 11);
|
||||
assert.equal(it.next().value, 12);
|
||||
assert.equal(it.next().value, 13);
|
||||
assert(it.next().done);
|
||||
},
|
||||
test_add(assert) {
|
||||
const list1 = new ObservableArray([1, 2, 3]);
|
||||
const list2 = new ObservableArray([11, 12, 13]);
|
||||
const all = new ConcatList(list1, list2);
|
||||
let fired = false;
|
||||
all.subscribe({
|
||||
onAdd(index, value) {
|
||||
fired = true;
|
||||
assert.equal(index, 4);
|
||||
assert.equal(value, 11.5);
|
||||
}
|
||||
});
|
||||
list2.insert(1, 11.5);
|
||||
assert(fired);
|
||||
},
|
||||
test_update(assert) {
|
||||
const list1 = new ObservableArray([1, 2, 3]);
|
||||
const list2 = new ObservableArray([11, 12, 13]);
|
||||
const all = new ConcatList(list1, list2);
|
||||
let fired = false;
|
||||
all.subscribe({
|
||||
onUpdate(index, value) {
|
||||
fired = true;
|
||||
assert.equal(index, 4);
|
||||
assert.equal(value, 10);
|
||||
}
|
||||
});
|
||||
list2.emitUpdate(1, 10);
|
||||
assert(fired);
|
||||
},
|
||||
};
|
||||
}
|
115
src/observable/list/MappedList.js
Normal file
115
src/observable/list/MappedList.js
Normal file
|
@ -0,0 +1,115 @@
|
|||
import BaseObservableList from "./BaseObservableList.js";
|
||||
|
||||
export default class MappedList extends BaseObservableList {
|
||||
constructor(sourceList, mapper, updater) {
|
||||
super();
|
||||
this._sourceList = sourceList;
|
||||
this._mapper = mapper;
|
||||
this._updater = updater;
|
||||
this._sourceUnsubscribe = null;
|
||||
this._mappedValues = null;
|
||||
}
|
||||
|
||||
onSubscribeFirst() {
|
||||
this._sourceUnsubscribe = this._sourceList.subscribe(this);
|
||||
this._mappedValues = [];
|
||||
for (const item of this._sourceList) {
|
||||
this._mappedValues.push(this._mapper(item));
|
||||
}
|
||||
}
|
||||
|
||||
onReset() {
|
||||
this._mappedValues = [];
|
||||
this.emitReset();
|
||||
}
|
||||
|
||||
onAdd(index, value) {
|
||||
const mappedValue = this._mapper(value);
|
||||
this._mappedValues.splice(index, 0, mappedValue);
|
||||
this.emitAdd(index, mappedValue);
|
||||
}
|
||||
|
||||
onUpdate(index, value, params) {
|
||||
const mappedValue = this._mappedValues[index];
|
||||
if (this._updater) {
|
||||
this._updater(mappedValue, params, value);
|
||||
}
|
||||
this.emitUpdate(index, mappedValue, params);
|
||||
}
|
||||
|
||||
onRemove(index) {
|
||||
const mappedValue = this._mappedValues[index];
|
||||
this._mappedValues.splice(index, 1);
|
||||
this.emitRemove(index, mappedValue);
|
||||
}
|
||||
|
||||
onMove(fromIdx, toIdx) {
|
||||
const mappedValue = this._mappedValues[fromIdx];
|
||||
this._mappedValues.splice(fromIdx, 1);
|
||||
this._mappedValues.splice(toIdx, 0, mappedValue);
|
||||
this.emitMove(fromIdx, toIdx, mappedValue);
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
this._sourceUnsubscribe();
|
||||
}
|
||||
|
||||
get length() {
|
||||
return this._mappedValues.length;
|
||||
}
|
||||
|
||||
[Symbol.iterator]() {
|
||||
return this._mappedValues.values();
|
||||
}
|
||||
}
|
||||
|
||||
export async function tests() {
|
||||
class MockList extends BaseObservableList {
|
||||
get length() {
|
||||
return 0;
|
||||
}
|
||||
[Symbol.iterator]() {
|
||||
return [].values();
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
test_add(assert) {
|
||||
const source = new MockList();
|
||||
const mapped = new MappedList(source, n => {return {n: n*n};});
|
||||
let fired = false;
|
||||
const unsubscribe = mapped.subscribe({
|
||||
onAdd(idx, value) {
|
||||
fired = true;
|
||||
assert.equal(idx, 0);
|
||||
assert.equal(value.n, 36);
|
||||
}
|
||||
});
|
||||
source.emitAdd(0, 6);
|
||||
assert(fired);
|
||||
unsubscribe();
|
||||
},
|
||||
test_update(assert) {
|
||||
const source = new MockList();
|
||||
const mapped = new MappedList(
|
||||
source,
|
||||
n => {return {n: n*n};},
|
||||
(o, p, n) => o.m = n*n
|
||||
);
|
||||
let fired = false;
|
||||
const unsubscribe = mapped.subscribe({
|
||||
onAdd() {},
|
||||
onUpdate(idx, value) {
|
||||
fired = true;
|
||||
assert.equal(idx, 0);
|
||||
assert.equal(value.n, 36);
|
||||
assert.equal(value.m, 49);
|
||||
}
|
||||
});
|
||||
source.emitAdd(0, 6);
|
||||
source.emitUpdate(0, 7);
|
||||
assert(fired);
|
||||
unsubscribe();
|
||||
}
|
||||
};
|
||||
}
|
|
@ -1,9 +1,9 @@
|
|||
import BaseObservableList from "./BaseObservableList.js";
|
||||
|
||||
export default class ObservableArray extends BaseObservableList {
|
||||
constructor() {
|
||||
constructor(initialValues = []) {
|
||||
super();
|
||||
this._items = [];
|
||||
this._items = initialValues;
|
||||
}
|
||||
|
||||
append(item) {
|
||||
|
|
|
@ -8,6 +8,10 @@ export default class SortedArray extends BaseObservableList {
|
|||
this._items = [];
|
||||
}
|
||||
|
||||
setManyUnsorted(items) {
|
||||
this.setManySorted(items);
|
||||
}
|
||||
|
||||
setManySorted(items) {
|
||||
// TODO: we can make this way faster by only looking up the first and last key,
|
||||
// and merging whatever is inbetween with items
|
||||
|
@ -32,8 +36,14 @@ export default class SortedArray extends BaseObservableList {
|
|||
}
|
||||
}
|
||||
|
||||
remove(item) {
|
||||
throw new Error("unimplemented");
|
||||
get(idx) {
|
||||
return this._items[idx];
|
||||
}
|
||||
|
||||
remove(idx) {
|
||||
const item = this._items[idx];
|
||||
this._items.splice(idx, 1);
|
||||
this.emitRemove(idx, item);
|
||||
}
|
||||
|
||||
get array() {
|
||||
|
|
|
@ -13,4 +13,8 @@ export default {
|
|||
// for indexeddb, we use unsigned 32 bit integers as keys
|
||||
return 0xFFFFFFFF;
|
||||
},
|
||||
|
||||
delay(ms) {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,11 @@ body {
|
|||
min-height: 0;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
.TimelinePanel ul {
|
||||
flex: 1 0 0;
|
||||
}
|
||||
|
||||
.RoomHeader {
|
||||
|
|
|
@ -69,3 +69,11 @@
|
|||
.RoomView_error {
|
||||
color: red;
|
||||
}
|
||||
|
||||
.MessageComposer > input {
|
||||
display: block;
|
||||
width: 100%;
|
||||
box-sizing: border-box;
|
||||
padding: 0.8em;
|
||||
border: none;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
|
||||
.TimelinePanel ul {
|
||||
flex: 1;
|
||||
overflow-y: auto;
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
|
@ -54,6 +53,10 @@
|
|||
background-color: darkgreen;
|
||||
}
|
||||
|
||||
.TextMessageView.pending .message-container {
|
||||
background-color: #333;
|
||||
}
|
||||
|
||||
.message-container p {
|
||||
margin: 5px 0;
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ export function text(str) {
|
|||
export const TAG_NAMES = [
|
||||
"ol", "ul", "li", "div", "h1", "h2", "h3", "h4", "h5", "h6",
|
||||
"p", "strong", "em", "span", "img", "section", "main", "article", "aside",
|
||||
"pre", "button", "time"];
|
||||
"pre", "button", "time", "input", "textarea"];
|
||||
|
||||
export const tag = {};
|
||||
|
||||
|
|
23
src/ui/web/session/room/MessageComposer.js
Normal file
23
src/ui/web/session/room/MessageComposer.js
Normal file
|
@ -0,0 +1,23 @@
|
|||
import TemplateView from "../../general/TemplateView.js";
|
||||
|
||||
export default class MessageComposer extends TemplateView {
|
||||
constructor(viewModel) {
|
||||
super(viewModel);
|
||||
this._input = null;
|
||||
}
|
||||
|
||||
render(t) {
|
||||
this._input = t.input({
|
||||
placeholder: "Send a message ...",
|
||||
onKeydown: e => this._onKeyDown(e)
|
||||
});
|
||||
return t.div({className: "MessageComposer"}, [this._input]);
|
||||
}
|
||||
|
||||
_onKeyDown(event) {
|
||||
if (event.key === "Enter") {
|
||||
this.viewModel.sendMessage(this._input.value);
|
||||
this._input.value = "";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
import TemplateView from "../../general/TemplateView.js";
|
||||
import TimelineList from "./TimelineList.js";
|
||||
import MessageComposer from "./MessageComposer.js";
|
||||
|
||||
export default class RoomView extends TemplateView {
|
||||
constructor(viewModel) {
|
||||
|
@ -18,17 +19,20 @@ export default class RoomView extends TemplateView {
|
|||
]),
|
||||
]),
|
||||
t.div({className: "RoomView_error"}, vm => vm.error),
|
||||
this._timelineList.mount()
|
||||
this._timelineList.mount(),
|
||||
this._composer.mount(),
|
||||
])
|
||||
]);
|
||||
}
|
||||
|
||||
mount() {
|
||||
this._composer = new MessageComposer(this.viewModel);
|
||||
this._timelineList = new TimelineList();
|
||||
return super.mount();
|
||||
}
|
||||
|
||||
unmount() {
|
||||
this._composer.unmount();
|
||||
this._timelineList.unmount();
|
||||
super.unmount();
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ export default class TextMessageView extends TemplateView {
|
|||
render(t, vm) {
|
||||
// no bindings ... should this be a template view?
|
||||
return t.li(
|
||||
{className: {"TextMessageView": true, own: vm.isOwn}},
|
||||
{className: {"TextMessageView": true, own: vm.isOwn, pending: vm.isPending}},
|
||||
t.div({className: "message-container"}, [
|
||||
t.div({className: "sender"}, vm => vm.isContinuation ? "" : vm.sender),
|
||||
t.p([vm.text, t.time(vm.date + " " + vm.time)]),
|
||||
|
|
Loading…
Reference in a new issue