diff --git a/changelog.d/12672.feature b/changelog.d/12672.feature
new file mode 100644
index 0000000000..b989e0d208
--- /dev/null
+++ b/changelog.d/12672.feature
@@ -0,0 +1 @@
+Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands.
\ No newline at end of file
diff --git a/changelog.d/12672.misc b/changelog.d/12672.misc
deleted file mode 100644
index 265e0a801f..0000000000
--- a/changelog.d/12672.misc
+++ /dev/null
@@ -1 +0,0 @@
-Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic.
\ No newline at end of file
diff --git a/changelog.d/12809.feature b/changelog.d/12809.feature
new file mode 100644
index 0000000000..b989e0d208
--- /dev/null
+++ b/changelog.d/12809.feature
@@ -0,0 +1 @@
+Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands.
\ No newline at end of file
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index fe34948168..32f52e54d8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta):
# by default, we just use the command name.
return self.NAME
+ def redis_channel_name(self, prefix: str) -> str:
+ """
+ Returns the Redis channel name upon which to publish this command.
+
+ Args:
+ prefix: The prefix for the channel.
+ """
+ return prefix
+
SC = TypeVar("SC", bound="_SimpleCommand")
@@ -395,6 +404,9 @@ class UserIpCommand(Command):
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
)
+ def redis_channel_name(self, prefix: str) -> str:
+ return f"{prefix}/USER_IP"
+
class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 73294654ef..fd1c0ec6af 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -221,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# remote instances.
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
+ channel_name = cmd.redis_channel_name(self.synapse_stream_prefix)
+
await make_deferred_yieldable(
- self.synapse_outbound_redis_connection.publish(
- self.synapse_stream_prefix, encoded_string
- )
+ self.synapse_outbound_redis_connection.publish(channel_name, encoded_string)
)
|