summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-18 19:35:29 +0000
committerGitHub <noreply@github.com>2023-01-18 19:35:29 +0000
commit9187fd940e2b2bbfd4df7204053cc26b2707aad4 (patch)
treef01de70e9daf00857dff25751e8dbc7c162e5fe5 /synapse/replication/tcp/resource.py
parentChange default room version to 10. Implements MSC3904 (#14111) (diff)
downloadsynapse-9187fd940e2b2bbfd4df7204053cc26b2707aad4.tar.xz
Wait for streams to catch up when processing HTTP replication. (#14820)
This should hopefully mitigate a class of races where data gets out of
sync due a HTTP replication request racing with the replication streams.
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py43
1 files changed, 19 insertions, 24 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 99f09669f0..9d17eff714 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -199,33 +199,28 @@ class ReplicationStreamer:
                             # The token has advanced but there is no data to
                             # send, so we send a `POSITION` to inform other
                             # workers of the updated position.
-                            if stream.NAME == EventsStream.NAME:
-                                # XXX: We only do this for the EventStream as it
-                                # turns out that e.g. account data streams share
-                                # their "current token" with each other, meaning
-                                # that it is *not* safe to send a POSITION.
-
-                                # Note: `last_token` may not *actually* be the
-                                # last token we sent out in a RDATA or POSITION.
-                                # This can happen if we sent out an RDATA for
-                                # position X when our current token was say X+1.
-                                # Other workers will see RDATA for X and then a
-                                # POSITION with last token of X+1, which will
-                                # cause them to check if there were any missing
-                                # updates between X and X+1.
-                                logger.info(
-                                    "Sending position: %s -> %s",
+
+                            # Note: `last_token` may not *actually* be the
+                            # last token we sent out in a RDATA or POSITION.
+                            # This can happen if we sent out an RDATA for
+                            # position X when our current token was say X+1.
+                            # Other workers will see RDATA for X and then a
+                            # POSITION with last token of X+1, which will
+                            # cause them to check if there were any missing
+                            # updates between X and X+1.
+                            logger.info(
+                                "Sending position: %s -> %s",
+                                stream.NAME,
+                                current_token,
+                            )
+                            self.command_handler.send_command(
+                                PositionCommand(
                                     stream.NAME,
+                                    self._instance_name,
+                                    last_token,
                                     current_token,
                                 )
-                                self.command_handler.send_command(
-                                    PositionCommand(
-                                        stream.NAME,
-                                        self._instance_name,
-                                        last_token,
-                                        current_token,
-                                    )
-                                )
+                            )
                             continue
 
                         # Some streams return multiple rows with the same stream IDs,