summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-05-20 15:28:23 +0100
committerGitHub <noreply@github.com>2022-05-20 15:28:23 +0100
commit39dee30f0120290d6ef3504815655df1a6cf47a5 (patch)
treea652e3badb2980aaa8b202c6d30fc3dcc7ea0f17 /synapse/replication/tcp
parentUniformize spam-checker API, part 1: the `Code` enum. (#12703) (diff)
downloadsynapse-39dee30f0120290d6ef3504815655df1a6cf47a5.tar.xz
Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands. (#12809)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/commands.py12
-rw-r--r--synapse/replication/tcp/redis.py6
2 files changed, 15 insertions, 3 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index fe34948168..32f52e54d8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta):
         # by default, we just use the command name.
         return self.NAME
 
+    def redis_channel_name(self, prefix: str) -> str:
+        """
+        Returns the Redis channel name upon which to publish this command.
+
+        Args:
+            prefix: The prefix for the channel.
+        """
+        return prefix
+
 
 SC = TypeVar("SC", bound="_SimpleCommand")
 
@@ -395,6 +404,9 @@ class UserIpCommand(Command):
             f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
         )
 
+    def redis_channel_name(self, prefix: str) -> str:
+        return f"{prefix}/USER_IP"
+
 
 class RemoteServerUpCommand(_SimpleCommand):
     """Sent when a worker has detected that a remote server is no longer
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 73294654ef..fd1c0ec6af 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -221,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # remote instances.
         tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
 
+        channel_name = cmd.redis_channel_name(self.synapse_stream_prefix)
+
         await make_deferred_yieldable(
-            self.synapse_outbound_redis_connection.publish(
-                self.synapse_stream_prefix, encoded_string
-            )
+            self.synapse_outbound_redis_connection.publish(channel_name, encoded_string)
         )