diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8831d83c56..0a061fe9b2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -217,54 +217,19 @@ class PresenceHandler(BaseHandler):
user_id, UserPresenceState.default(user_id)
)
- # If the users are ours then we want to set up a bunch of timers
- # to time things out.
- if self.hs.is_mine_id(user_id):
- if new_state.state == PresenceState.ONLINE:
- # Idle timer
- self.wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_active_ts + IDLE_TIMER
- )
-
- if new_state.state != PresenceState.OFFLINE:
- # User has stopped syncing
- self.wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
- )
-
- last_federate = new_state.last_federation_update_ts
- if now - last_federate > FEDERATION_PING_INTERVAL:
- # Been a while since we've poked remote servers
- new_state = new_state.copy_and_replace(
- last_federation_update_ts=now,
- )
- to_federation_ping[user_id] = new_state
-
- else:
- self.wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
- )
+ new_state, should_notify, should_ping = handle_update(
+ prev_state, new_state,
+ is_mine=self.hs.is_mine_id(user_id),
+ wheel_timer=self.wheel_timer,
+ now=now
+ )
- if new_state.state == PresenceState.ONLINE:
- active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
- new_state = new_state.copy_and_replace(
- currently_active=active,
- )
+ self.user_to_current_state[user_id] = new_state
- # Check whether the change was something worth notifying about
- if should_notify(prev_state, new_state):
- new_state.copy_and_replace(
- last_federation_update_ts=now,
- )
+ if should_notify:
to_notify[user_id] = new_state
-
- self.user_to_current_state[user_id] = new_state
+ elif should_ping:
+ to_federation_ping[user_id] = new_state
# TODO: We should probably ensure there are no races hereafter
@@ -296,55 +261,22 @@ class PresenceHandler(BaseHandler):
# take any action.
users_to_check = self.wheel_timer.fetch(now)
- changes = {} # Actual changes we need to notify people about
-
- for user_id in set(users_to_check):
- state = self.user_to_current_state.get(user_id, None)
- if not state:
- continue
-
- if state.state == PresenceState.OFFLINE:
- # No timeouts are associated with offline states.
- continue
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in set(users_to_check)
+ ]
+
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.hs.is_mine_id,
+ user_to_current_state=self.user_to_current_state,
+ user_to_num_current_syncs=self.user_to_num_current_syncs,
+ now=now,
+ )
- if self.hs.is_mine_id(user_id):
- if state.state == PresenceState.ONLINE:
- if now - state.last_active_ts > IDLE_TIMER:
- # Currently online, but last activity ages ago so auto
- # idle
- changes[user_id] = state.copy_and_replace(
- state=PresenceState.UNAVAILABLE,
- )
- elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
- # So that we send down a notification that we've
- # stopped updating.
- changes[user_id] = state
-
- if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
- # Need to send ping to other servers to ensure they don't
- # timeout and set us to offline
- changes[user_id] = state
-
- # If there are have been no sync for a while (and none ongoing),
- # set presence to offline
- if not self.user_to_num_current_syncs.get(user_id, 0):
- if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
- changes[user_id] = state.copy_and_replace(
- state=PresenceState.OFFLINE,
- status_msg=None,
- )
- else:
- # We expect to be poked occaisonally by the other side.
- # This is to protect against forgetful/buggy servers, so that
- # no one gets stuck online forever.
- if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
- # The other side seems to have disappeared.
- changes[user_id] = state.copy_and_replace(
- state=PresenceState.OFFLINE,
- status_msg=None,
- )
-
- preserve_fn(self._update_states)(changes.values())
+ preserve_fn(self._update_states)(changes)
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -925,3 +857,165 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)
+
+
+def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+ """Checks the presence of users that have timed out and updates as
+ appropriate.
+
+ Args:
+ user_states(list): List of UserPresenceState's to check.
+ is_mine_fn (fn): Function that returns if a user_id is ours
+ user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+ active syncs.
+ now (int): Current time in ms.
+
+ Returns:
+ List of UserPresenceState updates
+ """
+ changes = {} # Actual changes we need to notify people about
+
+ for state in user_states:
+ is_mine = is_mine_fn(state.user_id)
+
+ new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+ if new_state:
+ changes[state.user_id] = new_state
+
+ return changes.values()
+
+
+def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+ """Checks the presence of the user to see if any of the timers have elapsed
+
+ Args:
+ state (UserPresenceState)
+ is_mine (bool): Whether the user is ours
+ user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+ active syncs.
+ now (int): Current time in ms.
+
+ Returns:
+ A UserPresenceState update or None if no update.
+ """
+ if state.state == PresenceState.OFFLINE:
+ # No timeouts are associated with offline states.
+ return None
+
+ changed = False
+ user_id = state.user_id
+
+ if is_mine:
+ if state.state == PresenceState.ONLINE:
+ if now - state.last_active_ts > IDLE_TIMER:
+ # Currently online, but last activity ages ago so auto
+ # idle
+ state = state.copy_and_replace(
+ state=PresenceState.UNAVAILABLE,
+ )
+ changed = True
+ elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ # So that we send down a notification that we've
+ # stopped updating.
+ changed = True
+
+ if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
+ # Need to send ping to other servers to ensure they don't
+ # timeout and set us to offline
+ changed = True
+
+ # If there are have been no sync for a while (and none ongoing),
+ # set presence to offline
+ if not user_to_num_current_syncs.get(user_id, 0):
+ if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+ state = state.copy_and_replace(
+ state=PresenceState.OFFLINE,
+ status_msg=None,
+ )
+ changed = True
+ else:
+ # We expect to be poked occaisonally by the other side.
+ # This is to protect against forgetful/buggy servers, so that
+ # no one gets stuck online forever.
+ if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
+ # The other side seems to have disappeared.
+ state = state.copy_and_replace(
+ state=PresenceState.OFFLINE,
+ status_msg=None,
+ )
+ changed = True
+
+ return state if changed else None
+
+
+def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
+ """Given a presence update:
+ 1. Add any appropriate timers.
+ 2. Check if we should notify anyone.
+
+ Args:
+ prev_state (UserPresenceState)
+ new_state (UserPresenceState)
+ is_mine (bool): Whether the user is ours
+ wheel_timer (WheelTimer)
+ now (int): Time now in ms
+
+ Returns:
+ 3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
+ - new_state: is the state to actually persist
+ - persist_and_notify (bool): whether to persist and notify people
+ - federation_ping (bool): whether we should send a ping over federation
+ """
+ user_id = new_state.user_id
+
+ persist_and_notify = False
+ federation_ping = False
+
+ # If the users are ours then we want to set up a bunch of timers
+ # to time things out.
+ if is_mine:
+ if new_state.state == PresenceState.ONLINE:
+ # Idle timer
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_active_ts + IDLE_TIMER
+ )
+
+ if new_state.state != PresenceState.OFFLINE:
+ # User has stopped syncing
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+ )
+
+ last_federate = new_state.last_federation_update_ts
+ if now - last_federate > FEDERATION_PING_INTERVAL:
+ # Been a while since we've poked remote servers
+ new_state = new_state.copy_and_replace(
+ last_federation_update_ts=now,
+ )
+ federation_ping = True
+
+ else:
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+ )
+
+ if new_state.state == PresenceState.ONLINE:
+ active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
+ new_state = new_state.copy_and_replace(
+ currently_active=active,
+ )
+
+ # Check whether the change was something worth notifying about
+ if should_notify(prev_state, new_state):
+ new_state = new_state.copy_and_replace(
+ last_federation_update_ts=now,
+ )
+ persist_and_notify = True
+
+ return new_state, persist_and_notify, federation_ping
|