summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorH. Shay <hillerys@element.io>2023-03-06 12:48:59 -0800
committerH. Shay <hillerys@element.io>2023-03-06 12:48:59 -0800
commit7fc487421f19d8841c36481701655bc31bfcd79f (patch)
treee5c4e634fa957a561675156c868c561c36a71d66 /synapse/replication/tcp/resource.py
parentadd clearer return values (diff)
parentPass the requester during event serialization. (#15174) (diff)
downloadsynapse-7fc487421f19d8841c36481701655bc31bfcd79f.tar.xz
Merge branch 'develop' into shay/rework_module
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py

index 9d17eff714..347467d863 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer: except Exception: logger.exception("Failed to replicate") + # The last token we send may not match the current + # token, in which case we want to send out a `POSITION` + # to tell other workers the actual current position. + if updates[-1][0] < current_token: + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + updates[-1][0], + current_token, + ) + ) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False