diff --git a/changelog.d/12192.misc b/changelog.d/12192.misc
new file mode 100644
index 0000000000..bdfe8dad98
--- /dev/null
+++ b/changelog.d/12192.misc
@@ -0,0 +1 @@
+Rename `HomeServer.get_tcp_replication` to `get_replication_command_handler`.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1536a42723..a10a63b06c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -417,7 +417,7 @@ class GenericWorkerServer(HomeServer):
else:
logger.warning("Unsupported listener type: %s", listener.type)
- self.get_tcp_replication().start_replication(self)
+ self.get_replication_command_handler().start_replication(self)
def start(config_options: List[str]) -> None:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index a6789a840e..e4dc04c0b4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -273,7 +273,7 @@ class SynapseHomeServer(HomeServer):
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
- self.get_tcp_replication().start_replication(self)
+ self.get_replication_command_handler().start_replication(self)
for listener in self.config.server.listeners:
if listener.type == "http":
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 87e99c7ddf..2529dee613 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -63,7 +63,7 @@ class Authenticator:
self.replication_client = None
if hs.config.worker.worker_app:
- self.replication_client = hs.get_tcp_replication()
+ self.replication_client = hs.get_replication_command_handler()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c155098bee..9927a30e6e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -424,13 +424,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
async def _on_shutdown(self) -> None:
if self._presence_enabled:
- self.hs.get_tcp_replication().send_command(
+ self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
- self.hs.get_tcp_replication().send_user_sync(
+ self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index b5b84c09ae..14706a0817 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -54,6 +54,6 @@ class SlavedClientIpStore(BaseSlavedStore):
self.client_ip_last_seen.set(key, now)
- self.hs.get_tcp_replication().send_user_ip(
+ self.hs.get_replication_command_handler().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b8fc1d4db9..deeaaec4e6 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -462,6 +462,8 @@ class FederationSenderHandler:
# We ACK this token over replication so that the master can drop
# its in memory queues
- self._hs.get_tcp_replication().send_federation_ack(current_position)
+ self._hs.get_replication_command_handler().send_federation_ack(
+ current_position
+ )
except Exception:
logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0d2013a3cf..d51f045f22 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -295,9 +295,7 @@ class ReplicationCommandHandler:
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
def start_replication(self, hs: "HomeServer") -> None:
- """Helper method to start a replication connection to the remote server
- using TCP.
- """
+ """Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index b84e572da1..989c5be032 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
password=hs.config.redis.redis_password,
)
- self.synapse_handler = hs.get_tcp_replication()
+ self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname
self.synapse_outbound_redis_connection = outbound_redis_connection
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 494e42a2be..ab829040cd 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
"""Factory for new replication connections."""
def __init__(self, hs: "HomeServer"):
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
self.clock = hs.get_clock()
self.server_name = hs.config.server.server_name
@@ -85,7 +85,7 @@ class ReplicationStreamer:
self.is_looping = False
self.pending_updates = False
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
diff --git a/synapse/server.py b/synapse/server.py
index 46a64418ea..1270abb5a3 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -639,7 +639,7 @@ class HomeServer(metaclass=abc.ABCMeta):
return ReadMarkerHandler(self)
@cache_in_self
- def get_tcp_replication(self) -> ReplicationCommandHandler:
+ def get_replication_command_handler(self) -> ReplicationCommandHandler:
return ReplicationCommandHandler(self)
@cache_in_self
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index a7a05a564f..9c5df266bd 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -251,7 +251,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
self.connect_any_redis_attempts,
)
- self.hs.get_tcp_replication().start_replication(self.hs)
+ self.hs.get_replication_command_handler().start_replication(self.hs)
# When we see a connection attempt to the master replication listener we
# automatically set up the connection. This is so that tests don't
@@ -375,7 +375,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
)
if worker_hs.config.redis.redis_enabled:
- worker_hs.get_tcp_replication().start_replication(worker_hs)
+ worker_hs.get_replication_command_handler().start_replication(worker_hs)
return worker_hs
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
index f9d5da723c..641a94133b 100644
--- a/tests/replication/tcp/streams/test_events.py
+++ b/tests/replication/tcp/streams/test_events.py
@@ -420,7 +420,7 @@ class EventsStreamTestCase(BaseStreamTestCase):
# Manually send an old RDATA command, which should get dropped. This
# re-uses the row from above, but with an earlier stream token.
- self.hs.get_tcp_replication().send_command(
+ self.hs.get_replication_command_handler().send_command(
RdataCommand("events", "master", 1, row)
)
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index 3ff5afc6e5..9a229dd23f 100644
--- a/tests/replication/tcp/streams/test_typing.py
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -118,7 +118,7 @@ class TypingStreamTestCase(BaseStreamTestCase):
# Reset the typing handler
self.hs.get_replication_streams()["typing"].last_token = 0
- self.hs.get_tcp_replication()._streams["typing"].last_token = 0
+ self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 1b6a4bf4b0..26b8bd512a 100644
--- a/tests/replication/test_federation_ack.py
+++ b/tests/replication/test_federation_ack.py
@@ -48,7 +48,7 @@ class FederationAckTestCase(HomeserverTestCase):
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
- rch = self.hs.get_tcp_replication()
+ rch = self.hs.get_replication_command_handler()
# wire up the ReplicationCommandHandler to a mock connection, which needs
# to implement IReplicationConnection. (Note that Mock doesn't understand
|