diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..defc228c23 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
)
-class PerDestinationQueue(object):
+class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
@@ -92,8 +92,8 @@ class PerDestinationQueue(object):
self._destination = destination
self.transmission_loop_running = False
- # a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ # a list of pending PDUs
+ self._pending_pdus = [] # type: List[EventBase]
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
@@ -132,14 +132,13 @@ class PerDestinationQueue(object):
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase, order: int) -> None:
+ def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdu: pdu to send
- order
"""
- self._pending_pdus.append((pdu, order))
+ self._pending_pdus.append(pdu)
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -185,7 +184,7 @@ class PerDestinationQueue(object):
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
- # list of (pending_pdu, deferred, order)
+
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
@@ -210,7 +209,7 @@ class PerDestinationQueue(object):
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True
@@ -337,6 +336,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@@ -351,13 +372,13 @@ class PerDestinationQueue(object):
"TX [%s] Failed to send transaction: %s", self._destination, e
)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
|