forked from mystiq/hydrogen-web
move observable values each in their own file
This commit is contained in:
parent
1bccbbfa08
commit
07bc0a2376
23 changed files with 429 additions and 272 deletions
|
@ -14,7 +14,8 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue, ObservableValue} from "../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../observable/value/ObservableValue";
|
||||
import {BaseObservableValue} from "../../observable/value/BaseObservableValue";
|
||||
|
||||
export class Navigation {
|
||||
constructor(allowsChild) {
|
||||
|
|
|
@ -186,7 +186,7 @@ export class RoomGridViewModel extends ViewModel {
|
|||
}
|
||||
|
||||
import {createNavigation} from "../navigation/index.js";
|
||||
import {ObservableValue} from "../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../observable/value/ObservableValue";
|
||||
|
||||
export function tests() {
|
||||
class RoomVMMock {
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableValue} from "../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../observable/value/ObservableValue";
|
||||
import {RoomStatus} from "../../matrix/room/common";
|
||||
|
||||
/**
|
||||
|
|
|
@ -189,7 +189,7 @@ import {HomeServer as MockHomeServer} from "../../../../mocks/HomeServer.js";
|
|||
// other imports
|
||||
import {BaseMessageTile} from "./tiles/BaseMessageTile.js";
|
||||
import {MappedList} from "../../../../observable/list/MappedList";
|
||||
import {ObservableValue} from "../../../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../../../observable/value/ObservableValue";
|
||||
import {PowerLevels} from "../../../../matrix/room/PowerLevels.js";
|
||||
|
||||
export function tests() {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
import {ViewModel} from "../../ViewModel";
|
||||
import {KeyType} from "../../../matrix/ssss/index";
|
||||
import {createEnum} from "../../../utils/enum";
|
||||
import {FlatMapObservableValue} from "../../../observable/value/FlatMapObservableValue";
|
||||
|
||||
export const Status = createEnum("Enabled", "SetupKey", "SetupPhrase", "Pending", "NewVersionAvailable");
|
||||
export const BackupWriteStatus = createEnum("Writing", "Stopped", "Done", "Pending");
|
||||
|
@ -29,8 +30,8 @@ export class KeyBackupViewModel extends ViewModel {
|
|||
this._isBusy = false;
|
||||
this._dehydratedDeviceId = undefined;
|
||||
this._status = undefined;
|
||||
this._backupOperation = this._session.keyBackup.flatMap(keyBackup => keyBackup.operationInProgress);
|
||||
this._progress = this._backupOperation.flatMap(op => op.progress);
|
||||
this._backupOperation = new FlatMapObservableValue(this._session.keyBackup, keyBackup => keyBackup.operationInProgress);
|
||||
this._progress = new FlatMapObservableValue(this._backupOperation, op => op.progress);
|
||||
this.track(this._backupOperation.subscribe(() => {
|
||||
// see if needsNewKey might be set
|
||||
this._reevaluateStatus();
|
||||
|
|
|
@ -46,8 +46,6 @@ export {
|
|||
ConcatList,
|
||||
ObservableMap
|
||||
} from "./observable/index";
|
||||
export {
|
||||
BaseObservableValue,
|
||||
ObservableValue,
|
||||
RetainedObservableValue
|
||||
} from "./observable/ObservableValue";
|
||||
export {BaseObservableValue} from "./observable/value/BaseObservableValue";
|
||||
export {ObservableValue} from "./observable/value/ObservableValue";
|
||||
export {RetainedObservableValue} from "./observable/value/RetainedObservableValue";
|
||||
|
|
|
@ -18,7 +18,7 @@ limitations under the License.
|
|||
import {createEnum} from "../utils/enum";
|
||||
import {lookupHomeserver} from "./well-known.js";
|
||||
import {AbortableOperation} from "../utils/AbortableOperation";
|
||||
import {ObservableValue} from "../observable/ObservableValue";
|
||||
import {ObservableValue} from "../observable/value/ObservableValue";
|
||||
import {HomeServerApi} from "./net/HomeServerApi";
|
||||
import {Reconnector, ConnectionStatus} from "./net/Reconnector";
|
||||
import {ExponentialRetryDelay} from "./net/ExponentialRetryDelay";
|
||||
|
|
|
@ -45,7 +45,8 @@ import {
|
|||
keyFromDehydratedDeviceKey as createSSSSKeyFromDehydratedDeviceKey
|
||||
} from "./ssss/index";
|
||||
import {SecretStorage} from "./ssss/SecretStorage";
|
||||
import {ObservableValue, RetainedObservableValue} from "../observable/ObservableValue";
|
||||
import {ObservableValue} from "../observable/value/ObservableValue";
|
||||
import {RetainedObservableValue} from "../observable/value/RetainedObservableValue";
|
||||
|
||||
const PICKLE_KEY = "DEFAULT_KEY";
|
||||
const PUSHER_KEY = "pusher";
|
||||
|
@ -997,9 +998,18 @@ export function tests() {
|
|||
|
||||
return {
|
||||
"session data is not modified until after sync": async (assert) => {
|
||||
const session = new Session({storage: createStorageMock({
|
||||
const storage = createStorageMock({
|
||||
sync: {token: "a", filterId: 5}
|
||||
}), sessionInfo: {userId: ""}});
|
||||
});
|
||||
const session = new Session({
|
||||
storage,
|
||||
sessionInfo: {userId: ""},
|
||||
platform: {
|
||||
clock: {
|
||||
createTimeout: () => undefined
|
||||
}
|
||||
}
|
||||
});
|
||||
await session.load();
|
||||
let syncSet = false;
|
||||
const syncTxn = {
|
||||
|
|
|
@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableValue} from "../observable/ObservableValue";
|
||||
import {ObservableValue} from "../observable/value/ObservableValue";
|
||||
import {createEnum} from "../utils/enum";
|
||||
|
||||
const INCREMENTAL_TIMEOUT = 30000;
|
||||
|
|
|
@ -19,7 +19,7 @@ import {StoredRoomKey, keyFromBackup} from "../decryption/RoomKey";
|
|||
import {MEGOLM_ALGORITHM} from "../../common";
|
||||
import * as Curve25519 from "./Curve25519";
|
||||
import {AbortableOperation} from "../../../../utils/AbortableOperation";
|
||||
import {ObservableValue} from "../../../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../../../observable/value/ObservableValue";
|
||||
|
||||
import {SetAbortableFn} from "../../../../utils/AbortableOperation";
|
||||
import type {BackupInfo, SessionData, SessionKeyInfo, SessionInfo, KeyBackupPayload} from "./types";
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableValue} from "../../observable/ObservableValue";
|
||||
import {ObservableValue} from "../../observable/value/ObservableValue";
|
||||
import type {ExponentialRetryDelay} from "./ExponentialRetryDelay";
|
||||
import type {TimeMeasure} from "../../platform/web/dom/Clock.js";
|
||||
import type {OnlineStatus} from "../../platform/web/dom/OnlineStatus.js";
|
||||
|
|
|
@ -29,7 +29,7 @@ import {ObservedEventMap} from "./ObservedEventMap.js";
|
|||
import {DecryptionSource} from "../e2ee/common.js";
|
||||
import {ensureLogItem} from "../../logging/utils";
|
||||
import {PowerLevels} from "./PowerLevels.js";
|
||||
import {RetainedObservableValue} from "../../observable/ObservableValue";
|
||||
import {RetainedObservableValue} from "../../observable/value/RetainedObservableValue";
|
||||
import {TimelineReader} from "./timeline/persistence/TimelineReader";
|
||||
|
||||
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue} from "../../observable/ObservableValue";
|
||||
import {BaseObservableValue} from "../../observable/value/BaseObservableValue";
|
||||
|
||||
export class ObservedEventMap {
|
||||
constructor(notifyEmpty) {
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableValue} from "../observable/ObservableValue";
|
||||
import {ObservableValue} from "../observable/value/ObservableValue";
|
||||
|
||||
class Timeout {
|
||||
constructor(elapsed, ms) {
|
||||
|
|
|
@ -1,248 +0,0 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {AbortError} from "../utils/error";
|
||||
import {BaseObservable} from "./BaseObservable";
|
||||
import type {SubscriptionHandle} from "./BaseObservable";
|
||||
|
||||
// like an EventEmitter, but doesn't have an event type
|
||||
export abstract class BaseObservableValue<T> extends BaseObservable<(value: T) => void> {
|
||||
emit(argument: T) {
|
||||
for (const h of this._handlers) {
|
||||
h(argument);
|
||||
}
|
||||
}
|
||||
|
||||
abstract get(): T;
|
||||
|
||||
waitFor(predicate: (value: T) => boolean): IWaitHandle<T> {
|
||||
if (predicate(this.get())) {
|
||||
return new ResolvedWaitForHandle(Promise.resolve(this.get()));
|
||||
} else {
|
||||
return new WaitForHandle(this, predicate);
|
||||
}
|
||||
}
|
||||
|
||||
flatMap<C>(mapper: (value: T) => (BaseObservableValue<C> | undefined)): BaseObservableValue<C | undefined> {
|
||||
return new FlatMapObservableValue<T, C>(this, mapper);
|
||||
}
|
||||
}
|
||||
|
||||
interface IWaitHandle<T> {
|
||||
promise: Promise<T>;
|
||||
dispose(): void;
|
||||
}
|
||||
|
||||
class WaitForHandle<T> implements IWaitHandle<T> {
|
||||
private _promise: Promise<T>
|
||||
private _reject: ((reason?: any) => void) | null;
|
||||
private _subscription: (() => void) | null;
|
||||
|
||||
constructor(observable: BaseObservableValue<T>, predicate: (value: T) => boolean) {
|
||||
this._promise = new Promise((resolve, reject) => {
|
||||
this._reject = reject;
|
||||
this._subscription = observable.subscribe(v => {
|
||||
if (predicate(v)) {
|
||||
this._reject = null;
|
||||
resolve(v);
|
||||
this.dispose();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
get promise(): Promise<T> {
|
||||
return this._promise;
|
||||
}
|
||||
|
||||
dispose() {
|
||||
if (this._subscription) {
|
||||
this._subscription();
|
||||
this._subscription = null;
|
||||
}
|
||||
if (this._reject) {
|
||||
this._reject(new AbortError());
|
||||
this._reject = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ResolvedWaitForHandle<T> implements IWaitHandle<T> {
|
||||
constructor(public promise: Promise<T>) {}
|
||||
dispose() {}
|
||||
}
|
||||
|
||||
export class ObservableValue<T> extends BaseObservableValue<T> {
|
||||
private _value: T;
|
||||
|
||||
constructor(initialValue: T) {
|
||||
super();
|
||||
this._value = initialValue;
|
||||
}
|
||||
|
||||
get(): T {
|
||||
return this._value;
|
||||
}
|
||||
|
||||
set(value: T): void {
|
||||
if (value !== this._value) {
|
||||
this._value = value;
|
||||
this.emit(this._value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class RetainedObservableValue<T> extends ObservableValue<T> {
|
||||
private _freeCallback: () => void;
|
||||
|
||||
constructor(initialValue: T, freeCallback: () => void) {
|
||||
super(initialValue);
|
||||
this._freeCallback = freeCallback;
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
super.onUnsubscribeLast();
|
||||
this._freeCallback();
|
||||
}
|
||||
}
|
||||
|
||||
export class FlatMapObservableValue<P, C> extends BaseObservableValue<C | undefined> {
|
||||
private sourceSubscription?: SubscriptionHandle;
|
||||
private targetSubscription?: SubscriptionHandle;
|
||||
|
||||
constructor(
|
||||
private readonly source: BaseObservableValue<P>,
|
||||
private readonly mapper: (value: P) => (BaseObservableValue<C> | undefined)
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
super.onUnsubscribeLast();
|
||||
this.sourceSubscription = this.sourceSubscription!();
|
||||
if (this.targetSubscription) {
|
||||
this.targetSubscription = this.targetSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
onSubscribeFirst() {
|
||||
super.onSubscribeFirst();
|
||||
this.sourceSubscription = this.source.subscribe(() => {
|
||||
this.updateTargetSubscription();
|
||||
this.emit(this.get());
|
||||
});
|
||||
this.updateTargetSubscription();
|
||||
}
|
||||
|
||||
private updateTargetSubscription() {
|
||||
const sourceValue = this.source.get();
|
||||
if (sourceValue) {
|
||||
const target = this.mapper(sourceValue);
|
||||
if (target) {
|
||||
if (!this.targetSubscription) {
|
||||
this.targetSubscription = target.subscribe(() => this.emit(this.get()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if no sourceValue or target
|
||||
if (this.targetSubscription) {
|
||||
this.targetSubscription = this.targetSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
get(): C | undefined {
|
||||
const sourceValue = this.source.get();
|
||||
if (!sourceValue) {
|
||||
return undefined;
|
||||
}
|
||||
const mapped = this.mapper(sourceValue);
|
||||
return mapped?.get();
|
||||
}
|
||||
}
|
||||
|
||||
export function tests() {
|
||||
return {
|
||||
"set emits an update": assert => {
|
||||
const a = new ObservableValue<number>(0);
|
||||
let fired = false;
|
||||
const subscription = a.subscribe(v => {
|
||||
fired = true;
|
||||
assert.strictEqual(v, 5);
|
||||
});
|
||||
a.set(5);
|
||||
assert(fired);
|
||||
subscription();
|
||||
},
|
||||
"set doesn't emit if value hasn't changed": assert => {
|
||||
const a = new ObservableValue(5);
|
||||
let fired = false;
|
||||
const subscription = a.subscribe(() => {
|
||||
fired = true;
|
||||
});
|
||||
a.set(5);
|
||||
a.set(5);
|
||||
assert(!fired);
|
||||
subscription();
|
||||
},
|
||||
"waitFor promise resolves on matching update": async assert => {
|
||||
const a = new ObservableValue(5);
|
||||
const handle = a.waitFor(v => v === 6);
|
||||
Promise.resolve().then(() => {
|
||||
a.set(6);
|
||||
});
|
||||
await handle.promise;
|
||||
assert.strictEqual(a.get(), 6);
|
||||
},
|
||||
"waitFor promise rejects when disposed": async assert => {
|
||||
const a = new ObservableValue<number>(0);
|
||||
const handle = a.waitFor(() => false);
|
||||
Promise.resolve().then(() => {
|
||||
handle.dispose();
|
||||
});
|
||||
await assert.rejects(handle.promise, AbortError);
|
||||
},
|
||||
"flatMap.get": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const countProxy = a.flatMap(a => a!.count);
|
||||
assert.strictEqual(countProxy.get(), undefined);
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
assert.strictEqual(countProxy.get(), 0);
|
||||
},
|
||||
"flatMap update from source": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const updates: (number | undefined)[] = [];
|
||||
a.flatMap(a => a!.count).subscribe(count => {
|
||||
updates.push(count);
|
||||
});
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
assert.deepEqual(updates, [0]);
|
||||
},
|
||||
"flatMap update from target": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const updates: (number | undefined)[] = [];
|
||||
a.flatMap(a => a!.count).subscribe(count => {
|
||||
updates.push(count);
|
||||
});
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
count.set(5);
|
||||
assert.deepEqual(updates, [0, 5]);
|
||||
}
|
||||
}
|
||||
}
|
83
src/observable/value/BaseObservableValue.ts
Normal file
83
src/observable/value/BaseObservableValue.ts
Normal file
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {AbortError} from "../../utils/error";
|
||||
import {BaseObservable} from "../BaseObservable";
|
||||
import type {SubscriptionHandle} from "../BaseObservable";
|
||||
import {FlatMapObservableValue} from "./FlatMapObservableValue";
|
||||
|
||||
// like an EventEmitter, but doesn't have an event type
|
||||
export abstract class BaseObservableValue<T> extends BaseObservable<(value: T) => void> {
|
||||
emit(argument: T) {
|
||||
for (const h of this._handlers) {
|
||||
h(argument);
|
||||
}
|
||||
}
|
||||
|
||||
abstract get(): T;
|
||||
|
||||
waitFor(predicate: (value: T) => boolean): IWaitHandle<T> {
|
||||
if (predicate(this.get())) {
|
||||
return new ResolvedWaitForHandle(Promise.resolve(this.get()));
|
||||
} else {
|
||||
return new WaitForHandle(this, predicate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface IWaitHandle<T> {
|
||||
promise: Promise<T>;
|
||||
dispose(): void;
|
||||
}
|
||||
|
||||
class WaitForHandle<T> implements IWaitHandle<T> {
|
||||
private _promise: Promise<T>
|
||||
private _reject: ((reason?: any) => void) | null;
|
||||
private _subscription: (() => void) | null;
|
||||
|
||||
constructor(observable: BaseObservableValue<T>, predicate: (value: T) => boolean) {
|
||||
this._promise = new Promise((resolve, reject) => {
|
||||
this._reject = reject;
|
||||
this._subscription = observable.subscribe(v => {
|
||||
if (predicate(v)) {
|
||||
this._reject = null;
|
||||
resolve(v);
|
||||
this.dispose();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
get promise(): Promise<T> {
|
||||
return this._promise;
|
||||
}
|
||||
|
||||
dispose() {
|
||||
if (this._subscription) {
|
||||
this._subscription();
|
||||
this._subscription = null;
|
||||
}
|
||||
if (this._reject) {
|
||||
this._reject(new AbortError());
|
||||
this._reject = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ResolvedWaitForHandle<T> implements IWaitHandle<T> {
|
||||
constructor(public promise: Promise<T>) {}
|
||||
dispose() {}
|
||||
}
|
109
src/observable/value/FlatMapObservableValue.ts
Normal file
109
src/observable/value/FlatMapObservableValue.ts
Normal file
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue} from "./BaseObservableValue";
|
||||
import {SubscriptionHandle} from "../BaseObservable";
|
||||
|
||||
export class FlatMapObservableValue<P, C> extends BaseObservableValue<C | undefined> {
|
||||
private sourceSubscription?: SubscriptionHandle;
|
||||
private targetSubscription?: SubscriptionHandle;
|
||||
|
||||
constructor(
|
||||
private readonly source: BaseObservableValue<P>,
|
||||
private readonly mapper: (value: P) => (BaseObservableValue<C> | undefined)
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
super.onUnsubscribeLast();
|
||||
this.sourceSubscription = this.sourceSubscription!();
|
||||
if (this.targetSubscription) {
|
||||
this.targetSubscription = this.targetSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
onSubscribeFirst() {
|
||||
super.onSubscribeFirst();
|
||||
this.sourceSubscription = this.source.subscribe(() => {
|
||||
this.updateTargetSubscription();
|
||||
this.emit(this.get());
|
||||
});
|
||||
this.updateTargetSubscription();
|
||||
}
|
||||
|
||||
private updateTargetSubscription() {
|
||||
const sourceValue = this.source.get();
|
||||
if (sourceValue) {
|
||||
const target = this.mapper(sourceValue);
|
||||
if (target) {
|
||||
if (!this.targetSubscription) {
|
||||
this.targetSubscription = target.subscribe(() => this.emit(this.get()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if no sourceValue or target
|
||||
if (this.targetSubscription) {
|
||||
this.targetSubscription = this.targetSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
get(): C | undefined {
|
||||
const sourceValue = this.source.get();
|
||||
if (!sourceValue) {
|
||||
return undefined;
|
||||
}
|
||||
const mapped = this.mapper(sourceValue);
|
||||
return mapped?.get();
|
||||
}
|
||||
}
|
||||
|
||||
import {ObservableValue} from "./ObservableValue";
|
||||
|
||||
export function tests() {
|
||||
return {
|
||||
"flatMap.get": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const countProxy = new FlatMapObservableValue(a, a => a!.count);
|
||||
assert.strictEqual(countProxy.get(), undefined);
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
assert.strictEqual(countProxy.get(), 0);
|
||||
},
|
||||
"flatMap update from source": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const updates: (number | undefined)[] = [];
|
||||
new FlatMapObservableValue(a, a => a!.count).subscribe(count => {
|
||||
updates.push(count);
|
||||
});
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
assert.deepEqual(updates, [0]);
|
||||
},
|
||||
"flatMap update from target": assert => {
|
||||
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
|
||||
const updates: (number | undefined)[] = [];
|
||||
new FlatMapObservableValue(a, a => a!.count).subscribe(count => {
|
||||
updates.push(count);
|
||||
});
|
||||
const count = new ObservableValue<number>(0);
|
||||
a.set({count});
|
||||
count.set(5);
|
||||
assert.deepEqual(updates, [0, 5]);
|
||||
}
|
||||
}
|
||||
}
|
82
src/observable/value/ObservableValue.ts
Normal file
82
src/observable/value/ObservableValue.ts
Normal file
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {AbortError} from "../../utils/error";
|
||||
import {BaseObservableValue} from "./BaseObservableValue";
|
||||
|
||||
export class ObservableValue<T> extends BaseObservableValue<T> {
|
||||
private _value: T;
|
||||
|
||||
constructor(initialValue: T) {
|
||||
super();
|
||||
this._value = initialValue;
|
||||
}
|
||||
|
||||
get(): T {
|
||||
return this._value;
|
||||
}
|
||||
|
||||
set(value: T): void {
|
||||
if (value !== this._value) {
|
||||
this._value = value;
|
||||
this.emit(this._value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function tests() {
|
||||
return {
|
||||
"set emits an update": assert => {
|
||||
const a = new ObservableValue<number>(0);
|
||||
let fired = false;
|
||||
const subscription = a.subscribe(v => {
|
||||
fired = true;
|
||||
assert.strictEqual(v, 5);
|
||||
});
|
||||
a.set(5);
|
||||
assert(fired);
|
||||
subscription();
|
||||
},
|
||||
"set doesn't emit if value hasn't changed": assert => {
|
||||
const a = new ObservableValue(5);
|
||||
let fired = false;
|
||||
const subscription = a.subscribe(() => {
|
||||
fired = true;
|
||||
});
|
||||
a.set(5);
|
||||
a.set(5);
|
||||
assert(!fired);
|
||||
subscription();
|
||||
},
|
||||
"waitFor promise resolves on matching update": async assert => {
|
||||
const a = new ObservableValue(5);
|
||||
const handle = a.waitFor(v => v === 6);
|
||||
Promise.resolve().then(() => {
|
||||
a.set(6);
|
||||
});
|
||||
await handle.promise;
|
||||
assert.strictEqual(a.get(), 6);
|
||||
},
|
||||
"waitFor promise rejects when disposed": async assert => {
|
||||
const a = new ObservableValue<number>(0);
|
||||
const handle = a.waitFor(() => false);
|
||||
Promise.resolve().then(() => {
|
||||
handle.dispose();
|
||||
});
|
||||
await assert.rejects(handle.promise, AbortError);
|
||||
}
|
||||
}
|
||||
}
|
89
src/observable/value/PickMapObservable.ts
Normal file
89
src/observable/value/PickMapObservable.ts
Normal file
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue} from "./BaseObservableValue";
|
||||
import {BaseObservableMap, IMapObserver} from "../map/BaseObservableMap";
|
||||
import {SubscriptionHandle} from "../BaseObservable";
|
||||
|
||||
function pickLowestKey<K>(currentKey: K, newKey: K): boolean {
|
||||
return newKey < currentKey;
|
||||
}
|
||||
|
||||
export class PickMapObservable<K, V> implements IMapObserver<K, V> extends BaseObservableValue<V | undefined> {
|
||||
|
||||
private key?: K;
|
||||
private mapSubscription?: SubscriptionHandle;
|
||||
|
||||
constructor(
|
||||
private readonly map: BaseObservableMap<K, V>,
|
||||
private readonly pickKey: (currentKey: K, newKey: K) => boolean = pickLowestKey
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
private trySetKey(newKey: K): boolean {
|
||||
if (this.key === undefined || this.pickKey(this.key, newKey)) {
|
||||
this.key = newKey;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
onReset(): void {
|
||||
this.key = undefined;
|
||||
this.emit(this.get());
|
||||
}
|
||||
|
||||
onAdd(key: K, value:V): void {
|
||||
if (this.trySetKey(key)) {
|
||||
this.emit(this.get());
|
||||
}
|
||||
}
|
||||
|
||||
onUpdate(key: K, value: V, params: any): void {}
|
||||
|
||||
onRemove(key: K, value: V): void {
|
||||
if (key === this.key) {
|
||||
this.key = undefined;
|
||||
let changed = false;
|
||||
for (const [key] of this.map) {
|
||||
changed = this.trySetKey(key) || changed;
|
||||
}
|
||||
if (changed) {
|
||||
this.emit(this.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onSubscribeFirst(): void {
|
||||
this.mapSubscription = this.map.subscribe(this);
|
||||
for (const [key] of this.map) {
|
||||
this.trySetKey(key);
|
||||
}
|
||||
}
|
||||
|
||||
onUnsubscribeLast(): void {
|
||||
this.mapSubscription();
|
||||
this.key = undefined;
|
||||
}
|
||||
|
||||
get(): V | undefined {
|
||||
if (this.key !== undefined) {
|
||||
return this.map.get(this.key);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
}
|
31
src/observable/value/RetainedObservableValue.ts
Normal file
31
src/observable/value/RetainedObservableValue.ts
Normal file
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableValue} from "./ObservableValue";
|
||||
|
||||
export class RetainedObservableValue<T> extends ObservableValue<T> {
|
||||
private _freeCallback: () => void;
|
||||
|
||||
constructor(initialValue: T, freeCallback: () => void) {
|
||||
super(initialValue);
|
||||
this._freeCallback = freeCallback;
|
||||
}
|
||||
|
||||
onUnsubscribeLast() {
|
||||
super.onUnsubscribeLast();
|
||||
this._freeCallback();
|
||||
}
|
||||
}
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue} from "../../../observable/ObservableValue";
|
||||
import {BaseObservableValue} from "../../../observable/value/BaseObservableValue";
|
||||
|
||||
export class History extends BaseObservableValue {
|
||||
handleEvent(event) {
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue} from "../../../observable/ObservableValue";
|
||||
import {BaseObservableValue} from "../../../observable/value/BaseObservableValue";
|
||||
|
||||
export class OnlineStatus extends BaseObservableValue {
|
||||
constructor() {
|
||||
|
|
|
@ -14,7 +14,8 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {BaseObservableValue, ObservableValue} from "../observable/ObservableValue";
|
||||
import {BaseObservableValue} from "../observable/value/BaseObservableValue";
|
||||
import {ObservableValue} from "../observable/value/ObservableValue";
|
||||
|
||||
export interface IAbortable {
|
||||
abort();
|
||||
|
|
Loading…
Reference in a new issue