1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3b053ebcfb..3bb66bce32 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,19 +154,22 @@ class PerDestinationQueue:
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase) -> None:
- """Add a PDU to the queue, and start the transmission loop if necessary
+ def send_pdus(self, pdus: Iterable[EventBase]) -> None:
+ """Add PDUs to the queue, and start the transmission loop if necessary
Args:
- pdu: pdu to send
+ pdus: pdus to send
"""
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
- self._pending_pdus.append(pdu)
+ self._pending_pdus.extend(pdus)
else:
- assert pdu.internal_metadata.stream_ordering
- self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+ self._catchup_last_skipped = max(
+ pdu.internal_metadata.stream_ordering
+ for pdu in pdus
+ if pdu.internal_metadata.stream_ordering is not None
+ )
self.attempt_new_transaction()
|