summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-06-01 15:57:46 +0100
committerMark Haines <mjark@negativecurvature.net>2016-06-01 15:57:46 +0100
commitb06b10c8e3b3ad890886dea7b948eae03505d71a (patch)
treeadb1a9f2b016faa5d30ba61510e342abe01b7067
parentMerge pull request #808 from matrix-org/dbkr/room_list_spider (diff)
downloadsynapse-b06b10c8e3b3ad890886dea7b948eae03505d71a.tar.xz
Add infrastructure to the presence handler to track sync requests in external processes
-rw-r--r--synapse/handlers/presence.py76
-rw-r--r--tests/handlers/test_presence.py16
2 files changed, 72 insertions, 20 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37f57301fb..047da7ff25 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -158,10 +158,21 @@ class PresenceHandler(object):
         self.serial_to_user = {}
         self._next_serial = 1
 
-        # Keeps track of the number of *ongoing* syncs. While this is non zero
-        # a user will never go offline.
+        # Keeps track of the number of *ongoing* syncs on this process. While
+        # this is non zero a user will never go offline.
         self.user_to_num_current_syncs = {}
 
+        # Keeps track of the number of *ongoing* syncs on other processes.
+        # While any sync is ongoing on another process the user will never
+        # go offline.
+        # Each process has a unique identifier and an update frequency. If
+        # no update is received from that process within the update period then
+        # we assume that all the sync requests on that process have stopped.
+        # Stored as a dict from process_id to set of user_id, and a dict of
+        # process_id to millisecond timestamp last updated.
+        self.external_process_to_current_syncs = {}
+        self.external_process_last_updated_ms = []
+
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
@@ -286,7 +297,7 @@ class PresenceHandler(object):
             changes = handle_timeouts(
                 states,
                 is_mine_fn=self.is_mine_id,
-                user_to_num_current_syncs=self.user_to_num_current_syncs,
+                syncing_users=self.get_syncing_users(),
                 now=now,
             )
 
@@ -363,6 +374,51 @@ class PresenceHandler(object):
 
         defer.returnValue(_user_syncing())
 
+    def get_currently_syncing_users(self):
+        syncing_user_ids = {
+            user_id for user_id, count in self.user_to_num_current_syncs.items()
+            if count
+        }
+        syncing_user_ids.update(self.external_process_to_current_syncs.values())
+        return syncing_user_ids
+
+    @defer.inlineCallbacks
+    def update_external_syncs(self, process_id, syncing_user_ids):
+        time_now_ms = self.clock.time_msec()
+        prev_syncing_user_ids = (
+            self.external_process_to_current_syncs.get(process_id, set())
+        )
+        prev_states = yield self.current_state_for_users(
+            syncing_user_ids + prev_syncing_user_ids
+        )
+        updates = []
+
+        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+            prev_state = prev_states[new_user_id]
+            if prev_state.state == PresenceState.OFFLINE:
+                updates.append(prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=time_now_ms,
+                    last_user_sync_ts=time_now_ms,
+                ))
+            else:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                ))
+
+        for old_user_id in prev_syncing_user_ids:
+            prev_state = prev_states[old_user_id]
+            updates.append(prev_state.copy_and_replace(
+                last_user_sync_ts=time_now_ms,
+            ))
+
+        yield self._update_states(updates)
+
+        self.external_process_last_updated_ms = time_now_ms
+        self.external_process_to_current_syncs[process_id] = syncing_user_ids
+        if not syncing_user_ids:
+            del self.external_process_to_current_syncs[process_id]
+
     @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
@@ -935,15 +991,14 @@ class PresenceEventSource(object):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
 
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
         user_states(list): List of UserPresenceState's to check.
         is_mine_fn (fn): Function that returns if a user_id is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -954,21 +1009,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
     for state in user_states:
         is_mine = is_mine_fn(state.user_id)
 
-        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
         if new_state:
             changes[state.user_id] = new_state
 
     return changes.values()
 
 
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
         state (UserPresenceState)
         is_mine (bool): Whether the user is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -1002,7 +1056,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
 
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
-        if not user_to_num_current_syncs.get(user_id, 0):
+        if user_id not in syncing_user_ids:
             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 87c795fcfa..b531ba8540 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={}, now=now
+            state, is_mine=True, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={}, now=now
+            state, is_mine=True, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={
-                user_id: 1,
-            }, now=now
+            state, is_mine=True, syncing_user_ids=set([user_id]), now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={}, now=now
+            state, is_mine=True, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={}, now=now
+            state, is_mine=True, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNone(new_state)
@@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=False, user_to_num_current_syncs={}, now=now
+            state, is_mine=False, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         )
 
         new_state = handle_timeout(
-            state, is_mine=True, user_to_num_current_syncs={}, now=now
+            state, is_mine=True, syncing_user_ids=set(), now=now
         )
 
         self.assertIsNotNone(new_state)