diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..defc228c23 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
)
-class PerDestinationQueue(object):
+class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
@@ -74,12 +74,26 @@ class PerDestinationQueue(object):
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._transaction_manager = transaction_manager
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.worker.federation_shard_config
+
+ self._should_send_on_this_instance = True
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ # We don't raise an exception here to avoid taking out any other
+ # processing. We have a guard in `attempt_new_transaction` that
+ # ensure we don't start sending stuff.
+ logger.error(
+ "Create a per destination queue for %s on wrong worker", destination,
+ )
+ self._should_send_on_this_instance = False
self._destination = destination
self.transmission_loop_running = False
- # a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ # a list of pending PDUs
+ self._pending_pdus = [] # type: List[EventBase]
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
@@ -118,18 +132,17 @@ class PerDestinationQueue(object):
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase, order: int) -> None:
- """Add a PDU to the queue, and start the transmission loop if neccessary
+ def send_pdu(self, pdu: EventBase) -> None:
+ """Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdu: pdu to send
- order
"""
- self._pending_pdus.append((pdu, order))
+ self._pending_pdus.append(pdu)
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if neccessary.
+ """Add presence updates to the queue. Start the transmission loop if necessary.
Args:
states: presence to send
@@ -171,7 +184,7 @@ class PerDestinationQueue(object):
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
- # list of (pending_pdu, deferred, order)
+
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
@@ -180,6 +193,14 @@ class PerDestinationQueue(object):
logger.debug("TX [%s] Transaction already in progress", self._destination)
return
+ if not self._should_send_on_this_instance:
+ # We don't raise an exception here to avoid taking out any other
+ # processing.
+ logger.error(
+ "Trying to start a transaction to %s on wrong worker", self._destination
+ )
+ return
+
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
@@ -188,7 +209,7 @@ class PerDestinationQueue(object):
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True
@@ -315,6 +336,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@@ -329,13 +372,13 @@ class PerDestinationQueue(object):
"TX [%s] Failed to send transaction: %s", self._destination, e
)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
|