summary refs log tree commit diff
path: root/synapse/handlers/events.py
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2015-10-12 14:48:09 +0100
committerMark Haines <mjark@negativecurvature.net>2015-10-12 14:48:09 +0100
commit54cb509d647495a4568806106aadcefc69d98322 (patch)
tree87377c3e40581403c2b0010bd106cba20427cb0b /synapse/handlers/events.py
parentMerge pull request #297 from matrix-org/markjh/presence_races (diff)
parentSplit the sections of EventStreamHandler.get_stream that handle presence (diff)
downloadsynapse-54cb509d647495a4568806106aadcefc69d98322.tar.xz
Merge pull request #296 from matrix-org/markjh/eventstream_presence
Split the sections of EventStreamHandler.get_stream that handle presence
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r--synapse/handlers/events.py87
1 files changed, 52 insertions, 35 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 891502c04f..92afa35d57 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -47,6 +47,56 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
+    def started_stream(self, user):
+        """Tells the presence handler that we have started an eventstream for
+        the user:
+
+        Args:
+            user (User): The user who started a stream.
+        Returns:
+            A deferred that completes once their presence has been updated.
+        """
+        if user not in self._streams_per_user:
+            self._streams_per_user[user] = 0
+            if user in self._stop_timer_per_user:
+                try:
+                    self.clock.cancel_call_later(
+                        self._stop_timer_per_user.pop(user)
+                    )
+                except:
+                    logger.exception("Failed to cancel event timer")
+            else:
+                yield self.distributor.fire("started_user_eventstream", user)
+
+        self._streams_per_user[user] += 1
+
+    def stopped_stream(self, user):
+        """If there are no streams for a user this starts a timer that will
+        notify the presence handler that we haven't got an event stream for
+        the user unless the user starts a new stream in 30 seconds.
+
+        Args:
+            user (User): The user who stopped a stream.
+        """
+        self._streams_per_user[user] -= 1
+        if not self._streams_per_user[user]:
+            del self._streams_per_user[user]
+
+            # 30 seconds of grace to allow the client to reconnect again
+            #   before we think they're gone
+            def _later():
+                logger.debug("_later stopped_user_eventstream %s", user)
+
+                self._stop_timer_per_user.pop(user, None)
+
+                return self.distributor.fire("stopped_user_eventstream", user)
+
+            logger.debug("Scheduling _later: for %s", user)
+            self._stop_timer_per_user[user] = (
+                self.clock.call_later(30, _later)
+            )
+
+    @defer.inlineCallbacks
     @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0,
                    as_client_event=True, affect_presence=True,
@@ -59,20 +109,7 @@ class EventStreamHandler(BaseHandler):
 
         try:
             if affect_presence:
-                if auth_user not in self._streams_per_user:
-                    self._streams_per_user[auth_user] = 0
-                    if auth_user in self._stop_timer_per_user:
-                        try:
-                            self.clock.cancel_call_later(
-                                self._stop_timer_per_user.pop(auth_user)
-                            )
-                        except:
-                            logger.exception("Failed to cancel event timer")
-                    else:
-                        yield self.distributor.fire(
-                            "started_user_eventstream", auth_user
-                        )
-                self._streams_per_user[auth_user] += 1
+                yield self.started_stream(auth_user)
 
             rm_handler = self.hs.get_handlers().room_member_handler
 
@@ -114,27 +151,7 @@ class EventStreamHandler(BaseHandler):
 
         finally:
             if affect_presence:
-                self._streams_per_user[auth_user] -= 1
-                if not self._streams_per_user[auth_user]:
-                    del self._streams_per_user[auth_user]
-
-                    # 10 seconds of grace to allow the client to reconnect again
-                    #   before we think they're gone
-                    def _later():
-                        logger.debug(
-                            "_later stopped_user_eventstream %s", auth_user
-                        )
-
-                        self._stop_timer_per_user.pop(auth_user, None)
-
-                        return self.distributor.fire(
-                            "stopped_user_eventstream", auth_user
-                        )
-
-                    logger.debug("Scheduling _later: for %s", auth_user)
-                    self._stop_timer_per_user[auth_user] = (
-                        self.clock.call_later(30, _later)
-                    )
+                self.stopped_stream(auth_user)
 
 
 class EventHandler(BaseHandler):