summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/sender/__init__.py18
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
-rw-r--r--synapse/federation/sender/transaction_manager.py4
-rw-r--r--synapse/federation/transport/client.py4
4 files changed, 39 insertions, 2 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 4ebb0e8bc0..f7065517e5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -152,9 +152,24 @@ class FederationSender(object): @defer.inlineCallbacks def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event queue is getting behind: deprioritising transaction " + "transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -252,6 +267,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index a5b36b1827..a7c296e880 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@ # limitations under the License. import datetime import logging +import random from prometheus_client import Counter @@ -36,6 +37,8 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter # This is defined in the Matrix spec and enforced by the receiver. MAX_EDUS_PER_TRANSACTION = 100 +DEPRIORITISE_SLEEP_TIME = 10 + logger = logging.getLogger(__name__) @@ -189,6 +192,18 @@ class PerDestinationQueue(object): pending_pdus = [] while True: + if self._transaction_manager.deprioritise_transmission: + # if the event-processing loop has got behind, sleep to give it + # a chance to catch up. Add some randomness so that the transmitters + # don't all wake up in sync. + sleeptime = random.uniform( + DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2 + ) + logger.info( + "TX [%s]: sleeping for %f seconds", self._destination, sleeptime + ) + yield self._clock.sleep(sleeptime) + # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5fed626d5b..ca558fa242 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -49,6 +49,10 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + # the federation sender sometimes sets this to delay transaction transmission, + # if the sender gets behind. + self.deprioritise_transmission = False + @measure_func("_send_new_transaction") @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index dc95ab2113..8f9d6ac067 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -175,7 +175,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - path = _create_v1_path("/send/%s", transaction.transaction_id) + path = _create_v1_path("/send/%s/", transaction.transaction_id) response = yield self.client.put_json( transaction.destination, @@ -184,7 +184,7 @@ class TransportLayerClient(object): json_data_callback=json_data_callback, long_retries=True, backoff_on_404=True, # If we get a 404 the other side has gone - try_trailing_slash_on_400=True, + # try_trailing_slash_on_400=True, ) return response