summary refs log tree commit diff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py177
1 files changed, 165 insertions, 12 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py

index dd150f89a6..bc99af3fdd 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast from prometheus_client import Counter @@ -24,12 +24,12 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -53,7 +53,7 @@ sent_edus_by_type = Counter( ) -class PerDestinationQueue(object): +class PerDestinationQueue: """ Manages the per-destination transmission queues. @@ -92,8 +92,23 @@ class PerDestinationQueue(object): self._destination = destination self.transmission_loop_running = False - # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + # True whilst we are sending events that the remote homeserver missed + # because it was unreachable. We start in this state so we can perform + # catch-up at startup. + # New events will only be sent once this is finished, at which point + # _catching_up is flipped to False. + self._catching_up = True # type: bool + + # The stream_ordering of the most recent PDU that was discarded due to + # being in catch-up mode. + self._catchup_last_skipped = 0 # type: int + + # Cache of the last successfully-transmitted stream ordering for this + # destination (we are the only updater so this is safe) + self._last_successful_stream_ordering = None # type: Optional[int] + + # a list of pending PDUs + self._pending_pdus = [] # type: List[EventBase] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 @@ -132,14 +147,19 @@ class PerDestinationQueue(object): + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase, order: int) -> None: + def send_pdu(self, pdu: EventBase) -> None: """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send - order """ - self._pending_pdus.append((pdu, order)) + if not self._catching_up or self._last_successful_stream_ordering is None: + # only enqueue the PDU if we are not catching up (False) or do not + # yet know if we have anything to catch up (None) + self._pending_pdus.append(pdu) + else: + self._catchup_last_skipped = pdu.internal_metadata.stream_ordering + self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: @@ -185,7 +205,7 @@ class PerDestinationQueue(object): returns immediately. Otherwise kicks off the process of sending a transaction in the background. """ - # list of (pending_pdu, deferred, order) + if self.transmission_loop_running: # XXX: this can get stuck on by a never-ending # request at which point pending_pdus just keeps growing. @@ -210,7 +230,7 @@ class PerDestinationQueue(object): ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[Tuple[EventBase, int]] + pending_pdus = [] # type: List[EventBase] try: self.transmission_loop_running = True @@ -219,6 +239,13 @@ class PerDestinationQueue(object): # hence why we throw the result away. await get_retry_limiter(self._destination, self._clock, self._store) + if self._catching_up: + # we potentially need to catch-up first + await self._catch_up_transmission_loop() + if self._catching_up: + # not caught up yet + return + pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus @@ -326,6 +353,17 @@ class PerDestinationQueue(object): self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + # we sent some PDUs and it was successful, so update our + # last_successful_stream_ordering in the destinations table. + final_pdu = pending_pdus[-1] + last_successful_stream_ordering = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, last_successful_stream_ordering + ) else: break except NotRetryingDestination as e: @@ -337,6 +375,30 @@ class PerDestinationQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + + if e.retry_interval > 60 * 60 * 1000: + # we won't retry for another hour! + # (this suggests a significant outage) + # We drop pending EDUs because otherwise they will + # rack up indefinitely. + # (Dropping PDUs is already performed by `_start_catching_up`.) + # Note that: + # - the EDUs that are being dropped here are those that we can + # afford to drop (specifically, only typing notifications, + # read receipts and presence updates are being dropped here) + # - Other EDUs such as to_device messages are queued with a + # different mechanism + # - this is all volatile state that would be lost if the + # federation sender restarted anyway + + # dropping read receipts is a bit sad but should be solved + # through another mechanism, because this is all volatile! + self._pending_edus = [] + self._pending_edus_keyed = {} + self._pending_presence = {} + self._pending_rrs = {} + + self._start_catching_up() except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: @@ -346,25 +408,107 @@ class PerDestinationQueue(object): e.code, e, ) + + self._start_catching_up() except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e ) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._start_catching_up() except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._start_catching_up() finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False + async def _catch_up_transmission_loop(self) -> None: + first_catch_up_check = self._last_successful_stream_ordering is None + + if first_catch_up_check: + # first catchup so get last_successful_stream_ordering from database + self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering( + self._destination + ) + + if self._last_successful_stream_ordering is None: + # if it's still None, then this means we don't have the information + # in our database ­ we haven't successfully sent a PDU to this server + # (at least since the introduction of the feature tracking + # last_successful_stream_ordering). + # Sadly, this means we can't do anything here as we don't know what + # needs catching up — so catching up is futile; let's stop. + self._catching_up = False + return + + # get at most 50 catchup room/PDUs + while True: + event_ids = await self._store.get_catch_up_room_event_ids( + self._destination, self._last_successful_stream_ordering, + ) + + if not event_ids: + # No more events to catch up on, but we can't ignore the chance + # of a race condition, so we check that no new events have been + # skipped due to us being in catch-up mode + + if self._catchup_last_skipped > self._last_successful_stream_ordering: + # another event has been skipped because we were in catch-up mode + continue + + # we are done catching up! + self._catching_up = False + break + + if first_catch_up_check: + # as this is our check for needing catch-up, we may have PDUs in + # the queue from before we *knew* we had to do catch-up, so + # clear those out now. + self._start_catching_up() + + # fetch the relevant events from the event store + # - redacted behaviour of REDACT is fine, since we only send metadata + # of redacted events to the destination. + # - don't need to worry about rejected events as we do not actively + # forward received events over federation. + catchup_pdus = await self._store.get_events_as_list(event_ids) + if not catchup_pdus: + raise AssertionError( + "No events retrieved when we asked for %r. " + "This should not happen." % event_ids + ) + + if logger.isEnabledFor(logging.INFO): + rooms = [p.room_id for p in catchup_pdus] + logger.info("Catching up rooms to %s: %r", self._destination, rooms) + + success = await self._transaction_manager.send_new_transaction( + self._destination, catchup_pdus, [] + ) + + if not success: + return + + sent_transactions_counter.inc() + final_pdu = catchup_pdus[-1] + self._last_successful_stream_ordering = cast( + int, final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_ordering + ) + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return @@ -425,3 +569,12 @@ class PerDestinationQueue(object): ] return (edus, stream_id) + + def _start_catching_up(self) -> None: + """ + Marks this destination as being in catch-up mode. + + This throws away the PDU queue. + """ + self._catching_up = True + self._pending_pdus = []