summary refs log tree commit diff
path: root/synapse/federation/sender/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r--synapse/federation/sender/__init__.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 5b8faea4e7..f3d6b3a1c1 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -150,10 +150,25 @@ class FederationSender(object): "process_event_queue_for_federation", self._process_event_queue_loop ) - async def _process_event_queue_loop(self) -> None: + async def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event queue is getting behind: deprioritising transaction " + "transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = await self.store.get_federation_out_pos("events") next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -249,6 +264,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have