summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_server.py7
-rw-r--r--synapse/federation/persistence.py37
-rw-r--r--synapse/federation/send_queue.py6
-rw-r--r--synapse/federation/sender/__init__.py15
-rw-r--r--synapse/federation/sender/per_destination_queue.py43
-rw-r--r--synapse/federation/sender/transaction_manager.py33
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/federation/units.py4
10 files changed, 85 insertions, 68 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 420df2385f..38aa47963f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -39,7 +39,7 @@ from synapse.types import JsonDict, get_domain_from_id
 logger = logging.getLogger(__name__)
 
 
-class FederationBase(object):
+class FederationBase:
     def __init__(self, hs):
         self.hs = hs
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 11c5d63298..218df884b0 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -28,7 +28,6 @@ from typing import (
     Union,
 )
 
-from canonicaljson import json
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
@@ -63,7 +62,7 @@ from synapse.replication.http.federation import (
     ReplicationGetQueryRestServlet,
 )
 from synapse.types import JsonDict, get_domain_from_id
-from synapse.util import glob_to_regex, unwrapFirstError
+from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -551,7 +550,7 @@ class FederationServer(FederationBase):
             for device_id, keys in device_keys.items():
                 for key_id, json_str in keys.items():
                     json_result.setdefault(user_id, {})[device_id] = {
-                        key_id: json.loads(json_str)
+                        key_id: json_decoder.decode(json_str)
                     }
 
         logger.info(
@@ -786,7 +785,7 @@ def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
     return regex.match(server_name)
 
 
-class FederationHandlerRegistry(object):
+class FederationHandlerRegistry:
     """Allows classes to register themselves as handlers for a given EDU or
     query type for incoming federation traffic.
     """
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index d68b4bd670..079e2b2fe0 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -20,13 +20,16 @@ These actions are mostly only used by the :py:mod:`.replication` module.
 """
 
 import logging
+from typing import Optional, Tuple
 
+from synapse.federation.units import Transaction
 from synapse.logging.utils import log_function
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
 
-class TransactionActions(object):
+class TransactionActions:
     """ Defines persistence actions that relate to handling Transactions.
     """
 
@@ -34,30 +37,32 @@ class TransactionActions(object):
         self.store = datastore
 
     @log_function
-    def have_responded(self, origin, transaction):
-        """ Have we already responded to a transaction with the same id and
+    async def have_responded(
+        self, origin: str, transaction: Transaction
+    ) -> Optional[Tuple[int, JsonDict]]:
+        """Have we already responded to a transaction with the same id and
         origin?
 
         Returns:
-            Deferred: Results in `None` if we have not previously responded to
-            this transaction or a 2-tuple of `(int, dict)` representing the
-            response code and response body.
+            `None` if we have not previously responded to this transaction or a
+            2-tuple of `(int, dict)` representing the response code and response body.
         """
-        if not transaction.transaction_id:
+        transaction_id = transaction.transaction_id  # type: ignore
+        if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
-        return self.store.get_received_txn_response(transaction.transaction_id, origin)
+        return await self.store.get_received_txn_response(transaction_id, origin)
 
     @log_function
-    def set_response(self, origin, transaction, code, response):
-        """ Persist how we responded to a transaction.
-
-        Returns:
-            Deferred
+    async def set_response(
+        self, origin: str, transaction: Transaction, code: int, response: JsonDict
+    ) -> None:
+        """Persist how we responded to a transaction.
         """
-        if not transaction.transaction_id:
+        transaction_id = transaction.transaction_id  # type: ignore
+        if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
-        return self.store.set_received_txn_response(
-            transaction.transaction_id, origin, code, response
+        await self.store.set_received_txn_response(
+            transaction_id, origin, code, response
         )
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 2b0ab2dcbf..8e46957d15 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -37,8 +37,8 @@ from sortedcontainers import SortedDict
 
 from twisted.internet import defer
 
+from synapse.api.presence import UserPresenceState
 from synapse.metrics import LaterGauge
-from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 
 from .units import Edu
@@ -46,7 +46,7 @@ from .units import Edu
 logger = logging.getLogger(__name__)
 
 
-class FederationRemoteSendQueue(object):
+class FederationRemoteSendQueue:
     """A drop in replacement for FederationSender"""
 
     def __init__(self, hs):
@@ -365,7 +365,7 @@ class FederationRemoteSendQueue(object):
         )
 
 
-class BaseFederationRow(object):
+class BaseFederationRow:
     """Base class for rows to be sent in the federation stream.
 
     Specifies how to identify, serialize and deserialize the different types.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94cc63001e..552519e82c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
 
 import synapse
 import synapse.metrics
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
@@ -39,7 +40,6 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.metrics import Measure, measure_func
 
@@ -56,7 +56,7 @@ sent_pdus_destination_dist_total = Counter(
 )
 
 
-class FederationSender(object):
+class FederationSender:
     def __init__(self, hs: "synapse.server.HomeServer"):
         self.hs = hs
         self.server_name = hs.hostname
@@ -108,8 +108,6 @@ class FederationSender(object):
             ),
         )
 
-        self._order = 1
-
         self._is_processing = False
         self._last_poked_id = -1
 
@@ -272,9 +270,6 @@ class FederationSender(object):
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
-        order = self._order
-        self._order += 1
-
         destinations = set(destinations)
         destinations.discard(self.server_name)
         logger.debug("Sending to: %s", str(destinations))
@@ -286,7 +281,7 @@ class FederationSender(object):
         sent_pdus_destination_dist_count.inc()
 
         for destination in destinations:
-            self._get_per_destination_queue(destination).send_pdu(pdu, order)
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
@@ -329,10 +324,10 @@ class FederationSender(object):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains = await self.state.get_current_hosts_in_room(room_id)
+        domains_set = await self.state.get_current_hosts_in_room(room_id)
         domains = [
             d
-            for d in domains
+            for d in domains_set
             if d != self.server_name
             and self._federation_shard_config.should_handle(self._instance_name, d)
         ]
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..defc228c23 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.units import Edu
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
 )
 
 
-class PerDestinationQueue(object):
+class PerDestinationQueue:
     """
     Manages the per-destination transmission queues.
 
@@ -92,8 +92,8 @@ class PerDestinationQueue(object):
         self._destination = destination
         self.transmission_loop_running = False
 
-        # a list of tuples of (pending pdu, order)
-        self._pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        # a list of pending PDUs
+        self._pending_pdus = []  # type: List[EventBase]
 
         # XXX this is never actually used: see
         # https://github.com/matrix-org/synapse/issues/7549
@@ -132,14 +132,13 @@ class PerDestinationQueue(object):
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdu(self, pdu: EventBase, order: int) -> None:
+    def send_pdu(self, pdu: EventBase) -> None:
         """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
             pdu: pdu to send
-            order
         """
-        self._pending_pdus.append((pdu, order))
+        self._pending_pdus.append(pdu)
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -185,7 +184,7 @@ class PerDestinationQueue(object):
         returns immediately. Otherwise kicks off the process of sending a
         transaction in the background.
         """
-        # list of (pending_pdu, deferred, order)
+
         if self.transmission_loop_running:
             # XXX: this can get stuck on by a never-ending
             # request at which point pending_pdus just keeps growing.
@@ -210,7 +209,7 @@ class PerDestinationQueue(object):
         )
 
     async def _transaction_transmission_loop(self) -> None:
-        pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        pending_pdus = []  # type: List[EventBase]
         try:
             self.transmission_loop_running = True
 
@@ -337,6 +336,28 @@ class PerDestinationQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+
+            if e.retry_interval > 60 * 60 * 1000:
+                # we won't retry for another hour!
+                # (this suggests a significant outage)
+                # We drop pending PDUs and EDUs because otherwise they will
+                # rack up indefinitely.
+                # Note that:
+                # - the EDUs that are being dropped here are those that we can
+                #   afford to drop (specifically, only typing notifications,
+                #   read receipts and presence updates are being dropped here)
+                # - Other EDUs such as to_device messages are queued with a
+                #   different mechanism
+                # - this is all volatile state that would be lost if the
+                #   federation sender restarted anyway
+
+                # dropping read receipts is a bit sad but should be solved
+                # through another mechanism, because this is all volatile!
+                self._pending_pdus = []
+                self._pending_edus = []
+                self._pending_edus_keyed = {}
+                self._pending_presence = {}
+                self._pending_rrs = {}
         except FederationDeniedError as e:
             logger.info(e)
         except HttpResponseException as e:
@@ -351,13 +372,13 @@ class PerDestinationQueue(object):
                 "TX [%s] Failed to send transaction: %s", self._destination, e
             )
 
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
         except Exception:
             logger.exception("TX [%s] Failed to send transaction", self._destination)
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c7f6cb3d73..c84072ab73 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, List, Tuple
-
-from canonicaljson import json
+from typing import TYPE_CHECKING, List
 
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
     tags,
     whitelisted_homeserver,
 )
+from synapse.util import json_decoder
 from synapse.util.metrics import measure_func
 
 if TYPE_CHECKING:
@@ -36,7 +35,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class TransactionManager(object):
+class TransactionManager:
     """Helper class which handles building and sending transactions
 
     shared between PerDestinationQueue objects
@@ -54,11 +53,17 @@ class TransactionManager(object):
 
     @measure_func("_send_new_transaction")
     async def send_new_transaction(
-        self,
-        destination: str,
-        pending_pdus: List[Tuple[EventBase, int]],
-        pending_edus: List[Edu],
-    ):
+        self, destination: str, pdus: List[EventBase], edus: List[Edu],
+    ) -> bool:
+        """
+        Args:
+            destination: The destination to send to (e.g. 'example.org')
+            pdus: In-order list of PDUs to send
+            edus: List of EDUs to send
+
+        Returns:
+            True iff the transaction was successful
+        """
 
         # Make a transaction-sending opentracing span. This span follows on from
         # all the edus in that transaction. This needs to be done since there is
@@ -68,20 +73,14 @@ class TransactionManager(object):
         span_contexts = []
         keep_destination = whitelisted_homeserver(destination)
 
-        for edu in pending_edus:
+        for edu in edus:
             context = edu.get_context()
             if context:
-                span_contexts.append(extract_text_map(json.loads(context)))
+                span_contexts.append(extract_text_map(json_decoder.decode(context)))
             if keep_destination:
                 edu.strip_context()
 
         with start_active_span_follows_from("send_transaction", span_contexts):
-
-            # Sort based on the order field
-            pending_pdus.sort(key=lambda t: t[1])
-            pdus = [x[0] for x in pending_pdus]
-            edus = pending_edus
-
             success = True
 
             logger.debug("TX [%s] _attempt_new_transaction", destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 9ea821dbb2..17a10f622e 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -30,7 +30,7 @@ from synapse.logging.utils import log_function
 logger = logging.getLogger(__name__)
 
 
-class TransportLayerClient(object):
+class TransportLayerClient:
     """Sends federation HTTP requests to other servers"""
 
     def __init__(self, hs):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5e111aa902..9325e0f857 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -100,7 +100,7 @@ class NoAuthenticationError(AuthenticationError):
     pass
 
 
-class Authenticator(object):
+class Authenticator:
     def __init__(self, hs: HomeServer):
         self._clock = hs.get_clock()
         self.keyring = hs.get_keyring()
@@ -228,7 +228,7 @@ def _parse_auth_header(header_bytes):
         )
 
 
-class BaseFederationServlet(object):
+class BaseFederationServlet:
     """Abstract base class for federation servlet classes.
 
     The servlet object should have a PATH attribute which takes the form of a regexp to
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 6b32e0dcbf..64d98fc8f6 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -107,9 +107,7 @@ class Transaction(JsonEncodedObject):
         if "edus" in kwargs and not kwargs["edus"]:
             del kwargs["edus"]
 
-        super(Transaction, self).__init__(
-            transaction_id=transaction_id, pdus=pdus, **kwargs
-        )
+        super().__init__(transaction_id=transaction_id, pdus=pdus, **kwargs)
 
     @staticmethod
     def create_new(pdus, **kwargs):