diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-23 18:55:58 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-23 18:55:58 +0000 |
commit | ea17e939df64370c80316313bea5e50a989476d0 (patch) | |
tree | 141250623f95879b2226684a0084fb1479ea4c07 /synapse | |
parent | Remove `conn_id` usage for UserSyncCommand. (diff) | |
download | synapse-ea17e939df64370c80316313bea5e50a989476d0.tar.xz |
Add CLEAR_USER_SYNCS command that is sent on shutdown.
This should help with the case where a synchrotron gets restarted gracefully, rather than rely on 5 minute timeout.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/generic_worker.py | 17 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 26 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 5 |
4 files changed, 48 insertions, 3 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cc0bcd126a..f125658615 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -65,6 +65,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import ( AccountDataStream, DeviceListsStream, @@ -124,7 +125,6 @@ from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole -from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.generic_worker") @@ -246,8 +246,19 @@ class GenericWorkerPresence(object): self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) - self.process_id = random_string(16) - logger.info("Presence process_id is %r", self.process_id) + hs.get_reactor().addSystemEventTrigger( + "before", + "shutdown", + run_as_background_process, + "generic_presence.on_shutdown", + self._on_shutdown, + ) + + def _on_shutdown(self): + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_command( + ClearUserSyncsCommand(self.instance_id) + ) def send_user_sync(self, user_id, is_syncing, last_sync_ms): if self.hs.config.use_presence: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index fe3668838b..e4eec643f7 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -240,6 +240,30 @@ class UserSyncCommand(Command): ) +class ClearUserSyncsCommand(Command): + """Sent by the client to inform the server that it should drop all + information about syncing users sent by the client. + + Mainly used when client is about to shut down. + + Format:: + + CLEAR_USER_SYNC <instance_id> + """ + + NAME = "CLEAR_USER_SYNC" + + def __init__(self, instance_id): + self.instance_id = instance_id + + @classmethod + def from_line(cls, line): + return cls(line) + + def to_line(self): + return self.instance_id + + class FederationAckCommand(Command): """Sent by the client when it has processed up to a given point in the federation stream. This allows the master to drop in-memory caches of the @@ -400,6 +424,7 @@ _COMMANDS = ( InvalidateCacheCommand, UserIpCommand, RemoteServerUpCommand, + ClearUserSyncsCommand, ) # type: Tuple[Type[Command], ...] # Map of command name to command type. @@ -422,6 +447,7 @@ VALID_CLIENT_COMMANDS = ( ReplicateCommand.NAME, PingCommand.NAME, UserSyncCommand.NAME, + ClearUserSyncsCommand.NAME, FederationAckCommand.NAME, RemovePusherCommand.NAME, InvalidateCacheCommand.NAME, diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 8b9d65362b..ff720beb56 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -422,6 +422,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) + async def on_CLEAR_USER_SYNC(self, cmd): + await self.streamer.on_clear_user_syncs(cmd.instance_id) + async def on_REPLICATE(self, cmd): # Subscribe to all streams we're publishing to. for stream_name in self.streamer.streams_by_name: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 8d720694e9..acf8868de9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -254,6 +254,11 @@ class ReplicationStreamer(object): instance_id, user_id, is_syncing, last_sync_ms ) + async def on_clear_user_syncs(self, instance_id): + """A replication client wants us to drop all their UserSync data. + """ + await self.presence_handler.update_external_syncs_clear(instance_id) + @measure_func("repl.on_remove_pusher") async def on_remove_pusher(self, app_id, push_key, user_id): """A client has asked us to remove a pusher |