diff options
author | Sean Quah <seanq@matrix.org> | 2022-05-24 12:09:40 +0100 |
---|---|---|
committer | Sean Quah <seanq@matrix.org> | 2022-05-24 12:09:40 +0100 |
commit | 4ff94779963c52a3b73ebb85517fa6fb22fd8052 (patch) | |
tree | 76f6e60351dd9cbbea13e68b4c8d9baeb282f60b /synapse/replication | |
parent | Fixup changelog (diff) | |
parent | 1.60.0rc1 (diff) | |
download | synapse-4ff94779963c52a3b73ebb85517fa6fb22fd8052.tar.xz |
Merge remote-tracking branch 'origin/release-v1.60' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/http/_base.py | 21 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 18 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 12 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 34 | ||||
-rw-r--r-- | synapse/replication/tcp/redis.py | 39 |
5 files changed, 101 insertions, 23 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 2bd244ed79..a4ae4040c3 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -26,7 +26,8 @@ from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError from synapse.http import RequestTimedOutError -from synapse.http.server import HttpServer +from synapse.http.server import HttpServer, is_method_cancellable +from synapse.http.site import SynapseRequest from synapse.logging import opentracing from synapse.logging.opentracing import trace from synapse.types import JsonDict @@ -310,6 +311,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): url_args = list(self.PATH_ARGS) method = self.METHOD + if self.CACHE and is_method_cancellable(self._handle_request): + raise Exception( + f"{self.__class__.__name__} has been marked as cancellable, but CACHE " + "is set. The cancellable flag would have no effect." + ) + if self.CACHE: url_args.append("txn_id") @@ -324,7 +331,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): ) async def _check_auth_and_handle( - self, request: Request, **kwargs: Any + self, request: SynapseRequest, **kwargs: Any ) -> Tuple[int, JsonDict]: """Called on new incoming requests when caching is enabled. Checks if there is a cached response for the request and returns that, @@ -340,8 +347,18 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if self.CACHE: txn_id = kwargs.pop("txn_id") + # We ignore the `@cancellable` flag, since cancellation wouldn't interupt + # `_handle_request` and `ResponseCache` does not handle cancellation + # correctly yet. In particular, there may be issues to do with logging + # context lifetimes. + return await self.response_cache.wrap( txn_id, self._handle_request, request, **kwargs ) + # The `@cancellable` decorator may be applied to `_handle_request`. But we + # told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`, + # so we have to set up the cancellable flag ourselves. + request.is_render_cancellable = is_method_cancellable(self._handle_request) + return await self._handle_request(request, **kwargs) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 350762f494..a52e25c1af 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -153,19 +153,19 @@ class ReplicationDataHandler: if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] + StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows] ) elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] + StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows] ) elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) await self._pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} @@ -173,14 +173,18 @@ class ReplicationDataHandler: elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) + self.notifier.on_new_event( + StreamKeyType.TO_DEVICE, token, users=entities + ) elif stream_name == DeviceListsStream.NAME: all_room_ids: Set[str] = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids + ) elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index fe34948168..32f52e54d8 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta): # by default, we just use the command name. return self.NAME + def redis_channel_name(self, prefix: str) -> str: + """ + Returns the Redis channel name upon which to publish this command. + + Args: + prefix: The prefix for the channel. + """ + return prefix + SC = TypeVar("SC", bound="_SimpleCommand") @@ -395,6 +404,9 @@ class UserIpCommand(Command): f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})" ) + def redis_channel_name(self, prefix: str) -> str: + return f"{prefix}/USER_IP" + class RemoteServerUpCommand(_SimpleCommand): """Sent when a worker has detected that a remote server is no longer diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 9aba1cd451..e1cbfa50eb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -1,5 +1,5 @@ # Copyright 2017 Vector Creations Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. +# Copyright 2020, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,9 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + # Additional Redis channel suffixes to subscribe to. + self._channels_to_subscribe_to: List[str] = [] + self._is_presence_writer = ( hs.get_instance_name() in hs.config.worker.writers.presence ) @@ -243,6 +246,31 @@ class ReplicationCommandHandler: # If we're NOT using Redis, this must be handled by the master self._should_insert_client_ips = hs.get_instance_name() == "master" + if self._is_master or self._should_insert_client_ips: + self.subscribe_to_channel("USER_IP") + + def subscribe_to_channel(self, channel_name: str) -> None: + """ + Indicates that we wish to subscribe to a Redis channel by name. + + (The name will later be prefixed with the server name; i.e. subscribing + to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.) + + Raises: + - If replication has already started, then it's too late to subscribe + to new channels. + """ + + if self._factory is not None: + # We don't allow subscribing after the fact to avoid the chance + # of missing an important message because we didn't subscribe in time. + raise RuntimeError( + "Cannot subscribe to more channels after replication started." + ) + + if channel_name not in self._channels_to_subscribe_to: + self._channels_to_subscribe_to.append(channel_name) + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -321,7 +349,9 @@ class ReplicationCommandHandler: # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( - hs, outbound_redis_connection + hs, + outbound_redis_connection, + channel_names=self._channels_to_subscribe_to, ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 989c5be032..fd1c0ec6af 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,7 +14,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast +from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast import attr import txredisapi @@ -85,14 +85,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): Attributes: synapse_handler: The command handler to handle incoming commands. - synapse_stream_name: The *redis* stream name to subscribe to and publish + synapse_stream_prefix: The *redis* stream name to subscribe to and publish from (not anything to do with Synapse replication streams). synapse_outbound_redis_connection: The connection to redis to use to send commands. """ synapse_handler: "ReplicationCommandHandler" - synapse_stream_name: str + synapse_stream_prefix: str + synapse_channel_names: List[str] synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): @@ -117,8 +118,13 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # it's important to make sure that we only send the REPLICATE command once we # have successfully subscribed to the stream - otherwise we might miss the # POSITION response sent back by the other end. - logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name) - await make_deferred_yieldable(self.subscribe(self.synapse_stream_name)) + fully_qualified_stream_names = [ + f"{self.synapse_stream_prefix}/{stream_suffix}" + for stream_suffix in self.synapse_channel_names + ] + [self.synapse_stream_prefix] + logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names) + await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names)) + logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) @@ -215,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # remote instances. tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc() + channel_name = cmd.redis_channel_name(self.synapse_stream_prefix) + await make_deferred_yieldable( - self.synapse_outbound_redis_connection.publish( - self.synapse_stream_name, encoded_string - ) + self.synapse_outbound_redis_connection.publish(channel_name, encoded_string) ) @@ -300,20 +306,27 @@ def format_address(address: IAddress) -> str: class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """This is a reconnecting factory that connects to redis and immediately - subscribes to a stream. + subscribes to some streams. Args: hs outbound_redis_connection: A connection to redis that will be used to send outbound commands (this is separate to the redis connection used to subscribe). + channel_names: A list of channel names to append to the base channel name + to additionally subscribe to. + e.g. if ['ABC', 'DEF'] is specified then we'll listen to: + example.com; example.com/ABC; and example.com/DEF. """ maxDelay = 5 protocol = RedisSubscriber def __init__( - self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler + self, + hs: "HomeServer", + outbound_redis_connection: txredisapi.ConnectionHandler, + channel_names: List[str], ): super().__init__( @@ -326,7 +339,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): ) self.synapse_handler = hs.get_replication_command_handler() - self.synapse_stream_name = hs.hostname + self.synapse_stream_prefix = hs.hostname + self.synapse_channel_names = channel_names self.synapse_outbound_redis_connection = outbound_redis_connection @@ -340,7 +354,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): # protocol. p.synapse_handler = self.synapse_handler p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection - p.synapse_stream_name = self.synapse_stream_name + p.synapse_stream_prefix = self.synapse_stream_prefix + p.synapse_channel_names = self.synapse_channel_names return p |