diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4ebb0e8bc0..f7065517e5 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -152,9 +152,24 @@ class FederationSender(object):
@defer.inlineCallbacks
def _process_event_queue_loop(self):
+ loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
+ # if we've been going around this loop for a long time without
+ # catching up, deprioritise transaction transmission. This should mean
+ # that events get batched into fewer transactions, which is more
+ # efficient, and hence give us a chance to catch up
+ if (
+ self.clock.time_msec() - loop_start_time > 60 * 1000
+ and not self._transaction_manager.deprioritise_transmission
+ ):
+ logger.warning(
+ "Event queue is getting behind: deprioritising transaction "
+ "transmission"
+ )
+ self._transaction_manager.deprioritise_transmission = True
+
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@@ -252,6 +267,9 @@ class FederationSender(object):
finally:
self._is_processing = False
+ if self._transaction_manager.deprioritise_transmission:
+ logger.info("Event queue caught up: re-prioritising transmission")
+ self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index a5b36b1827..a7c296e880 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@
# limitations under the License.
import datetime
import logging
+import random
from prometheus_client import Counter
@@ -36,6 +37,8 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
+DEPRIORITISE_SLEEP_TIME = 10
+
logger = logging.getLogger(__name__)
@@ -189,6 +192,18 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
+ if self._transaction_manager.deprioritise_transmission:
+ # if the event-processing loop has got behind, sleep to give it
+ # a chance to catch up. Add some randomness so that the transmitters
+ # don't all wake up in sync.
+ sleeptime = random.uniform(
+ DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
+ )
+ logger.info(
+ "TX [%s]: sleeping for %f seconds", self._destination, sleeptime
+ )
+ yield self._clock.sleep(sleeptime)
+
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5fed626d5b..ca558fa242 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -49,6 +49,10 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
+ # the federation sender sometimes sets this to delay transaction transmission,
+ # if the sender gets behind.
+ self.deprioritise_transmission = False
+
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index dc95ab2113..8f9d6ac067 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -175,7 +175,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
- path = _create_v1_path("/send/%s", transaction.transaction_id)
+ path = _create_v1_path("/send/%s/", transaction.transaction_id)
response = yield self.client.put_json(
transaction.destination,
@@ -184,7 +184,7 @@ class TransportLayerClient(object):
json_data_callback=json_data_callback,
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
- try_trailing_slash_on_400=True,
+ # try_trailing_slash_on_400=True,
)
return response
|