diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 280 |
1 files changed, 187 insertions, 93 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8831d83c56..0a061fe9b2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -217,54 +217,19 @@ class PresenceHandler(BaseHandler): user_id, UserPresenceState.default(user_id) ) - # If the users are ours then we want to set up a bunch of timers - # to time things out. - if self.hs.is_mine_id(user_id): - if new_state.state == PresenceState.ONLINE: - # Idle timer - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_active_ts + IDLE_TIMER - ) - - if new_state.state != PresenceState.OFFLINE: - # User has stopped syncing - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT - ) - - last_federate = new_state.last_federation_update_ts - if now - last_federate > FEDERATION_PING_INTERVAL: - # Been a while since we've poked remote servers - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) - to_federation_ping[user_id] = new_state - - else: - self.wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT - ) + new_state, should_notify, should_ping = handle_update( + prev_state, new_state, + is_mine=self.hs.is_mine_id(user_id), + wheel_timer=self.wheel_timer, + now=now + ) - if new_state.state == PresenceState.ONLINE: - active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY - new_state = new_state.copy_and_replace( - currently_active=active, - ) + self.user_to_current_state[user_id] = new_state - # Check whether the change was something worth notifying about - if should_notify(prev_state, new_state): - new_state.copy_and_replace( - last_federation_update_ts=now, - ) + if should_notify: to_notify[user_id] = new_state - - self.user_to_current_state[user_id] = new_state + elif should_ping: + to_federation_ping[user_id] = new_state # TODO: We should probably ensure there are no races hereafter @@ -296,55 +261,22 @@ class PresenceHandler(BaseHandler): # take any action. users_to_check = self.wheel_timer.fetch(now) - changes = {} # Actual changes we need to notify people about - - for user_id in set(users_to_check): - state = self.user_to_current_state.get(user_id, None) - if not state: - continue - - if state.state == PresenceState.OFFLINE: - # No timeouts are associated with offline states. - continue + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in set(users_to_check) + ] + + changes = handle_timeouts( + states, + is_mine_fn=self.hs.is_mine_id, + user_to_current_state=self.user_to_current_state, + user_to_num_current_syncs=self.user_to_num_current_syncs, + now=now, + ) - if self.hs.is_mine_id(user_id): - if state.state == PresenceState.ONLINE: - if now - state.last_active_ts > IDLE_TIMER: - # Currently online, but last activity ages ago so auto - # idle - changes[user_id] = state.copy_and_replace( - state=PresenceState.UNAVAILABLE, - ) - elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: - # So that we send down a notification that we've - # stopped updating. - changes[user_id] = state - - if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: - # Need to send ping to other servers to ensure they don't - # timeout and set us to offline - changes[user_id] = state - - # If there are have been no sync for a while (and none ongoing), - # set presence to offline - if not self.user_to_num_current_syncs.get(user_id, 0): - if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: - changes[user_id] = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) - else: - # We expect to be poked occaisonally by the other side. - # This is to protect against forgetful/buggy servers, so that - # no one gets stuck online forever. - if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: - # The other side seems to have disappeared. - changes[user_id] = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) - - preserve_fn(self._update_states)(changes.values()) + preserve_fn(self._update_states)(changes) @defer.inlineCallbacks def bump_presence_active_time(self, user): @@ -925,3 +857,165 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): return self.get_new_events(user, from_key=None, include_offline=False) + + +def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): + """Checks the presence of users that have timed out and updates as + appropriate. + + Args: + user_states(list): List of UserPresenceState's to check. + is_mine_fn (fn): Function that returns if a user_id is ours + user_to_num_current_syncs (dict): Mapping of user_id to number of currently + active syncs. + now (int): Current time in ms. + + Returns: + List of UserPresenceState updates + """ + changes = {} # Actual changes we need to notify people about + + for state in user_states: + is_mine = is_mine_fn(state.user_id) + + new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + if new_state: + changes[state.user_id] = new_state + + return changes.values() + + +def handle_timeout(state, is_mine, user_to_num_current_syncs, now): + """Checks the presence of the user to see if any of the timers have elapsed + + Args: + state (UserPresenceState) + is_mine (bool): Whether the user is ours + user_to_num_current_syncs (dict): Mapping of user_id to number of currently + active syncs. + now (int): Current time in ms. + + Returns: + A UserPresenceState update or None if no update. + """ + if state.state == PresenceState.OFFLINE: + # No timeouts are associated with offline states. + return None + + changed = False + user_id = state.user_id + + if is_mine: + if state.state == PresenceState.ONLINE: + if now - state.last_active_ts > IDLE_TIMER: + # Currently online, but last activity ages ago so auto + # idle + state = state.copy_and_replace( + state=PresenceState.UNAVAILABLE, + ) + changed = True + elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # So that we send down a notification that we've + # stopped updating. + changed = True + + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: + # Need to send ping to other servers to ensure they don't + # timeout and set us to offline + changed = True + + # If there are have been no sync for a while (and none ongoing), + # set presence to offline + if not user_to_num_current_syncs.get(user_id, 0): + if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: + state = state.copy_and_replace( + state=PresenceState.OFFLINE, + status_msg=None, + ) + changed = True + else: + # We expect to be poked occaisonally by the other side. + # This is to protect against forgetful/buggy servers, so that + # no one gets stuck online forever. + if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: + # The other side seems to have disappeared. + state = state.copy_and_replace( + state=PresenceState.OFFLINE, + status_msg=None, + ) + changed = True + + return state if changed else None + + +def handle_update(prev_state, new_state, is_mine, wheel_timer, now): + """Given a presence update: + 1. Add any appropriate timers. + 2. Check if we should notify anyone. + + Args: + prev_state (UserPresenceState) + new_state (UserPresenceState) + is_mine (bool): Whether the user is ours + wheel_timer (WheelTimer) + now (int): Time now in ms + + Returns: + 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: + - new_state: is the state to actually persist + - persist_and_notify (bool): whether to persist and notify people + - federation_ping (bool): whether we should send a ping over federation + """ + user_id = new_state.user_id + + persist_and_notify = False + federation_ping = False + + # If the users are ours then we want to set up a bunch of timers + # to time things out. + if is_mine: + if new_state.state == PresenceState.ONLINE: + # Idle timer + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ) + + if new_state.state != PresenceState.OFFLINE: + # User has stopped syncing + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ) + + last_federate = new_state.last_federation_update_ts + if now - last_federate > FEDERATION_PING_INTERVAL: + # Been a while since we've poked remote servers + new_state = new_state.copy_and_replace( + last_federation_update_ts=now, + ) + federation_ping = True + + else: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT + ) + + if new_state.state == PresenceState.ONLINE: + active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY + new_state = new_state.copy_and_replace( + currently_active=active, + ) + + # Check whether the change was something worth notifying about + if should_notify(prev_state, new_state): + new_state = new_state.copy_and_replace( + last_federation_update_ts=now, + ) + persist_and_notify = True + + return new_state, persist_and_notify, federation_ping |