summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py34
1 files changed, 32 insertions, 2 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 9aba1cd451..e1cbfa50eb 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -1,5 +1,5 @@
 # Copyright 2017 Vector Creations Ltd
-# Copyright 2020 The Matrix.org Foundation C.I.C.
+# Copyright 2020, 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ class ReplicationCommandHandler:
         self._instance_id = hs.get_instance_id()
         self._instance_name = hs.get_instance_name()
 
+        # Additional Redis channel suffixes to subscribe to.
+        self._channels_to_subscribe_to: List[str] = []
+
         self._is_presence_writer = (
             hs.get_instance_name() in hs.config.worker.writers.presence
         )
@@ -243,6 +246,31 @@ class ReplicationCommandHandler:
             # If we're NOT using Redis, this must be handled by the master
             self._should_insert_client_ips = hs.get_instance_name() == "master"
 
+        if self._is_master or self._should_insert_client_ips:
+            self.subscribe_to_channel("USER_IP")
+
+    def subscribe_to_channel(self, channel_name: str) -> None:
+        """
+        Indicates that we wish to subscribe to a Redis channel by name.
+
+        (The name will later be prefixed with the server name; i.e. subscribing
+        to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.)
+
+        Raises:
+          - If replication has already started, then it's too late to subscribe
+            to new channels.
+        """
+
+        if self._factory is not None:
+            # We don't allow subscribing after the fact to avoid the chance
+            # of missing an important message because we didn't subscribe in time.
+            raise RuntimeError(
+                "Cannot subscribe to more channels after replication started."
+            )
+
+        if channel_name not in self._channels_to_subscribe_to:
+            self._channels_to_subscribe_to.append(channel_name)
+
     def _add_command_to_stream_queue(
         self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
     ) -> None:
@@ -321,7 +349,9 @@ class ReplicationCommandHandler:
 
             # Now create the factory/connection for the subscription stream.
             self._factory = RedisDirectTcpReplicationClientFactory(
-                hs, outbound_redis_connection
+                hs,
+                outbound_redis_connection,
+                channel_names=self._channels_to_subscribe_to,
             )
             hs.get_reactor().connectTCP(
                 hs.config.redis.redis_host,