summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-06 11:04:53 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-06 11:04:53 +0000
commitc7285607a3652b814c0274025fc8521618d27590 (patch)
tree118b539f78137f31bc7495c7cbae76a7c2de4ac7 /synapse/federation/transaction_queue.py
parentFix outbound federation (diff)
downloadsynapse-c7285607a3652b814c0274025fc8521618d27590.tar.xz
Revert EDU-batching hacks from matrix-org-hotfixes
Firstly: we want to do this in a better way, which is the intention of
too many RRs, which means we need to make it happen again.

This reverts commits: 8d7c0264b 000d23090 eb0334b07 4d07dc0d1
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py55
1 files changed, 10 insertions, 45 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py

index 549bc944a6..698d4b4f87 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -66,9 +66,6 @@ sent_edus_by_type = Counter( ["type"], ) -# number of seconds to wait to batch up outgoing EDUs -EDU_BATCH_TIME = 5.0 - class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -122,12 +119,6 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - # In order to batch outgoing EDUs, we delay sending them. This records the time - # when we should send the next batch, by destination. - self.edu_tx_time_by_dest = {} - - self.edu_tx_task_by_dest = {} - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -408,21 +399,7 @@ class TransactionQueue(object): destination = edu.destination - if destination not in self.edu_tx_time_by_dest: - txtime = self.clock.time() + EDU_BATCH_TIME * 1000 - self.edu_tx_time_by_dest[destination] = txtime - - if destination in self.edu_tx_task_by_dest: - # we already have a job queued to send EDUs to this destination - return - - def send_edus(): - del self.edu_tx_task_by_dest[destination] - self._attempt_new_transaction(destination) - - self.edu_tx_task_by_dest[destination] = self.clock.call_later( - EDU_BATCH_TIME, send_edus, - ) + self._attempt_new_transaction(destination) def send_device_messages(self, destination): if destination == self.server_name: @@ -447,7 +424,6 @@ class TransactionQueue(object): Returns: None """ - # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -501,30 +477,19 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - # if we have PDUs to send, we may as well send EDUs too. Otherwise, - # we only send EDUs if their delay is up - if destination in self.edu_tx_time_by_dest and ( - pending_pdus or - self.clock.time() > self.edu_tx_time_by_dest[destination] - ): - del self.edu_tx_time_by_dest[destination] - - pending_edus = self.pending_edus_by_dest.pop(destination, []) - - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.edu_tx_time_by_dest[destination] = self.clock.time() - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - else: - pending_edus = [] + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.pending_edus_by_dest[destination] = leftover_edus pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append(