diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-27 19:45:42 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-05-27 19:45:42 +0100 |
commit | ef3934ec8f123f6f553b07471588fbcc7f444cd8 (patch) | |
tree | 3249598988a9d6716860048b4346099f98e55790 /synapse | |
parent | Remove spurious change (diff) | |
download | synapse-ef3934ec8f123f6f553b07471588fbcc7f444cd8.tar.xz |
Ensure we persist and ack the same token
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/generic_worker.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 28ebf67455..f3ec2a34ec 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -890,16 +890,18 @@ class FederationSenderHandler(object): # we're not being re-entered? with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + await self.store.update_federation_out_pos( - "federation", self.federation_position + "federation", current_position ) # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + self._hs.get_tcp_replication().send_federation_ack(current_position) + self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position") |