diff options
author | Erik Johnston <erik@matrix.org> | 2023-01-20 21:04:33 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-20 21:04:33 +0000 |
commit | 0ec12a37538d0df07d96cfc9cf5f5208f7453607 (patch) | |
tree | f5d217dac0a6cdf5d7734a25724a8f903a134d76 /synapse/replication | |
parent | Always notify replication when a stream advances (#14877) (diff) | |
download | synapse-0ec12a37538d0df07d96cfc9cf5f5208f7453607.tar.xz |
Reduce max time we wait for stream positions (#14881)
Now that we wait for stream positions whenever we do a HTTP replication hit, we need to be less brutal in the case where we do timeout (as we have bugs around this).
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/http/_base.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 21 |
2 files changed, 11 insertions, 12 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 709327b97f..908f3f1db7 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): instance_name=instance_name, stream_name=stream_name, position=position, - raise_on_timeout=False, ) return result @@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): instance_name=content[_STREAM_POSITION_KEY]["instance_name"], stream_name=stream_name, position=position, - raise_on_timeout=False, ) if self.CACHE: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 6e242c5749..493f616679 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -59,7 +59,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) # How long we allow callers to wait for replication updates before timing out. -_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30 +_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5 class DirectTcpReplicationClientFactory(ReconnectingClientFactory): @@ -326,7 +326,6 @@ class ReplicationDataHandler: instance_name: str, stream_name: str, position: int, - raise_on_timeout: bool = True, ) -> None: """Wait until this instance has received updates up to and including the given stream position. @@ -335,8 +334,6 @@ class ReplicationDataHandler: instance_name stream_name position - raise_on_timeout: Whether to raise an exception if we time out - waiting for the updates, or if we log an error and return. """ if instance_name == self._instance_name: @@ -365,19 +362,23 @@ class ReplicationDataHandler: # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): - logger.info("Waiting for repl stream %r to reach %s", stream_name, position) + logger.info( + "Waiting for repl stream %r to reach %s (%s)", + stream_name, + position, + instance_name, + ) try: await make_deferred_yieldable(deferred) except defer.TimeoutError: logger.error("Timed out waiting for stream %s", stream_name) - - if raise_on_timeout: - raise - return logger.info( - "Finished waiting for repl stream %r to reach %s", stream_name, position + "Finished waiting for repl stream %r to reach %s (%s)", + stream_name, + position, + instance_name, ) def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: |