MOBILE-3817 rxjs: Fix zipIncludingComplete completion

When the last observable completed it didn't emit pending values
main
Dani Palou 2022-09-08 15:07:03 +02:00
parent 52a4322f0d
commit 2395edbd05
2 changed files with 57 additions and 23 deletions

View File

@ -126,7 +126,6 @@ type GetObservablesReturnTypes<T> = { [key in keyof T]: T[key] extends Observabl
*/
type ZipObservableData<T = unknown> = {
values: T[];
hasValueForIndex: boolean[];
completed: boolean;
subscription?: Subscription;
};
@ -142,50 +141,62 @@ export function zipIncludingComplete<T extends Observable<unknown>[]>(
...observables: T
): Observable<GetObservablesReturnTypes<T>> {
return new Observable(subscriber => {
const observablesData: ZipObservableData[] = [];
let nextIndex = 0;
let numCompleted = 0;
let hasErrored = false;
let hasCompleted = false;
// Before subscribing, initialize the data for all observables.
const observablesData = observables.map(() => <ZipObservableData> {
values: [],
completed: false,
});
// Treat an emitted event.
const treatEmitted = (completed = false) => {
if (hasErrored) {
if (hasErrored || hasCompleted) {
return;
}
if (numCompleted >= observables.length) {
subscriber.complete();
if (completed) {
// Check if all observables have completed.
const numCompleted = observablesData.reduce((total, data) => total + (data.completed ? 1 : 0), 0);
if (numCompleted === observablesData.length) {
hasCompleted = true;
return;
// Emit all pending values.
const maxValues = observablesData.reduce((maxValues, data) => Math.max(maxValues, data.values.length), 0);
while (nextIndex < maxValues) {
emitNextValue();
nextIndex++;
}
subscriber.complete();
return;
}
}
// Check if any observable still doesn't have data for the index.
const notReady = observablesData.some(data => !data.completed && !data.hasValueForIndex[nextIndex]);
const notReady = observablesData.some(data => !data.completed && !(nextIndex in data.values));
if (notReady) {
return;
}
// For each observable, get the value for the next index, or last value if not present (completed).
const valueToEmit = observablesData.map(observableData =>
observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]);
emitNextValue();
nextIndex++;
subscriber.next(<GetObservablesReturnTypes<T>> valueToEmit);
if (completed) {
// An observable was completed, there might be other values to emit.
treatEmitted(true);
}
};
const emitNextValue = () => {
// For each observable, get the value for the next index, or last value if not present (completed).
const valueToEmit = observablesData.map(observableData =>
observableData.values[nextIndex] ?? observableData.values[observableData.values.length - 1]);
// Before subscribing, initialize the data for all observables.
observables.forEach((observable, obsIndex) => {
observablesData[obsIndex] = {
values: [],
hasValueForIndex: [],
completed: false,
};
});
subscriber.next(<GetObservablesReturnTypes<T>> valueToEmit);
};
observables.forEach((observable, obsIndex) => {
const observableData = observablesData[obsIndex];
@ -193,7 +204,6 @@ export function zipIncludingComplete<T extends Observable<unknown>[]>(
observableData.subscription = observable.subscribe({
next: (value) => {
observableData.values.push(value);
observableData.hasValueForIndex.push(true);
treatEmitted();
},
error: (error) => {
@ -202,7 +212,6 @@ export function zipIncludingComplete<T extends Observable<unknown>[]>(
},
complete: () => {
observableData.completed = true;
numCompleted++;
treatEmitted(true);
},
});

View File

@ -234,4 +234,29 @@ describe('RXJS Utils', () => {
});
});
it('zipIncludingComplete emits all pending values when last observable completes', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
scheduler.run(({ expectObservable, cold }) => {
expectObservable(zipIncludingComplete(
cold('-a-b-|', {
a: 'A1',
b: 'A2',
c: 'A3',
}),
cold('-a-----|', {
a: 'B1',
}),
)).toBe(
'-a-----(b|)',
{
a: ['A1','B1'],
b: ['A2','B1'],
},
);
});
});
});