Stop the master relaying USER_SYNC for other workers (#7318)
Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication.
In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits.
Fixes (I hope) #7257.
2 files changed, 10 insertions, 12 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index f26aee83cb..c7880d4b63 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -210,7 +210,10 @@ class ReplicateCommand(Command):
class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
- stopped syncing. Used to calculate presence on the master.
+ stopped syncing on this process.
+
+ This is used by the process handling presence (typically the master) to
+ calculate who is online and who is not.
Includes a timestamp of when the last user sync was.
@@ -218,7 +221,7 @@ class UserSyncCommand(Command):
USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
- Where <state> is either "start" or "stop"
+ Where <state> is either "start" or "end"
"""
NAME = "USER_SYNC"
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 5b5ee2c13e..0db5a3a24d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -337,13 +337,6 @@ class ReplicationCommandHandler:
if self._is_master:
self._notifier.notify_remote_server_up(cmd.data)
- def get_currently_syncing_users(self):
- """Get the list of currently syncing users (if any). This is called
- when a connection has been established and we need to send the
- currently syncing users.
- """
- return self._presence_handler.get_currently_syncing_users()
-
def new_connection(self, connection: AbstractConnection):
"""Called when we have a new connection.
"""
@@ -361,9 +354,11 @@ class ReplicationCommandHandler:
if self._factory:
self._factory.resetDelay()
- # Tell the server if we have any users currently syncing (should only
- # happen on synchrotrons)
- currently_syncing = self.get_currently_syncing_users()
+ # Tell the other end if we have any users currently syncing.
+ currently_syncing = (
+ self._presence_handler.get_currently_syncing_users_for_replication()
+ )
+
now = self._clock.time_msec()
for user_id in currently_syncing:
connection.send_command(
|