From cfb3096e33d8087a4b5214945d0b24295bb1c56a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Oct 2020 11:19:09 +0100 Subject: Revert federation-transaction-transmission backoff hacks This reverts b852a8247, 15b2a5081, 28889d8da. I don't think these patches are required any more, and if they are, they should be on mainline, not hidden in our hotfixes branch. Let's try backing them out: if that turns out to be an error, we can PR them properly. --- synapse/federation/sender/__init__.py | 20 +------------------- synapse/federation/sender/per_destination_queue.py | 15 --------------- synapse/federation/sender/transaction_manager.py | 4 ---- 3 files changed, 1 insertion(+), 38 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index b22869501c..552519e82c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -151,25 +151,10 @@ class FederationSender: "process_event_queue_for_federation", self._process_event_queue_loop ) - async def _process_event_queue_loop(self): - loop_start_time = self.clock.time_msec() + async def _process_event_queue_loop(self) -> None: try: self._is_processing = True while True: - # if we've been going around this loop for a long time without - # catching up, deprioritise transaction transmission. This should mean - # that events get batched into fewer transactions, which is more - # efficient, and hence give us a chance to catch up - if ( - self.clock.time_msec() - loop_start_time > 60 * 1000 - and not self._transaction_manager.deprioritise_transmission - ): - logger.warning( - "Event queue is getting behind: deprioritising transaction " - "transmission" - ) - self._transaction_manager.deprioritise_transmission = True - last_token = await self.store.get_federation_out_pos("events") next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -279,9 +264,6 @@ class FederationSender: finally: self._is_processing = False - if self._transaction_manager.deprioritise_transmission: - logger.info("Event queue caught up: re-prioritising transmission") - self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index b4da52e7e6..defc228c23 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,6 @@ # limitations under the License. import datetime import logging -import random from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter @@ -40,8 +39,6 @@ if TYPE_CHECKING: # This is defined in the Matrix spec and enforced by the receiver. MAX_EDUS_PER_TRANSACTION = 100 -DEPRIORITISE_SLEEP_TIME = 10 - logger = logging.getLogger(__name__) @@ -223,18 +220,6 @@ class PerDestinationQueue: pending_pdus = [] while True: - if self._transaction_manager.deprioritise_transmission: - # if the event-processing loop has got behind, sleep to give it - # a chance to catch up. Add some randomness so that the transmitters - # don't all wake up in sync. - sleeptime = random.uniform( - DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2 - ) - logger.info( - "TX [%s]: sleeping for %f seconds", self._destination, sleeptime - ) - await self._clock.sleep(sleeptime) - # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index cf472c9f15..c84072ab73 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -51,10 +51,6 @@ class TransactionManager: # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) - # the federation sender sometimes sets this to delay transaction transmission, - # if the sender gets behind. - self.deprioritise_transmission = False - @measure_func("_send_new_transaction") async def send_new_transaction( self, destination: str, pdus: List[EventBase], edus: List[Edu], -- cgit 1.5.1