summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py79
-rw-r--r--synapse/federation/sender/per_destination_queue.py50
-rw-r--r--synapse/federation/sender/transaction_manager.py14
3 files changed, 117 insertions, 26 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..4662008bfd 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -16,14 +16,13 @@
 import logging
 from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
-from six import itervalues
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
 import synapse
 import synapse.metrics
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
@@ -41,7 +40,6 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.metrics import Measure, measure_func
 
@@ -71,6 +69,9 @@ class FederationSender(object):
 
         self._transaction_manager = TransactionManager(hs)
 
+        self._instance_name = hs.get_instance_name()
+        self._federation_shard_config = hs.config.worker.federation_shard_config
+
         # map from destination to PerDestinationQueue
         self._per_destination_queues = {}  # type: Dict[str, PerDestinationQueue]
 
@@ -193,7 +194,13 @@ class FederationSender(object):
                         )
                         return
 
-                    destinations = set(destinations)
+                    destinations = {
+                        d
+                        for d in destinations
+                        if self._federation_shard_config.should_handle(
+                            self._instance_name, d
+                        )
+                    }
 
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
@@ -203,7 +210,15 @@ class FederationSender(object):
 
                     logger.debug("Sending %s to %r", event, destinations)
 
-                    self._send_pdu(event, destinations)
+                    if destinations:
+                        self._send_pdu(event, destinations)
+
+                        now = self.clock.time_msec()
+                        ts = await self.store.get_received_ts(event.event_id)
+
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "federation_sender"
+                        ).observe((now - ts) / 1000)
 
                 async def handle_room_events(events: Iterable[EventBase]) -> None:
                     with Measure(self.clock, "handle_room_events"):
@@ -218,7 +233,7 @@ class FederationSender(object):
                     defer.gatherResults(
                         [
                             run_in_background(handle_room_events, evs)
-                            for evs in itervalues(events_by_room)
+                            for evs in events_by_room.values()
                         ],
                         consumeErrors=True,
                     )
@@ -273,8 +288,7 @@ class FederationSender(object):
         for destination in destinations:
             self._get_per_destination_queue(destination).send_pdu(pdu, order)
 
-    @defer.inlineCallbacks
-    def send_read_receipt(self, receipt: ReadReceipt):
+    async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
 
         Args:
@@ -315,8 +329,13 @@ class FederationSender(object):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains = yield self.state.get_current_hosts_in_room(room_id)
-        domains = [d for d in domains if d != self.server_name]
+        domains_set = await self.state.get_current_hosts_in_room(room_id)
+        domains = [
+            d
+            for d in domains_set
+            if d != self.server_name
+            and self._federation_shard_config.should_handle(self._instance_name, d)
+        ]
         if not domains:
             return
 
@@ -365,8 +384,7 @@ class FederationSender(object):
             queue.flush_read_receipts_for_room(room_id)
 
     @preserve_fn  # the caller should not yield on this
-    @defer.inlineCallbacks
-    def send_presence(self, states: List[UserPresenceState]):
+    async def send_presence(self, states: List[UserPresenceState]):
         """Send the new presence states to the appropriate destinations.
 
         This actually queues up the presence states ready for sending and
@@ -401,7 +419,7 @@ class FederationSender(object):
                 if not states_map:
                     break
 
-                yield self._process_presence_inner(list(states_map.values()))
+                await self._process_presence_inner(list(states_map.values()))
         except Exception:
             logger.exception("Error sending presence states to servers")
         finally:
@@ -421,20 +439,29 @@ class FederationSender(object):
         for destination in destinations:
             if destination == self.server_name:
                 continue
+            if not self._federation_shard_config.should_handle(
+                self._instance_name, destination
+            ):
+                continue
             self._get_per_destination_queue(destination).send_presence(states)
 
     @measure_func("txnqueue._process_presence")
-    @defer.inlineCallbacks
-    def _process_presence_inner(self, states: List[UserPresenceState]):
+    async def _process_presence_inner(self, states: List[UserPresenceState]):
         """Given a list of states populate self.pending_presence_by_dest and
         poke to send a new transaction to each destination
         """
-        hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+        hosts_and_states = await get_interested_remotes(self.store, states, self.state)
 
         for destinations, states in hosts_and_states:
             for destination in destinations:
                 if destination == self.server_name:
                     continue
+
+                if not self._federation_shard_config.should_handle(
+                    self._instance_name, destination
+                ):
+                    continue
+
                 self._get_per_destination_queue(destination).send_presence(states)
 
     def build_and_send_edu(
@@ -456,6 +483,11 @@ class FederationSender(object):
             logger.info("Not sending EDU to ourselves")
             return
 
+        if not self._federation_shard_config.should_handle(
+            self._instance_name, destination
+        ):
+            return
+
         edu = Edu(
             origin=self.server_name,
             destination=destination,
@@ -472,6 +504,11 @@ class FederationSender(object):
             edu: edu to send
             key: clobbering key for this edu
         """
+        if not self._federation_shard_config.should_handle(
+            self._instance_name, edu.destination
+        ):
+            return
+
         queue = self._get_per_destination_queue(edu.destination)
         if key:
             queue.send_keyed_edu(edu, key)
@@ -483,6 +520,11 @@ class FederationSender(object):
             logger.warning("Not sending device update to ourselves")
             return
 
+        if not self._federation_shard_config.should_handle(
+            self._instance_name, destination
+        ):
+            return
+
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
     def wake_destination(self, destination: str):
@@ -496,6 +538,11 @@ class FederationSender(object):
             logger.warning("Not waking up ourselves")
             return
 
+        if not self._federation_shard_config.should_handle(
+            self._instance_name, destination
+        ):
+            return
+
         self._get_per_destination_queue(destination).attempt_new_transaction()
 
     @staticmethod
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..c09ffcaf4c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.units import Edu
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -74,6 +74,20 @@ class PerDestinationQueue(object):
         self._clock = hs.get_clock()
         self._store = hs.get_datastore()
         self._transaction_manager = transaction_manager
+        self._instance_name = hs.get_instance_name()
+        self._federation_shard_config = hs.config.worker.federation_shard_config
+
+        self._should_send_on_this_instance = True
+        if not self._federation_shard_config.should_handle(
+            self._instance_name, destination
+        ):
+            # We don't raise an exception here to avoid taking out any other
+            # processing. We have a guard in `attempt_new_transaction` that
+            # ensure we don't start sending stuff.
+            logger.error(
+                "Create a per destination queue for %s on wrong worker", destination,
+            )
+            self._should_send_on_this_instance = False
 
         self._destination = destination
         self.transmission_loop_running = False
@@ -119,7 +133,7 @@ class PerDestinationQueue(object):
         )
 
     def send_pdu(self, pdu: EventBase, order: int) -> None:
-        """Add a PDU to the queue, and start the transmission loop if neccessary
+        """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
             pdu: pdu to send
@@ -129,7 +143,7 @@ class PerDestinationQueue(object):
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
-        """Add presence updates to the queue. Start the transmission loop if neccessary.
+        """Add presence updates to the queue. Start the transmission loop if necessary.
 
         Args:
             states: presence to send
@@ -180,6 +194,14 @@ class PerDestinationQueue(object):
             logger.debug("TX [%s] Transaction already in progress", self._destination)
             return
 
+        if not self._should_send_on_this_instance:
+            # We don't raise an exception here to avoid taking out any other
+            # processing.
+            logger.error(
+                "Trying to start a transaction to %s on wrong worker", self._destination
+            )
+            return
+
         logger.debug("TX [%s] Starting transaction loop", self._destination)
 
         run_as_background_process(
@@ -315,6 +337,28 @@ class PerDestinationQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+
+            if e.retry_interval > 60 * 60 * 1000:
+                # we won't retry for another hour!
+                # (this suggests a significant outage)
+                # We drop pending PDUs and EDUs because otherwise they will
+                # rack up indefinitely.
+                # Note that:
+                # - the EDUs that are being dropped here are those that we can
+                #   afford to drop (specifically, only typing notifications,
+                #   read receipts and presence updates are being dropped here)
+                # - Other EDUs such as to_device messages are queued with a
+                #   different mechanism
+                # - this is all volatile state that would be lost if the
+                #   federation sender restarted anyway
+
+                # dropping read receipts is a bit sad but should be solved
+                # through another mechanism, because this is all volatile!
+                self._pending_pdus = []
+                self._pending_edus = []
+                self._pending_edus_keyed = {}
+                self._pending_presence = {}
+                self._pending_rrs = {}
         except FederationDeniedError as e:
             logger.info(e)
         except HttpResponseException as e:
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index a2752a54a5..9bd534a313 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, List
-
-from canonicaljson import json
+from typing import TYPE_CHECKING, List, Tuple
 
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
     tags,
     whitelisted_homeserver,
 )
+from synapse.util import json_decoder
 from synapse.util.metrics import measure_func
 
 if TYPE_CHECKING:
@@ -54,15 +53,16 @@ class TransactionManager(object):
 
     @measure_func("_send_new_transaction")
     async def send_new_transaction(
-        self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
+        self,
+        destination: str,
+        pending_pdus: List[Tuple[EventBase, int]],
+        pending_edus: List[Edu],
     ):
 
         # Make a transaction-sending opentracing span. This span follows on from
         # all the edus in that transaction. This needs to be done since there is
         # no active span here, so if the edus were not received by the remote the
         # span would have no causality and it would be forgotten.
-        # The span_contexts is a generator so that it won't be evaluated if
-        # opentracing is disabled. (Yay speed!)
 
         span_contexts = []
         keep_destination = whitelisted_homeserver(destination)
@@ -70,7 +70,7 @@ class TransactionManager(object):
         for edu in pending_edus:
             context = edu.get_context()
             if context:
-                span_contexts.append(extract_text_map(json.loads(context)))
+                span_contexts.append(extract_text_map(json_decoder.decode(context)))
             if keep_destination:
                 edu.strip_context()