From 05e8c70c059f8ebb066e029bc3aa3e0cefef1019 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 14 Apr 2021 18:19:02 +0200 Subject: Experimental Federation Speedup (#9702) This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max). Signed-off-by: Jonathan de Jong --- synapse/federation/sender/per_destination_queue.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/federation/sender/per_destination_queue.py') diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3b053ebcfb..3bb66bce32 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -154,19 +154,22 @@ class PerDestinationQueue: + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase) -> None: - """Add a PDU to the queue, and start the transmission loop if necessary + def send_pdus(self, pdus: Iterable[EventBase]) -> None: + """Add PDUs to the queue, and start the transmission loop if necessary Args: - pdu: pdu to send + pdus: pdus to send """ if not self._catching_up or self._last_successful_stream_ordering is None: # only enqueue the PDU if we are not catching up (False) or do not # yet know if we have anything to catch up (None) - self._pending_pdus.append(pdu) + self._pending_pdus.extend(pdus) else: - assert pdu.internal_metadata.stream_ordering - self._catchup_last_skipped = pdu.internal_metadata.stream_ordering + self._catchup_last_skipped = max( + pdu.internal_metadata.stream_ordering + for pdu in pdus + if pdu.internal_metadata.stream_ordering is not None + ) self.attempt_new_transaction() -- cgit 1.5.1