summary refs log tree commit diff
path: root/webclient/components
diff options
context:
space:
mode:
Diffstat (limited to 'webclient/components')
-rw-r--r--webclient/components/matrix/event-stream-service.js62
1 files changed, 57 insertions, 5 deletions
diff --git a/webclient/components/matrix/event-stream-service.js b/webclient/components/matrix/event-stream-service.js
index 1cb9960b9a..97018df881 100644
--- a/webclient/components/matrix/event-stream-service.js
+++ b/webclient/components/matrix/event-stream-service.js
@@ -19,19 +19,21 @@ limitations under the License.
 /*
 This service manages where in the event stream the web client currently is and 
 provides methods to resume/pause/stop the event stream. This service is not
-responsible for parsing event data. For that, see the eventDataHandler.
+responsible for parsing event data. For that, see the eventHandlerService.
 */
 angular.module('eventStreamService', [])
-.factory('eventStreamService', ['matrixService', function(matrixService) {
+.factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) {
     var END = "END";
     var START = "START";
     var TIMEOUT_MS = 5000;
+    var ERR_TIMEOUT_MS = 5000;
     
     var settings = {
         from: "END",
         to: undefined,
         limit: undefined,
-        shouldPoll: true
+        shouldPoll: true,
+        isActive: false
     };
     
     // interrupts the stream. Only valid if there is a stream conneciton 
@@ -39,19 +41,69 @@ angular.module('eventStreamService', [])
     var interrupt = function(shouldPoll) {
         console.log("p[EventStream] interrupt("+shouldPoll+") "+
                     JSON.stringify(settings));
+        settings.shouldPoll = shouldPoll;
+        settings.isActive = false;
     };
     
     var saveStreamSettings = function() {
         localStorage.setItem("streamSettings", JSON.stringify(settings));
     };
     
+    var startEventStream = function() {
+        settings.shouldPoll = true;
+        settings.isActive = true;
+        var deferred = $q.defer();
+        // run the stream from the latest token
+        matrixService.getEventStream(settings.from, TIMEOUT_MS).then(
+            function(response) {
+                if (!settings.isActive) {
+                    console.log("[EventStream] Got response but now inactive. Dropping data.");
+                    return;
+                }
+                
+                settings.from = response.data.end;
+                
+                console.log("[EventStream] Got response from "+settings.from+" to "+response.data.end);
+                eventHandlerService.handleEvents(response.data.chunk, true);
+                
+                deferred.resolve(response);
+                
+                if (settings.shouldPoll) {
+                    $timeout(startEventStream, 0);
+                }
+                else {
+                    console.log("[EventStream] Stopping poll.");
+                }
+            },
+            function(error) {
+                if (error.status == 403) {
+                    settings.shouldPoll = false;
+                }
+                
+                deferred.reject(error);
+                
+                if (settings.shouldPoll) {
+                    $timeout(startEventStream, ERR_TIMEOUT_MS);
+                }
+                else {
+                    console.log("[EventStream] Stopping polling.");
+                }
+            }
+        );
+        return deferred.promise;
+    };
+    
     return {
         // resume the stream from whereever it last got up to. Typically used
         // when the page is opened.
         resume: function() {
+            if (settings.isActive) {
+                console.log("[EventStream] Already active, ignoring resume()");
+                return;
+            }
+        
             console.log("[EventStream] resume "+JSON.stringify(settings));
-            // run the stream from the latest token
-            return matrixService.getEventStream(settings.from, TIMEOUT_MS);
+            return startEventStream();
         },
         
         // pause the stream. Resuming it will continue from the current position