summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py53
1 files changed, 43 insertions, 10 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f592ad352e..7b45c87a96 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,7 +16,7 @@
 
 import synapse
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
@@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -135,6 +135,8 @@ class SynchrotronSlavedStore(
         RoomMemberStore.__dict__["who_forgot_in_room"]
     )
 
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
 
 class SynchrotronPresence(object):
     def __init__(self, hs):
@@ -153,6 +155,13 @@ class SynchrotronPresence(object):
         self.process_id = random_string(16)
         logger.info("Presence process_id is %r", self.process_id)
 
+        self._sending_sync = False
+        self._need_to_send_sync = False
+        self.clock.looping_call(
+            self._send_syncing_users_regularly,
+            UPDATE_SYNCING_USERS_MS,
+        )
+
     def set_state(self, user, state):
         # TODO Hows this supposed to work?
         pass
@@ -165,12 +174,10 @@ class SynchrotronPresence(object):
         if affect_presence:
             curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
-            # TODO: Send this less frequently.
-            # TODO: Make sure this doesn't race. Currently we can lose updates
-            # if two users come online in quick sucession and the second http
-            # to the master completes before the first.
-            # TODO: Don't block the sync request on this HTTP hit.
-            yield self._send_syncing_users()
+            prev_states = yield self.current_state_for_users([user_id])
+            if prev_states[user_id].state == PresenceState.OFFLINE:
+                # TODO: Don't block the sync request on this HTTP hit.
+                yield self._send_syncing_users_now()
 
         def _end():
             if affect_presence:
@@ -185,8 +192,24 @@ class SynchrotronPresence(object):
 
         defer.returnValue(_user_syncing())
 
-    def _send_syncing_users(self):
-        return self.http_client.post_json_get_json(self.syncing_users_url, {
+    def _send_syncing_users_regularly(self):
+        # Only send an update if we aren't in the middle of sending one.
+        if not self._sending_sync:
+            preserve_fn(self._send_syncing_users_now)()
+
+    @defer.inlineCallbacks
+    def _send_syncing_users_now(self):
+        if self._sending_sync:
+            # We don't want to race with sending another update.
+            # Instead we wait for that update to finish and send another
+            # update afterwards.
+            self._need_to_send_sync = True
+            return
+
+        # Flag that we are sending an update.
+        self._sending_sync = True
+
+        yield self.http_client.post_json_get_json(self.syncing_users_url, {
             "process_id": self.process_id,
             "syncing_users": [
                 user_id for user_id, count in self.user_to_num_current_syncs.items()
@@ -194,6 +217,16 @@ class SynchrotronPresence(object):
             ],
         })
 
+        # Unset the flag as we are no longer sending an update.
+        self._sending_sync = False
+        if self._need_to_send_sync:
+            # If something happened while we were sending the update then
+            # we might need to send another update.
+            # TODO: Check if the update that was sent matches the current state
+            # as we only need to send an update if they are different.
+            self._need_to_send_sync = False
+            yield self._send_syncing_users_now()
+
     def process_replication(self, result):
         stream = result.get("presence", {"rows": []})
         for row in stream["rows"]: