summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-02-24 14:39:50 +0000
committerGitHub <noreply@github.com>2023-02-24 14:39:50 +0000
commitb2357a898cdd1f4a2222609abfe471801ea88dcd (patch)
treeb02eb9a2d99e5bbf66743fe644c4629f731e25e8 /synapse/replication/tcp/resource.py
parentTweak changelog (diff)
downloadsynapse-b2357a898cdd1f4a2222609abfe471801ea88dcd.tar.xz
Fix bug where 5s delays would occasionally happen. (#15150)
This only affects deployments using workers.
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 9d17eff714..347467d863 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer:
                             except Exception:
                                 logger.exception("Failed to replicate")
 
+                        # The last token we send may not match the current
+                        # token, in which case we want to send out a `POSITION`
+                        # to tell other workers the actual current position.
+                        if updates[-1][0] < current_token:
+                            logger.info(
+                                "Sending position: %s -> %s",
+                                stream.NAME,
+                                current_token,
+                            )
+                            self.command_handler.send_command(
+                                PositionCommand(
+                                    stream.NAME,
+                                    self._instance_name,
+                                    updates[-1][0],
+                                    current_token,
+                                )
+                            )
+
             logger.debug("No more pending updates, breaking poke loop")
         finally:
             self.pending_updates = False