summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-20 21:04:33 +0000
committerGitHub <noreply@github.com>2023-01-20 21:04:33 +0000
commit0ec12a37538d0df07d96cfc9cf5f5208f7453607 (patch)
treef5d217dac0a6cdf5d7734a25724a8f903a134d76 /synapse/replication
parentAlways notify replication when a stream advances (#14877) (diff)
downloadsynapse-0ec12a37538d0df07d96cfc9cf5f5208f7453607.tar.xz
Reduce max time we wait for stream positions (#14881)
Now that we wait for stream positions whenever we do a HTTP replication
hit, we need to be less brutal in the case where we do timeout (as we
have bugs around this).
Diffstat (limited to '')
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/replication/tcp/client.py21
2 files changed, 11 insertions, 12 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 709327b97f..908f3f1db7 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                         instance_name=instance_name,
                         stream_name=stream_name,
                         position=position,
-                        raise_on_timeout=False,
                     )
 
                 return result
@@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                 instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
                 stream_name=stream_name,
                 position=position,
-                raise_on_timeout=False,
             )
 
         if self.CACHE:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6e242c5749..493f616679 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -59,7 +59,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 # How long we allow callers to wait for replication updates before timing out.
-_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
+_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
 
 
 class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
@@ -326,7 +326,6 @@ class ReplicationDataHandler:
         instance_name: str,
         stream_name: str,
         position: int,
-        raise_on_timeout: bool = True,
     ) -> None:
         """Wait until this instance has received updates up to and including
         the given stream position.
@@ -335,8 +334,6 @@ class ReplicationDataHandler:
             instance_name
             stream_name
             position
-            raise_on_timeout: Whether to raise an exception if we time out
-                waiting for the updates, or if we log an error and return.
         """
 
         if instance_name == self._instance_name:
@@ -365,19 +362,23 @@ class ReplicationDataHandler:
 
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):
-            logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
+            logger.info(
+                "Waiting for repl stream %r to reach %s (%s)",
+                stream_name,
+                position,
+                instance_name,
+            )
             try:
                 await make_deferred_yieldable(deferred)
             except defer.TimeoutError:
                 logger.error("Timed out waiting for stream %s", stream_name)
-
-                if raise_on_timeout:
-                    raise
-
                 return
 
             logger.info(
-                "Finished waiting for repl stream %r to reach %s", stream_name, position
+                "Finished waiting for repl stream %r to reach %s (%s)",
+                stream_name,
+                position,
+                instance_name,
             )
 
     def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: