diff options
author | reivilibre <oliverw@matrix.org> | 2022-05-20 15:28:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-20 15:28:23 +0100 |
commit | 39dee30f0120290d6ef3504815655df1a6cf47a5 (patch) | |
tree | a652e3badb2980aaa8b202c6d30fc3dcc7ea0f17 /synapse/replication | |
parent | Uniformize spam-checker API, part 1: the `Code` enum. (#12703) (diff) | |
download | synapse-39dee30f0120290d6ef3504815655df1a6cf47a5.tar.xz |
Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands. (#12809)
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/tcp/commands.py | 12 | ||||
-rw-r--r-- | synapse/replication/tcp/redis.py | 6 |
2 files changed, 15 insertions, 3 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index fe34948168..32f52e54d8 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta): # by default, we just use the command name. return self.NAME + def redis_channel_name(self, prefix: str) -> str: + """ + Returns the Redis channel name upon which to publish this command. + + Args: + prefix: The prefix for the channel. + """ + return prefix + SC = TypeVar("SC", bound="_SimpleCommand") @@ -395,6 +404,9 @@ class UserIpCommand(Command): f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})" ) + def redis_channel_name(self, prefix: str) -> str: + return f"{prefix}/USER_IP" + class RemoteServerUpCommand(_SimpleCommand): """Sent when a worker has detected that a remote server is no longer diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 73294654ef..fd1c0ec6af 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -221,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # remote instances. tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc() + channel_name = cmd.redis_channel_name(self.synapse_stream_prefix) + await make_deferred_yieldable( - self.synapse_outbound_redis_connection.publish( - self.synapse_stream_prefix, encoded_string - ) + self.synapse_outbound_redis_connection.publish(channel_name, encoded_string) ) |