summary refs log tree commit diff
path: root/synapse/replication/tcp/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/redis.py')
-rw-r--r--synapse/replication/tcp/redis.py35
1 files changed, 25 insertions, 10 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 989c5be032..73294654ef 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -14,7 +14,7 @@
 
 import logging
 from inspect import isawaitable
-from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast
 
 import attr
 import txredisapi
@@ -85,14 +85,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
 
     Attributes:
         synapse_handler: The command handler to handle incoming commands.
-        synapse_stream_name: The *redis* stream name to subscribe to and publish
+        synapse_stream_prefix: The *redis* stream name to subscribe to and publish
             from (not anything to do with Synapse replication streams).
         synapse_outbound_redis_connection: The connection to redis to use to send
             commands.
     """
 
     synapse_handler: "ReplicationCommandHandler"
-    synapse_stream_name: str
+    synapse_stream_prefix: str
+    synapse_channel_names: List[str]
     synapse_outbound_redis_connection: txredisapi.ConnectionHandler
 
     def __init__(self, *args: Any, **kwargs: Any):
@@ -117,8 +118,13 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # it's important to make sure that we only send the REPLICATE command once we
         # have successfully subscribed to the stream - otherwise we might miss the
         # POSITION response sent back by the other end.
-        logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
-        await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
+        fully_qualified_stream_names = [
+            f"{self.synapse_stream_prefix}/{stream_suffix}"
+            for stream_suffix in self.synapse_channel_names
+        ] + [self.synapse_stream_prefix]
+        logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names)
+        await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names))
+
         logger.info(
             "Successfully subscribed to redis stream, sending REPLICATE command"
         )
@@ -217,7 +223,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
 
         await make_deferred_yieldable(
             self.synapse_outbound_redis_connection.publish(
-                self.synapse_stream_name, encoded_string
+                self.synapse_stream_prefix, encoded_string
             )
         )
 
@@ -300,20 +306,27 @@ def format_address(address: IAddress) -> str:
 
 class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
     """This is a reconnecting factory that connects to redis and immediately
-    subscribes to a stream.
+    subscribes to some streams.
 
     Args:
         hs
         outbound_redis_connection: A connection to redis that will be used to
             send outbound commands (this is separate to the redis connection
             used to subscribe).
+        channel_names: A list of channel names to append to the base channel name
+            to additionally subscribe to.
+            e.g. if ['ABC', 'DEF'] is specified then we'll listen to:
+            example.com; example.com/ABC; and example.com/DEF.
     """
 
     maxDelay = 5
     protocol = RedisSubscriber
 
     def __init__(
-        self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
+        self,
+        hs: "HomeServer",
+        outbound_redis_connection: txredisapi.ConnectionHandler,
+        channel_names: List[str],
     ):
 
         super().__init__(
@@ -326,7 +339,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
         )
 
         self.synapse_handler = hs.get_replication_command_handler()
-        self.synapse_stream_name = hs.hostname
+        self.synapse_stream_prefix = hs.hostname
+        self.synapse_channel_names = channel_names
 
         self.synapse_outbound_redis_connection = outbound_redis_connection
 
@@ -340,7 +354,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
         # protocol.
         p.synapse_handler = self.synapse_handler
         p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
-        p.synapse_stream_name = self.synapse_stream_name
+        p.synapse_stream_prefix = self.synapse_stream_prefix
+        p.synapse_channel_names = self.synapse_channel_names
 
         return p