diff options
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r-- | synapse/handlers/events.py | 99 |
1 files changed, 77 insertions, 22 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index aabec37fc0..b336b292d3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -47,26 +47,81 @@ class EventStreamHandler(BaseHandler): def get_stream(self, auth_user_id, pagin_config, timeout=0): auth_user = self.hs.parse_userid(auth_user_id) - if pagin_config.from_token is None: - pagin_config.from_token = None - - rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_rooms_for_user(auth_user) - - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) - - chunks = [ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in events - ] - - chunk = { - "chunk": chunks, - "start": tokens[0].to_string(), - "end": tokens[1].to_string(), - } - - defer.returnValue(chunk) + try: + if auth_user not in self._streams_per_user: + self._streams_per_user[auth_user] = 0 + if auth_user in self._stop_timer_per_user: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user)) + else: + self.distributor.fire( + "started_user_eventstream", auth_user + ) + self._streams_per_user[auth_user] += 1 + + + if pagin_config.from_token is None: + pagin_config.from_token = None + + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) + + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + + chunk = { + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), + } + + defer.returnValue(chunk) + + finally: + self._streams_per_user[auth_user] -= 1 + if not self._streams_per_user[auth_user]: + del self._streams_per_user[auth_user] + + # 10 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + self.distributor.fire( + "stopped_user_eventstream", auth_user + ) + del self._stop_timer_per_user[auth_user] + + self._stop_timer_per_user[auth_user] = ( + self.clock.call_later(5, _later) + ) + + +class EventHandler(BaseHandler): + @defer.inlineCallbacks + def get_event(self, user, event_id): + """Retrieve a single specified event. + + Args: + user (synapse.types.UserID): The user requesting the event + event_id (str): The event ID to obtain. + Returns: + dict: An event, or None if there is no event matching this ID. + Raises: + SynapseError if there was a problem retrieving this event, or + AuthError if the user does not have the rights to inspect this + event. + """ + event = yield self.store.get_event(event_id) + + if not event: + defer.returnValue(None) + return + + yield self.auth.check(event, raises=True) + defer.returnValue(event) |