summary refs log tree commit diff
path: root/synapse/handlers/events.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/events.py99
1 files changed, 77 insertions, 22 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index aabec37fc0..b336b292d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -47,26 +47,81 @@ class EventStreamHandler(BaseHandler):
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
         auth_user = self.hs.parse_userid(auth_user_id)
 
-        if pagin_config.from_token is None:
-            pagin_config.from_token = None
-
-        rm_handler = self.hs.get_handlers().room_member_handler
-        room_ids = yield rm_handler.get_rooms_for_user(auth_user)
-
-        events, tokens = yield self.notifier.get_events_for(
-            auth_user, room_ids, pagin_config, timeout
-        )
-
-        chunks = [
-            e.get_dict() if isinstance(e, SynapseEvent) else e
-            for e in events
-        ]
-
-        chunk = {
-            "chunk": chunks,
-            "start": tokens[0].to_string(),
-            "end": tokens[1].to_string(),
-        }
-
-        defer.returnValue(chunk)
+        try:
+            if auth_user not in self._streams_per_user:
+                self._streams_per_user[auth_user] = 0
+                if auth_user in self._stop_timer_per_user:
+                    self.clock.cancel_call_later(
+                        self._stop_timer_per_user.pop(auth_user))
+                else:
+                    self.distributor.fire(
+                        "started_user_eventstream", auth_user
+                    )
+            self._streams_per_user[auth_user] += 1
+
+
+            if pagin_config.from_token is None:
+                pagin_config.from_token = None
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(auth_user)
+
+            events, tokens = yield self.notifier.get_events_for(
+                auth_user, room_ids, pagin_config, timeout
+            )
+
+            chunks = [
+                e.get_dict() if isinstance(e, SynapseEvent) else e
+                for e in events
+            ]
+
+            chunk = {
+                "chunk": chunks,
+                "start": tokens[0].to_string(),
+                "end": tokens[1].to_string(),
+            }
+
+            defer.returnValue(chunk)
+
+        finally:
+            self._streams_per_user[auth_user] -= 1
+            if not self._streams_per_user[auth_user]:
+                del self._streams_per_user[auth_user]
+
+                # 10 seconds of grace to allow the client to reconnect again
+                #   before we think they're gone
+                def _later():
+                    self.distributor.fire(
+                        "stopped_user_eventstream", auth_user
+                    )
+                    del self._stop_timer_per_user[auth_user]
+
+                self._stop_timer_per_user[auth_user] = (
+                    self.clock.call_later(5, _later)
+                )
+
+
+class EventHandler(BaseHandler):
 
+    @defer.inlineCallbacks
+    def get_event(self, user, event_id):
+        """Retrieve a single specified event.
+
+        Args:
+            user (synapse.types.UserID): The user requesting the event
+            event_id (str): The event ID to obtain.
+        Returns:
+            dict: An event, or None if there is no event matching this ID.
+        Raises:
+            SynapseError if there was a problem retrieving this event, or
+            AuthError if the user does not have the rights to inspect this
+            event.
+        """
+        event = yield self.store.get_event(event_id)
+
+        if not event:
+            defer.returnValue(None)
+            return
+
+        yield self.auth.check(event, raises=True)
+        defer.returnValue(event)