diff options
author | Jonathan de Jong <jonathan@automatia.nl> | 2021-04-14 18:19:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-14 17:19:02 +0100 |
commit | 05e8c70c059f8ebb066e029bc3aa3e0cefef1019 (patch) | |
tree | be614725637438d22494c9b48adf277fecc89445 /synapse/federation/sender/per_destination_queue.py | |
parent | Move some replication processing out of generic_worker (#9796) (diff) | |
download | synapse-05e8c70c059f8ebb066e029bc3aa3e0cefef1019.tar.xz |
Experimental Federation Speedup (#9702)
This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max). Signed-off-by: Jonathan de Jong <jonathan@automatia.nl>
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3b053ebcfb..3bb66bce32 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -154,19 +154,22 @@ class PerDestinationQueue: + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase) -> None: - """Add a PDU to the queue, and start the transmission loop if necessary + def send_pdus(self, pdus: Iterable[EventBase]) -> None: + """Add PDUs to the queue, and start the transmission loop if necessary Args: - pdu: pdu to send + pdus: pdus to send """ if not self._catching_up or self._last_successful_stream_ordering is None: # only enqueue the PDU if we are not catching up (False) or do not # yet know if we have anything to catch up (None) - self._pending_pdus.append(pdu) + self._pending_pdus.extend(pdus) else: - assert pdu.internal_metadata.stream_ordering - self._catchup_last_skipped = pdu.internal_metadata.stream_ordering + self._catchup_last_skipped = max( + pdu.internal_metadata.stream_ordering + for pdu in pdus + if pdu.internal_metadata.stream_ordering is not None + ) self.attempt_new_transaction() |