diff options
author | Sean Quah <seanq@matrix.org> | 2022-05-24 12:09:40 +0100 |
---|---|---|
committer | Sean Quah <seanq@matrix.org> | 2022-05-24 12:09:40 +0100 |
commit | 4ff94779963c52a3b73ebb85517fa6fb22fd8052 (patch) | |
tree | 76f6e60351dd9cbbea13e68b4c8d9baeb282f60b /synapse/replication/tcp/handler.py | |
parent | Fixup changelog (diff) | |
parent | 1.60.0rc1 (diff) | |
download | synapse-4ff94779963c52a3b73ebb85517fa6fb22fd8052.tar.xz |
Merge remote-tracking branch 'origin/release-v1.60' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 9aba1cd451..e1cbfa50eb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -1,5 +1,5 @@ # Copyright 2017 Vector Creations Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. +# Copyright 2020, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,9 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + # Additional Redis channel suffixes to subscribe to. + self._channels_to_subscribe_to: List[str] = [] + self._is_presence_writer = ( hs.get_instance_name() in hs.config.worker.writers.presence ) @@ -243,6 +246,31 @@ class ReplicationCommandHandler: # If we're NOT using Redis, this must be handled by the master self._should_insert_client_ips = hs.get_instance_name() == "master" + if self._is_master or self._should_insert_client_ips: + self.subscribe_to_channel("USER_IP") + + def subscribe_to_channel(self, channel_name: str) -> None: + """ + Indicates that we wish to subscribe to a Redis channel by name. + + (The name will later be prefixed with the server name; i.e. subscribing + to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.) + + Raises: + - If replication has already started, then it's too late to subscribe + to new channels. + """ + + if self._factory is not None: + # We don't allow subscribing after the fact to avoid the chance + # of missing an important message because we didn't subscribe in time. + raise RuntimeError( + "Cannot subscribe to more channels after replication started." + ) + + if channel_name not in self._channels_to_subscribe_to: + self._channels_to_subscribe_to.append(channel_name) + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -321,7 +349,9 @@ class ReplicationCommandHandler: # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( - hs, outbound_redis_connection + hs, + outbound_redis_connection, + channel_names=self._channels_to_subscribe_to, ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, |