summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/presence.py280
1 files changed, 187 insertions, 93 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8831d83c56..0a061fe9b2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -217,54 +217,19 @@ class PresenceHandler(BaseHandler):
                 user_id, UserPresenceState.default(user_id)
             )
 
-            # If the users are ours then we want to set up a bunch of timers
-            # to time things out.
-            if self.hs.is_mine_id(user_id):
-                if new_state.state == PresenceState.ONLINE:
-                    # Idle timer
-                    self.wheel_timer.insert(
-                        now=now,
-                        obj=user_id,
-                        then=new_state.last_active_ts + IDLE_TIMER
-                    )
-
-                if new_state.state != PresenceState.OFFLINE:
-                    # User has stopped syncing
-                    self.wheel_timer.insert(
-                        now=now,
-                        obj=user_id,
-                        then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
-                    )
-
-                    last_federate = new_state.last_federation_update_ts
-                    if now - last_federate > FEDERATION_PING_INTERVAL:
-                        # Been a while since we've poked remote servers
-                        new_state = new_state.copy_and_replace(
-                            last_federation_update_ts=now,
-                        )
-                        to_federation_ping[user_id] = new_state
-
-            else:
-                self.wheel_timer.insert(
-                    now=now,
-                    obj=user_id,
-                    then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
-                )
+            new_state, should_notify, should_ping = handle_update(
+                prev_state, new_state,
+                is_mine=self.hs.is_mine_id(user_id),
+                wheel_timer=self.wheel_timer,
+                now=now
+            )
 
-            if new_state.state == PresenceState.ONLINE:
-                active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
-                new_state = new_state.copy_and_replace(
-                    currently_active=active,
-                )
+            self.user_to_current_state[user_id] = new_state
 
-            # Check whether the change was something worth notifying about
-            if should_notify(prev_state, new_state):
-                new_state.copy_and_replace(
-                    last_federation_update_ts=now,
-                )
+            if should_notify:
                 to_notify[user_id] = new_state
-
-            self.user_to_current_state[user_id] = new_state
+            elif should_ping:
+                to_federation_ping[user_id] = new_state
 
         # TODO: We should probably ensure there are no races hereafter
 
@@ -296,55 +261,22 @@ class PresenceHandler(BaseHandler):
         # take any action.
         users_to_check = self.wheel_timer.fetch(now)
 
-        changes = {}  # Actual changes we need to notify people about
-
-        for user_id in set(users_to_check):
-            state = self.user_to_current_state.get(user_id, None)
-            if not state:
-                continue
-
-            if state.state == PresenceState.OFFLINE:
-                # No timeouts are associated with offline states.
-                continue
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in set(users_to_check)
+        ]
+
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.hs.is_mine_id,
+            user_to_current_state=self.user_to_current_state,
+            user_to_num_current_syncs=self.user_to_num_current_syncs,
+            now=now,
+        )
 
-            if self.hs.is_mine_id(user_id):
-                if state.state == PresenceState.ONLINE:
-                    if now - state.last_active_ts > IDLE_TIMER:
-                        # Currently online, but last activity ages ago so auto
-                        # idle
-                        changes[user_id] = state.copy_and_replace(
-                            state=PresenceState.UNAVAILABLE,
-                        )
-                    elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
-                        # So that we send down a notification that we've
-                        # stopped updating.
-                        changes[user_id] = state
-
-                if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
-                    # Need to send ping to other servers to ensure they don't
-                    # timeout and set us to offline
-                    changes[user_id] = state
-
-                # If there are have been no sync for a while (and none ongoing),
-                # set presence to offline
-                if not self.user_to_num_current_syncs.get(user_id, 0):
-                    if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
-                        changes[user_id] = state.copy_and_replace(
-                            state=PresenceState.OFFLINE,
-                            status_msg=None,
-                        )
-            else:
-                # We expect to be poked occaisonally by the other side.
-                # This is to protect against forgetful/buggy servers, so that
-                # no one gets stuck online forever.
-                if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
-                    # The other side seems to have disappeared.
-                    changes[user_id] = state.copy_and_replace(
-                        state=PresenceState.OFFLINE,
-                        status_msg=None,
-                    )
-
-        preserve_fn(self._update_states)(changes.values())
+        preserve_fn(self._update_states)(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -925,3 +857,165 @@ class PresenceEventSource(object):
 
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
+
+
+def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+    """Checks the presence of users that have timed out and updates as
+    appropriate.
+
+    Args:
+        user_states(list): List of UserPresenceState's to check.
+        is_mine_fn (fn): Function that returns if a user_id is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
+
+    Returns:
+        List of UserPresenceState updates
+    """
+    changes = {}  # Actual changes we need to notify people about
+
+    for state in user_states:
+        is_mine = is_mine_fn(state.user_id)
+
+        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        if new_state:
+            changes[state.user_id] = new_state
+
+    return changes.values()
+
+
+def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+    """Checks the presence of the user to see if any of the timers have elapsed
+
+    Args:
+        state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
+
+    Returns:
+        A UserPresenceState update or None if no update.
+    """
+    if state.state == PresenceState.OFFLINE:
+        # No timeouts are associated with offline states.
+        return None
+
+    changed = False
+    user_id = state.user_id
+
+    if is_mine:
+        if state.state == PresenceState.ONLINE:
+            if now - state.last_active_ts > IDLE_TIMER:
+                # Currently online, but last activity ages ago so auto
+                # idle
+                state = state.copy_and_replace(
+                    state=PresenceState.UNAVAILABLE,
+                )
+                changed = True
+            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+                # So that we send down a notification that we've
+                # stopped updating.
+                changed = True
+
+        if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
+            # Need to send ping to other servers to ensure they don't
+            # timeout and set us to offline
+            changed = True
+
+        # If there are have been no sync for a while (and none ongoing),
+        # set presence to offline
+        if not user_to_num_current_syncs.get(user_id, 0):
+            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+                state = state.copy_and_replace(
+                    state=PresenceState.OFFLINE,
+                    status_msg=None,
+                )
+                changed = True
+    else:
+        # We expect to be poked occaisonally by the other side.
+        # This is to protect against forgetful/buggy servers, so that
+        # no one gets stuck online forever.
+        if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
+            # The other side seems to have disappeared.
+            state = state.copy_and_replace(
+                state=PresenceState.OFFLINE,
+                status_msg=None,
+            )
+            changed = True
+
+    return state if changed else None
+
+
+def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
+    """Given a presence update:
+        1. Add any appropriate timers.
+        2. Check if we should notify anyone.
+
+    Args:
+        prev_state (UserPresenceState)
+        new_state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        wheel_timer (WheelTimer)
+        now (int): Time now in ms
+
+    Returns:
+        3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
+            - new_state: is the state to actually persist
+            - persist_and_notify (bool): whether to persist and notify people
+            - federation_ping (bool): whether we should send a ping over federation
+    """
+    user_id = new_state.user_id
+
+    persist_and_notify = False
+    federation_ping = False
+
+    # If the users are ours then we want to set up a bunch of timers
+    # to time things out.
+    if is_mine:
+        if new_state.state == PresenceState.ONLINE:
+            # Idle timer
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
+            )
+
+        if new_state.state != PresenceState.OFFLINE:
+            # User has stopped syncing
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+
+            last_federate = new_state.last_federation_update_ts
+            if now - last_federate > FEDERATION_PING_INTERVAL:
+                # Been a while since we've poked remote servers
+                new_state = new_state.copy_and_replace(
+                    last_federation_update_ts=now,
+                )
+                federation_ping = True
+
+    else:
+        wheel_timer.insert(
+            now=now,
+            obj=user_id,
+            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+        )
+
+    if new_state.state == PresenceState.ONLINE:
+        active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
+        new_state = new_state.copy_and_replace(
+            currently_active=active,
+        )
+
+    # Check whether the change was something worth notifying about
+    if should_notify(prev_state, new_state):
+        new_state = new_state.copy_and_replace(
+            last_federation_update_ts=now,
+        )
+        persist_and_notify = True
+
+    return new_state, persist_and_notify, federation_ping