summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7584.misc1
-rw-r--r--synapse/app/generic_worker.py31
-rw-r--r--synapse/util/async_helpers.py12
-rw-r--r--tests/util/test_linearizer.py32
4 files changed, 69 insertions, 7 deletions
diff --git a/changelog.d/7584.misc b/changelog.d/7584.misc
new file mode 100644
index 0000000000..55d1689f77
--- /dev/null
+++ b/changelog.d/7584.misc
@@ -0,0 +1 @@
+Speed up processing of federation stream RDATA rows.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5afe52f8d4..f3ec2a34ec 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -863,9 +863,24 @@ class FederationSenderHandler(object):
         a FEDERATION_ACK back to the master, and stores the token that we have processed
          in `federation_stream_position` so that we can restart where we left off.
         """
-        try:
-            self.federation_position = token
+        self.federation_position = token
+
+        # We save and send the ACK to master asynchronously, so we don't block
+        # processing on persistence. We don't need to do this operation for
+        # every single RDATA we receive, we just need to do it periodically.
+
+        if self._fed_position_linearizer.is_queued(None):
+            # There is already a task queued up to save and send the token, so
+            # no need to queue up another task.
+            return
+
+        run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
 
+    async def _save_and_send_ack(self):
+        """Save the current federation position in the database and send an ACK
+        to master with where we're up to.
+        """
+        try:
             # We linearize here to ensure we don't have races updating the token
             #
             # XXX this appears to be redundant, since the ReplicationCommandHandler
@@ -875,16 +890,18 @@ class FederationSenderHandler(object):
             # we're not being re-entered?
 
             with (await self._fed_position_linearizer.queue(None)):
+                # We persist and ack the same position, so we take a copy of it
+                # here as otherwise it can get modified from underneath us.
+                current_position = self.federation_position
+
                 await self.store.update_federation_out_pos(
-                    "federation", self.federation_position
+                    "federation", current_position
                 )
 
                 # We ACK this token over replication so that the master can drop
                 # its in memory queues
-                self._hs.get_tcp_replication().send_federation_ack(
-                    self.federation_position
-                )
-                self._last_ack = self.federation_position
+                self._hs.get_tcp_replication().send_federation_ack(current_position)
+                self._last_ack = current_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 581dffd8a0..f7af2bca7f 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -225,6 +225,18 @@ class Linearizer(object):
             {}
         )  # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
 
+    def is_queued(self, key) -> bool:
+        """Checks whether there is a process queued up waiting
+        """
+        entry = self.key_to_defer.get(key)
+        if not entry:
+            # No entry so nothing is waiting.
+            return False
+
+        # There are waiting deferreds only in the OrderedDict of deferreds is
+        # non-empty.
+        return bool(entry[1])
+
     def queue(self, key):
         # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
         # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index 852ef23185..ca3858b184 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -45,6 +45,38 @@ class LinearizerTestCase(unittest.TestCase):
         with (yield d2):
             pass
 
+    @defer.inlineCallbacks
+    def test_linearizer_is_queued(self):
+        linearizer = Linearizer()
+
+        key = object()
+
+        d1 = linearizer.queue(key)
+        cm1 = yield d1
+
+        # Since d1 gets called immediately, "is_queued" should return false.
+        self.assertFalse(linearizer.is_queued(key))
+
+        d2 = linearizer.queue(key)
+        self.assertFalse(d2.called)
+
+        # Now d2 is queued up behind successful completion of cm1
+        self.assertTrue(linearizer.is_queued(key))
+
+        with cm1:
+            self.assertFalse(d2.called)
+
+            # cm1 still not done, so d2 still queued.
+            self.assertTrue(linearizer.is_queued(key))
+
+        # And now d2 is called and nothing is in the queue again
+        self.assertFalse(linearizer.is_queued(key))
+
+        with (yield d2):
+            self.assertFalse(linearizer.is_queued(key))
+
+        self.assertFalse(linearizer.is_queued(key))
+
     def test_lots_of_queued_things(self):
         # we have one slow thing, and lots of fast things queued up behind it.
         # it should *not* explode the stack.