From 3e4af36bc8515504721b3c1b1d64d4f45359bf88 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 10 Mar 2022 08:01:56 -0500 Subject: Rename get_tcp_replication to get_replication_command_handler. (#12192) Since the object it returns is a ReplicationCommandHandler. This is clean-up from adding support to Redis where the command handler was added as an additional layer of abstraction from the TCP protocol. --- synapse/app/generic_worker.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/federation/transport/server/_base.py | 2 +- synapse/handlers/presence.py | 4 ++-- synapse/replication/slave/storage/client_ips.py | 2 +- synapse/replication/tcp/client.py | 4 +++- synapse/replication/tcp/handler.py | 4 +--- synapse/replication/tcp/redis.py | 2 +- synapse/replication/tcp/resource.py | 4 ++-- synapse/server.py | 2 +- 10 files changed, 14 insertions(+), 14 deletions(-) (limited to 'synapse') diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1536a42723..a10a63b06c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -417,7 +417,7 @@ class GenericWorkerServer(HomeServer): else: logger.warning("Unsupported listener type: %s", listener.type) - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) def start(config_options: List[str]) -> None: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a6789a840e..e4dc04c0b4 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -273,7 +273,7 @@ class SynapseHomeServer(HomeServer): # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client # rather than a server). - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) for listener in self.config.server.listeners: if listener.type == "http": diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 87e99c7ddf..2529dee613 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -63,7 +63,7 @@ class Authenticator: self.replication_client = None if hs.config.worker.worker_app: - self.replication_client = hs.get_tcp_replication() + self.replication_client = hs.get_replication_command_handler() # A method just so we can pass 'self' as the authenticator to the Servlets async def authenticate_request( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c155098bee..9927a30e6e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -424,13 +424,13 @@ class WorkerPresenceHandler(BasePresenceHandler): async def _on_shutdown(self) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_command( + self.hs.get_replication_command_handler().send_command( ClearUserSyncsCommand(self.instance_id) ) def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_user_sync( + self.hs.get_replication_command_handler().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index b5b84c09ae..14706a0817 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -54,6 +54,6 @@ class SlavedClientIpStore(BaseSlavedStore): self.client_ip_last_seen.set(key, now) - self.hs.get_tcp_replication().send_user_ip( + self.hs.get_replication_command_handler().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b8fc1d4db9..deeaaec4e6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -462,6 +462,8 @@ class FederationSenderHandler: # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) + self._hs.get_replication_command_handler().send_federation_ack( + current_position + ) except Exception: logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0d2013a3cf..d51f045f22 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -295,9 +295,7 @@ class ReplicationCommandHandler: raise Exception("Unrecognised command %s in stream queue", cmd.NAME) def start_replication(self, hs: "HomeServer") -> None: - """Helper method to start a replication connection to the remote server - using TCP. - """ + """Helper method to start replication.""" if hs.config.redis.redis_enabled: from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b84e572da1..989c5be032 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): password=hs.config.redis.redis_password, ) - self.synapse_handler = hs.get_tcp_replication() + self.synapse_handler = hs.get_replication_command_handler() self.synapse_stream_name = hs.hostname self.synapse_outbound_redis_connection = outbound_redis_connection diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 494e42a2be..ab829040cd 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): """Factory for new replication connections.""" def __init__(self, hs: "HomeServer"): - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() self.clock = hs.get_clock() self.server_name = hs.config.server.server_name @@ -85,7 +85,7 @@ class ReplicationStreamer: self.is_looping = False self.pending_updates = False - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() diff --git a/synapse/server.py b/synapse/server.py index 46a64418ea..1270abb5a3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -639,7 +639,7 @@ class HomeServer(metaclass=abc.ABCMeta): return ReadMarkerHandler(self) @cache_in_self - def get_tcp_replication(self) -> ReplicationCommandHandler: + def get_replication_command_handler(self) -> ReplicationCommandHandler: return ReplicationCommandHandler(self) @cache_in_self -- cgit 1.4.1