diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..552519e82c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -16,14 +16,13 @@
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
-from six import itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
import synapse
import synapse.metrics
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -41,7 +40,6 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.metrics import Measure, measure_func
@@ -58,7 +56,7 @@ sent_pdus_destination_dist_total = Counter(
)
-class FederationSender(object):
+class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -71,6 +69,9 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.worker.federation_shard_config
+
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
@@ -107,8 +108,6 @@ class FederationSender(object):
),
)
- self._order = 1
-
self._is_processing = False
self._last_poked_id = -1
@@ -193,7 +192,13 @@ class FederationSender(object):
)
return
- destinations = set(destinations)
+ destinations = {
+ d
+ for d in destinations
+ if self._federation_shard_config.should_handle(
+ self._instance_name, d
+ )
+ }
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
@@ -203,7 +208,15 @@ class FederationSender(object):
logger.debug("Sending %s to %r", event, destinations)
- self._send_pdu(event, destinations)
+ if destinations:
+ self._send_pdu(event, destinations)
+
+ now = self.clock.time_msec()
+ ts = await self.store.get_received_ts(event.event_id)
+
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "federation_sender"
+ ).observe((now - ts) / 1000)
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
@@ -218,7 +231,7 @@ class FederationSender(object):
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
+ for evs in events_by_room.values()
],
consumeErrors=True,
)
@@ -257,9 +270,6 @@ class FederationSender(object):
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
- order = self._order
- self._order += 1
-
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
@@ -271,10 +281,9 @@ class FederationSender(object):
sent_pdus_destination_dist_count.inc()
for destination in destinations:
- self._get_per_destination_queue(destination).send_pdu(pdu, order)
+ self._get_per_destination_queue(destination).send_pdu(pdu)
- @defer.inlineCallbacks
- def send_read_receipt(self, receipt: ReadReceipt):
+ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
@@ -315,8 +324,13 @@ class FederationSender(object):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains = yield self.state.get_current_hosts_in_room(room_id)
- domains = [d for d in domains if d != self.server_name]
+ domains_set = await self.state.get_current_hosts_in_room(room_id)
+ domains = [
+ d
+ for d in domains_set
+ if d != self.server_name
+ and self._federation_shard_config.should_handle(self._instance_name, d)
+ ]
if not domains:
return
@@ -365,8 +379,7 @@ class FederationSender(object):
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
- @defer.inlineCallbacks
- def send_presence(self, states: List[UserPresenceState]):
+ async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@@ -401,7 +414,7 @@ class FederationSender(object):
if not states_map:
break
- yield self._process_presence_inner(list(states_map.values()))
+ await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
@@ -421,20 +434,29 @@ class FederationSender(object):
for destination in destinations:
if destination == self.server_name:
continue
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
- @defer.inlineCallbacks
- def _process_presence_inner(self, states: List[UserPresenceState]):
+ async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
- hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+ hosts_and_states = await get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
+
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ continue
+
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(
@@ -456,6 +478,11 @@ class FederationSender(object):
logger.info("Not sending EDU to ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
edu = Edu(
origin=self.server_name,
destination=destination,
@@ -472,6 +499,11 @@ class FederationSender(object):
edu: edu to send
key: clobbering key for this edu
"""
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, edu.destination
+ ):
+ return
+
queue = self._get_per_destination_queue(edu.destination)
if key:
queue.send_keyed_edu(edu, key)
@@ -483,6 +515,11 @@ class FederationSender(object):
logger.warning("Not sending device update to ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
@@ -496,6 +533,11 @@ class FederationSender(object):
logger.warning("Not waking up ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
@staticmethod
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..defc228c23 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
)
-class PerDestinationQueue(object):
+class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
@@ -74,12 +74,26 @@ class PerDestinationQueue(object):
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._transaction_manager = transaction_manager
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.worker.federation_shard_config
+
+ self._should_send_on_this_instance = True
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ # We don't raise an exception here to avoid taking out any other
+ # processing. We have a guard in `attempt_new_transaction` that
+ # ensure we don't start sending stuff.
+ logger.error(
+ "Create a per destination queue for %s on wrong worker", destination,
+ )
+ self._should_send_on_this_instance = False
self._destination = destination
self.transmission_loop_running = False
- # a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ # a list of pending PDUs
+ self._pending_pdus = [] # type: List[EventBase]
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
@@ -118,18 +132,17 @@ class PerDestinationQueue(object):
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase, order: int) -> None:
- """Add a PDU to the queue, and start the transmission loop if neccessary
+ def send_pdu(self, pdu: EventBase) -> None:
+ """Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdu: pdu to send
- order
"""
- self._pending_pdus.append((pdu, order))
+ self._pending_pdus.append(pdu)
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if neccessary.
+ """Add presence updates to the queue. Start the transmission loop if necessary.
Args:
states: presence to send
@@ -171,7 +184,7 @@ class PerDestinationQueue(object):
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
- # list of (pending_pdu, deferred, order)
+
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
@@ -180,6 +193,14 @@ class PerDestinationQueue(object):
logger.debug("TX [%s] Transaction already in progress", self._destination)
return
+ if not self._should_send_on_this_instance:
+ # We don't raise an exception here to avoid taking out any other
+ # processing.
+ logger.error(
+ "Trying to start a transaction to %s on wrong worker", self._destination
+ )
+ return
+
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
@@ -188,7 +209,7 @@ class PerDestinationQueue(object):
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True
@@ -315,6 +336,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@@ -329,13 +372,13 @@ class PerDestinationQueue(object):
"TX [%s] Failed to send transaction: %s", self._destination, e
)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index a2752a54a5..c84072ab73 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -15,8 +15,6 @@
import logging
from typing import TYPE_CHECKING, List
-from canonicaljson import json
-
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.util import json_decoder
from synapse.util.metrics import measure_func
if TYPE_CHECKING:
@@ -36,7 +35,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class TransactionManager(object):
+class TransactionManager:
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
@@ -54,33 +53,34 @@ class TransactionManager(object):
@measure_func("_send_new_transaction")
async def send_new_transaction(
- self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
- ):
+ self, destination: str, pdus: List[EventBase], edus: List[Edu],
+ ) -> bool:
+ """
+ Args:
+ destination: The destination to send to (e.g. 'example.org')
+ pdus: In-order list of PDUs to send
+ edus: List of EDUs to send
+
+ Returns:
+ True iff the transaction was successful
+ """
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
- # The span_contexts is a generator so that it won't be evaluated if
- # opentracing is disabled. (Yay speed!)
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
- for edu in pending_edus:
+ for edu in edus:
context = edu.get_context()
if context:
- span_contexts.append(extract_text_map(json.loads(context)))
+ span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
-
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
-
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
|