summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-27 19:45:42 +0100
committerErik Johnston <erik@matrix.org>2020-05-27 19:45:42 +0100
commitef3934ec8f123f6f553b07471588fbcc7f444cd8 (patch)
tree3249598988a9d6716860048b4346099f98e55790 /synapse/app/generic_worker.py
parentRemove spurious change (diff)
downloadsynapse-ef3934ec8f123f6f553b07471588fbcc7f444cd8.tar.xz
Ensure we persist and ack the same token
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 28ebf67455..f3ec2a34ec 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -890,16 +890,18 @@ class FederationSenderHandler(object):
             # we're not being re-entered?
 
             with (await self._fed_position_linearizer.queue(None)):
+                # We persist and ack the same position, so we take a copy of it
+                # here as otherwise it can get modified from underneath us.
+                current_position = self.federation_position
+
                 await self.store.update_federation_out_pos(
-                    "federation", self.federation_position
+                    "federation", current_position
                 )
 
                 # We ACK this token over replication so that the master can drop
                 # its in memory queues
-                self._hs.get_tcp_replication().send_federation_ack(
-                    self.federation_position
-                )
-                self._last_ack = self.federation_position
+                self._hs.get_tcp_replication().send_federation_ack(current_position)
+                self._last_ack = current_position
         except Exception:
             logger.exception("Error updating federation stream position")