From f41a4e7b572e5f7c224b018c1aa1b4e11651ee1e Mon Sep 17 00:00:00 2001
From: Dani Palou <dani@moodle.com>
Date: Tue, 21 Jun 2022 10:58:10 +0200
Subject: [PATCH] MOBILE-3817 core: Create observable methods for WS requests

---
 src/core/classes/site.ts | 194 ++++++++++++++++++++++++---------------
 1 file changed, 119 insertions(+), 75 deletions(-)

diff --git a/src/core/classes/site.ts b/src/core/classes/site.ts
index b54787346..7dd12b4e5 100644
--- a/src/core/classes/site.ts
+++ b/src/core/classes/site.ts
@@ -57,6 +57,8 @@ import {
     WSGroups,
     WS_CACHE_TABLES_PREFIX,
 } from '@services/database/sites';
+import { Observable, Subject } from 'rxjs';
+import { finalize, map } from 'rxjs/operators';
 
 /**
  * QR Code type enumeration.
@@ -122,7 +124,7 @@ export class CoreSite {
     protected lastAutoLogin = 0;
     protected offlineDisabled = false;
     // eslint-disable-next-line @typescript-eslint/no-explicit-any
-    protected ongoingRequests: { [cacheId: string]: Promise<any> } = {};
+    protected ongoingRequests: { [cacheId: string]: Observable<any> } = {};
     protected requestQueue: RequestQueueItem[] = [];
     protected requestQueueTimeout: number | null = null;
     protected tokenPluginFileWorks?: boolean;
@@ -492,18 +494,25 @@ export class CoreSite {
      */
     // eslint-disable-next-line @typescript-eslint/no-explicit-any
     read<T = unknown>(method: string, data: any, preSets?: CoreSiteWSPreSets): Promise<T> {
-        preSets = preSets || {};
-        if (preSets.getFromCache === undefined) {
-            preSets.getFromCache = true;
-        }
-        if (preSets.saveToCache === undefined) {
-            preSets.saveToCache = true;
-        }
-        if (preSets.reusePending === undefined) {
-            preSets.reusePending = true;
-        }
+        return this.readObservable<T>(method, data, preSets).toPromise();
+    }
 
-        return this.request(method, data, preSets);
+    /**
+     * Read some data from the Moodle site using WS. Requests are cached by default.
+     *
+     * @param method WS method to use.
+     * @param data Data to send to the WS.
+     * @param preSets Extra options.
+     * @return Observable.
+     */
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    readObservable<T = unknown>(method: string, data: any, preSets?: CoreSiteWSPreSets): Observable<T> {
+        preSets = preSets || {};
+        preSets.getFromCache = preSets.getFromCache ?? true;
+        preSets.saveToCache = preSets.saveToCache ?? true;
+        preSets.reusePending = preSets.reusePending ?? true;
+
+        return this.requestObservable<T>(method, data, preSets);
     }
 
     /**
@@ -516,18 +525,25 @@ export class CoreSite {
      */
     // eslint-disable-next-line @typescript-eslint/no-explicit-any
     write<T = unknown>(method: string, data: any, preSets?: CoreSiteWSPreSets): Promise<T> {
-        preSets = preSets || {};
-        if (preSets.getFromCache === undefined) {
-            preSets.getFromCache = false;
-        }
-        if (preSets.saveToCache === undefined) {
-            preSets.saveToCache = false;
-        }
-        if (preSets.emergencyCache === undefined) {
-            preSets.emergencyCache = false;
-        }
+        return this.writeObservable<T>(method, data, preSets).toPromise();
+    }
 
-        return this.request(method, data, preSets);
+    /**
+     * Sends some data to the Moodle site using WS. Requests are NOT cached by default.
+     *
+     * @param method WS method to use.
+     * @param data Data to send to the WS.
+     * @param preSets Extra options.
+     * @return Observable.
+     */
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    writeObservable<T = unknown>(method: string, data: any, preSets?: CoreSiteWSPreSets): Observable<T> {
+        preSets = preSets || {};
+        preSets.getFromCache = preSets.getFromCache ?? false;
+        preSets.saveToCache = preSets.saveToCache ?? false;
+        preSets.emergencyCache = preSets.emergencyCache ?? false;
+
+        return this.requestObservable<T>(method, data, preSets);
     }
 
     /**
@@ -537,6 +553,19 @@ export class CoreSite {
      * @param data Arguments to pass to the method.
      * @param preSets Extra options.
      * @return Promise resolved with the response, rejected with CoreWSError if it fails.
+     */
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    async request<T = unknown>(method: string, data: any, preSets: CoreSiteWSPreSets): Promise<T> {
+        return this.requestObservable<T>(method, data, preSets).toPromise();
+    }
+
+    /**
+     * WS request to the site.
+     *
+     * @param method The WebService method to be called.
+     * @param data Arguments to pass to the method.
+     * @param preSets Extra options.
+     * @return Observable
      * @description
      *
      * Sends a webservice request to the site. This method will automatically add the
@@ -546,7 +575,7 @@ export class CoreSite {
      * data hasn't expired.
      */
     // eslint-disable-next-line @typescript-eslint/no-explicit-any
-    async request<T = unknown>(method: string, data: any, preSets: CoreSiteWSPreSets): Promise<T> {
+    requestObservable<T = unknown>(method: string, data: any, preSets: CoreSiteWSPreSets): Observable<T> {
         if (this.isLoggedOut() && !ALLOWED_LOGGEDOUT_WS.includes(method)) {
             // Site is logged out, it cannot call WebServices.
             CoreEvents.trigger(CoreEvents.SESSION_EXPIRED, {}, this.id);
@@ -608,28 +637,24 @@ export class CoreSite {
 
         // Check for an ongoing identical request if we're not ignoring cache.
         if (preSets.getFromCache && this.ongoingRequests[cacheId] !== undefined) {
-            const response = await this.ongoingRequests[cacheId];
-
-            // Clone the data, this may prevent errors if in the callback the object is modified.
-            return CoreUtils.clone(response);
+            return this.ongoingRequests[cacheId];
         }
 
-        const promise = this.performRequest<T>(method, data, preSets, wsPreSets);
+        const observable = this.performRequest<T>(method, data, preSets, wsPreSets).pipe(
+            // Return a clone of the original object, this may prevent errors if in the callback the object is modified.
+            map((data) => CoreUtils.clone(data)),
+        );
 
-        this.ongoingRequests[cacheId] = promise;
+        this.ongoingRequests[cacheId] = observable;
 
-        // Clear ongoing request after setting the promise (just in case it's already resolved).
-        try {
-            const response = await promise;
-
-            // We pass back a clone of the original object, this may prevent errors if in the callback the object is modified.
-            return CoreUtils.clone(response);
-        } finally {
-            // Make sure we don't clear the promise of a newer request that ignores the cache.
-            if (this.ongoingRequests[cacheId] === promise) {
-                delete this.ongoingRequests[cacheId];
-            }
-        }
+        return observable.pipe(
+            finalize(() => {
+                // Clear the ongoing request unless it has changed (e.g. a new request that ignores cache).
+                if (this.ongoingRequests[cacheId] === observable) {
+                    delete this.ongoingRequests[cacheId];
+                }
+            }),
+        );
     }
 
     /**
@@ -639,29 +664,42 @@ export class CoreSite {
      * @param data Arguments to pass to the method.
      * @param preSets Extra options related to the site.
      * @param wsPreSets Extra options related to the WS call.
-     * @return Promise resolved with the response.
+     * @return Observable.
      */
-    protected async performRequest<T = unknown>(
+    protected performRequest<T = unknown>(
         method: string,
         data: unknown,
         preSets: CoreSiteWSPreSets,
         wsPreSets: CoreWSPreSets,
-    ): Promise<T> {
-        let response: T | {exception?: string; errorcode?: string};
+    ): Observable<T> {
+        const subject = new Subject<T>();
 
-        try {
-            response = await this.getFromCache<T>(method, data, preSets, false);
-        } catch {
-            // Not found or expired, call WS.
-            response = await this.getFromWSOrEmergencyCache<T>(method, data, preSets, wsPreSets);
-        }
+        const run = async () => {
+            try {
+                let response: T | {exception?: string; errorcode?: string};
 
-        if (('exception' in response && response.exception !== undefined) ||
-                ('errorcode' in response && response.errorcode !== undefined)) {
-            throw new CoreWSError(response);
-        }
+                try {
+                    response = await this.getFromCache<T>(method, data, preSets, false);
+                } catch {
+                    // Not found or expired, call WS.
+                    response = await this.getFromWSOrEmergencyCache<T>(method, data, preSets, wsPreSets);
+                }
 
-        return <T> response;
+                if (('exception' in response && response.exception !== undefined) ||
+                        ('errorcode' in response && response.errorcode !== undefined)) {
+                    throw new CoreWSError(response);
+                }
+
+                subject.next(<T> response);
+                subject.complete();
+            } catch (error) {
+                subject.error(error);
+            }
+        };
+
+        run();
+
+        return subject;
     }
 
     /**
@@ -1536,12 +1574,24 @@ export class CoreSite {
 
         // Check for an ongoing identical request if we're not ignoring cache.
         if (cachePreSets.getFromCache && this.ongoingRequests[cacheId] !== undefined) {
-            const response = await this.ongoingRequests[cacheId];
-
-            return response;
+            return await this.ongoingRequests[cacheId].toPromise();
         }
 
-        const promise = this.getFromCache<CoreSitePublicConfigResponse>(method, {}, cachePreSets, false).catch(async () => {
+        const subject = new Subject<CoreSitePublicConfigResponse>();
+        const observable = subject.pipe(
+            // Return a clone of the original object, this may prevent errors if in the callback the object is modified.
+            map((data) => CoreUtils.clone(data)),
+            finalize(() => {
+                // Clear the ongoing request unless it has changed (e.g. a new request that ignores cache).
+                if (this.ongoingRequests[cacheId] === observable) {
+                    delete this.ongoingRequests[cacheId];
+                }
+            }),
+        );
+
+        this.ongoingRequests[cacheId] = observable;
+
+        this.getFromCache<CoreSitePublicConfigResponse>(method, {}, cachePreSets, false).catch(async () => {
             if (cachePreSets.forceOffline) {
                 // Don't call the WS, just fail.
                 throw new CoreError(
@@ -1568,22 +1618,16 @@ export class CoreSite {
                     throw error;
                 }
             }
+        }).then((response) => {
+            subject.next(response);
+            subject.complete();
+
+            return;
+        }).catch((error) => {
+            subject.error(error);
         });
 
-        this.ongoingRequests[cacheId] = promise;
-
-        // Clear ongoing request after setting the promise (just in case it's already resolved).
-        try {
-            const response = await promise;
-
-            // We pass back a clone of the original object, this may prevent errors if in the callback the object is modified.
-            return response;
-        } finally {
-            // Make sure we don't clear the promise of a newer request that ignores the cache.
-            if (this.ongoingRequests[cacheId] === promise) {
-                delete this.ongoingRequests[cacheId];
-            }
-        }
+        return observable.toPromise();
     }
 
     /**