summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py47
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/replication/tcp/external_cache.py4
-rw-r--r--synapse/replication/tcp/handler.py6
-rw-r--r--synapse/replication/tcp/redis.py8
-rw-r--r--synapse/replication/tcp/resource.py10
-rw-r--r--synapse/replication/tcp/streams/_base.py12
8 files changed, 66 insertions, 29 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e697c74a6..f1abb98653 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
 
 from prometheus_client import Counter, Gauge
 
+from twisted.internet.error import ConnectError, DNSLookupError
 from twisted.web.server import Request
 
 from synapse.api.errors import HttpResponseException, SynapseError
@@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             `_handle_request` must return a Deferred.
         RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
             is received.
+        RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
+            a connection error is received.
+        RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
+            receiving connection errors, each will backoff exponentially longer.
     """
 
     NAME: str = abc.abstractproperty()  # type: ignore
@@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
     METHOD = "POST"
     CACHE = True
     RETRY_ON_TIMEOUT = True
+    RETRY_ON_CONNECT_ERROR = True
+    RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5  # =63s (2^6-1)
 
     def __init__(self, hs: "HomeServer"):
         if self.CACHE:
@@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                     "/".join(url_args),
                 )
 
+                headers: Dict[bytes, List[bytes]] = {}
+                # Add an authorization header, if configured.
+                if replication_secret:
+                    headers[b"Authorization"] = [b"Bearer " + replication_secret]
+                opentracing.inject_header_dict(headers, check_destination=False)
+
                 try:
+                    # Keep track of attempts made so we can bail if we don't manage to
+                    # connect to the target after N tries.
+                    attempts = 0
                     # We keep retrying the same request for timeouts. This is so that we
                     # have a good idea that the request has either succeeded or failed
                     # on the master, and so whether we should clean up or not.
                     while True:
-                        headers: Dict[bytes, List[bytes]] = {}
-                        # Add an authorization header, if configured.
-                        if replication_secret:
-                            headers[b"Authorization"] = [
-                                b"Bearer " + replication_secret
-                            ]
-                        opentracing.inject_header_dict(headers, check_destination=False)
                         try:
                             result = await request_func(uri, data, headers=headers)
                             break
@@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                             if not cls.RETRY_ON_TIMEOUT:
                                 raise
 
-                        logger.warning("%s request timed out; retrying", cls.NAME)
+                            logger.warning("%s request timed out; retrying", cls.NAME)
+
+                            # If we timed out we probably don't need to worry about backing
+                            # off too much, but lets just wait a little anyway.
+                            await clock.sleep(1)
+                        except (ConnectError, DNSLookupError) as e:
+                            if not cls.RETRY_ON_CONNECT_ERROR:
+                                raise
+                            if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
+                                raise
+
+                            delay = 2 ** attempts
+                            logger.warning(
+                                "%s request connection failed; retrying in %ds: %r",
+                                cls.NAME,
+                                delay,
+                                e,
+                            )
 
-                        # If we timed out we probably don't need to worry about backing
-                        # off too much, but lets just wait a little anyway.
-                        await clock.sleep(1)
+                            await clock.sleep(delay)
+                            attempts += 1
                 except HttpResponseException as e:
                     # We convert to SynapseError as we know that it was a SynapseError
                     # on the main process that we should send to the client. (And
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index b5b84c09ae..14706a0817 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -54,6 +54,6 @@ class SlavedClientIpStore(BaseSlavedStore):
 
         self.client_ip_last_seen.set(key, now)
 
-        self.hs.get_tcp_replication().send_user_ip(
+        self.hs.get_replication_command_handler().send_user_ip(
             user_id, access_token, ip, user_agent, device_id, now
         )
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b8479b0b4..deeaaec4e6 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler:
             # changes.
             hosts = {row.entity for row in rows if not row.entity.startswith("@")}
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
@@ -462,6 +462,8 @@ class FederationSenderHandler:
 
                 # We ACK this token over replication so that the master can drop
                 # its in memory queues
-                self._hs.get_tcp_replication().send_federation_ack(current_position)
+                self._hs.get_replication_command_handler().send_federation_ack(
+                    current_position
+                )
         except Exception:
             logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index aaf91e5e02..bf7d017968 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -21,7 +21,7 @@ from synapse.logging.context import make_deferred_yieldable
 from synapse.util import json_decoder, json_encoder
 
 if TYPE_CHECKING:
-    from txredisapi import RedisProtocol
+    from txredisapi import ConnectionHandler
 
     from synapse.server import HomeServer
 
@@ -63,7 +63,7 @@ class ExternalCache:
     def __init__(self, hs: "HomeServer"):
         if hs.config.redis.redis_enabled:
             self._redis_connection: Optional[
-                "RedisProtocol"
+                "ConnectionHandler"
             ] = hs.get_outbound_redis_connection()
         else:
             self._redis_connection = None
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0d2013a3cf..b217c35f99 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -295,9 +295,7 @@ class ReplicationCommandHandler:
             raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
 
     def start_replication(self, hs: "HomeServer") -> None:
-        """Helper method to start a replication connection to the remote server
-        using TCP.
-        """
+        """Helper method to start replication."""
         if hs.config.redis.redis_enabled:
             from synapse.replication.tcp.redis import (
                 RedisDirectTcpReplicationClientFactory,
@@ -711,7 +709,7 @@ class ReplicationCommandHandler:
         self.send_command(RemoteServerUpCommand(server))
 
     def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
-        """Called when a new update is available to stream to clients.
+        """Called when a new update is available to stream to Redis subscribers.
 
         We need to check if the client is interested in the stream or not
         """
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 3170f7c59b..989c5be032 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -93,7 +93,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
 
     synapse_handler: "ReplicationCommandHandler"
     synapse_stream_name: str
-    synapse_outbound_redis_connection: txredisapi.RedisProtocol
+    synapse_outbound_redis_connection: txredisapi.ConnectionHandler
 
     def __init__(self, *args: Any, **kwargs: Any):
         super().__init__(*args, **kwargs)
@@ -313,7 +313,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
     protocol = RedisSubscriber
 
     def __init__(
-        self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
+        self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
     ):
 
         super().__init__(
@@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
             password=hs.config.redis.redis_password,
         )
 
-        self.synapse_handler = hs.get_tcp_replication()
+        self.synapse_handler = hs.get_replication_command_handler()
         self.synapse_stream_name = hs.hostname
 
         self.synapse_outbound_redis_connection = outbound_redis_connection
@@ -353,7 +353,7 @@ def lazyConnection(
     reconnect: bool = True,
     password: Optional[str] = None,
     replyTimeout: int = 30,
-) -> txredisapi.RedisProtocol:
+) -> txredisapi.ConnectionHandler:
     """Creates a connection to Redis that is lazily set up and reconnects if the
     connections is lost.
     """
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 494e42a2be..c6870df8f9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
     """Factory for new replication connections."""
 
     def __init__(self, hs: "HomeServer"):
-        self.command_handler = hs.get_tcp_replication()
+        self.command_handler = hs.get_replication_command_handler()
         self.clock = hs.get_clock()
         self.server_name = hs.config.server.server_name
 
@@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory):
 class ReplicationStreamer:
     """Handles replication connections.
 
-    This needs to be poked when new replication data may be available. When new
-    data is available it will propagate to all connected clients.
+    This needs to be poked when new replication data may be available.
+    When new data is available it will propagate to all Redis subscribers.
     """
 
     def __init__(self, hs: "HomeServer"):
@@ -85,7 +85,7 @@ class ReplicationStreamer:
         self.is_looping = False
         self.pending_updates = False
 
-        self.command_handler = hs.get_tcp_replication()
+        self.command_handler = hs.get_replication_command_handler()
 
         # Set of streams to replicate.
         self.streams = self.command_handler.get_streams_to_replicate()
@@ -109,7 +109,7 @@ class ReplicationStreamer:
 
     def on_notifier_poke(self) -> None:
         """Checks if there is actually any new data and sends it to the
-        connections if there are.
+        Redis subscribers if there are.
 
         This should get called each time new data is available, even if it
         is currently being executed, so that nothing gets missed
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 23d631a769..495f2f0285 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -316,7 +316,19 @@ class PresenceFederationStream(Stream):
 class TypingStream(Stream):
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class TypingStreamRow:
+        """
+        An entry in the typing stream.
+        Describes all the users that are 'typing' right now in one room.
+
+        When a user stops typing, it will be streamed as a new update with that
+        user absent; you can think of the `user_ids` list as overwriting the
+        entire list that was there previously.
+        """
+
+        # The room that this update is for.
         room_id: str
+
+        # All the users that are 'typing' right now in the specified room.
         user_ids: List[str]
 
     NAME = "typing"