diff --git a/changelog.d/14881.misc b/changelog.d/14881.misc
new file mode 100644
index 0000000000..be89d092b6
--- /dev/null
+++ b/changelog.d/14881.misc
@@ -0,0 +1 @@
+Reduce max time we wait for stream positions.
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 709327b97f..908f3f1db7 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=instance_name,
stream_name=stream_name,
position=position,
- raise_on_timeout=False,
)
return result
@@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
stream_name=stream_name,
position=position,
- raise_on_timeout=False,
)
if self.CACHE:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6e242c5749..493f616679 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -59,7 +59,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long we allow callers to wait for replication updates before timing out.
-_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
+_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
@@ -326,7 +326,6 @@ class ReplicationDataHandler:
instance_name: str,
stream_name: str,
position: int,
- raise_on_timeout: bool = True,
) -> None:
"""Wait until this instance has received updates up to and including
the given stream position.
@@ -335,8 +334,6 @@ class ReplicationDataHandler:
instance_name
stream_name
position
- raise_on_timeout: Whether to raise an exception if we time out
- waiting for the updates, or if we log an error and return.
"""
if instance_name == self._instance_name:
@@ -365,19 +362,23 @@ class ReplicationDataHandler:
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
- logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
+ logger.info(
+ "Waiting for repl stream %r to reach %s (%s)",
+ stream_name,
+ position,
+ instance_name,
+ )
try:
await make_deferred_yieldable(deferred)
except defer.TimeoutError:
logger.error("Timed out waiting for stream %s", stream_name)
-
- if raise_on_timeout:
- raise
-
return
logger.info(
- "Finished waiting for repl stream %r to reach %s", stream_name, position
+ "Finished waiting for repl stream %r to reach %s (%s)",
+ stream_name,
+ position,
+ instance_name,
)
def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
|