summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-23 18:55:58 +0000
committerErik Johnston <erik@matrix.org>2020-03-23 18:55:58 +0000
commitea17e939df64370c80316313bea5e50a989476d0 (patch)
tree141250623f95879b2226684a0084fb1479ea4c07 /synapse/replication
parentRemove `conn_id` usage for UserSyncCommand. (diff)
downloadsynapse-ea17e939df64370c80316313bea5e50a989476d0.tar.xz
Add CLEAR_USER_SYNCS command that is sent on shutdown.
This should help with the case where a synchrotron gets restarted
gracefully, rather than rely on 5 minute timeout.
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/commands.py26
-rw-r--r--synapse/replication/tcp/protocol.py3
-rw-r--r--synapse/replication/tcp/resource.py5
3 files changed, 34 insertions, 0 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index fe3668838b..e4eec643f7 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -240,6 +240,30 @@ class UserSyncCommand(Command):
         )
 
 
+class ClearUserSyncsCommand(Command):
+    """Sent by the client to inform the server that it should drop all
+    information about syncing users sent by the client.
+
+    Mainly used when client is about to shut down.
+
+    Format::
+
+        CLEAR_USER_SYNC <instance_id>
+    """
+
+    NAME = "CLEAR_USER_SYNC"
+
+    def __init__(self, instance_id):
+        self.instance_id = instance_id
+
+    @classmethod
+    def from_line(cls, line):
+        return cls(line)
+
+    def to_line(self):
+        return self.instance_id
+
+
 class FederationAckCommand(Command):
     """Sent by the client when it has processed up to a given point in the
     federation stream. This allows the master to drop in-memory caches of the
@@ -400,6 +424,7 @@ _COMMANDS = (
     InvalidateCacheCommand,
     UserIpCommand,
     RemoteServerUpCommand,
+    ClearUserSyncsCommand,
 )  # type: Tuple[Type[Command], ...]
 
 # Map of command name to command type.
@@ -422,6 +447,7 @@ VALID_CLIENT_COMMANDS = (
     ReplicateCommand.NAME,
     PingCommand.NAME,
     UserSyncCommand.NAME,
+    ClearUserSyncsCommand.NAME,
     FederationAckCommand.NAME,
     RemovePusherCommand.NAME,
     InvalidateCacheCommand.NAME,
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 8b9d65362b..ff720beb56 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -422,6 +422,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
             cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
         )
 
+    async def on_CLEAR_USER_SYNC(self, cmd):
+        await self.streamer.on_clear_user_syncs(cmd.instance_id)
+
     async def on_REPLICATE(self, cmd):
         # Subscribe to all streams we're publishing to.
         for stream_name in self.streamer.streams_by_name:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8d720694e9..acf8868de9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -254,6 +254,11 @@ class ReplicationStreamer(object):
             instance_id, user_id, is_syncing, last_sync_ms
         )
 
+    async def on_clear_user_syncs(self, instance_id):
+        """A replication client wants us to drop all their UserSync data.
+        """
+        await self.presence_handler.update_external_syncs_clear(instance_id)
+
     @measure_func("repl.on_remove_pusher")
     async def on_remove_pusher(self, app_id, push_key, user_id):
         """A client has asked us to remove a pusher