diff options
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 9aba1cd451..e1cbfa50eb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -1,5 +1,5 @@ # Copyright 2017 Vector Creations Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. +# Copyright 2020, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,9 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + # Additional Redis channel suffixes to subscribe to. + self._channels_to_subscribe_to: List[str] = [] + self._is_presence_writer = ( hs.get_instance_name() in hs.config.worker.writers.presence ) @@ -243,6 +246,31 @@ class ReplicationCommandHandler: # If we're NOT using Redis, this must be handled by the master self._should_insert_client_ips = hs.get_instance_name() == "master" + if self._is_master or self._should_insert_client_ips: + self.subscribe_to_channel("USER_IP") + + def subscribe_to_channel(self, channel_name: str) -> None: + """ + Indicates that we wish to subscribe to a Redis channel by name. + + (The name will later be prefixed with the server name; i.e. subscribing + to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.) + + Raises: + - If replication has already started, then it's too late to subscribe + to new channels. + """ + + if self._factory is not None: + # We don't allow subscribing after the fact to avoid the chance + # of missing an important message because we didn't subscribe in time. + raise RuntimeError( + "Cannot subscribe to more channels after replication started." + ) + + if channel_name not in self._channels_to_subscribe_to: + self._channels_to_subscribe_to.append(channel_name) + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -321,7 +349,9 @@ class ReplicationCommandHandler: # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( - hs, outbound_redis_connection + hs, + outbound_redis_connection, + channel_names=self._channels_to_subscribe_to, ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, |