summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14881.misc1
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/replication/tcp/client.py21
3 files changed, 12 insertions, 12 deletions
diff --git a/changelog.d/14881.misc b/changelog.d/14881.misc
new file mode 100644
index 0000000000..be89d092b6
--- /dev/null
+++ b/changelog.d/14881.misc
@@ -0,0 +1 @@
+Reduce max time we wait for stream positions.
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 709327b97f..908f3f1db7 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                         instance_name=instance_name,
                         stream_name=stream_name,
                         position=position,
-                        raise_on_timeout=False,
                     )
 
                 return result
@@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                 instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
                 stream_name=stream_name,
                 position=position,
-                raise_on_timeout=False,
             )
 
         if self.CACHE:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6e242c5749..493f616679 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -59,7 +59,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 # How long we allow callers to wait for replication updates before timing out.
-_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
+_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
 
 
 class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
@@ -326,7 +326,6 @@ class ReplicationDataHandler:
         instance_name: str,
         stream_name: str,
         position: int,
-        raise_on_timeout: bool = True,
     ) -> None:
         """Wait until this instance has received updates up to and including
         the given stream position.
@@ -335,8 +334,6 @@ class ReplicationDataHandler:
             instance_name
             stream_name
             position
-            raise_on_timeout: Whether to raise an exception if we time out
-                waiting for the updates, or if we log an error and return.
         """
 
         if instance_name == self._instance_name:
@@ -365,19 +362,23 @@ class ReplicationDataHandler:
 
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):
-            logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
+            logger.info(
+                "Waiting for repl stream %r to reach %s (%s)",
+                stream_name,
+                position,
+                instance_name,
+            )
             try:
                 await make_deferred_yieldable(deferred)
             except defer.TimeoutError:
                 logger.error("Timed out waiting for stream %s", stream_name)
-
-                if raise_on_timeout:
-                    raise
-
                 return
 
             logger.info(
-                "Finished waiting for repl stream %r to reach %s", stream_name, position
+                "Finished waiting for repl stream %r to reach %s (%s)",
+                stream_name,
+                position,
+                instance_name,
             )
 
     def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: