diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94cc63001e..4662008bfd 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
import synapse
import synapse.metrics
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -39,7 +40,6 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.metrics import Measure, measure_func
@@ -329,10 +329,10 @@ class FederationSender(object):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains = await self.state.get_current_hosts_in_room(room_id)
+ domains_set = await self.state.get_current_hosts_in_room(room_id)
domains = [
d
- for d in domains
+ for d in domains_set
if d != self.server_name
and self._federation_shard_config.should_handle(self._instance_name, d)
]
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..c09ffcaf4c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -337,6 +337,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c7f6cb3d73..9bd534a313 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -15,8 +15,6 @@
import logging
from typing import TYPE_CHECKING, List, Tuple
-from canonicaljson import json
-
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.util import json_decoder
from synapse.util.metrics import measure_func
if TYPE_CHECKING:
@@ -71,7 +70,7 @@ class TransactionManager(object):
for edu in pending_edus:
context = edu.get_context()
if context:
- span_contexts.append(extract_text_map(json.loads(context)))
+ span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()
|