diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 30941f5ad6..c3b77419a9 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -66,6 +66,9 @@ sent_edus_by_type = Counter(
["type"],
)
+# number of seconds to wait to batch up outgoing EDUs
+EDU_BATCH_TIME = 5.0
+
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -119,6 +122,12 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
+ # In order to batch outgoing EDUs, we delay sending them. This records the time
+ # when we should send the next batch, by destination.
+ self.edu_tx_time_by_dest = {}
+
+ self.edu_tx_task_by_dest = {}
+
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
@@ -380,7 +389,21 @@ class TransactionQueue(object):
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
- self._attempt_new_transaction(destination)
+ if destination not in self.edu_tx_time_by_dest:
+ txtime = self.clock.time() + EDU_BATCH_TIME * 1000
+ self.edu_tx_time_by_dest[destination] = txtime
+
+ if destination in self.edu_tx_task_by_dest:
+ # we already have a job queued to send EDUs to this destination
+ return
+
+ def send_edus():
+ del self.edu_tx_task_by_dest[destination]
+ self._attempt_new_transaction(destination)
+
+ self.edu_tx_task_by_dest[destination] = self.clock.call_later(
+ EDU_BATCH_TIME, send_edus,
+ )
def send_device_messages(self, destination):
if destination == self.server_name:
@@ -405,6 +428,7 @@ class TransactionQueue(object):
Returns:
None
"""
+
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
@@ -458,18 +482,29 @@ class TransactionQueue(object):
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ # if we have PDUs to send, we may as well send EDUs too. Otherwise,
+ # we only send EDUs if their delay is up
+ if destination in self.edu_tx_time_by_dest and (
+ pending_pdus or
+ self.clock.time() > self.edu_tx_time_by_dest[destination]
+ ):
+ del self.edu_tx_time_by_dest[destination]
- # We can only include at most 100 EDUs per transactions
- pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
- if leftover_edus:
- self.pending_edus_by_dest[destination] = leftover_edus
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_presence = self.pending_presence_by_dest.pop(destination, {})
+ # We can only include at most 100 EDUs per transactions
+ pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
+ if leftover_edus:
+ self.edu_tx_time_by_dest[destination] = self.clock.time()
+ self.pending_edus_by_dest[destination] = leftover_edus
- pending_edus.extend(
- self.pending_edus_keyed_by_dest.pop(destination, {}).values()
- )
+ pending_edus.extend(
+ self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+ )
+ else:
+ pending_edus = []
+
+ pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_edus.extend(device_message_edus)
if pending_presence:
|