diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/handler.py | 34 | ||||
-rw-r--r-- | synapse/replication/tcp/redis.py | 35 |
2 files changed, 57 insertions, 12 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 9aba1cd451..e1cbfa50eb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -1,5 +1,5 @@ # Copyright 2017 Vector Creations Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. +# Copyright 2020, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,9 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + # Additional Redis channel suffixes to subscribe to. + self._channels_to_subscribe_to: List[str] = [] + self._is_presence_writer = ( hs.get_instance_name() in hs.config.worker.writers.presence ) @@ -243,6 +246,31 @@ class ReplicationCommandHandler: # If we're NOT using Redis, this must be handled by the master self._should_insert_client_ips = hs.get_instance_name() == "master" + if self._is_master or self._should_insert_client_ips: + self.subscribe_to_channel("USER_IP") + + def subscribe_to_channel(self, channel_name: str) -> None: + """ + Indicates that we wish to subscribe to a Redis channel by name. + + (The name will later be prefixed with the server name; i.e. subscribing + to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.) + + Raises: + - If replication has already started, then it's too late to subscribe + to new channels. + """ + + if self._factory is not None: + # We don't allow subscribing after the fact to avoid the chance + # of missing an important message because we didn't subscribe in time. + raise RuntimeError( + "Cannot subscribe to more channels after replication started." + ) + + if channel_name not in self._channels_to_subscribe_to: + self._channels_to_subscribe_to.append(channel_name) + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -321,7 +349,9 @@ class ReplicationCommandHandler: # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( - hs, outbound_redis_connection + hs, + outbound_redis_connection, + channel_names=self._channels_to_subscribe_to, ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 989c5be032..73294654ef 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,7 +14,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast +from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast import attr import txredisapi @@ -85,14 +85,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): Attributes: synapse_handler: The command handler to handle incoming commands. - synapse_stream_name: The *redis* stream name to subscribe to and publish + synapse_stream_prefix: The *redis* stream name to subscribe to and publish from (not anything to do with Synapse replication streams). synapse_outbound_redis_connection: The connection to redis to use to send commands. """ synapse_handler: "ReplicationCommandHandler" - synapse_stream_name: str + synapse_stream_prefix: str + synapse_channel_names: List[str] synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): @@ -117,8 +118,13 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # it's important to make sure that we only send the REPLICATE command once we # have successfully subscribed to the stream - otherwise we might miss the # POSITION response sent back by the other end. - logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name) - await make_deferred_yieldable(self.subscribe(self.synapse_stream_name)) + fully_qualified_stream_names = [ + f"{self.synapse_stream_prefix}/{stream_suffix}" + for stream_suffix in self.synapse_channel_names + ] + [self.synapse_stream_prefix] + logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names) + await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names)) + logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) @@ -217,7 +223,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): await make_deferred_yieldable( self.synapse_outbound_redis_connection.publish( - self.synapse_stream_name, encoded_string + self.synapse_stream_prefix, encoded_string ) ) @@ -300,20 +306,27 @@ def format_address(address: IAddress) -> str: class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """This is a reconnecting factory that connects to redis and immediately - subscribes to a stream. + subscribes to some streams. Args: hs outbound_redis_connection: A connection to redis that will be used to send outbound commands (this is separate to the redis connection used to subscribe). + channel_names: A list of channel names to append to the base channel name + to additionally subscribe to. + e.g. if ['ABC', 'DEF'] is specified then we'll listen to: + example.com; example.com/ABC; and example.com/DEF. """ maxDelay = 5 protocol = RedisSubscriber def __init__( - self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler + self, + hs: "HomeServer", + outbound_redis_connection: txredisapi.ConnectionHandler, + channel_names: List[str], ): super().__init__( @@ -326,7 +339,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): ) self.synapse_handler = hs.get_replication_command_handler() - self.synapse_stream_name = hs.hostname + self.synapse_stream_prefix = hs.hostname + self.synapse_channel_names = channel_names self.synapse_outbound_redis_connection = outbound_redis_connection @@ -340,7 +354,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): # protocol. p.synapse_handler = self.synapse_handler p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection - p.synapse_stream_name = self.synapse_stream_name + p.synapse_stream_prefix = self.synapse_stream_prefix + p.synapse_channel_names = self.synapse_channel_names return p |