From 2395edbd052f2c0005f775b9a638c3a6bbce77ae Mon Sep 17 00:00:00 2001 From: Dani Palou Date: Thu, 8 Sep 2022 15:07:03 +0200 Subject: [PATCH] MOBILE-3817 rxjs: Fix zipIncludingComplete completion When the last observable completed it didn't emit pending values --- src/core/utils/rxjs.ts | 55 ++++++++++++++++++------------- src/core/utils/tests/rxjs.test.ts | 25 ++++++++++++++ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/src/core/utils/rxjs.ts b/src/core/utils/rxjs.ts index 463b9b332..b583b0fe5 100644 --- a/src/core/utils/rxjs.ts +++ b/src/core/utils/rxjs.ts @@ -126,7 +126,6 @@ type GetObservablesReturnTypes = { [key in keyof T]: T[key] extends Observabl */ type ZipObservableData = { values: T[]; - hasValueForIndex: boolean[]; completed: boolean; subscription?: Subscription; }; @@ -142,50 +141,62 @@ export function zipIncludingComplete[]>( ...observables: T ): Observable> { return new Observable(subscriber => { - const observablesData: ZipObservableData[] = []; let nextIndex = 0; - let numCompleted = 0; let hasErrored = false; + let hasCompleted = false; + + // Before subscribing, initialize the data for all observables. + const observablesData = observables.map(() => { + values: [], + completed: false, + }); // Treat an emitted event. const treatEmitted = (completed = false) => { - if (hasErrored) { + if (hasErrored || hasCompleted) { return; } - if (numCompleted >= observables.length) { - subscriber.complete(); + if (completed) { + // Check if all observables have completed. + const numCompleted = observablesData.reduce((total, data) => total + (data.completed ? 1 : 0), 0); + if (numCompleted === observablesData.length) { + hasCompleted = true; - return; + // Emit all pending values. + const maxValues = observablesData.reduce((maxValues, data) => Math.max(maxValues, data.values.length), 0); + while (nextIndex < maxValues) { + emitNextValue(); + nextIndex++; + } + + subscriber.complete(); + + return; + } } // Check if any observable still doesn't have data for the index. - const notReady = observablesData.some(data => !data.completed && !data.hasValueForIndex[nextIndex]); + const notReady = observablesData.some(data => !data.completed && !(nextIndex in data.values)); if (notReady) { return; } - // For each observable, get the value for the next index, or last value if not present (completed). - const valueToEmit = observablesData.map(observableData => - observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]); - + emitNextValue(); nextIndex++; - subscriber.next(> valueToEmit); if (completed) { // An observable was completed, there might be other values to emit. treatEmitted(true); } }; + const emitNextValue = () => { + // For each observable, get the value for the next index, or last value if not present (completed). + const valueToEmit = observablesData.map(observableData => + observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]); - // Before subscribing, initialize the data for all observables. - observables.forEach((observable, obsIndex) => { - observablesData[obsIndex] = { - values: [], - hasValueForIndex: [], - completed: false, - }; - }); + subscriber.next(> valueToEmit); + }; observables.forEach((observable, obsIndex) => { const observableData = observablesData[obsIndex]; @@ -193,7 +204,6 @@ export function zipIncludingComplete[]>( observableData.subscription = observable.subscribe({ next: (value) => { observableData.values.push(value); - observableData.hasValueForIndex.push(true); treatEmitted(); }, error: (error) => { @@ -202,7 +212,6 @@ export function zipIncludingComplete[]>( }, complete: () => { observableData.completed = true; - numCompleted++; treatEmitted(true); }, }); diff --git a/src/core/utils/tests/rxjs.test.ts b/src/core/utils/tests/rxjs.test.ts index 934b9cfb0..e7221b610 100644 --- a/src/core/utils/tests/rxjs.test.ts +++ b/src/core/utils/tests/rxjs.test.ts @@ -234,4 +234,29 @@ describe('RXJS Utils', () => { }); }); + it('zipIncludingComplete emits all pending values when last observable completes', () => { + const scheduler = new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected); + }); + + scheduler.run(({ expectObservable, cold }) => { + expectObservable(zipIncludingComplete( + cold('-a-b-|', { + a: 'A1', + b: 'A2', + c: 'A3', + }), + cold('-a-----|', { + a: 'B1', + }), + )).toBe( + '-a-----(b|)', + { + a: ['A1','B1'], + b: ['A2','B1'], + }, + ); + }); + }); + });