diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 322d695bc7..5c2482e40c 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -16,6 +16,7 @@
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
+from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IAddress, IConnector
from twisted.internet.protocol import ReconnectingClientFactory
@@ -314,10 +315,21 @@ class ReplicationDataHandler:
self.send_handler.wake_destination(server)
async def wait_for_stream_position(
- self, instance_name: str, stream_name: str, position: int
+ self,
+ instance_name: str,
+ stream_name: str,
+ position: int,
+ raise_on_timeout: bool = True,
) -> None:
"""Wait until this instance has received updates up to and including
the given stream position.
+
+ Args:
+ instance_name
+ stream_name
+ position
+ raise_on_timeout: Whether to raise an exception if we time out
+ waiting for the updates, or if we log an error and return.
"""
if instance_name == self._instance_name:
@@ -345,7 +357,16 @@ class ReplicationDataHandler:
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
- await make_deferred_yieldable(deferred)
+ try:
+ await make_deferred_yieldable(deferred)
+ except defer.TimeoutError:
+ logger.error("Timed out waiting for stream %s", stream_name)
+
+ if raise_on_timeout:
+ raise
+
+ return
+
logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position
)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 99f09669f0..9d17eff714 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -199,33 +199,28 @@ class ReplicationStreamer:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
- if stream.NAME == EventsStream.NAME:
- # XXX: We only do this for the EventStream as it
- # turns out that e.g. account data streams share
- # their "current token" with each other, meaning
- # that it is *not* safe to send a POSITION.
-
- # Note: `last_token` may not *actually* be the
- # last token we sent out in a RDATA or POSITION.
- # This can happen if we sent out an RDATA for
- # position X when our current token was say X+1.
- # Other workers will see RDATA for X and then a
- # POSITION with last token of X+1, which will
- # cause them to check if there were any missing
- # updates between X and X+1.
- logger.info(
- "Sending position: %s -> %s",
+
+ # Note: `last_token` may not *actually* be the
+ # last token we sent out in a RDATA or POSITION.
+ # This can happen if we sent out an RDATA for
+ # position X when our current token was say X+1.
+ # Other workers will see RDATA for X and then a
+ # POSITION with last token of X+1, which will
+ # cause them to check if there were any missing
+ # updates between X and X+1.
+ logger.info(
+ "Sending position: %s -> %s",
+ stream.NAME,
+ current_token,
+ )
+ self.command_handler.send_command(
+ PositionCommand(
stream.NAME,
+ self._instance_name,
+ last_token,
current_token,
)
- self.command_handler.send_command(
- PositionCommand(
- stream.NAME,
- self._instance_name,
- last_token,
- current_token,
- )
- )
+ )
continue
# Some streams return multiple rows with the same stream IDs,
|