summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-27 19:31:44 +0100
committerErik Johnston <erik@matrix.org>2020-05-27 19:34:07 +0100
commit35c308731d3a014f20a91467830c808358681259 (patch)
treed6db0def2a763cefe67ac50227c70143b3c664ab /synapse/app
parentImprove changelog wording (diff)
downloadsynapse-35c308731d3a014f20a91467830c808358681259.tar.xz
Speed up processing of federation stream RDATA rows.
Instead of storing and sending an ACK for every single row we send
synchronously, we instead do it asynchronously while batching up
updates.
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/generic_worker.py19
1 files changed, 17 insertions, 2 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5afe52f8d4..28ebf67455 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -863,9 +863,24 @@ class FederationSenderHandler(object):
         a FEDERATION_ACK back to the master, and stores the token that we have processed
          in `federation_stream_position` so that we can restart where we left off.
         """
-        try:
-            self.federation_position = token
+        self.federation_position = token
+
+        # We save and send the ACK to master asynchronously, so we don't block
+        # processing on persistence. We don't need to do this operation for
+        # every single RDATA we receive, we just need to do it periodically.
+
+        if self._fed_position_linearizer.is_queued(None):
+            # There is already a task queued up to save and send the token, so
+            # no need to queue up another task.
+            return
 
+        run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
+
+    async def _save_and_send_ack(self):
+        """Save the current federation position in the database and send an ACK
+        to master with where we're up to.
+        """
+        try:
             # We linearize here to ensure we don't have races updating the token
             #
             # XXX this appears to be redundant, since the ReplicationCommandHandler