1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 5b8faea4e7..f3d6b3a1c1 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -150,10 +150,25 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
- async def _process_event_queue_loop(self) -> None:
+ async def _process_event_queue_loop(self):
+ loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
+ # if we've been going around this loop for a long time without
+ # catching up, deprioritise transaction transmission. This should mean
+ # that events get batched into fewer transactions, which is more
+ # efficient, and hence give us a chance to catch up
+ if (
+ self.clock.time_msec() - loop_start_time > 60 * 1000
+ and not self._transaction_manager.deprioritise_transmission
+ ):
+ logger.warning(
+ "Event queue is getting behind: deprioritising transaction "
+ "transmission"
+ )
+ self._transaction_manager.deprioritise_transmission = True
+
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@@ -249,6 +264,9 @@ class FederationSender(object):
finally:
self._is_processing = False
+ if self._transaction_manager.deprioritise_transmission:
+ logger.info("Event queue caught up: re-prioritising transmission")
+ self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
|