summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-27 19:45:42 +0100
committerErik Johnston <erik@matrix.org>2020-05-27 19:45:42 +0100
commitef3934ec8f123f6f553b07471588fbcc7f444cd8 (patch)
tree3249598988a9d6716860048b4346099f98e55790 /synapse/app
parentRemove spurious change (diff)
downloadsynapse-ef3934ec8f123f6f553b07471588fbcc7f444cd8.tar.xz
Ensure we persist and ack the same token
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/generic_worker.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py

index 28ebf67455..f3ec2a34ec 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -890,16 +890,18 @@ class FederationSenderHandler(object): # we're not being re-entered? with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + await self.store.update_federation_out_pos( - "federation", self.federation_position + "federation", current_position ) # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + self._hs.get_tcp_replication().send_federation_ack(current_position) + self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position")