diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-27 20:06:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-27 20:06:29 +0100 |
commit | 8c5f88fa4d80f48d3080c7d240aa7ceaf454d690 (patch) | |
tree | 305d2c1325e730ceb275824cd0b4c6b1bb0d718c /synapse/app/generic_worker.py | |
parent | allow emails to be passed through SAML (#7385) (diff) | |
parent | Ensure we persist and ack the same token (diff) | |
download | synapse-8c5f88fa4d80f48d3080c7d240aa7ceaf454d690.tar.xz |
Merge pull request #7584 from matrix-org/erikj/save_and_send_fed_token_in_bg
Speed up processing of federation stream RDATA rows.
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r-- | synapse/app/generic_worker.py | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 5afe52f8d4..f3ec2a34ec 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -863,9 +863,24 @@ class FederationSenderHandler(object): a FEDERATION_ACK back to the master, and stores the token that we have processed in `federation_stream_position` so that we can restart where we left off. """ - try: - self.federation_position = token + self.federation_position = token + + # We save and send the ACK to master asynchronously, so we don't block + # processing on persistence. We don't need to do this operation for + # every single RDATA we receive, we just need to do it periodically. + + if self._fed_position_linearizer.is_queued(None): + # There is already a task queued up to save and send the token, so + # no need to queue up another task. + return + + run_as_background_process("_save_and_send_ack", self._save_and_send_ack) + async def _save_and_send_ack(self): + """Save the current federation position in the database and send an ACK + to master with where we're up to. + """ + try: # We linearize here to ensure we don't have races updating the token # # XXX this appears to be redundant, since the ReplicationCommandHandler @@ -875,16 +890,18 @@ class FederationSenderHandler(object): # we're not being re-entered? with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + await self.store.update_federation_out_pos( - "federation", self.federation_position + "federation", current_position ) # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + self._hs.get_tcp_replication().send_federation_ack(current_position) + self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position") |