diff --git a/webclient/components/matrix/event-stream-service.js b/webclient/components/matrix/event-stream-service.js
index 1cb9960b9a..97018df881 100644
--- a/webclient/components/matrix/event-stream-service.js
+++ b/webclient/components/matrix/event-stream-service.js
@@ -19,19 +19,21 @@ limitations under the License.
/*
This service manages where in the event stream the web client currently is and
provides methods to resume/pause/stop the event stream. This service is not
-responsible for parsing event data. For that, see the eventDataHandler.
+responsible for parsing event data. For that, see the eventHandlerService.
*/
angular.module('eventStreamService', [])
-.factory('eventStreamService', ['matrixService', function(matrixService) {
+.factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) {
var END = "END";
var START = "START";
var TIMEOUT_MS = 5000;
+ var ERR_TIMEOUT_MS = 5000;
var settings = {
from: "END",
to: undefined,
limit: undefined,
- shouldPoll: true
+ shouldPoll: true,
+ isActive: false
};
// interrupts the stream. Only valid if there is a stream conneciton
@@ -39,19 +41,69 @@ angular.module('eventStreamService', [])
var interrupt = function(shouldPoll) {
console.log("p[EventStream] interrupt("+shouldPoll+") "+
JSON.stringify(settings));
+ settings.shouldPoll = shouldPoll;
+ settings.isActive = false;
};
var saveStreamSettings = function() {
localStorage.setItem("streamSettings", JSON.stringify(settings));
};
+ var startEventStream = function() {
+ settings.shouldPoll = true;
+ settings.isActive = true;
+ var deferred = $q.defer();
+ // run the stream from the latest token
+ matrixService.getEventStream(settings.from, TIMEOUT_MS).then(
+ function(response) {
+ if (!settings.isActive) {
+ console.log("[EventStream] Got response but now inactive. Dropping data.");
+ return;
+ }
+
+ settings.from = response.data.end;
+
+ console.log("[EventStream] Got response from "+settings.from+" to "+response.data.end);
+ eventHandlerService.handleEvents(response.data.chunk, true);
+
+ deferred.resolve(response);
+
+ if (settings.shouldPoll) {
+ $timeout(startEventStream, 0);
+ }
+ else {
+ console.log("[EventStream] Stopping poll.");
+ }
+ },
+ function(error) {
+ if (error.status == 403) {
+ settings.shouldPoll = false;
+ }
+
+ deferred.reject(error);
+
+ if (settings.shouldPoll) {
+ $timeout(startEventStream, ERR_TIMEOUT_MS);
+ }
+ else {
+ console.log("[EventStream] Stopping polling.");
+ }
+ }
+ );
+ return deferred.promise;
+ };
+
return {
// resume the stream from whereever it last got up to. Typically used
// when the page is opened.
resume: function() {
+ if (settings.isActive) {
+ console.log("[EventStream] Already active, ignoring resume()");
+ return;
+ }
+
console.log("[EventStream] resume "+JSON.stringify(settings));
- // run the stream from the latest token
- return matrixService.getEventStream(settings.from, TIMEOUT_MS);
+ return startEventStream();
},
// pause the stream. Resuming it will continue from the current position
|