summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-09-27 16:00:00 +0100
committerRichard van der Hoff <richard@matrix.org>2019-10-10 10:34:08 +0100
commitb852a8247d1132fae125c3fb813023b6ec3f6cb3 (patch)
tree84c95b063f45860fc7d51c42cc61e80a49542c9b
parentMerge branch 'erikj/cache_memberships' of github.com:matrix-org/synapse into ... (diff)
downloadsynapse-b852a8247d1132fae125c3fb813023b6ec3f6cb3.tar.xz
Awful hackery to try to get the fed sender to keep up
Basically, if the federation sender starts getting behind, insert some sleeps
into the transaction transmission code to give the fed sender a chance to catch
up.

Might have to experiment a bit with the numbers.
-rw-r--r--changelog.d/6126.feature1
-rw-r--r--synapse/federation/sender/__init__.py18
-rw-r--r--synapse/federation/sender/per_destination_queue.py5
-rw-r--r--synapse/federation/sender/transaction_manager.py4
4 files changed, 28 insertions, 0 deletions
diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature
new file mode 100644

index 0000000000..1207ba6206 --- /dev/null +++ b/changelog.d/6126.feature
@@ -0,0 +1 @@ +Group events into larger federation transactions at times of high traffic. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 2b2ee8612a..f23bbf0e1f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -152,9 +152,24 @@ class FederationSender(object): @defer.inlineCallbacks def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event processing loop is getting behind: deprioritising " + "transaction transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -252,6 +267,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index fad980b893..b890aaf840 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -189,6 +189,11 @@ class PerDestinationQueue(object): pending_pdus = [] while True: + if self._transaction_manager.deprioritise_transmission: + # if the event-processing loop has got behind, sleep to give it + # a chance to catch up + yield self._clock.sleep(2) + # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5b6c79c51a..69679dbf65 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -49,6 +49,10 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + # the federation sender sometimes sets this to delay transaction transmission, + # if the sender gets behind. + self.deprioritise_transmission = False + @measure_func("_send_new_transaction") @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus):