summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/transaction_queue.py54
1 files changed, 42 insertions, 12 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py

index 2969c83ac5..18e4d6575b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -66,6 +66,9 @@ sent_edus_by_type = Counter( ["type"], ) +# number of seconds to wait to batch up outgoing EDUs +EDU_BATCH_TIME = 5.0 + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -119,6 +122,12 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} + # In order to batch outgoing EDUs, we delay sending them. This records the time + # when we should send the next batch, by destination. + self.edu_tx_time_by_dest = {} + + self.edu_tx_task_by_dest = {} + LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -380,9 +389,18 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) - # this is a bit of a hack, but we delay starting the transmission loop - # in an effort to batch up outgoing EDUs a bit. - self.clock.call_later(5.0, self._attempt_new_transaction, destination) + if destination in self.edu_tx_task_by_dest: + # we already have a job queued to send EDUs to this destination + return + + def send_edus(): + del self.edu_tx_task_by_dest[destination] + self._send_new_transaction(destination) + + self.edu_tx_time_by_dest = self.clock.time() + EDU_BATCH_TIME * 1000 + self.edu_tx_task_by_dest[destination] = self.clock.call_later( + EDU_BATCH_TIME, send_edus, + ) def send_device_messages(self, destination): if destination == self.server_name: @@ -407,6 +425,7 @@ class TransactionQueue(object): Returns: None """ + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -460,18 +479,29 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - pending_edus = self.pending_edus_by_dest.pop(destination, []) + # if we have PDUs to send, we may as well send EDUs too. Otherwise, + # we only send EDUs if their delay is up + if pending_pdus or ( + destination in self.edu_tx_time_by_dest + and self.clock.time() > self.edu_tx_time_by_dest[destination] + ): + del self.edu_tx_time_by_dest[destination] - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.edu_tx_time_by_dest[destination] = self.clock.time() + self.pending_edus_by_dest[destination] = leftover_edus - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + else: + pending_edus = [] + + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_edus.extend(device_message_edus) if pending_presence: