diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/events.py | 87 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 11 |
2 files changed, 60 insertions, 38 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 891502c04f..92afa35d57 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -47,6 +47,56 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() @defer.inlineCallbacks + def started_stream(self, user): + """Tells the presence handler that we have started an eventstream for + the user: + + Args: + user (User): The user who started a stream. + Returns: + A deferred that completes once their presence has been updated. + """ + if user not in self._streams_per_user: + self._streams_per_user[user] = 0 + if user in self._stop_timer_per_user: + try: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(user) + ) + except: + logger.exception("Failed to cancel event timer") + else: + yield self.distributor.fire("started_user_eventstream", user) + + self._streams_per_user[user] += 1 + + def stopped_stream(self, user): + """If there are no streams for a user this starts a timer that will + notify the presence handler that we haven't got an event stream for + the user unless the user starts a new stream in 30 seconds. + + Args: + user (User): The user who stopped a stream. + """ + self._streams_per_user[user] -= 1 + if not self._streams_per_user[user]: + del self._streams_per_user[user] + + # 30 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + logger.debug("_later stopped_user_eventstream %s", user) + + self._stop_timer_per_user.pop(user, None) + + return self.distributor.fire("stopped_user_eventstream", user) + + logger.debug("Scheduling _later: for %s", user) + self._stop_timer_per_user[user] = ( + self.clock.call_later(30, _later) + ) + + @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, as_client_event=True, affect_presence=True, @@ -59,20 +109,7 @@ class EventStreamHandler(BaseHandler): try: if affect_presence: - if auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user) - ) - except: - logger.exception("Failed to cancel event timer") - else: - yield self.distributor.fire( - "started_user_eventstream", auth_user - ) - self._streams_per_user[auth_user] += 1 + yield self.started_stream(auth_user) rm_handler = self.hs.get_handlers().room_member_handler @@ -114,27 +151,7 @@ class EventStreamHandler(BaseHandler): finally: if affect_presence: - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug( - "_later stopped_user_eventstream %s", auth_user - ) - - self._stop_timer_per_user.pop(auth_user, None) - - return self.distributor.fire( - "stopped_user_eventstream", auth_user - ) - - logger.debug("Scheduling _later: for %s", auth_user) - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(30, _later) - ) + self.stopped_stream(auth_user) class EventHandler(BaseHandler): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e91e81831e..ce60642127 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -378,7 +378,7 @@ class PresenceHandler(BaseHandler): # TODO(paul): perform a presence push as part of start/stop poll so # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + yield self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -422,12 +422,12 @@ class PresenceHandler(BaseHandler): @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state - self.set_state(user, user, {"presence": PresenceState.ONLINE}) + return self.set_state(user, user, {"presence": PresenceState.ONLINE}) @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state - self.set_state(user, user, {"presence": PresenceState.OFFLINE}) + return self.set_state(user, user, {"presence": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): @@ -1263,6 +1263,11 @@ class UserPresenceCache(object): self.state = {"presence": PresenceState.OFFLINE} self.serial = None + def __repr__(self): + return "UserPresenceCache(state=%r, serial=%r)" % ( + self.state, self.serial + ) + def update(self, state, serial): assert("mtime_age" not in state) |