summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-19 12:17:28 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-19 12:17:28 +0000
commit73c6630718de6950b723c18e25eb7c316f08b608 (patch)
treeb4530b81bebf6e179d84b35be2403a713750ce6c
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
downloadsynapse-73c6630718de6950b723c18e25eb7c316f08b608.tar.xz
Revert "Reinstate EDU-batching hacks"
This reverts commit ed8ccc37377f5ffa0d7d7365747c8897aea6a489.
-rw-r--r--synapse/federation/transaction_queue.py57
1 files changed, 10 insertions, 47 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py

index 549bc944a6..e5e42c647d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -66,9 +66,6 @@ sent_edus_by_type = Counter( ["type"], ) -# number of seconds to wait to batch up outgoing EDUs -EDU_BATCH_TIME = 5.0 - class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -122,12 +119,6 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - # In order to batch outgoing EDUs, we delay sending them. This records the time - # when we should send the next batch, by destination. - self.edu_tx_time_by_dest = {} - - self.edu_tx_task_by_dest = {} - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -406,23 +397,7 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu) - destination = edu.destination - - if destination not in self.edu_tx_time_by_dest: - txtime = self.clock.time() + EDU_BATCH_TIME * 1000 - self.edu_tx_time_by_dest[destination] = txtime - - if destination in self.edu_tx_task_by_dest: - # we already have a job queued to send EDUs to this destination - return - - def send_edus(): - del self.edu_tx_task_by_dest[destination] - self._attempt_new_transaction(destination) - - self.edu_tx_task_by_dest[destination] = self.clock.call_later( - EDU_BATCH_TIME, send_edus, - ) + self._attempt_new_transaction(edu.destination) def send_device_messages(self, destination): if destination == self.server_name: @@ -447,7 +422,6 @@ class TransactionQueue(object): Returns: None """ - # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -501,30 +475,19 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - # if we have PDUs to send, we may as well send EDUs too. Otherwise, - # we only send EDUs if their delay is up - if destination in self.edu_tx_time_by_dest and ( - pending_pdus or - self.clock.time() > self.edu_tx_time_by_dest[destination] - ): - del self.edu_tx_time_by_dest[destination] - - pending_edus = self.pending_edus_by_dest.pop(destination, []) - - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.edu_tx_time_by_dest[destination] = self.clock.time() - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - else: - pending_edus = [] + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.pending_edus_by_dest[destination] = leftover_edus pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append(