From c7285607a3652b814c0274025fc8521618d27590 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 6 Mar 2019 11:04:53 +0000 Subject: Revert EDU-batching hacks from matrix-org-hotfixes Firstly: we want to do this in a better way, which is the intention of too many RRs, which means we need to make it happen again. This reverts commits: 8d7c0264b 000d23090 eb0334b07 4d07dc0d1 --- synapse/federation/transaction_queue.py | 55 ++++++--------------------------- 1 file changed, 10 insertions(+), 45 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 549bc944a6..698d4b4f87 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -66,9 +66,6 @@ sent_edus_by_type = Counter( ["type"], ) -# number of seconds to wait to batch up outgoing EDUs -EDU_BATCH_TIME = 5.0 - class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -122,12 +119,6 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - # In order to batch outgoing EDUs, we delay sending them. This records the time - # when we should send the next batch, by destination. - self.edu_tx_time_by_dest = {} - - self.edu_tx_task_by_dest = {} - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -408,21 +399,7 @@ class TransactionQueue(object): destination = edu.destination - if destination not in self.edu_tx_time_by_dest: - txtime = self.clock.time() + EDU_BATCH_TIME * 1000 - self.edu_tx_time_by_dest[destination] = txtime - - if destination in self.edu_tx_task_by_dest: - # we already have a job queued to send EDUs to this destination - return - - def send_edus(): - del self.edu_tx_task_by_dest[destination] - self._attempt_new_transaction(destination) - - self.edu_tx_task_by_dest[destination] = self.clock.call_later( - EDU_BATCH_TIME, send_edus, - ) + self._attempt_new_transaction(destination) def send_device_messages(self, destination): if destination == self.server_name: @@ -447,7 +424,6 @@ class TransactionQueue(object): Returns: None """ - # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -501,30 +477,19 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - # if we have PDUs to send, we may as well send EDUs too. Otherwise, - # we only send EDUs if their delay is up - if destination in self.edu_tx_time_by_dest and ( - pending_pdus or - self.clock.time() > self.edu_tx_time_by_dest[destination] - ): - del self.edu_tx_time_by_dest[destination] - - pending_edus = self.pending_edus_by_dest.pop(destination, []) - - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.edu_tx_time_by_dest[destination] = self.clock.time() - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - else: - pending_edus = [] + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.pending_edus_by_dest[destination] = leftover_edus pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( -- cgit 1.5.1