summary refs log tree commit diff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
authorJonathan de Jong <jonathan@automatia.nl>2021-04-14 18:19:02 +0200
committerGitHub <noreply@github.com>2021-04-14 17:19:02 +0100
commit05e8c70c059f8ebb066e029bc3aa3e0cefef1019 (patch)
treebe614725637438d22494c9b48adf277fecc89445 /synapse/federation/sender/per_destination_queue.py
parentMove some replication processing out of generic_worker (#9796) (diff)
downloadsynapse-05e8c70c059f8ebb066e029bc3aa3e0cefef1019.tar.xz
Experimental Federation Speedup (#9702)
This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max).

Signed-off-by: Jonathan de Jong <jonathan@automatia.nl>
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3b053ebcfb..3bb66bce32 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,19 +154,22 @@ class PerDestinationQueue:
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdu(self, pdu: EventBase) -> None:
-        """Add a PDU to the queue, and start the transmission loop if necessary
+    def send_pdus(self, pdus: Iterable[EventBase]) -> None:
+        """Add PDUs to the queue, and start the transmission loop if necessary
 
         Args:
-            pdu: pdu to send
+            pdus: pdus to send
         """
         if not self._catching_up or self._last_successful_stream_ordering is None:
             # only enqueue the PDU if we are not catching up (False) or do not
             # yet know if we have anything to catch up (None)
-            self._pending_pdus.append(pdu)
+            self._pending_pdus.extend(pdus)
         else:
-            assert pdu.internal_metadata.stream_ordering
-            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+            self._catchup_last_skipped = max(
+                pdu.internal_metadata.stream_ordering
+                for pdu in pdus
+                if pdu.internal_metadata.stream_ordering is not None
+            )
 
         self.attempt_new_transaction()