summary refs log tree commit diff
path: root/synapse/handlers/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-01-30 14:17:47 +0000
committerErik Johnston <erik@matrix.org>2015-01-30 14:17:47 +0000
commit84b78c3b5fe3f5e82c3fdf1a2a1c7eed62e1b887 (patch)
tree0475ae5759bd17bc76780a7f376e8a7bd900a430 /synapse/handlers/events.py
parentSplit up replication_layer module into client, server and transaction queue (diff)
parentMerge branch 'new_state_resolution' of github.com:matrix-org/synapse into rej... (diff)
downloadsynapse-84b78c3b5fe3f5e82c3fdf1a2a1c7eed62e1b887.tar.xz
Merge branch 'rejections_storage' of github.com:matrix-org/synapse into replication_split
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r--synapse/handlers/events.py71
1 files changed, 38 insertions, 33 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 01e67b0818..025e7e7e62 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.types import UserID
+from synapse.events.utils import serialize_event
 
 from ._base import BaseHandler
 
@@ -48,24 +49,25 @@ class EventStreamHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0,
-                   as_client_event=True):
+                   as_client_event=True, affect_presence=True):
         auth_user = UserID.from_string(auth_user_id)
 
         try:
-            if auth_user not in self._streams_per_user:
-                self._streams_per_user[auth_user] = 0
-                if auth_user in self._stop_timer_per_user:
-                    try:
-                        self.clock.cancel_call_later(
-                            self._stop_timer_per_user.pop(auth_user)
+            if affect_presence:
+                if auth_user not in self._streams_per_user:
+                    self._streams_per_user[auth_user] = 0
+                    if auth_user in self._stop_timer_per_user:
+                        try:
+                            self.clock.cancel_call_later(
+                                self._stop_timer_per_user.pop(auth_user)
+                            )
+                        except:
+                            logger.exception("Failed to cancel event timer")
+                    else:
+                        yield self.distributor.fire(
+                            "started_user_eventstream", auth_user
                         )
-                    except:
-                        logger.exception("Failed to cancel event timer")
-                else:
-                    yield self.distributor.fire(
-                        "started_user_eventstream", auth_user
-                    )
-            self._streams_per_user[auth_user] += 1
+                self._streams_per_user[auth_user] += 1
 
             if pagin_config.from_token is None:
                 pagin_config.from_token = None
@@ -78,8 +80,10 @@ class EventStreamHandler(BaseHandler):
                     auth_user, room_ids, pagin_config, timeout
                 )
 
+            time_now = self.clock.time_msec()
+
             chunks = [
-                self.hs.serialize_event(e, as_client_event) for e in events
+                serialize_event(e, time_now, as_client_event) for e in events
             ]
 
             chunk = {
@@ -91,27 +95,28 @@ class EventStreamHandler(BaseHandler):
             defer.returnValue(chunk)
 
         finally:
-            self._streams_per_user[auth_user] -= 1
-            if not self._streams_per_user[auth_user]:
-                del self._streams_per_user[auth_user]
-
-                # 10 seconds of grace to allow the client to reconnect again
-                #   before we think they're gone
-                def _later():
-                    logger.debug(
-                        "_later stopped_user_eventstream %s", auth_user
-                    )
+            if affect_presence:
+                self._streams_per_user[auth_user] -= 1
+                if not self._streams_per_user[auth_user]:
+                    del self._streams_per_user[auth_user]
+
+                    # 10 seconds of grace to allow the client to reconnect again
+                    #   before we think they're gone
+                    def _later():
+                        logger.debug(
+                            "_later stopped_user_eventstream %s", auth_user
+                        )
 
-                    self._stop_timer_per_user.pop(auth_user, None)
+                        self._stop_timer_per_user.pop(auth_user, None)
 
-                    yield self.distributor.fire(
-                        "stopped_user_eventstream", auth_user
-                    )
+                        return self.distributor.fire(
+                            "stopped_user_eventstream", auth_user
+                        )
 
-                logger.debug("Scheduling _later: for %s", auth_user)
-                self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(30, _later)
-                )
+                    logger.debug("Scheduling _later: for %s", auth_user)
+                    self._stop_timer_per_user[auth_user] = (
+                        self.clock.call_later(30, _later)
+                    )
 
 
 class EventHandler(BaseHandler):