diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e697c74a6..f1abb98653 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
from prometheus_client import Counter, Gauge
+from twisted.internet.error import ConnectError, DNSLookupError
from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError
@@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
+ RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
+ a connection error is received.
+ RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
+ receiving connection errors, each will backoff exponentially longer.
"""
NAME: str = abc.abstractproperty() # type: ignore
@@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
+ RETRY_ON_CONNECT_ERROR = True
+ RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)
def __init__(self, hs: "HomeServer"):
if self.CACHE:
@@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"/".join(url_args),
)
+ headers: Dict[bytes, List[bytes]] = {}
+ # Add an authorization header, if configured.
+ if replication_secret:
+ headers[b"Authorization"] = [b"Bearer " + replication_secret]
+ opentracing.inject_header_dict(headers, check_destination=False)
+
try:
+ # Keep track of attempts made so we can bail if we don't manage to
+ # connect to the target after N tries.
+ attempts = 0
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
- headers: Dict[bytes, List[bytes]] = {}
- # Add an authorization header, if configured.
- if replication_secret:
- headers[b"Authorization"] = [
- b"Bearer " + replication_secret
- ]
- opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
@@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if not cls.RETRY_ON_TIMEOUT:
raise
- logger.warning("%s request timed out; retrying", cls.NAME)
+ logger.warning("%s request timed out; retrying", cls.NAME)
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ await clock.sleep(1)
+ except (ConnectError, DNSLookupError) as e:
+ if not cls.RETRY_ON_CONNECT_ERROR:
+ raise
+ if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
+ raise
+
+ delay = 2 ** attempts
+ logger.warning(
+ "%s request connection failed; retrying in %ds: %r",
+ cls.NAME,
+ delay,
+ e,
+ )
- # If we timed out we probably don't need to worry about backing
- # off too much, but lets just wait a little anyway.
- await clock.sleep(1)
+ await clock.sleep(delay)
+ attempts += 1
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index b5b84c09ae..14706a0817 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -54,6 +54,6 @@ class SlavedClientIpStore(BaseSlavedStore):
self.client_ip_last_seen.set(key, now)
- self.hs.get_tcp_replication().send_user_ip(
+ self.hs.get_replication_command_handler().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b8479b0b4..deeaaec4e6 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler:
# changes.
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts:
- self.federation_sender.send_device_messages(host)
+ self.federation_sender.send_device_messages(host, immediate=False)
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
@@ -462,6 +462,8 @@ class FederationSenderHandler:
# We ACK this token over replication so that the master can drop
# its in memory queues
- self._hs.get_tcp_replication().send_federation_ack(current_position)
+ self._hs.get_replication_command_handler().send_federation_ack(
+ current_position
+ )
except Exception:
logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index aaf91e5e02..bf7d017968 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -21,7 +21,7 @@ from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
if TYPE_CHECKING:
- from txredisapi import RedisProtocol
+ from txredisapi import ConnectionHandler
from synapse.server import HomeServer
@@ -63,7 +63,7 @@ class ExternalCache:
def __init__(self, hs: "HomeServer"):
if hs.config.redis.redis_enabled:
self._redis_connection: Optional[
- "RedisProtocol"
+ "ConnectionHandler"
] = hs.get_outbound_redis_connection()
else:
self._redis_connection = None
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0d2013a3cf..b217c35f99 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -295,9 +295,7 @@ class ReplicationCommandHandler:
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
def start_replication(self, hs: "HomeServer") -> None:
- """Helper method to start a replication connection to the remote server
- using TCP.
- """
+ """Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
@@ -711,7 +709,7 @@ class ReplicationCommandHandler:
self.send_command(RemoteServerUpCommand(server))
def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
- """Called when a new update is available to stream to clients.
+ """Called when a new update is available to stream to Redis subscribers.
We need to check if the client is interested in the stream or not
"""
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 3170f7c59b..989c5be032 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -93,7 +93,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
synapse_handler: "ReplicationCommandHandler"
synapse_stream_name: str
- synapse_outbound_redis_connection: txredisapi.RedisProtocol
+ synapse_outbound_redis_connection: txredisapi.ConnectionHandler
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
@@ -313,7 +313,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
protocol = RedisSubscriber
def __init__(
- self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
+ self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
):
super().__init__(
@@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
password=hs.config.redis.redis_password,
)
- self.synapse_handler = hs.get_tcp_replication()
+ self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname
self.synapse_outbound_redis_connection = outbound_redis_connection
@@ -353,7 +353,7 @@ def lazyConnection(
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
-) -> txredisapi.RedisProtocol:
+) -> txredisapi.ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connections is lost.
"""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 494e42a2be..c6870df8f9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
"""Factory for new replication connections."""
def __init__(self, hs: "HomeServer"):
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
self.clock = hs.get_clock()
self.server_name = hs.config.server.server_name
@@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory):
class ReplicationStreamer:
"""Handles replication connections.
- This needs to be poked when new replication data may be available. When new
- data is available it will propagate to all connected clients.
+ This needs to be poked when new replication data may be available.
+ When new data is available it will propagate to all Redis subscribers.
"""
def __init__(self, hs: "HomeServer"):
@@ -85,7 +85,7 @@ class ReplicationStreamer:
self.is_looping = False
self.pending_updates = False
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
@@ -109,7 +109,7 @@ class ReplicationStreamer:
def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the
- connections if there are.
+ Redis subscribers if there are.
This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 23d631a769..495f2f0285 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -316,7 +316,19 @@ class PresenceFederationStream(Stream):
class TypingStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingStreamRow:
+ """
+ An entry in the typing stream.
+ Describes all the users that are 'typing' right now in one room.
+
+ When a user stops typing, it will be streamed as a new update with that
+ user absent; you can think of the `user_ids` list as overwriting the
+ entire list that was there previously.
+ """
+
+ # The room that this update is for.
room_id: str
+
+ # All the users that are 'typing' right now in the specified room.
user_ids: List[str]
NAME = "typing"
|