MOBILE-3817 utils: Move new utils functions to existing rxjs file
parent
88297ed400
commit
63f3c440e3
|
@ -21,7 +21,7 @@ import { CoreSite, CoreSiteWSPreSets } from '@classes/site';
|
||||||
import { CoreStatusWithWarningsWSResponse, CoreWSExternalWarning } from '@services/ws';
|
import { CoreStatusWithWarningsWSResponse, CoreWSExternalWarning } from '@services/ws';
|
||||||
import { makeSingleton } from '@singletons';
|
import { makeSingleton } from '@singletons';
|
||||||
import { CoreError } from '@classes/errors/error';
|
import { CoreError } from '@classes/errors/error';
|
||||||
import { asyncObservable, firstValueFrom } from '@/core/utils/observables';
|
import { asyncObservable, firstValueFrom } from '@/core/utils/rxjs';
|
||||||
import { Observable } from 'rxjs';
|
import { Observable } from 'rxjs';
|
||||||
import { map } from 'rxjs/operators';
|
import { map } from 'rxjs/operators';
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ import {
|
||||||
} from '@services/database/sites';
|
} from '@services/database/sites';
|
||||||
import { Observable, ObservableInput, ObservedValueOf, OperatorFunction, Subject } from 'rxjs';
|
import { Observable, ObservableInput, ObservedValueOf, OperatorFunction, Subject } from 'rxjs';
|
||||||
import { finalize, map, mergeMap } from 'rxjs/operators';
|
import { finalize, map, mergeMap } from 'rxjs/operators';
|
||||||
import { firstValueFrom } from '../utils/observables';
|
import { firstValueFrom } from '../utils/rxjs';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* QR Code type enumeration.
|
* QR Code type enumeration.
|
||||||
|
|
|
@ -27,7 +27,7 @@ import { CoreWSExternalFile } from '@services/ws';
|
||||||
import { AddonCourseCompletion } from '@addons/coursecompletion/services/coursecompletion';
|
import { AddonCourseCompletion } from '@addons/coursecompletion/services/coursecompletion';
|
||||||
import moment from 'moment-timezone';
|
import moment from 'moment-timezone';
|
||||||
import { Observable, of } from 'rxjs';
|
import { Observable, of } from 'rxjs';
|
||||||
import { firstValueFrom, zipIncudingComplete } from '@/core/utils/observables';
|
import { firstValueFrom, zipIncludingComplete } from '@/core/utils/rxjs';
|
||||||
import { catchError, map } from 'rxjs/operators';
|
import { catchError, map } from 'rxjs/operators';
|
||||||
import { chainRequests } from '@classes/site';
|
import { chainRequests } from '@classes/site';
|
||||||
|
|
||||||
|
@ -259,7 +259,7 @@ export class CoreCoursesHelperProvider {
|
||||||
};
|
};
|
||||||
courses = this.filterAndSortCoursesWithOptions(courses, options);
|
courses = this.filterAndSortCoursesWithOptions(courses, options);
|
||||||
|
|
||||||
return zipIncudingComplete(
|
return zipIncludingComplete(
|
||||||
this.loadCoursesExtraInfoObservable(courses, options.loadCategoryNames, newOptions),
|
this.loadCoursesExtraInfoObservable(courses, options.loadCategoryNames, newOptions),
|
||||||
CoreCourses.getCoursesAdminAndNavOptionsObservable(courseIds, newOptions).pipe(map(courseOptions => {
|
CoreCourses.getCoursesAdminAndNavOptionsObservable(courseIds, newOptions).pipe(map(courseOptions => {
|
||||||
courses.forEach((course: CoreEnrolledCourseDataWithOptions) => {
|
courses.forEach((course: CoreEnrolledCourseDataWithOptions) => {
|
||||||
|
|
|
@ -20,7 +20,7 @@ import { CoreStatusWithWarningsWSResponse, CoreWarningsWSResponse, CoreWSExterna
|
||||||
import { CoreEvents } from '@singletons/events';
|
import { CoreEvents } from '@singletons/events';
|
||||||
import { CoreWSError } from '@classes/errors/wserror';
|
import { CoreWSError } from '@classes/errors/wserror';
|
||||||
import { CoreCourseAnyCourseDataWithExtraInfoAndOptions, CoreCourseWithImageAndColor } from './courses-helper';
|
import { CoreCourseAnyCourseDataWithExtraInfoAndOptions, CoreCourseWithImageAndColor } from './courses-helper';
|
||||||
import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '@/core/utils/observables';
|
import { asyncObservable, firstValueFrom, ignoreErrors, zipIncludingComplete } from '@/core/utils/rxjs';
|
||||||
import { of, Observable } from 'rxjs';
|
import { of, Observable } from 'rxjs';
|
||||||
import { map } from 'rxjs/operators';
|
import { map } from 'rxjs/operators';
|
||||||
|
|
||||||
|
@ -697,7 +697,7 @@ export class CoreCoursesProvider {
|
||||||
courseIds = await this.getCourseIdsForAdminAndNavOptions(courseIds, siteId);
|
courseIds = await this.getCourseIdsForAdminAndNavOptions(courseIds, siteId);
|
||||||
|
|
||||||
// Get user navigation and administration options.
|
// Get user navigation and administration options.
|
||||||
return zipIncudingComplete(
|
return zipIncludingComplete(
|
||||||
ignoreErrors(this.getUserNavigationOptionsObservable(courseIds, options), {}),
|
ignoreErrors(this.getUserNavigationOptionsObservable(courseIds, options), {}),
|
||||||
ignoreErrors(this.getUserAdministrationOptionsObservable(courseIds, options), {}),
|
ignoreErrors(this.getUserAdministrationOptionsObservable(courseIds, options), {}),
|
||||||
).pipe(
|
).pipe(
|
||||||
|
|
|
@ -21,7 +21,7 @@ import { makeSingleton } from '@singletons';
|
||||||
import { CoreError } from '@classes/errors/error';
|
import { CoreError } from '@classes/errors/error';
|
||||||
import { Observable } from 'rxjs';
|
import { Observable } from 'rxjs';
|
||||||
import { map } from 'rxjs/operators';
|
import { map } from 'rxjs/operators';
|
||||||
import { asyncObservable, firstValueFrom } from '@/core/utils/observables';
|
import { asyncObservable, firstValueFrom } from '@/core/utils/rxjs';
|
||||||
|
|
||||||
const ROOT_CACHE_KEY = 'CoreCoursesDashboard:';
|
const ROOT_CACHE_KEY = 'CoreCoursesDashboard:';
|
||||||
|
|
||||||
|
|
|
@ -1,163 +0,0 @@
|
||||||
// (C) Copyright 2015 Moodle Pty Ltd.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
import { CoreError } from '@classes/errors/error';
|
|
||||||
import { CoreSubscriptions } from '@singletons/subscriptions';
|
|
||||||
import { BehaviorSubject, Observable, of, Subscription } from 'rxjs';
|
|
||||||
import { catchError } from 'rxjs/operators';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert to an Observable a Promise that resolves to an Observable.
|
|
||||||
*
|
|
||||||
* @param createObservable A function returning a promise that resolves to an Observable.
|
|
||||||
* @returns Observable.
|
|
||||||
*/
|
|
||||||
export function asyncObservable<T>(createObservable: () => Promise<Observable<T>>): Observable<T> {
|
|
||||||
const promise = createObservable();
|
|
||||||
|
|
||||||
return new Observable(subscriber => {
|
|
||||||
promise
|
|
||||||
.then(observable => observable.subscribe(subscriber))
|
|
||||||
.catch(error => subscriber.error(error));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a Promise that resolved with the first value returned from an observable.
|
|
||||||
* This function can be removed when the app starts using rxjs v7.
|
|
||||||
*
|
|
||||||
* @param observable Observable.
|
|
||||||
* @returns Promise resolved with the first value returned.
|
|
||||||
*/
|
|
||||||
export function firstValueFrom<T>(observable: Observable<T>): Promise<T> {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
CoreSubscriptions.once(observable, resolve, reject, () => {
|
|
||||||
// Subscription is completed, check if we can get its value.
|
|
||||||
if (observable instanceof BehaviorSubject) {
|
|
||||||
resolve(observable.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
reject(new CoreError('Couldn\'t get first value from observable because it\'s already completed'));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ignore errors from an observable, returning a certain value instead.
|
|
||||||
*
|
|
||||||
* @param observable Observable to ignore errors.
|
|
||||||
* @param fallback Value to return if the observer errors.
|
|
||||||
* @return Observable with ignored errors, returning the fallback result if provided.
|
|
||||||
*/
|
|
||||||
export function ignoreErrors<Result>(observable: Observable<Result>): Observable<Result | undefined>;
|
|
||||||
export function ignoreErrors<Result, Fallback>(observable: Observable<Result>, fallback: Fallback): Observable<Result | Fallback>;
|
|
||||||
export function ignoreErrors<Result, Fallback>(
|
|
||||||
observable: Observable<Result>,
|
|
||||||
fallback?: Fallback,
|
|
||||||
): Observable<Result | Fallback | undefined> {
|
|
||||||
return observable.pipe(catchError(() => of(fallback)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get return types of a list of observables.
|
|
||||||
*/
|
|
||||||
type GetObservablesReturnTypes<T> = { [key in keyof T]: T[key] extends Observable<infer R> ? R : never };
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Data for an observable when zipping.
|
|
||||||
*/
|
|
||||||
type ZipObservableData<T = any> = { // eslint-disable-line @typescript-eslint/no-explicit-any
|
|
||||||
values: T[];
|
|
||||||
completed: boolean;
|
|
||||||
subscription?: Subscription;
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Similar to rxjs zip operator, but once an observable completes we'll continue to emit the last value as long
|
|
||||||
* as the other observables continue to emit values.
|
|
||||||
*
|
|
||||||
* @param observables Observables to zip.
|
|
||||||
* @return Observable that emits the zipped values.
|
|
||||||
*/
|
|
||||||
export function zipIncudingComplete<T extends Observable<any>[]>( // eslint-disable-line @typescript-eslint/no-explicit-any
|
|
||||||
...observables: T
|
|
||||||
): Observable<GetObservablesReturnTypes<T>> {
|
|
||||||
return new Observable(subscriber => {
|
|
||||||
const observablesData: ZipObservableData[] = [];
|
|
||||||
let nextIndex = 0;
|
|
||||||
let numCompleted = 0;
|
|
||||||
let hasErrored = false;
|
|
||||||
|
|
||||||
// Treat an emitted event.
|
|
||||||
const treatEmitted = (completed = false) => {
|
|
||||||
if (hasErrored) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numCompleted >= observables.length) {
|
|
||||||
subscriber.complete();
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if any observable still doesn't have data for the index.
|
|
||||||
const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined);
|
|
||||||
if (notReady) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// For each observable, get the value for the next index, or last value if not present (completed).
|
|
||||||
const valueToEmit = observablesData.map(observableData =>
|
|
||||||
observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]);
|
|
||||||
|
|
||||||
nextIndex++;
|
|
||||||
subscriber.next(<GetObservablesReturnTypes<T>> valueToEmit);
|
|
||||||
|
|
||||||
if (completed) {
|
|
||||||
// An observable was completed, there might be other values to emit.
|
|
||||||
treatEmitted(true);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
observables.forEach((observable, obsIndex) => {
|
|
||||||
const observableData: ZipObservableData = {
|
|
||||||
values: [],
|
|
||||||
completed: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
observableData.subscription = observable.subscribe({
|
|
||||||
next: (value) => {
|
|
||||||
observableData.values.push(value);
|
|
||||||
treatEmitted();
|
|
||||||
},
|
|
||||||
error: (error) => {
|
|
||||||
hasErrored = true;
|
|
||||||
subscriber.error(error);
|
|
||||||
},
|
|
||||||
complete: () => {
|
|
||||||
observableData.completed = true;
|
|
||||||
numCompleted++;
|
|
||||||
treatEmitted(true);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
observablesData[obsIndex] = observableData;
|
|
||||||
});
|
|
||||||
|
|
||||||
// When unsubscribing, unsubscribe from all observables.
|
|
||||||
return () => {
|
|
||||||
observablesData.forEach(observableData => observableData.subscription?.unsubscribe());
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
|
@ -13,8 +13,10 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
import { FormControl } from '@angular/forms';
|
import { FormControl } from '@angular/forms';
|
||||||
import { Observable, OperatorFunction } from 'rxjs';
|
import { CoreError } from '@classes/errors/error';
|
||||||
import { filter } from 'rxjs/operators';
|
import { CoreSubscriptions } from '@singletons/subscriptions';
|
||||||
|
import { BehaviorSubject, Observable, of, OperatorFunction, Subscription } from 'rxjs';
|
||||||
|
import { catchError, filter } from 'rxjs/operators';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an observable that emits the current form control value.
|
* Create an observable that emits the current form control value.
|
||||||
|
@ -60,3 +62,149 @@ export function startWithOnSubscribed<T>(onSubscribed: () => T): OperatorFunctio
|
||||||
return source.subscribe(value => subscriber.next(value));
|
return source.subscribe(value => subscriber.next(value));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert to an Observable a Promise that resolves to an Observable.
|
||||||
|
*
|
||||||
|
* @param createObservable A function returning a promise that resolves to an Observable.
|
||||||
|
* @returns Observable.
|
||||||
|
*/
|
||||||
|
export function asyncObservable<T>(createObservable: () => Promise<Observable<T>>): Observable<T> {
|
||||||
|
const promise = createObservable();
|
||||||
|
|
||||||
|
return new Observable(subscriber => {
|
||||||
|
promise
|
||||||
|
.then(observable => observable.subscribe(subscriber)) // rxjs will automatically handle unsubscribes.
|
||||||
|
.catch(error => subscriber.error(error));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a Promise resolved with the first value returned from an observable. The difference with toPromise is that
|
||||||
|
* this function returns the value as soon as it's emitted, it doesn't wait until the observable completes.
|
||||||
|
* This function can be removed when the app starts using rxjs v7.
|
||||||
|
*
|
||||||
|
* @param observable Observable.
|
||||||
|
* @returns Promise resolved with the first value returned.
|
||||||
|
*/
|
||||||
|
export function firstValueFrom<T>(observable: Observable<T>): Promise<T> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
CoreSubscriptions.once(observable, resolve, reject, () => {
|
||||||
|
// Subscription is completed, check if we can get its value.
|
||||||
|
if (observable instanceof BehaviorSubject) {
|
||||||
|
resolve(observable.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
reject(new CoreError('Couldn\'t get first value from observable because it\'s already completed'));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ignore errors from an observable, returning a certain value instead.
|
||||||
|
*
|
||||||
|
* @param observable Observable to ignore errors.
|
||||||
|
* @param fallback Value to return if the observer errors.
|
||||||
|
* @return Observable with ignored errors, returning the fallback result if provided.
|
||||||
|
*/
|
||||||
|
export function ignoreErrors<Result>(observable: Observable<Result>): Observable<Result | undefined>;
|
||||||
|
export function ignoreErrors<Result, Fallback>(observable: Observable<Result>, fallback: Fallback): Observable<Result | Fallback>;
|
||||||
|
export function ignoreErrors<Result, Fallback>(
|
||||||
|
observable: Observable<Result>,
|
||||||
|
fallback?: Fallback,
|
||||||
|
): Observable<Result | Fallback | undefined> {
|
||||||
|
return observable.pipe(catchError(() => of(fallback)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get return types of a list of observables.
|
||||||
|
*/
|
||||||
|
type GetObservablesReturnTypes<T> = { [key in keyof T]: T[key] extends Observable<infer R> ? R : never };
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data for an observable when zipping.
|
||||||
|
*/
|
||||||
|
type ZipObservableData<T = unknown> = {
|
||||||
|
values: T[];
|
||||||
|
completed: boolean;
|
||||||
|
subscription?: Subscription;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as the built-in zip operator, but once an observable completes it'll continue to emit the last value as long
|
||||||
|
* as the other observables continue to emit values.
|
||||||
|
*
|
||||||
|
* @param observables Observables to zip.
|
||||||
|
* @return Observable that emits the zipped values.
|
||||||
|
*/
|
||||||
|
export function zipIncludingComplete<T extends Observable<unknown>[]>(
|
||||||
|
...observables: T
|
||||||
|
): Observable<GetObservablesReturnTypes<T>> {
|
||||||
|
return new Observable(subscriber => {
|
||||||
|
const observablesData: ZipObservableData[] = [];
|
||||||
|
let nextIndex = 0;
|
||||||
|
let numCompleted = 0;
|
||||||
|
let hasErrored = false;
|
||||||
|
|
||||||
|
// Treat an emitted event.
|
||||||
|
const treatEmitted = (completed = false) => {
|
||||||
|
if (hasErrored) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numCompleted >= observables.length) {
|
||||||
|
subscriber.complete();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if any observable still doesn't have data for the index.
|
||||||
|
const notReady = observablesData.some(data => !data.completed && data.values[nextIndex] === undefined);
|
||||||
|
if (notReady) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each observable, get the value for the next index, or last value if not present (completed).
|
||||||
|
const valueToEmit = observablesData.map(observableData =>
|
||||||
|
observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]);
|
||||||
|
|
||||||
|
nextIndex++;
|
||||||
|
subscriber.next(<GetObservablesReturnTypes<T>> valueToEmit);
|
||||||
|
|
||||||
|
if (completed) {
|
||||||
|
// An observable was completed, there might be other values to emit.
|
||||||
|
treatEmitted(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
observables.forEach((observable, obsIndex) => {
|
||||||
|
const observableData: ZipObservableData = {
|
||||||
|
values: [],
|
||||||
|
completed: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
observableData.subscription = observable.subscribe({
|
||||||
|
next: (value) => {
|
||||||
|
observableData.values.push(value);
|
||||||
|
treatEmitted();
|
||||||
|
},
|
||||||
|
error: (error) => {
|
||||||
|
hasErrored = true;
|
||||||
|
subscriber.error(error);
|
||||||
|
},
|
||||||
|
complete: () => {
|
||||||
|
observableData.completed = true;
|
||||||
|
numCompleted++;
|
||||||
|
treatEmitted(true);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
observablesData[obsIndex] = observableData;
|
||||||
|
});
|
||||||
|
|
||||||
|
// When unsubscribing, unsubscribe from all observables.
|
||||||
|
return () => {
|
||||||
|
observablesData.forEach(observableData => observableData.subscription?.unsubscribe());
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
|
@ -1,160 +0,0 @@
|
||||||
// (C) Copyright 2015 Moodle Pty Ltd.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
import { BehaviorSubject, Observable, Subject } from 'rxjs';
|
|
||||||
import { TestScheduler } from 'rxjs/testing';
|
|
||||||
import { asyncObservable, firstValueFrom, ignoreErrors, zipIncudingComplete } from '../observables';
|
|
||||||
|
|
||||||
describe('Observables utility functions', () => {
|
|
||||||
|
|
||||||
it('asyncObservable emits values', (done) => {
|
|
||||||
const subject = new Subject();
|
|
||||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
|
||||||
resolve(subject);
|
|
||||||
});
|
|
||||||
|
|
||||||
asyncObservable(() => promise).subscribe({
|
|
||||||
next: (value) => {
|
|
||||||
expect(value).toEqual('foo');
|
|
||||||
done();
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
// Wait for the promise callback to be called before emitting the value.
|
|
||||||
setTimeout(() => subject.next('foo'));
|
|
||||||
});
|
|
||||||
|
|
||||||
it('asyncObservable emits errors', (done) => {
|
|
||||||
const subject = new Subject();
|
|
||||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
|
||||||
resolve(subject);
|
|
||||||
});
|
|
||||||
|
|
||||||
asyncObservable(() => promise).subscribe({
|
|
||||||
error: (value) => {
|
|
||||||
expect(value).toEqual('foo');
|
|
||||||
done();
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
// Wait for the promise callback to be called before emitting the value.
|
|
||||||
setTimeout(() => subject.error('foo'));
|
|
||||||
});
|
|
||||||
|
|
||||||
it('asyncObservable emits complete', (done) => {
|
|
||||||
const subject = new Subject();
|
|
||||||
const promise = new Promise<Observable<unknown>>((resolve) => {
|
|
||||||
resolve(subject);
|
|
||||||
});
|
|
||||||
|
|
||||||
asyncObservable(() => promise).subscribe({
|
|
||||||
complete: () => done(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Wait for the promise callback to be called before emitting the value.
|
|
||||||
setTimeout(() => subject.complete());
|
|
||||||
});
|
|
||||||
|
|
||||||
it('asyncObservable emits error if promise is rejected', (done) => {
|
|
||||||
const promise = new Promise<Observable<unknown>>((resolve, reject) => {
|
|
||||||
reject('Custom error');
|
|
||||||
});
|
|
||||||
|
|
||||||
asyncObservable(() => promise).subscribe({
|
|
||||||
error: (error) => {
|
|
||||||
expect(error).toEqual('Custom error');
|
|
||||||
done();
|
|
||||||
},
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('returns first value emitted by an observable', async () => {
|
|
||||||
const subject = new Subject();
|
|
||||||
setTimeout(() => subject.next('foo'), 10);
|
|
||||||
|
|
||||||
await expect(firstValueFrom(subject)).resolves.toEqual('foo');
|
|
||||||
|
|
||||||
// Check that running it again doesn't get last value, it gets the new one.
|
|
||||||
setTimeout(() => subject.next('bar'), 10);
|
|
||||||
await expect(firstValueFrom(subject)).resolves.toEqual('bar');
|
|
||||||
|
|
||||||
// Check we cannot get first value if a subject is already completed.
|
|
||||||
subject.complete();
|
|
||||||
await expect(firstValueFrom(subject)).rejects.toThrow();
|
|
||||||
|
|
||||||
// Check that we get last value when using BehaviourSubject.
|
|
||||||
const behaviorSubject = new BehaviorSubject('baz');
|
|
||||||
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
|
||||||
|
|
||||||
// Check we get last value even if behaviour subject is completed.
|
|
||||||
behaviorSubject.complete();
|
|
||||||
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
|
||||||
|
|
||||||
// Check that Promise is rejected if the observable emits an error.
|
|
||||||
const errorSubject = new Subject();
|
|
||||||
setTimeout(() => errorSubject.error('foo error'), 10);
|
|
||||||
|
|
||||||
await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error');
|
|
||||||
});
|
|
||||||
|
|
||||||
it('ignores observable errors', (done) => {
|
|
||||||
const subject = new Subject();
|
|
||||||
|
|
||||||
ignoreErrors(subject, 'default value').subscribe({
|
|
||||||
next: (value) => {
|
|
||||||
expect(value).toEqual('default value');
|
|
||||||
done();
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
subject.error('foo');
|
|
||||||
});
|
|
||||||
|
|
||||||
it('zips observables including complete events', () => {
|
|
||||||
const scheduler = new TestScheduler((actual, expected) => {
|
|
||||||
expect(actual).toEqual(expected);
|
|
||||||
});
|
|
||||||
|
|
||||||
scheduler.run(({ expectObservable, cold }) => {
|
|
||||||
expectObservable(zipIncudingComplete(
|
|
||||||
cold('-a-b---|', {
|
|
||||||
a: 'A1',
|
|
||||||
b: 'A2',
|
|
||||||
}),
|
|
||||||
cold('-a----b-c--|', {
|
|
||||||
a: 'B1',
|
|
||||||
b: 'B2',
|
|
||||||
c: 'B3',
|
|
||||||
}),
|
|
||||||
cold('-a-b-c--de-----|', {
|
|
||||||
a: 'C1',
|
|
||||||
b: 'C2',
|
|
||||||
c: 'C3',
|
|
||||||
d: 'C4',
|
|
||||||
e: 'C5',
|
|
||||||
}),
|
|
||||||
)).toBe(
|
|
||||||
'-a----b-c--(de)|',
|
|
||||||
{
|
|
||||||
a: ['A1','B1','C1'],
|
|
||||||
b: ['A2','B2','C2'],
|
|
||||||
c: ['A2','B3','C3'],
|
|
||||||
d: ['A2','B3','C4'],
|
|
||||||
e: ['A2','B3','C5'],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
});
|
|
|
@ -12,14 +12,23 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
import { formControlValue, resolved, startWithOnSubscribed } from '@/core/utils/rxjs';
|
import {
|
||||||
|
asyncObservable,
|
||||||
|
firstValueFrom,
|
||||||
|
formControlValue,
|
||||||
|
ignoreErrors,
|
||||||
|
resolved,
|
||||||
|
startWithOnSubscribed,
|
||||||
|
zipIncludingComplete,
|
||||||
|
} from '@/core/utils/rxjs';
|
||||||
import { mock } from '@/testing/utils';
|
import { mock } from '@/testing/utils';
|
||||||
import { FormControl } from '@angular/forms';
|
import { FormControl } from '@angular/forms';
|
||||||
import { of, Subject } from 'rxjs';
|
import { BehaviorSubject, Observable, of, Subject } from 'rxjs';
|
||||||
|
import { TestScheduler } from 'rxjs/testing';
|
||||||
|
|
||||||
describe('RXJS Utils', () => {
|
describe('RXJS Utils', () => {
|
||||||
|
|
||||||
it('Emits filtered form values', () => {
|
it('formControlValue emits filtered form values', () => {
|
||||||
// Arrange.
|
// Arrange.
|
||||||
let value = 'one';
|
let value = 'one';
|
||||||
const emited: string[] = [];
|
const emited: string[] = [];
|
||||||
|
@ -48,7 +57,7 @@ describe('RXJS Utils', () => {
|
||||||
expect(emited).toEqual(['two', 'three']);
|
expect(emited).toEqual(['two', 'three']);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('Emits resolved values', async () => {
|
it('resolved emits resolved values', async () => {
|
||||||
// Arrange.
|
// Arrange.
|
||||||
const emited: string[] = [];
|
const emited: string[] = [];
|
||||||
const promises = [
|
const promises = [
|
||||||
|
@ -67,7 +76,7 @@ describe('RXJS Utils', () => {
|
||||||
expect(emited).toEqual(['one', 'two', 'three']);
|
expect(emited).toEqual(['one', 'two', 'three']);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('Adds starting values on subscription', () => {
|
it('startWithOnSubscribed adds starting values on subscription', () => {
|
||||||
// Arrange.
|
// Arrange.
|
||||||
let store = 'one';
|
let store = 'one';
|
||||||
const emited: string[] = [];
|
const emited: string[] = [];
|
||||||
|
@ -86,4 +95,143 @@ describe('RXJS Utils', () => {
|
||||||
expect(emited).toEqual(['two', 'final', 'three', 'final']);
|
expect(emited).toEqual(['two', 'final', 'three', 'final']);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('asyncObservable emits values', (done) => {
|
||||||
|
const subject = new Subject();
|
||||||
|
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||||
|
resolve(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncObservable(() => promise).subscribe({
|
||||||
|
next: (value) => {
|
||||||
|
expect(value).toEqual('foo');
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the promise callback to be called before emitting the value.
|
||||||
|
setTimeout(() => subject.next('foo'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('asyncObservable emits errors', (done) => {
|
||||||
|
const subject = new Subject();
|
||||||
|
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||||
|
resolve(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncObservable(() => promise).subscribe({
|
||||||
|
error: (value) => {
|
||||||
|
expect(value).toEqual('foo');
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the promise callback to be called before emitting the value.
|
||||||
|
setTimeout(() => subject.error('foo'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('asyncObservable emits complete', (done) => {
|
||||||
|
const subject = new Subject();
|
||||||
|
const promise = new Promise<Observable<unknown>>((resolve) => {
|
||||||
|
resolve(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncObservable(() => promise).subscribe({
|
||||||
|
complete: () => done(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the promise callback to be called before emitting the value.
|
||||||
|
setTimeout(() => subject.complete());
|
||||||
|
});
|
||||||
|
|
||||||
|
it('asyncObservable emits error if promise is rejected', (done) => {
|
||||||
|
const promise = new Promise<Observable<unknown>>((resolve, reject) => {
|
||||||
|
reject('Custom error');
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncObservable(() => promise).subscribe({
|
||||||
|
error: (error) => {
|
||||||
|
expect(error).toEqual('Custom error');
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('firstValueFrom returns first value emitted by an observable', async () => {
|
||||||
|
const subject = new Subject();
|
||||||
|
setTimeout(() => subject.next('foo'), 10);
|
||||||
|
|
||||||
|
await expect(firstValueFrom(subject)).resolves.toEqual('foo');
|
||||||
|
|
||||||
|
// Check that running it again doesn't get last value, it gets the new one.
|
||||||
|
setTimeout(() => subject.next('bar'), 10);
|
||||||
|
await expect(firstValueFrom(subject)).resolves.toEqual('bar');
|
||||||
|
|
||||||
|
// Check we cannot get first value if a subject is already completed.
|
||||||
|
subject.complete();
|
||||||
|
await expect(firstValueFrom(subject)).rejects.toThrow();
|
||||||
|
|
||||||
|
// Check that we get last value when using BehaviourSubject.
|
||||||
|
const behaviorSubject = new BehaviorSubject('baz');
|
||||||
|
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
||||||
|
|
||||||
|
// Check we get last value even if behaviour subject is completed.
|
||||||
|
behaviorSubject.complete();
|
||||||
|
await expect(firstValueFrom(behaviorSubject)).resolves.toEqual('baz');
|
||||||
|
|
||||||
|
// Check that Promise is rejected if the observable emits an error.
|
||||||
|
const errorSubject = new Subject();
|
||||||
|
setTimeout(() => errorSubject.error('foo error'), 10);
|
||||||
|
|
||||||
|
await expect(firstValueFrom(errorSubject)).rejects.toMatch('foo error');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ignoreErrors ignores observable errors', (done) => {
|
||||||
|
const subject = new Subject();
|
||||||
|
|
||||||
|
ignoreErrors(subject, 'default value').subscribe({
|
||||||
|
next: (value) => {
|
||||||
|
expect(value).toEqual('default value');
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
subject.error('foo');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('zipIncludingComplete zips observables including complete events', () => {
|
||||||
|
const scheduler = new TestScheduler((actual, expected) => {
|
||||||
|
expect(actual).toEqual(expected);
|
||||||
|
});
|
||||||
|
|
||||||
|
scheduler.run(({ expectObservable, cold }) => {
|
||||||
|
expectObservable(zipIncludingComplete(
|
||||||
|
cold('-a-b---|', {
|
||||||
|
a: 'A1',
|
||||||
|
b: 'A2',
|
||||||
|
}),
|
||||||
|
cold('-a----b-c--|', {
|
||||||
|
a: 'B1',
|
||||||
|
b: 'B2',
|
||||||
|
c: 'B3',
|
||||||
|
}),
|
||||||
|
cold('-a-b-c--de-----|', {
|
||||||
|
a: 'C1',
|
||||||
|
b: 'C2',
|
||||||
|
c: 'C3',
|
||||||
|
d: 'C4',
|
||||||
|
e: 'C5',
|
||||||
|
}),
|
||||||
|
)).toBe(
|
||||||
|
'-a----b-c--(de)|',
|
||||||
|
{
|
||||||
|
a: ['A1','B1','C1'],
|
||||||
|
b: ['A2','B2','C2'],
|
||||||
|
c: ['A2','B3','C3'],
|
||||||
|
d: ['A2','B3','C4'],
|
||||||
|
e: ['A2','B3','C5'],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue