summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-08-22 16:11:22 +0100
committerGitHub <noreply@github.com>2023-08-22 15:11:22 +0000
commit803f63df1c52237a23cb68c1b2a8402200a7216d (patch)
treec50886529b4d20e959e6980611ede501c3456dcb /synapse/replication
parentDisable `m.3pid_changes` capability when MSC3861 is enabled. (#16134) (diff)
downloadsynapse-803f63df1c52237a23cb68c1b2a8402200a7216d.tar.xz
Fix perf of `wait_for_stream_positions` (#16148)
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/client.py19
1 files changed, 12 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 04e8cff6ea..3b88dc68ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,9 @@
 """A replication client for use by synapse workers.
 """
 import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple
+
+from sortedcontainers import SortedList
 
 from twisted.internet import defer
 from twisted.internet.defer import Deferred
@@ -86,7 +88,9 @@ class ReplicationDataHandler:
 
         # Map from stream and instance to list of deferreds waiting for the stream to
         # arrive at a particular position. The lists are sorted by stream position.
-        self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
+        self._streams_to_waiters: Dict[
+            Tuple[str, str], SortedList[Tuple[int, Deferred]]
+        ] = {}
 
     async def on_rdata(
         self, stream_name: str, instance_name: str, token: int, rows: list
@@ -238,7 +242,9 @@ class ReplicationDataHandler:
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is
         # greater than the received row position.
-        waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
+        waiting_list = self._streams_to_waiters.get((stream_name, instance_name))
+        if not waiting_list:
+            return
 
         # Index of first item with a position after the current token, i.e we
         # have called all deferreds before this index. If not overwritten by
@@ -262,7 +268,7 @@ class ReplicationDataHandler:
 
         # Drop all entries in the waiting list that were called in the above
         # loop. (This maintains the order so no need to resort)
-        waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
+        del waiting_list[:index_of_first_deferred_not_called]
 
         for deferred in deferreds_to_callback:
             try:
@@ -322,11 +328,10 @@ class ReplicationDataHandler:
         )
 
         waiting_list = self._streams_to_waiters.setdefault(
-            (stream_name, instance_name), []
+            (stream_name, instance_name), SortedList(key=lambda t: t[0])
         )
 
-        waiting_list.append((position, deferred))
-        waiting_list.sort(key=lambda t: t[0])
+        waiting_list.add((position, deferred))
 
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):