summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-23 18:55:58 +0000
committerErik Johnston <erik@matrix.org>2020-03-23 18:55:58 +0000
commitea17e939df64370c80316313bea5e50a989476d0 (patch)
tree141250623f95879b2226684a0084fb1479ea4c07 /synapse
parentRemove `conn_id` usage for UserSyncCommand. (diff)
downloadsynapse-ea17e939df64370c80316313bea5e50a989476d0.tar.xz
Add CLEAR_USER_SYNCS command that is sent on shutdown.
This should help with the case where a synchrotron gets restarted
gracefully, rather than rely on 5 minute timeout.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/generic_worker.py17
-rw-r--r--synapse/replication/tcp/commands.py26
-rw-r--r--synapse/replication/tcp/protocol.py3
-rw-r--r--synapse/replication/tcp/resource.py5
4 files changed, 48 insertions, 3 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index cc0bcd126a..f125658615 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -65,6 +65,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.commands import ClearUserSyncsCommand
 from synapse.replication.tcp.streams import (
     AccountDataStream,
     DeviceListsStream,
@@ -124,7 +125,6 @@ from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
-from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger("synapse.app.generic_worker")
@@ -246,8 +246,19 @@ class GenericWorkerPresence(object):
             self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
         )
 
-        self.process_id = random_string(16)
-        logger.info("Presence process_id is %r", self.process_id)
+        hs.get_reactor().addSystemEventTrigger(
+            "before",
+            "shutdown",
+            run_as_background_process,
+            "generic_presence.on_shutdown",
+            self._on_shutdown,
+        )
+
+    def _on_shutdown(self):
+        if self.hs.config.use_presence:
+            self.hs.get_tcp_replication().send_command(
+                ClearUserSyncsCommand(self.instance_id)
+            )
 
     def send_user_sync(self, user_id, is_syncing, last_sync_ms):
         if self.hs.config.use_presence:
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index fe3668838b..e4eec643f7 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -240,6 +240,30 @@ class UserSyncCommand(Command):
         )
 
 
+class ClearUserSyncsCommand(Command):
+    """Sent by the client to inform the server that it should drop all
+    information about syncing users sent by the client.
+
+    Mainly used when client is about to shut down.
+
+    Format::
+
+        CLEAR_USER_SYNC <instance_id>
+    """
+
+    NAME = "CLEAR_USER_SYNC"
+
+    def __init__(self, instance_id):
+        self.instance_id = instance_id
+
+    @classmethod
+    def from_line(cls, line):
+        return cls(line)
+
+    def to_line(self):
+        return self.instance_id
+
+
 class FederationAckCommand(Command):
     """Sent by the client when it has processed up to a given point in the
     federation stream. This allows the master to drop in-memory caches of the
@@ -400,6 +424,7 @@ _COMMANDS = (
     InvalidateCacheCommand,
     UserIpCommand,
     RemoteServerUpCommand,
+    ClearUserSyncsCommand,
 )  # type: Tuple[Type[Command], ...]
 
 # Map of command name to command type.
@@ -422,6 +447,7 @@ VALID_CLIENT_COMMANDS = (
     ReplicateCommand.NAME,
     PingCommand.NAME,
     UserSyncCommand.NAME,
+    ClearUserSyncsCommand.NAME,
     FederationAckCommand.NAME,
     RemovePusherCommand.NAME,
     InvalidateCacheCommand.NAME,
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 8b9d65362b..ff720beb56 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -422,6 +422,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
             cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
         )
 
+    async def on_CLEAR_USER_SYNC(self, cmd):
+        await self.streamer.on_clear_user_syncs(cmd.instance_id)
+
     async def on_REPLICATE(self, cmd):
         # Subscribe to all streams we're publishing to.
         for stream_name in self.streamer.streams_by_name:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8d720694e9..acf8868de9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -254,6 +254,11 @@ class ReplicationStreamer(object):
             instance_id, user_id, is_syncing, last_sync_ms
         )
 
+    async def on_clear_user_syncs(self, instance_id):
+        """A replication client wants us to drop all their UserSync data.
+        """
+        await self.presence_handler.update_external_syncs_clear(instance_id)
+
     @measure_func("repl.on_remove_pusher")
     async def on_remove_pusher(self, app_id, push_key, user_id):
         """A client has asked us to remove a pusher