From 4ce66fc8a19ccfd7c99beb216160a44e8765576e Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 20 May 2021 14:51:04 +0200 Subject: [PATCH] allow concurrent removals when iterating pending events so we can remove failed events in the next commit --- src/matrix/room/sending/SendQueue.js | 16 +----- src/observable/list/SortedArray.js | 76 +++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 16 deletions(-) diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index c946a2f5..4b1088c3 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -47,25 +47,11 @@ export class SendQueue { this._roomEncryption = roomEncryption; } - _nextPendingEvent(current) { - if (!current) { - return this._pendingEvents.get(0); - } else { - const idx = this._pendingEvents.indexOf(current); - if (idx !== -1) { - return this._pendingEvents.get(idx + 1); - } - return; - } - } - _sendLoop(log) { this._isSending = true; this._sendLoopLogItem = log.runDetached("send queue flush", async log => { - let pendingEvent; try { - // eslint-disable-next-line no-cond-assign - while (pendingEvent = this._nextPendingEvent(pendingEvent)) { + for (const pendingEvent of this._pendingEvents) { await log.wrap("send event", async log => { log.set("queueIndex", pendingEvent.queueIndex); try { diff --git a/src/observable/list/SortedArray.js b/src/observable/list/SortedArray.js index 193661cf..24378658 100644 --- a/src/observable/list/SortedArray.js +++ b/src/observable/list/SortedArray.js @@ -58,6 +58,14 @@ export class SortedArray extends BaseObservableList { } } + _getNext(item) { + let idx = sortedIndex(this._items, item, this._comparator); + while(idx < this._items.length && this._comparator(this._items[idx], item) <= 0) { + idx += 1; + } + return this.get(idx); + } + set(item, updateParams = null) { const idx = sortedIndex(this._items, item, this._comparator); if (idx >= this._items.length || this._comparator(this._items[idx], item) !== 0) { @@ -88,6 +96,72 @@ export class SortedArray extends BaseObservableList { } [Symbol.iterator]() { - return this._items.values(); + return new Iterator(this); } } + +// iterator that works even if the current value is removed while iterating +class Iterator { + constructor(sortedArray) { + this._sortedArray = sortedArray; + this._current = null; + } + + next() { + if (this._sortedArray) { + if (this._current) { + this._current = this._sortedArray._getNext(this._current); + } else { + this._current = this._sortedArray.get(0); + } + if (this._current) { + return {value: this._current}; + } else { + // cause done below + this._sortedArray = null; + } + } + if (!this._sortedArray) { + return {done: true}; + } + } +} + +export function tests() { + return { + "setManyUnsorted": assert => { + const sa = new SortedArray((a, b) => a.localeCompare(b)); + sa.setManyUnsorted(["b", "a", "c"]); + assert.equal(sa.length, 3); + assert.equal(sa.get(0), "a"); + assert.equal(sa.get(1), "b"); + assert.equal(sa.get(2), "c"); + }, + "_getNext": assert => { + const sa = new SortedArray((a, b) => a.localeCompare(b)); + sa.setManyUnsorted(["b", "a", "f"]); + assert.equal(sa._getNext("a"), "b"); + assert.equal(sa._getNext("b"), "f"); + // also finds the next if the value is not in the collection + assert.equal(sa._getNext("c"), "f"); + assert.equal(sa._getNext("f"), undefined); + }, + "iterator with removals": assert => { + const queue = new SortedArray((a, b) => a.idx - b.idx); + queue.setManyUnsorted([{idx: 5}, {idx: 3}, {idx: 1}, {idx: 4}, {idx: 2}]); + const it = queue[Symbol.iterator](); + assert.equal(it.next().value.idx, 1); + assert.equal(it.next().value.idx, 2); + queue.remove(1); + assert.equal(it.next().value.idx, 3); + queue.remove(1); + assert.equal(it.next().value.idx, 4); + queue.remove(1); + assert.equal(it.next().value.idx, 5); + queue.remove(1); + assert.equal(it.next().done, true); + // check done persists + assert.equal(it.next().done, true); + } + } +} \ No newline at end of file