allow concurrent removals when iterating pending events
so we can remove failed events in the next commit
This commit is contained in:
parent
618a32e6c0
commit
4ce66fc8a1
2 changed files with 76 additions and 16 deletions
|
@ -47,25 +47,11 @@ export class SendQueue {
|
|||
this._roomEncryption = roomEncryption;
|
||||
}
|
||||
|
||||
_nextPendingEvent(current) {
|
||||
if (!current) {
|
||||
return this._pendingEvents.get(0);
|
||||
} else {
|
||||
const idx = this._pendingEvents.indexOf(current);
|
||||
if (idx !== -1) {
|
||||
return this._pendingEvents.get(idx + 1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_sendLoop(log) {
|
||||
this._isSending = true;
|
||||
this._sendLoopLogItem = log.runDetached("send queue flush", async log => {
|
||||
let pendingEvent;
|
||||
try {
|
||||
// eslint-disable-next-line no-cond-assign
|
||||
while (pendingEvent = this._nextPendingEvent(pendingEvent)) {
|
||||
for (const pendingEvent of this._pendingEvents) {
|
||||
await log.wrap("send event", async log => {
|
||||
log.set("queueIndex", pendingEvent.queueIndex);
|
||||
try {
|
||||
|
|
|
@ -58,6 +58,14 @@ export class SortedArray extends BaseObservableList {
|
|||
}
|
||||
}
|
||||
|
||||
_getNext(item) {
|
||||
let idx = sortedIndex(this._items, item, this._comparator);
|
||||
while(idx < this._items.length && this._comparator(this._items[idx], item) <= 0) {
|
||||
idx += 1;
|
||||
}
|
||||
return this.get(idx);
|
||||
}
|
||||
|
||||
set(item, updateParams = null) {
|
||||
const idx = sortedIndex(this._items, item, this._comparator);
|
||||
if (idx >= this._items.length || this._comparator(this._items[idx], item) !== 0) {
|
||||
|
@ -88,6 +96,72 @@ export class SortedArray extends BaseObservableList {
|
|||
}
|
||||
|
||||
[Symbol.iterator]() {
|
||||
return this._items.values();
|
||||
return new Iterator(this);
|
||||
}
|
||||
}
|
||||
|
||||
// iterator that works even if the current value is removed while iterating
|
||||
class Iterator {
|
||||
constructor(sortedArray) {
|
||||
this._sortedArray = sortedArray;
|
||||
this._current = null;
|
||||
}
|
||||
|
||||
next() {
|
||||
if (this._sortedArray) {
|
||||
if (this._current) {
|
||||
this._current = this._sortedArray._getNext(this._current);
|
||||
} else {
|
||||
this._current = this._sortedArray.get(0);
|
||||
}
|
||||
if (this._current) {
|
||||
return {value: this._current};
|
||||
} else {
|
||||
// cause done below
|
||||
this._sortedArray = null;
|
||||
}
|
||||
}
|
||||
if (!this._sortedArray) {
|
||||
return {done: true};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function tests() {
|
||||
return {
|
||||
"setManyUnsorted": assert => {
|
||||
const sa = new SortedArray((a, b) => a.localeCompare(b));
|
||||
sa.setManyUnsorted(["b", "a", "c"]);
|
||||
assert.equal(sa.length, 3);
|
||||
assert.equal(sa.get(0), "a");
|
||||
assert.equal(sa.get(1), "b");
|
||||
assert.equal(sa.get(2), "c");
|
||||
},
|
||||
"_getNext": assert => {
|
||||
const sa = new SortedArray((a, b) => a.localeCompare(b));
|
||||
sa.setManyUnsorted(["b", "a", "f"]);
|
||||
assert.equal(sa._getNext("a"), "b");
|
||||
assert.equal(sa._getNext("b"), "f");
|
||||
// also finds the next if the value is not in the collection
|
||||
assert.equal(sa._getNext("c"), "f");
|
||||
assert.equal(sa._getNext("f"), undefined);
|
||||
},
|
||||
"iterator with removals": assert => {
|
||||
const queue = new SortedArray((a, b) => a.idx - b.idx);
|
||||
queue.setManyUnsorted([{idx: 5}, {idx: 3}, {idx: 1}, {idx: 4}, {idx: 2}]);
|
||||
const it = queue[Symbol.iterator]();
|
||||
assert.equal(it.next().value.idx, 1);
|
||||
assert.equal(it.next().value.idx, 2);
|
||||
queue.remove(1);
|
||||
assert.equal(it.next().value.idx, 3);
|
||||
queue.remove(1);
|
||||
assert.equal(it.next().value.idx, 4);
|
||||
queue.remove(1);
|
||||
assert.equal(it.next().value.idx, 5);
|
||||
queue.remove(1);
|
||||
assert.equal(it.next().done, true);
|
||||
// check done persists
|
||||
assert.equal(it.next().done, true);
|
||||
}
|
||||
}
|
||||
}
|
Reference in a new issue