summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-27 19:31:44 +0100
committerErik Johnston <erik@matrix.org>2020-05-27 19:34:07 +0100
commit35c308731d3a014f20a91467830c808358681259 (patch)
treed6db0def2a763cefe67ac50227c70143b3c664ab /synapse/util
parentImprove changelog wording (diff)
downloadsynapse-35c308731d3a014f20a91467830c808358681259.tar.xz
Speed up processing of federation stream RDATA rows.
Instead of storing and sending an ACK for every single row we send
synchronously, we instead do it asynchronously while batching up
updates.
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async_helpers.py12
1 files changed, 12 insertions, 0 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 581dffd8a0..f7af2bca7f 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -225,6 +225,18 @@ class Linearizer(object):
             {}
         )  # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
 
+    def is_queued(self, key) -> bool:
+        """Checks whether there is a process queued up waiting
+        """
+        entry = self.key_to_defer.get(key)
+        if not entry:
+            # No entry so nothing is waiting.
+            return False
+
+        # There are waiting deferreds only in the OrderedDict of deferreds is
+        # non-empty.
+        return bool(entry[1])
+
     def queue(self, key):
         # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
         # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not