MOBILE-3817 core: Implement zipIncludingComplete and add tests
parent
3e462979f7
commit
73b108e5c5
|
@ -276,6 +276,7 @@ testsConfig['rules']['padded-blocks'] = [
|
|||
},
|
||||
];
|
||||
testsConfig['rules']['jest/expect-expect'] = 'off';
|
||||
testsConfig['rules']['jest/no-done-callback'] = 'off';
|
||||
testsConfig['plugins'].push('jest');
|
||||
testsConfig['extends'].push('plugin:jest/recommended');
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
import { CoreError } from '@classes/errors/error';
|
||||
import { CoreSubscriptions } from '@singletons/subscriptions';
|
||||
import { BehaviorSubject, Observable, of } from 'rxjs';
|
||||
import { BehaviorSubject, Observable, of, Subscription } from 'rxjs';
|
||||
import { catchError } from 'rxjs/operators';
|
||||
|
||||
/**
|
||||
|
@ -28,11 +28,7 @@ export function asyncObservable<T>(createObservable: () => Promise<Observable<T>
|
|||
|
||||
return new Observable(subscriber => {
|
||||
promise
|
||||
.then(observable => observable.subscribe(
|
||||
value => subscriber.next(value),
|
||||
error => subscriber.error(error),
|
||||
() => subscriber.complete(),
|
||||
))
|
||||
.then(observable => observable.subscribe(subscriber))
|
||||
.catch(error => subscriber.error(error));
|
||||
});
|
||||
}
|
||||
|
@ -72,3 +68,96 @@ export function ignoreErrors<Result, Fallback>(
|
|||
): Observable<Result | Fallback | undefined> {
|
||||
return observable.pipe(catchError(() => of(fallback)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get return types of a list of observables.
|
||||
*/
|
||||
type GetObservablesReturnTypes<T> = { [key in keyof T]: T[key] extends Observable<infer R> ? R : never };
|
||||
|
||||
/**
|
||||
* Data for an observable when zipping.
|
||||
*/
|
||||
type ZipObservableData<T = any> = { // eslint-disable-line @typescript-eslint/no-explicit-any
|
||||
values: T[];
|
||||
completed: boolean;
|
||||
subscription?: Subscription;
|
||||
};
|
||||
|
||||
/**
|
||||
* Similar to rxjs zip operator, but once an observable completes we'll continue to emit the last value as long
|
||||
* as the other observables continue to emit values.
|
||||
*
|
||||
* @param observables Observables to zip.
|
||||
* @return Observable that emits the zipped values.
|
||||
*/
|
||||
export function zipIncudingComplete<T extends Observable<any>[]>( // eslint-disable-line @typescript-eslint/no-explicit-any
|
||||
...observables: T
|
||||
): Observable<GetObservablesReturnTypes<T>> {
|
||||
return new Observable(subscriber => {
|
||||
const observablesData: ZipObservableData[] = [];
|
||||
let nextIndex = 0;
|
||||
let numCompleted = 0;
|
||||
let hasErrored = false;
|
||||
|
||||
// Treat an emitted event.
|
||||
const treatEmitted = (completed = false) => {
|
||||
if (hasErrored) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (numCompleted >= observables.length) {
|
||||
subscriber.complete();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if any observable still doesn't have data for the index.
|
||||
const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined);
|
||||
if (notReady) {
|
||||
return;
|
||||
}
|
||||
|
||||
// For each observable, get the value for the next index, or last value if not present (completed).
|
||||
const valueToEmit = observablesData.map(observableData =>
|
||||
observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]);
|
||||
|
||||
nextIndex++;
|
||||
subscriber.next(<GetObservablesReturnTypes<T>> valueToEmit);
|
||||
|
||||
if (completed) {
|
||||
// An observable was completed, there might be other values to emit.
|
||||
treatEmitted(true);
|
||||
}
|
||||
};
|
||||
|
||||
observables.forEach((observable, obsIndex) => {
|
||||
const observableData: ZipObservableData = {
|
||||
values: [],
|
||||
completed: false,
|
||||
};
|
||||
|
||||
observableData.subscription = observable.subscribe({
|
||||
next: (value) => {
|
||||
observableData.values.push(value);
|
||||
treatEmitted();
|
||||
},
|
||||
error: (error) => {
|
||||
hasErrored = true;
|
||||
subscriber.error(error);
|
||||
},
|
||||
complete: () => {
|
||||
observableData.completed = true;
|
||||
numCompleted++;
|
||||
treatEmitted(true);
|
||||
},
|
||||
});
|
||||
|
||||
observablesData[obsIndex] = observableData;
|
||||
});
|
||||
|
||||
// When unsubscribing, unsubscribe from all observables.
|
||||
return () => {
|
||||
observablesData.forEach(observableData => observableData.subscription?.unsubscribe());
|
||||
};
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
// (C) Copyright 2015 Moodle Pty Ltd.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import { BehaviorSubject, Observable, Subject } from 'rxjs';
|
||||
import { TestScheduler } from 'rxjs/testing';
|
||||
import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '../observables';
|
||||
|
||||
describe('Observables utility functions', () => {
|
||||
|
||||
it('asyncObservable emits values', (done) => {
|
||||
const subject = new Subject();
|
||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||
resolve(subject);
|
||||
});
|
||||
|
||||
asyncObservable(() => promise).subscribe({
|
||||
next: (value) => {
|
||||
expect(value).toEqual('foo');
|
||||
done();
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for the promise callback to be called before emitting the value.
|
||||
setTimeout(() => subject.next('foo'));
|
||||
});
|
||||
|
||||
it('asyncObservable emits errors', (done) => {
|
||||
const subject = new Subject();
|
||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||
resolve(subject);
|
||||
});
|
||||
|
||||
asyncObservable(() => promise).subscribe({
|
||||
error: (value) => {
|
||||
expect(value).toEqual('foo');
|
||||
done();
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for the promise callback to be called before emitting the value.
|
||||
setTimeout(() => subject.error('foo'));
|
||||
});
|
||||
|
||||
it('asyncObservable emits complete', (done) => {
|
||||
const subject = new Subject();
|
||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||
resolve(subject);
|
||||
});
|
||||
|
||||
asyncObservable(() => promise).subscribe({
|
||||
complete: () => done(),
|
||||
});
|
||||
|
||||
// Wait for the promise callback to be called before emitting the value.
|
||||
setTimeout(() => subject.complete());
|
||||
});
|
||||
|
||||
it('asyncObservable emits error if promise is rejected', (done) => {
|
||||
const promise = new Promise<Observable<unknown>>((resolve, reject) => {
|
||||
reject('Custom error');
|
||||
});
|
||||
|
||||
asyncObservable(() => promise).subscribe({
|
||||
error: (error) => {
|
||||
expect(error).toEqual('Custom error');
|
||||
done();
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('returns first value emitted by an observable', async () => {
|
||||
const subject = new Subject();
|
||||
setTimeout(() => subject.next('foo'), 10);
|
||||
|
||||
await expect(firstValueFrom(subject)).resolves.toEqual('foo');
|
||||
|
||||
// Check that running it again doesn't get last value, it gets the new one.
|
||||
setTimeout(() => subject.next('bar'), 10);
|
||||
await expect(firstValueFrom(subject)).resolves.toEqual('bar');
|
||||
|
||||
// Check we cannot get first value if a subject is already completed.
|
||||
subject.complete();
|
||||
await expect(firstValueFrom(subject)).rejects.toThrow();
|
||||
|
||||
// Check that we get last value when using BehaviourSubject.
|
||||
const behaviorSubject = new BehaviorSubject('baz');
|
||||
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
||||
|
||||
// Check we get last value even if behaviour subject is completed.
|
||||
behaviorSubject.complete();
|
||||
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
||||
|
||||
// Check that Promise is rejected if the observable emits an error.
|
||||
const errorSubject = new Subject();
|
||||
setTimeout(() => errorSubject.error('foo error'), 10);
|
||||
|
||||
await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error');
|
||||
});
|
||||
|
||||
it('ignores observable errors', (done) => {
|
||||
const subject = new Subject();
|
||||
|
||||
ignoreErrors(subject, 'default value').subscribe({
|
||||
next: (value) => {
|
||||
expect(value).toEqual('default value');
|
||||
done();
|
||||
},
|
||||
});
|
||||
|
||||
subject.error('foo');
|
||||
});
|
||||
|
||||
it('zips observables including complete events', () => {
|
||||
const scheduler = new TestScheduler((actual, expected) => {
|
||||
expect(actual).toEqual(expected);
|
||||
});
|
||||
|
||||
scheduler.run(({ expectObservable, cold }) => {
|
||||
expectObservable(zipIncudingComplete(
|
||||
cold('-a-b---|', {
|
||||
a: 'A1',
|
||||
b: 'A2',
|
||||
}),
|
||||
cold('-a----b-c--|', {
|
||||
a: 'B1',
|
||||
b: 'B2',
|
||||
c: 'B3',
|
||||
}),
|
||||
cold('-a-b-c--de-----|', {
|
||||
a: 'C1',
|
||||
b: 'C2',
|
||||
c: 'C3',
|
||||
d: 'C4',
|
||||
e: 'C5',
|
||||
}),
|
||||
)).toBe(
|
||||
'-a----b-c--(de)|',
|
||||
{
|
||||
a: ['A1','B1','C1'],
|
||||
b: ['A2','B2','C2'],
|
||||
c: ['A2','B3','C3'],
|
||||
d: ['A2','B3','C4'],
|
||||
e: ['A2','B3','C5'],
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
Loading…
Reference in New Issue