summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py25
-rw-r--r--synapse/replication/tcp/resource.py43
2 files changed, 42 insertions, 26 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 322d695bc7..5c2482e40c 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -16,6 +16,7 @@
 import logging
 from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
 
+from twisted.internet import defer
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IAddress, IConnector
 from twisted.internet.protocol import ReconnectingClientFactory
@@ -314,10 +315,21 @@ class ReplicationDataHandler:
             self.send_handler.wake_destination(server)
 
     async def wait_for_stream_position(
-        self, instance_name: str, stream_name: str, position: int
+        self,
+        instance_name: str,
+        stream_name: str,
+        position: int,
+        raise_on_timeout: bool = True,
     ) -> None:
         """Wait until this instance has received updates up to and including
         the given stream position.
+
+        Args:
+            instance_name
+            stream_name
+            position
+            raise_on_timeout: Whether to raise an exception if we time out
+                waiting for the updates, or if we log an error and return.
         """
 
         if instance_name == self._instance_name:
@@ -345,7 +357,16 @@ class ReplicationDataHandler:
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):
             logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
-            await make_deferred_yieldable(deferred)
+            try:
+                await make_deferred_yieldable(deferred)
+            except defer.TimeoutError:
+                logger.error("Timed out waiting for stream %s", stream_name)
+
+                if raise_on_timeout:
+                    raise
+
+                return
+
             logger.info(
                 "Finished waiting for repl stream %r to reach %s", stream_name, position
             )
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 99f09669f0..9d17eff714 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -199,33 +199,28 @@ class ReplicationStreamer:
                             # The token has advanced but there is no data to
                             # send, so we send a `POSITION` to inform other
                             # workers of the updated position.
-                            if stream.NAME == EventsStream.NAME:
-                                # XXX: We only do this for the EventStream as it
-                                # turns out that e.g. account data streams share
-                                # their "current token" with each other, meaning
-                                # that it is *not* safe to send a POSITION.
-
-                                # Note: `last_token` may not *actually* be the
-                                # last token we sent out in a RDATA or POSITION.
-                                # This can happen if we sent out an RDATA for
-                                # position X when our current token was say X+1.
-                                # Other workers will see RDATA for X and then a
-                                # POSITION with last token of X+1, which will
-                                # cause them to check if there were any missing
-                                # updates between X and X+1.
-                                logger.info(
-                                    "Sending position: %s -> %s",
+
+                            # Note: `last_token` may not *actually* be the
+                            # last token we sent out in a RDATA or POSITION.
+                            # This can happen if we sent out an RDATA for
+                            # position X when our current token was say X+1.
+                            # Other workers will see RDATA for X and then a
+                            # POSITION with last token of X+1, which will
+                            # cause them to check if there were any missing
+                            # updates between X and X+1.
+                            logger.info(
+                                "Sending position: %s -> %s",
+                                stream.NAME,
+                                current_token,
+                            )
+                            self.command_handler.send_command(
+                                PositionCommand(
                                     stream.NAME,
+                                    self._instance_name,
+                                    last_token,
                                     current_token,
                                 )
-                                self.command_handler.send_command(
-                                    PositionCommand(
-                                        stream.NAME,
-                                        self._instance_name,
-                                        last_token,
-                                        current_token,
-                                    )
-                                )
+                            )
                             continue
 
                         # Some streams return multiple rows with the same stream IDs,