diff --git a/src/addons/coursecompletion/services/coursecompletion.ts b/src/addons/coursecompletion/services/coursecompletion.ts index 162747136..76feeecd4 100644 --- a/src/addons/coursecompletion/services/coursecompletion.ts +++ b/src/addons/coursecompletion/services/coursecompletion.ts @@ -21,7 +21,7 @@ import { CoreSite, CoreSiteWSPreSets } from '@classes/site'; import { CoreStatusWithWarningsWSResponse, CoreWSExternalWarning } from '@services/ws'; import { makeSingleton } from '@singletons'; import { CoreError } from '@classes/errors/error'; -import { asyncObservable, firstValueFrom } from '@/core/utils/observables'; +import { asyncObservable, firstValueFrom } from '@/core/utils/rxjs'; import { Observable } from 'rxjs'; import { map } from 'rxjs/operators'; diff --git a/src/core/classes/site.ts b/src/core/classes/site.ts index 9e233beb9..ec047eb8a 100644 --- a/src/core/classes/site.ts +++ b/src/core/classes/site.ts @@ -59,7 +59,7 @@ import { } from '@services/database/sites'; import { Observable, ObservableInput, ObservedValueOf, OperatorFunction, Subject } from 'rxjs'; import { finalize, map, mergeMap } from 'rxjs/operators'; -import { firstValueFrom } from '../utils/observables'; +import { firstValueFrom } from '../utils/rxjs'; /** * QR Code type enumeration. diff --git a/src/core/features/courses/services/courses-helper.ts b/src/core/features/courses/services/courses-helper.ts index 546a38f3d..a4231cec5 100644 --- a/src/core/features/courses/services/courses-helper.ts +++ b/src/core/features/courses/services/courses-helper.ts @@ -27,7 +27,7 @@ import { CoreWSExternalFile } from '@services/ws'; import { AddonCourseCompletion } from '@addons/coursecompletion/services/coursecompletion'; import moment from 'moment-timezone'; import { Observable, of } from 'rxjs'; -import { firstValueFrom, zipIncudingComplete } from '@/core/utils/observables'; +import { firstValueFrom, zipIncludingComplete } from '@/core/utils/rxjs'; import { catchError, map } from 'rxjs/operators'; import { chainRequests } from '@classes/site'; @@ -259,7 +259,7 @@ export class CoreCoursesHelperProvider { }; courses = this.filterAndSortCoursesWithOptions(courses, options); - return zipIncudingComplete( + return zipIncludingComplete( this.loadCoursesExtraInfoObservable(courses, options.loadCategoryNames, newOptions), CoreCourses.getCoursesAdminAndNavOptionsObservable(courseIds, newOptions).pipe(map(courseOptions => { courses.forEach((course: CoreEnrolledCourseDataWithOptions) => { diff --git a/src/core/features/courses/services/courses.ts b/src/core/features/courses/services/courses.ts index fea06c2bc..6c4ea9e43 100644 --- a/src/core/features/courses/services/courses.ts +++ b/src/core/features/courses/services/courses.ts @@ -20,7 +20,7 @@ import { CoreStatusWithWarningsWSResponse, CoreWarningsWSResponse, CoreWSExterna import { CoreEvents } from '@singletons/events'; import { CoreWSError } from '@classes/errors/wserror'; import { CoreCourseAnyCourseDataWithExtraInfoAndOptions, CoreCourseWithImageAndColor } from './courses-helper'; -import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '@/core/utils/observables'; +import { asyncObservable, firstValueFrom, ignoreErrors, zipIncludingComplete } from '@/core/utils/rxjs'; import { of, Observable } from 'rxjs'; import { map } from 'rxjs/operators'; @@ -697,7 +697,7 @@ export class CoreCoursesProvider { courseIds = await this.getCourseIdsForAdminAndNavOptions(courseIds, siteId); // Get user navigation and administration options. - return zipIncudingComplete( + return zipIncludingComplete( ignoreErrors(this.getUserNavigationOptionsObservable(courseIds, options), {}), ignoreErrors(this.getUserAdministrationOptionsObservable(courseIds, options), {}), ).pipe( diff --git a/src/core/features/courses/services/dashboard.ts b/src/core/features/courses/services/dashboard.ts index 982c1a5cd..eb812b835 100644 --- a/src/core/features/courses/services/dashboard.ts +++ b/src/core/features/courses/services/dashboard.ts @@ -21,7 +21,7 @@ import { makeSingleton } from '@singletons'; import { CoreError } from '@classes/errors/error'; import { Observable } from 'rxjs'; import { map } from 'rxjs/operators'; -import { asyncObservable, firstValueFrom } from '@/core/utils/observables'; +import { asyncObservable, firstValueFrom } from '@/core/utils/rxjs'; const ROOT_CACHE_KEY = 'CoreCoursesDashboard:'; diff --git a/src/core/utils/observables.ts b/src/core/utils/observables.ts deleted file mode 100644 index 9f30b1755..000000000 --- a/src/core/utils/observables.ts +++ /dev/null @@ -1,163 +0,0 @@ -// (C) Copyright 2015 Moodle Pty Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import { CoreError } from '@classes/errors/error'; -import { CoreSubscriptions } from '@singletons/subscriptions'; -import { BehaviorSubject, Observable, of, Subscription } from 'rxjs'; -import { catchError } from 'rxjs/operators'; - -/** - * Convert to an Observable a Promise that resolves to an Observable. - * - * @param createObservable A function returning a promise that resolves to an Observable. - * @returns Observable. - */ -export function asyncObservable(createObservable: () => Promise>): Observable { - const promise = createObservable(); - - return new Observable(subscriber => { - promise - .then(observable => observable.subscribe(subscriber)) - .catch(error => subscriber.error(error)); - }); -} - -/** - * Create a Promise that resolved with the first value returned from an observable. - * This function can be removed when the app starts using rxjs v7. - * - * @param observable Observable. - * @returns Promise resolved with the first value returned. - */ -export function firstValueFrom(observable: Observable): Promise { - return new Promise((resolve, reject) => { - CoreSubscriptions.once(observable, resolve, reject, () => { - // Subscription is completed, check if we can get its value. - if (observable instanceof BehaviorSubject) { - resolve(observable.getValue()); - } - - reject(new CoreError('Couldn\'t get first value from observable because it\'s already completed')); - }); - }); -} - -/** - * Ignore errors from an observable, returning a certain value instead. - * - * @param observable Observable to ignore errors. - * @param fallback Value to return if the observer errors. - * @return Observable with ignored errors, returning the fallback result if provided. - */ -export function ignoreErrors(observable: Observable): Observable; -export function ignoreErrors(observable: Observable, fallback: Fallback): Observable; -export function ignoreErrors( - observable: Observable, - fallback?: Fallback, -): Observable { - return observable.pipe(catchError(() => of(fallback))); -} - -/** - * Get return types of a list of observables. - */ -type GetObservablesReturnTypes = { [key in keyof T]: T[key] extends Observable ? R : never }; - -/** - * Data for an observable when zipping. - */ -type ZipObservableData = { // eslint-disable-line @typescript-eslint/no-explicit-any - values: T[]; - completed: boolean; - subscription?: Subscription; -}; - -/** - * Similar to rxjs zip operator, but once an observable completes we'll continue to emit the last value as long - * as the other observables continue to emit values. - * - * @param observables Observables to zip. - * @return Observable that emits the zipped values. - */ -export function zipIncudingComplete[]>( // eslint-disable-line @typescript-eslint/no-explicit-any - ...observables: T -): Observable> { - return new Observable(subscriber => { - const observablesData: ZipObservableData[] = []; - let nextIndex = 0; - let numCompleted = 0; - let hasErrored = false; - - // Treat an emitted event. - const treatEmitted = (completed = false) => { - if (hasErrored) { - return; - } - - if (numCompleted >= observables.length) { - subscriber.complete(); - - return; - } - - // Check if any observable still doesn't have data for the index. - const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined); - if (notReady) { - return; - } - - // For each observable, get the value for the next index, or last value if not present (completed). - const valueToEmit = observablesData.map(observableData => - observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]); - - nextIndex++; - subscriber.next(> valueToEmit); - - if (completed) { - // An observable was completed, there might be other values to emit. - treatEmitted(true); - } - }; - - observables.forEach((observable, obsIndex) => { - const observableData: ZipObservableData = { - values: [], - completed: false, - }; - - observableData.subscription = observable.subscribe({ - next: (value) => { - observableData.values.push(value); - treatEmitted(); - }, - error: (error) => { - hasErrored = true; - subscriber.error(error); - }, - complete: () => { - observableData.completed = true; - numCompleted++; - treatEmitted(true); - }, - }); - - observablesData[obsIndex] = observableData; - }); - - // When unsubscribing, unsubscribe from all observables. - return () => { - observablesData.forEach(observableData => observableData.subscription?.unsubscribe()); - }; - }); -} diff --git a/src/core/utils/rxjs.ts b/src/core/utils/rxjs.ts index 509f28d82..d284b971a 100644 --- a/src/core/utils/rxjs.ts +++ b/src/core/utils/rxjs.ts @@ -13,8 +13,10 @@ // limitations under the License. import { FormControl } from '@angular/forms'; -import { Observable, OperatorFunction } from 'rxjs'; -import { filter } from 'rxjs/operators'; +import { CoreError } from '@classes/errors/error'; +import { CoreSubscriptions } from '@singletons/subscriptions'; +import { BehaviorSubject, Observable, of, OperatorFunction, Subscription } from 'rxjs'; +import { catchError, filter } from 'rxjs/operators'; /** * Create an observable that emits the current form control value. @@ -60,3 +62,149 @@ export function startWithOnSubscribed(onSubscribed: () => T): OperatorFunctio return source.subscribe(value => subscriber.next(value)); }); } + +/** + * Convert to an Observable a Promise that resolves to an Observable. + * + * @param createObservable A function returning a promise that resolves to an Observable. + * @returns Observable. + */ +export function asyncObservable(createObservable: () => Promise>): Observable { + const promise = createObservable(); + + return new Observable(subscriber => { + promise + .then(observable => observable.subscribe(subscriber)) // rxjs will automatically handle unsubscribes. + .catch(error => subscriber.error(error)); + }); +} + +/** + * Create a Promise resolved with the first value returned from an observable. The difference with toPromise is that + * this function returns the value as soon as it's emitted, it doesn't wait until the observable completes. + * This function can be removed when the app starts using rxjs v7. + * + * @param observable Observable. + * @returns Promise resolved with the first value returned. + */ +export function firstValueFrom(observable: Observable): Promise { + return new Promise((resolve, reject) => { + CoreSubscriptions.once(observable, resolve, reject, () => { + // Subscription is completed, check if we can get its value. + if (observable instanceof BehaviorSubject) { + resolve(observable.getValue()); + } + + reject(new CoreError('Couldn\'t get first value from observable because it\'s already completed')); + }); + }); +} + +/** + * Ignore errors from an observable, returning a certain value instead. + * + * @param observable Observable to ignore errors. + * @param fallback Value to return if the observer errors. + * @return Observable with ignored errors, returning the fallback result if provided. + */ +export function ignoreErrors(observable: Observable): Observable; +export function ignoreErrors(observable: Observable, fallback: Fallback): Observable; +export function ignoreErrors( + observable: Observable, + fallback?: Fallback, +): Observable { + return observable.pipe(catchError(() => of(fallback))); +} + +/** + * Get return types of a list of observables. + */ +type GetObservablesReturnTypes = { [key in keyof T]: T[key] extends Observable ? R : never }; + +/** + * Data for an observable when zipping. + */ +type ZipObservableData = { + values: T[]; + completed: boolean; + subscription?: Subscription; +}; + +/** + * Same as the built-in zip operator, but once an observable completes it'll continue to emit the last value as long + * as the other observables continue to emit values. + * + * @param observables Observables to zip. + * @return Observable that emits the zipped values. + */ +export function zipIncludingComplete[]>( + ...observables: T +): Observable> { + return new Observable(subscriber => { + const observablesData: ZipObservableData[] = []; + let nextIndex = 0; + let numCompleted = 0; + let hasErrored = false; + + // Treat an emitted event. + const treatEmitted = (completed = false) => { + if (hasErrored) { + return; + } + + if (numCompleted >= observables.length) { + subscriber.complete(); + + return; + } + + // Check if any observable still doesn't have data for the index. + const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined); + if (notReady) { + return; + } + + // For each observable, get the value for the next index, or last value if not present (completed). + const valueToEmit = observablesData.map(observableData => + observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]); + + nextIndex++; + subscriber.next(> valueToEmit); + + if (completed) { + // An observable was completed, there might be other values to emit. + treatEmitted(true); + } + }; + + observables.forEach((observable, obsIndex) => { + const observableData: ZipObservableData = { + values: [], + completed: false, + }; + + observableData.subscription = observable.subscribe({ + next: (value) => { + observableData.values.push(value); + treatEmitted(); + }, + error: (error) => { + hasErrored = true; + subscriber.error(error); + }, + complete: () => { + observableData.completed = true; + numCompleted++; + treatEmitted(true); + }, + }); + + observablesData[obsIndex] = observableData; + }); + + // When unsubscribing, unsubscribe from all observables. + return () => { + observablesData.forEach(observableData => observableData.subscription?.unsubscribe()); + }; + }); +} diff --git a/src/core/utils/tests/observables.test.ts b/src/core/utils/tests/observables.test.ts deleted file mode 100644 index 47a7a1e1c..000000000 --- a/src/core/utils/tests/observables.test.ts +++ /dev/null @@ -1,160 +0,0 @@ -// (C) Copyright 2015 Moodle Pty Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import { BehaviorSubject, Observable, Subject } from 'rxjs'; -import { TestScheduler } from 'rxjs/testing'; -import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '../observables'; - -describe('Observables utility functions', () => { - - it('asyncObservable emits values', (done) => { - const subject = new Subject(); - const promise = new Promise>((resolve) => { - resolve(subject); - }); - - asyncObservable(() => promise).subscribe({ - next: (value) => { - expect(value).toEqual('foo'); - done(); - }, - }); - - // Wait for the promise callback to be called before emitting the value. - setTimeout(() => subject.next('foo')); - }); - - it('asyncObservable emits errors', (done) => { - const subject = new Subject(); - const promise = new Promise>((resolve) => { - resolve(subject); - }); - - asyncObservable(() => promise).subscribe({ - error: (value) => { - expect(value).toEqual('foo'); - done(); - }, - }); - - // Wait for the promise callback to be called before emitting the value. - setTimeout(() => subject.error('foo')); - }); - - it('asyncObservable emits complete', (done) => { - const subject = new Subject(); - const promise = new Promise>((resolve) => { - resolve(subject); - }); - - asyncObservable(() => promise).subscribe({ - complete: () => done(), - }); - - // Wait for the promise callback to be called before emitting the value. - setTimeout(() => subject.complete()); - }); - - it('asyncObservable emits error if promise is rejected', (done) => { - const promise = new Promise>((resolve, reject) => { - reject('Custom error'); - }); - - asyncObservable(() => promise).subscribe({ - error: (error) => { - expect(error).toEqual('Custom error'); - done(); - }, - }); - }); - - it('returns first value emitted by an observable', async () => { - const subject = new Subject(); - setTimeout(() => subject.next('foo'), 10); - - await expect(firstValueFrom(subject)).resolves.toEqual('foo'); - - // Check that running it again doesn't get last value, it gets the new one. - setTimeout(() => subject.next('bar'), 10); - await expect(firstValueFrom(subject)).resolves.toEqual('bar'); - - // Check we cannot get first value if a subject is already completed. - subject.complete(); - await expect(firstValueFrom(subject)).rejects.toThrow(); - - // Check that we get last value when using BehaviourSubject. - const behaviorSubject = new BehaviorSubject('baz'); - await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); - - // Check we get last value even if behaviour subject is completed. - behaviorSubject.complete(); - await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); - - // Check that Promise is rejected if the observable emits an error. - const errorSubject = new Subject(); - setTimeout(() => errorSubject.error('foo error'), 10); - - await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error'); - }); - - it('ignores observable errors', (done) => { - const subject = new Subject(); - - ignoreErrors(subject, 'default value').subscribe({ - next: (value) => { - expect(value).toEqual('default value'); - done(); - }, - }); - - subject.error('foo'); - }); - - it('zips observables including complete events', () => { - const scheduler = new TestScheduler((actual, expected) => { - expect(actual).toEqual(expected); - }); - - scheduler.run(({ expectObservable, cold }) => { - expectObservable(zipIncudingComplete( - cold('-a-b---|', { - a: 'A1', - b: 'A2', - }), - cold('-a----b-c--|', { - a: 'B1', - b: 'B2', - c: 'B3', - }), - cold('-a-b-c--de-----|', { - a: 'C1', - b: 'C2', - c: 'C3', - d: 'C4', - e: 'C5', - }), - )).toBe( - '-a----b-c--(de)|', - { - a: ['A1','B1','C1'], - b: ['A2','B2','C2'], - c: ['A2','B3','C3'], - d: ['A2','B3','C4'], - e: ['A2','B3','C5'], - }, - ); - }); - }); - -}); diff --git a/src/core/utils/tests/rxjs.test.ts b/src/core/utils/tests/rxjs.test.ts index 738fd389b..934b9cfb0 100644 --- a/src/core/utils/tests/rxjs.test.ts +++ b/src/core/utils/tests/rxjs.test.ts @@ -12,14 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { formControlValue, resolved, startWithOnSubscribed } from '@/core/utils/rxjs'; +import { + asyncObservable, + firstValueFrom, + formControlValue, + ignoreErrors, + resolved, + startWithOnSubscribed, + zipIncludingComplete, +} from '@/core/utils/rxjs'; import { mock } from '@/testing/utils'; import { FormControl } from '@angular/forms'; -import { of, Subject } from 'rxjs'; +import { BehaviorSubject, Observable, of, Subject } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; describe('RXJS Utils', () => { - it('Emits filtered form values', () => { + it('formControlValue emits filtered form values', () => { // Arrange. let value = 'one'; const emited: string[] = []; @@ -48,7 +57,7 @@ describe('RXJS Utils', () => { expect(emited).toEqual(['two', 'three']); }); - it('Emits resolved values', async () => { + it('resolved emits resolved values', async () => { // Arrange. const emited: string[] = []; const promises = [ @@ -67,7 +76,7 @@ describe('RXJS Utils', () => { expect(emited).toEqual(['one', 'two', 'three']); }); - it('Adds starting values on subscription', () => { + it('startWithOnSubscribed adds starting values on subscription', () => { // Arrange. let store = 'one'; const emited: string[] = []; @@ -86,4 +95,143 @@ describe('RXJS Utils', () => { expect(emited).toEqual(['two', 'final', 'three', 'final']); }); + it('asyncObservable emits values', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + next: (value) => { + expect(value).toEqual('foo'); + done(); + }, + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.next('foo')); + }); + + it('asyncObservable emits errors', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + error: (value) => { + expect(value).toEqual('foo'); + done(); + }, + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.error('foo')); + }); + + it('asyncObservable emits complete', (done) => { + const subject = new Subject(); + const promise = new Promise>((resolve) => { + resolve(subject); + }); + + asyncObservable(() => promise).subscribe({ + complete: () => done(), + }); + + // Wait for the promise callback to be called before emitting the value. + setTimeout(() => subject.complete()); + }); + + it('asyncObservable emits error if promise is rejected', (done) => { + const promise = new Promise>((resolve, reject) => { + reject('Custom error'); + }); + + asyncObservable(() => promise).subscribe({ + error: (error) => { + expect(error).toEqual('Custom error'); + done(); + }, + }); + }); + + it('firstValueFrom returns first value emitted by an observable', async () => { + const subject = new Subject(); + setTimeout(() => subject.next('foo'), 10); + + await expect(firstValueFrom(subject)).resolves.toEqual('foo'); + + // Check that running it again doesn't get last value, it gets the new one. + setTimeout(() => subject.next('bar'), 10); + await expect(firstValueFrom(subject)).resolves.toEqual('bar'); + + // Check we cannot get first value if a subject is already completed. + subject.complete(); + await expect(firstValueFrom(subject)).rejects.toThrow(); + + // Check that we get last value when using BehaviourSubject. + const behaviorSubject = new BehaviorSubject('baz'); + await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); + + // Check we get last value even if behaviour subject is completed. + behaviorSubject.complete(); + await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz'); + + // Check that Promise is rejected if the observable emits an error. + const errorSubject = new Subject(); + setTimeout(() => errorSubject.error('foo error'), 10); + + await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error'); + }); + + it('ignoreErrors ignores observable errors', (done) => { + const subject = new Subject(); + + ignoreErrors(subject, 'default value').subscribe({ + next: (value) => { + expect(value).toEqual('default value'); + done(); + }, + }); + + subject.error('foo'); + }); + + it('zipIncludingComplete zips observables including complete events', () => { + const scheduler = new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected); + }); + + scheduler.run(({ expectObservable, cold }) => { + expectObservable(zipIncludingComplete( + cold('-a-b---|', { + a: 'A1', + b: 'A2', + }), + cold('-a----b-c--|', { + a: 'B1', + b: 'B2', + c: 'B3', + }), + cold('-a-b-c--de-----|', { + a: 'C1', + b: 'C2', + c: 'C3', + d: 'C4', + e: 'C5', + }), + )).toBe( + '-a----b-c--(de)|', + { + a: ['A1','B1','C1'], + b: ['A2','B2','C2'], + c: ['A2','B3','C3'], + d: ['A2','B3','C4'], + e: ['A2','B3','C5'], + }, + ); + }); + }); + });