summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-13 14:42:11 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-13 14:42:11 +0000
commited8ccc37377f5ffa0d7d7365747c8897aea6a489 (patch)
tree8e5a079907c1d76f8dcdfcf4b015dbd9fb6b4bf4
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
downloadsynapse-ed8ccc37377f5ffa0d7d7365747c8897aea6a489.tar.xz
Reinstate EDU-batching hacks
This reverts commit c7285607a3652b814c0274025fc8521618d27590.
-rw-r--r--synapse/federation/transaction_queue.py57
1 files changed, 47 insertions, 10 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py

index e5e42c647d..549bc944a6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -66,6 +66,9 @@ sent_edus_by_type = Counter( ["type"], ) +# number of seconds to wait to batch up outgoing EDUs +EDU_BATCH_TIME = 5.0 + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -119,6 +122,12 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} + # In order to batch outgoing EDUs, we delay sending them. This records the time + # when we should send the next batch, by destination. + self.edu_tx_time_by_dest = {} + + self.edu_tx_task_by_dest = {} + LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -397,7 +406,23 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu) - self._attempt_new_transaction(edu.destination) + destination = edu.destination + + if destination not in self.edu_tx_time_by_dest: + txtime = self.clock.time() + EDU_BATCH_TIME * 1000 + self.edu_tx_time_by_dest[destination] = txtime + + if destination in self.edu_tx_task_by_dest: + # we already have a job queued to send EDUs to this destination + return + + def send_edus(): + del self.edu_tx_task_by_dest[destination] + self._attempt_new_transaction(destination) + + self.edu_tx_task_by_dest[destination] = self.clock.call_later( + EDU_BATCH_TIME, send_edus, + ) def send_device_messages(self, destination): if destination == self.server_name: @@ -422,6 +447,7 @@ class TransactionQueue(object): Returns: None """ + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -475,18 +501,29 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - pending_edus = self.pending_edus_by_dest.pop(destination, []) + # if we have PDUs to send, we may as well send EDUs too. Otherwise, + # we only send EDUs if their delay is up + if destination in self.edu_tx_time_by_dest and ( + pending_pdus or + self.clock.time() > self.edu_tx_time_by_dest[destination] + ): + del self.edu_tx_time_by_dest[destination] - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.edu_tx_time_by_dest[destination] = self.clock.time() + self.pending_edus_by_dest[destination] = leftover_edus - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + else: + pending_edus = [] + + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_edus.extend(device_message_edus) if pending_presence: