From 73b108e5c52fcb3e93b247245ac022ee2bb6a1f7 Mon Sep 17 00:00:00 2001 From: Dani Palou Date: Thu, 30 Jun 2022 16:39:09 +0200 Subject: [PATCH] MOBILE-3817 core: Implement zipIncludingComplete and add tests --- .eslintrc.js | 1 + src/core/utils/observables.ts | 101 +++++++++++++- src/core/utils/tests/observables.test.ts | 160 +++++++++++++++++++++++ 3 files changed, 256 insertions(+), 6 deletions(-) create mode 100644 src/core/utils/tests/observables.test.ts diff --git a/.eslintrc.js b/.eslintrc.js index 6319501e3..865090c04 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -276,6 +276,7 @@ testsConfig['rules']['padded-blocks'] = [ }, ]; testsConfig['rules']['jest/expect-expect'] = 'off'; +testsConfig['rules']['jest/no-done-callback'] = 'off'; testsConfig['plugins'].push('jest'); testsConfig['extends'].push('plugin:jest/recommended'); diff --git a/src/core/utils/observables.ts b/src/core/utils/observables.ts index 718fc884c..9f30b1755 100644 --- a/src/core/utils/observables.ts +++ b/src/core/utils/observables.ts @@ -14,7 +14,7 @@ import { CoreError } from '@classes/errors/error'; import { CoreSubscriptions } from '@singletons/subscriptions'; -import { BehaviorSubject, Observable, of } from 'rxjs'; +import { BehaviorSubject, Observable, of, Subscription } from 'rxjs'; import { catchError } from 'rxjs/operators'; /** @@ -28,11 +28,7 @@ export function asyncObservable(createObservable: () => Promise return new Observable(subscriber => { promise - .then(observable => observable.subscribe( - value => subscriber.next(value), - error => subscriber.error(error), - () => subscriber.complete(), - )) + .then(observable => observable.subscribe(subscriber)) .catch(error => subscriber.error(error)); }); } @@ -72,3 +68,96 @@ export function ignoreErrors( ): Observable { return observable.pipe(catchError(() => of(fallback))); } + +/** + * Get return types of a list of observables. + */ +type GetObservablesReturnTypes = { [key in keyof T]: T[key] extends Observable ? R : never }; + +/** + * Data for an observable when zipping. + */ +type ZipObservableData = { // eslint-disable-line @typescript-eslint/no-explicit-any + values: T[]; + completed: boolean; + subscription?: Subscription; +}; + +/** + * Similar to rxjs zip operator, but once an observable completes we'll continue to emit the last value as long + * as the other observables continue to emit values. + * + * @param observables Observables to zip. + * @return Observable that emits the zipped values. + */ +export function zipIncudingComplete[]>( // eslint-disable-line @typescript-eslint/no-explicit-any + ...observables: T +): Observable> { + return new Observable(subscriber => { + const observablesData: ZipObservableData[] = []; + let nextIndex = 0; + let numCompleted = 0; + let hasErrored = false; + + // Treat an emitted event. + const treatEmitted = (completed = false) => { + if (hasErrored) { + return; + } + + if (numCompleted >= observables.length) { + subscriber.complete(); + + return; + } + + // Check if any observable still doesn't have data for the index. + const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined); + if (notReady) { + return; + } + + // For each observable, get the value for the next index, or last value if not present (completed). + const valueToEmit = observablesData.map(observableData => + observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]); + + nextIndex++; + subscriber.next(> valueToEmit); + + if (completed) { + // An observable was completed, there might be other values to emit. + treatEmitted(true); + } + }; + + observables.forEach((observable, obsIndex) => { + const observableData: ZipObservableData = { + values: [], + completed: false, + }; + + observableData.subscription = observable.subscribe({ + next: (value) => { + observableData.values.push(value); + treatEmitted(); + }, + error: (error) => { + hasErrored = true; + subscriber.error(error); + }, + complete: () => { + observableData.completed = true; + numCompleted++; + treatEmitted(true); + }, + }); + + observablesData[obsIndex] = observableData; + }); + + // When unsubscribing, unsubscribe from all observables. + return () => { + observablesData.forEach(observableData => observableData.subscription?.unsubscribe()); + }; + }); +} diff --git a/src/core/utils/tests/observables.test.ts b/src/core/utils/tests/observables.test.ts new file mode 100644 index 000000000..47a7a1e1c --- /dev/null +++ b/src/core/utils/tests/observables.test.ts @@ -0,0 +1,160 @@ +// (C) Copyright 2015 Moodle Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { BehaviorSubject, Observable, Subject } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; +import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '../observables'; + +describe('Observables utility functions', () => { + + it('asyncObservable emits values', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + next: (value) => { + expect(value).toEqual('foo'); + done(); + }, + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.next('foo')); + }); + + it('asyncObservable emits errors', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + error: (value) => { + expect(value).toEqual('foo'); + done(); + }, + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.error('foo')); + }); + + it('asyncObservable emits complete', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + complete: () => done(), + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.complete()); + }); + + it('asyncObservable emits error if promise is rejected', (done) => { + const promise = new Promise>((resolve, reject) => { + reject('Custom error'); + }); + + asyncObservable(() => promise).subscribe({ + error: (error) => { + expect(error).toEqual('Custom error'); + done(); + }, + }); + }); + + it('returns first value emitted by an observable', async () => { + const subject = new Subject(); + setTimeout(() => subject.next('foo'), 10); + + await expect(firstValueFrom(subject)).resolves.toEqual('foo'); + + // Check that running it again doesn't get last value, it gets the new one. + setTimeout(() => subject.next('bar'), 10); + await expect(firstValueFrom(subject)).resolves.toEqual('bar'); + + // Check we cannot get first value if a subject is already completed. + subject.complete(); + await expect(firstValueFrom(subject)).rejects.toThrow(); + + // Check that we get last value when using BehaviourSubject. + const behaviorSubject = new BehaviorSubject('baz'); + await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); + + // Check we get last value even if behaviour subject is completed. + behaviorSubject.complete(); + await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); + + // Check that Promise is rejected if the observable emits an error. + const errorSubject = new Subject(); + setTimeout(() => errorSubject.error('foo error'), 10); + + await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error'); + }); + + it('ignores observable errors', (done) => { + const subject = new Subject(); + + ignoreErrors(subject, 'default value').subscribe({ + next: (value) => { + expect(value).toEqual('default value'); + done(); + }, + }); + + subject.error('foo'); + }); + + it('zips observables including complete events', () => { + const scheduler = new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected); + }); + + scheduler.run(({ expectObservable, cold }) => { + expectObservable(zipIncudingComplete( + cold('-a-b---|', { + a: 'A1', + b: 'A2', + }), + cold('-a----b-c--|', { + a: 'B1', + b: 'B2', + c: 'B3', + }), + cold('-a-b-c--de-----|', { + a: 'C1', + b: 'C2', + c: 'C3', + d: 'C4', + e: 'C5', + }), + )).toBe( + '-a----b-c--(de)|', + { + a: ['A1','B1','C1'], + b: ['A2','B2','C2'], + c: ['A2','B3','C3'], + d: ['A2','B3','C4'], + e: ['A2','B3','C5'], + }, + ); + }); + }); + +});