summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorSean Quah <seanq@matrix.org>2022-05-24 12:09:40 +0100
committerSean Quah <seanq@matrix.org>2022-05-24 12:09:40 +0100
commit4ff94779963c52a3b73ebb85517fa6fb22fd8052 (patch)
tree76f6e60351dd9cbbea13e68b4c8d9baeb282f60b /synapse/replication
parentFixup changelog (diff)
parent1.60.0rc1 (diff)
downloadsynapse-4ff94779963c52a3b73ebb85517fa6fb22fd8052.tar.xz
Merge remote-tracking branch 'origin/release-v1.60' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py21
-rw-r--r--synapse/replication/tcp/client.py18
-rw-r--r--synapse/replication/tcp/commands.py12
-rw-r--r--synapse/replication/tcp/handler.py34
-rw-r--r--synapse/replication/tcp/redis.py39
5 files changed, 101 insertions, 23 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2bd244ed79..a4ae4040c3 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -26,7 +26,8 @@ from twisted.web.server import Request
 
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.http import RequestTimedOutError
-from synapse.http.server import HttpServer
+from synapse.http.server import HttpServer, is_method_cancellable
+from synapse.http.site import SynapseRequest
 from synapse.logging import opentracing
 from synapse.logging.opentracing import trace
 from synapse.types import JsonDict
@@ -310,6 +311,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         url_args = list(self.PATH_ARGS)
         method = self.METHOD
 
+        if self.CACHE and is_method_cancellable(self._handle_request):
+            raise Exception(
+                f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
+                "is set. The cancellable flag would have no effect."
+            )
+
         if self.CACHE:
             url_args.append("txn_id")
 
@@ -324,7 +331,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         )
 
     async def _check_auth_and_handle(
-        self, request: Request, **kwargs: Any
+        self, request: SynapseRequest, **kwargs: Any
     ) -> Tuple[int, JsonDict]:
         """Called on new incoming requests when caching is enabled. Checks
         if there is a cached response for the request and returns that,
@@ -340,8 +347,18 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         if self.CACHE:
             txn_id = kwargs.pop("txn_id")
 
+            # We ignore the `@cancellable` flag, since cancellation wouldn't interupt
+            # `_handle_request` and `ResponseCache` does not handle cancellation
+            # correctly yet. In particular, there may be issues to do with logging
+            # context lifetimes.
+
             return await self.response_cache.wrap(
                 txn_id, self._handle_request, request, **kwargs
             )
 
+        # The `@cancellable` decorator may be applied to `_handle_request`. But we
+        # told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
+        # so we have to set up the cancellable flag ourselves.
+        request.is_render_cancellable = is_method_cancellable(self._handle_request)
+
         return await self._handle_request(request, **kwargs)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 350762f494..a52e25c1af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import PersistedEventPosition, ReadReceipt, UserID
+from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
 from synapse.util.async_helpers import Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -153,19 +153,19 @@ class ReplicationDataHandler:
         if stream_name == TypingStream.NAME:
             self._typing_handler.process_replication_rows(token, rows)
             self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows]
             )
         elif stream_name == PushRulesStream.NAME:
             self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows]
             )
         elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
             self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
             )
         elif stream_name == ReceiptsStream.NAME:
             self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
             )
             await self._pusher_pool.on_new_receipts(
                 token, token, {row.room_id for row in rows}
@@ -173,14 +173,18 @@ class ReplicationDataHandler:
         elif stream_name == ToDeviceStream.NAME:
             entities = [row.entity for row in rows if row.entity.startswith("@")]
             if entities:
-                self.notifier.on_new_event("to_device_key", token, users=entities)
+                self.notifier.on_new_event(
+                    StreamKeyType.TO_DEVICE, token, users=entities
+                )
         elif stream_name == DeviceListsStream.NAME:
             all_room_ids: Set[str] = set()
             for row in rows:
                 if row.entity.startswith("@"):
                     room_ids = await self.store.get_rooms_for_user(row.entity)
                     all_room_ids.update(room_ids)
-            self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
+            self.notifier.on_new_event(
+                StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
+            )
         elif stream_name == GroupServerStream.NAME:
             self.notifier.on_new_event(
                 "groups_key", token, users=[row.user_id for row in rows]
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index fe34948168..32f52e54d8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta):
         # by default, we just use the command name.
         return self.NAME
 
+    def redis_channel_name(self, prefix: str) -> str:
+        """
+        Returns the Redis channel name upon which to publish this command.
+
+        Args:
+            prefix: The prefix for the channel.
+        """
+        return prefix
+
 
 SC = TypeVar("SC", bound="_SimpleCommand")
 
@@ -395,6 +404,9 @@ class UserIpCommand(Command):
             f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
         )
 
+    def redis_channel_name(self, prefix: str) -> str:
+        return f"{prefix}/USER_IP"
+
 
 class RemoteServerUpCommand(_SimpleCommand):
     """Sent when a worker has detected that a remote server is no longer
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 9aba1cd451..e1cbfa50eb 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -1,5 +1,5 @@
 # Copyright 2017 Vector Creations Ltd
-# Copyright 2020 The Matrix.org Foundation C.I.C.
+# Copyright 2020, 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ class ReplicationCommandHandler:
         self._instance_id = hs.get_instance_id()
         self._instance_name = hs.get_instance_name()
 
+        # Additional Redis channel suffixes to subscribe to.
+        self._channels_to_subscribe_to: List[str] = []
+
         self._is_presence_writer = (
             hs.get_instance_name() in hs.config.worker.writers.presence
         )
@@ -243,6 +246,31 @@ class ReplicationCommandHandler:
             # If we're NOT using Redis, this must be handled by the master
             self._should_insert_client_ips = hs.get_instance_name() == "master"
 
+        if self._is_master or self._should_insert_client_ips:
+            self.subscribe_to_channel("USER_IP")
+
+    def subscribe_to_channel(self, channel_name: str) -> None:
+        """
+        Indicates that we wish to subscribe to a Redis channel by name.
+
+        (The name will later be prefixed with the server name; i.e. subscribing
+        to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.)
+
+        Raises:
+          - If replication has already started, then it's too late to subscribe
+            to new channels.
+        """
+
+        if self._factory is not None:
+            # We don't allow subscribing after the fact to avoid the chance
+            # of missing an important message because we didn't subscribe in time.
+            raise RuntimeError(
+                "Cannot subscribe to more channels after replication started."
+            )
+
+        if channel_name not in self._channels_to_subscribe_to:
+            self._channels_to_subscribe_to.append(channel_name)
+
     def _add_command_to_stream_queue(
         self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
     ) -> None:
@@ -321,7 +349,9 @@ class ReplicationCommandHandler:
 
             # Now create the factory/connection for the subscription stream.
             self._factory = RedisDirectTcpReplicationClientFactory(
-                hs, outbound_redis_connection
+                hs,
+                outbound_redis_connection,
+                channel_names=self._channels_to_subscribe_to,
             )
             hs.get_reactor().connectTCP(
                 hs.config.redis.redis_host,
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 989c5be032..fd1c0ec6af 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -14,7 +14,7 @@
 
 import logging
 from inspect import isawaitable
-from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast
 
 import attr
 import txredisapi
@@ -85,14 +85,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
 
     Attributes:
         synapse_handler: The command handler to handle incoming commands.
-        synapse_stream_name: The *redis* stream name to subscribe to and publish
+        synapse_stream_prefix: The *redis* stream name to subscribe to and publish
             from (not anything to do with Synapse replication streams).
         synapse_outbound_redis_connection: The connection to redis to use to send
             commands.
     """
 
     synapse_handler: "ReplicationCommandHandler"
-    synapse_stream_name: str
+    synapse_stream_prefix: str
+    synapse_channel_names: List[str]
     synapse_outbound_redis_connection: txredisapi.ConnectionHandler
 
     def __init__(self, *args: Any, **kwargs: Any):
@@ -117,8 +118,13 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # it's important to make sure that we only send the REPLICATE command once we
         # have successfully subscribed to the stream - otherwise we might miss the
         # POSITION response sent back by the other end.
-        logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
-        await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
+        fully_qualified_stream_names = [
+            f"{self.synapse_stream_prefix}/{stream_suffix}"
+            for stream_suffix in self.synapse_channel_names
+        ] + [self.synapse_stream_prefix]
+        logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names)
+        await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names))
+
         logger.info(
             "Successfully subscribed to redis stream, sending REPLICATE command"
         )
@@ -215,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         # remote instances.
         tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
 
+        channel_name = cmd.redis_channel_name(self.synapse_stream_prefix)
+
         await make_deferred_yieldable(
-            self.synapse_outbound_redis_connection.publish(
-                self.synapse_stream_name, encoded_string
-            )
+            self.synapse_outbound_redis_connection.publish(channel_name, encoded_string)
         )
 
 
@@ -300,20 +306,27 @@ def format_address(address: IAddress) -> str:
 
 class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
     """This is a reconnecting factory that connects to redis and immediately
-    subscribes to a stream.
+    subscribes to some streams.
 
     Args:
         hs
         outbound_redis_connection: A connection to redis that will be used to
             send outbound commands (this is separate to the redis connection
             used to subscribe).
+        channel_names: A list of channel names to append to the base channel name
+            to additionally subscribe to.
+            e.g. if ['ABC', 'DEF'] is specified then we'll listen to:
+            example.com; example.com/ABC; and example.com/DEF.
     """
 
     maxDelay = 5
     protocol = RedisSubscriber
 
     def __init__(
-        self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
+        self,
+        hs: "HomeServer",
+        outbound_redis_connection: txredisapi.ConnectionHandler,
+        channel_names: List[str],
     ):
 
         super().__init__(
@@ -326,7 +339,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
         )
 
         self.synapse_handler = hs.get_replication_command_handler()
-        self.synapse_stream_name = hs.hostname
+        self.synapse_stream_prefix = hs.hostname
+        self.synapse_channel_names = channel_names
 
         self.synapse_outbound_redis_connection = outbound_redis_connection
 
@@ -340,7 +354,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
         # protocol.
         p.synapse_handler = self.synapse_handler
         p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
-        p.synapse_stream_name = self.synapse_stream_name
+        p.synapse_stream_prefix = self.synapse_stream_prefix
+        p.synapse_channel_names = self.synapse_channel_names
 
         return p