summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_server.py7
-rw-r--r--synapse/federation/persistence.py37
-rw-r--r--synapse/federation/send_queue.py6
-rw-r--r--synapse/federation/sender/__init__.py26
-rw-r--r--synapse/federation/sender/per_destination_queue.py177
-rw-r--r--synapse/federation/sender/transaction_manager.py33
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/federation/units.py4
10 files changed, 227 insertions, 71 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 420df2385f..38aa47963f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -39,7 +39,7 @@ from synapse.types import JsonDict, get_domain_from_id
 logger = logging.getLogger(__name__)
 
 
-class FederationBase(object):
+class FederationBase:
     def __init__(self, hs):
         self.hs = hs
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 11c5d63298..218df884b0 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -28,7 +28,6 @@ from typing import (
     Union,
 )
 
-from canonicaljson import json
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
@@ -63,7 +62,7 @@ from synapse.replication.http.federation import (
     ReplicationGetQueryRestServlet,
 )
 from synapse.types import JsonDict, get_domain_from_id
-from synapse.util import glob_to_regex, unwrapFirstError
+from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -551,7 +550,7 @@ class FederationServer(FederationBase):
             for device_id, keys in device_keys.items():
                 for key_id, json_str in keys.items():
                     json_result.setdefault(user_id, {})[device_id] = {
-                        key_id: json.loads(json_str)
+                        key_id: json_decoder.decode(json_str)
                     }
 
         logger.info(
@@ -786,7 +785,7 @@ def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
     return regex.match(server_name)
 
 
-class FederationHandlerRegistry(object):
+class FederationHandlerRegistry:
     """Allows classes to register themselves as handlers for a given EDU or
     query type for incoming federation traffic.
     """
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index d68b4bd670..079e2b2fe0 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -20,13 +20,16 @@ These actions are mostly only used by the :py:mod:`.replication` module.
 """
 
 import logging
+from typing import Optional, Tuple
 
+from synapse.federation.units import Transaction
 from synapse.logging.utils import log_function
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
 
-class TransactionActions(object):
+class TransactionActions:
     """ Defines persistence actions that relate to handling Transactions.
     """
 
@@ -34,30 +37,32 @@ class TransactionActions(object):
         self.store = datastore
 
     @log_function
-    def have_responded(self, origin, transaction):
-        """ Have we already responded to a transaction with the same id and
+    async def have_responded(
+        self, origin: str, transaction: Transaction
+    ) -> Optional[Tuple[int, JsonDict]]:
+        """Have we already responded to a transaction with the same id and
         origin?
 
         Returns:
-            Deferred: Results in `None` if we have not previously responded to
-            this transaction or a 2-tuple of `(int, dict)` representing the
-            response code and response body.
+            `None` if we have not previously responded to this transaction or a
+            2-tuple of `(int, dict)` representing the response code and response body.
         """
-        if not transaction.transaction_id:
+        transaction_id = transaction.transaction_id  # type: ignore
+        if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
-        return self.store.get_received_txn_response(transaction.transaction_id, origin)
+        return await self.store.get_received_txn_response(transaction_id, origin)
 
     @log_function
-    def set_response(self, origin, transaction, code, response):
-        """ Persist how we responded to a transaction.
-
-        Returns:
-            Deferred
+    async def set_response(
+        self, origin: str, transaction: Transaction, code: int, response: JsonDict
+    ) -> None:
+        """Persist how we responded to a transaction.
         """
-        if not transaction.transaction_id:
+        transaction_id = transaction.transaction_id  # type: ignore
+        if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
-        return self.store.set_received_txn_response(
-            transaction.transaction_id, origin, code, response
+        await self.store.set_received_txn_response(
+            transaction_id, origin, code, response
         )
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 2b0ab2dcbf..8e46957d15 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -37,8 +37,8 @@ from sortedcontainers import SortedDict
 
 from twisted.internet import defer
 
+from synapse.api.presence import UserPresenceState
 from synapse.metrics import LaterGauge
-from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 
 from .units import Edu
@@ -46,7 +46,7 @@ from .units import Edu
 logger = logging.getLogger(__name__)
 
 
-class FederationRemoteSendQueue(object):
+class FederationRemoteSendQueue:
     """A drop in replacement for FederationSender"""
 
     def __init__(self, hs):
@@ -365,7 +365,7 @@ class FederationRemoteSendQueue(object):
         )
 
 
-class BaseFederationRow(object):
+class BaseFederationRow:
     """Base class for rows to be sent in the federation stream.
 
     Specifies how to identify, serialize and deserialize the different types.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94cc63001e..41a726878d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
 
 import synapse
 import synapse.metrics
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
@@ -39,7 +40,6 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.metrics import Measure, measure_func
 
@@ -56,7 +56,7 @@ sent_pdus_destination_dist_total = Counter(
 )
 
 
-class FederationSender(object):
+class FederationSender:
     def __init__(self, hs: "synapse.server.HomeServer"):
         self.hs = hs
         self.server_name = hs.hostname
@@ -108,8 +108,6 @@ class FederationSender(object):
             ),
         )
 
-        self._order = 1
-
         self._is_processing = False
         self._last_poked_id = -1
 
@@ -211,7 +209,7 @@ class FederationSender(object):
                     logger.debug("Sending %s to %r", event, destinations)
 
                     if destinations:
-                        self._send_pdu(event, destinations)
+                        await self._send_pdu(event, destinations)
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -267,14 +265,11 @@ class FederationSender(object):
         finally:
             self._is_processing = False
 
-    def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
-        order = self._order
-        self._order += 1
-
         destinations = set(destinations)
         destinations.discard(self.server_name)
         logger.debug("Sending to: %s", str(destinations))
@@ -285,8 +280,15 @@ class FederationSender(object):
         sent_pdus_destination_dist_total.inc(len(destinations))
         sent_pdus_destination_dist_count.inc()
 
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
+        )
+
         for destination in destinations:
-            self._get_per_destination_queue(destination).send_pdu(pdu, order)
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
@@ -329,10 +331,10 @@ class FederationSender(object):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains = await self.state.get_current_hosts_in_room(room_id)
+        domains_set = await self.state.get_current_hosts_in_room(room_id)
         domains = [
             d
-            for d in domains
+            for d in domains_set
             if d != self.server_name
             and self._federation_shard_config.should_handle(self._instance_name, d)
         ]
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..2657767fd1 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
 
 from prometheus_client import Counter
 
@@ -24,12 +24,12 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.units import Edu
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
 from synapse.types import ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
 )
 
 
-class PerDestinationQueue(object):
+class PerDestinationQueue:
     """
     Manages the per-destination transmission queues.
 
@@ -92,8 +92,23 @@ class PerDestinationQueue(object):
         self._destination = destination
         self.transmission_loop_running = False
 
-        # a list of tuples of (pending pdu, order)
-        self._pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        # True whilst we are sending events that the remote homeserver missed
+        # because it was unreachable. We start in this state so we can perform
+        # catch-up at startup.
+        # New events will only be sent once this is finished, at which point
+        # _catching_up is flipped to False.
+        self._catching_up = True  # type: bool
+
+        # The stream_ordering of the most recent PDU that was discarded due to
+        # being in catch-up mode.
+        self._catchup_last_skipped = 0  # type: int
+
+        # Cache of the last successfully-transmitted stream ordering for this
+        # destination (we are the only updater so this is safe)
+        self._last_successful_stream_ordering = None  # type: Optional[int]
+
+        # a list of pending PDUs
+        self._pending_pdus = []  # type: List[EventBase]
 
         # XXX this is never actually used: see
         # https://github.com/matrix-org/synapse/issues/7549
@@ -132,14 +147,19 @@ class PerDestinationQueue(object):
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdu(self, pdu: EventBase, order: int) -> None:
+    def send_pdu(self, pdu: EventBase) -> None:
         """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
             pdu: pdu to send
-            order
         """
-        self._pending_pdus.append((pdu, order))
+        if not self._catching_up or self._last_successful_stream_ordering is None:
+            # only enqueue the PDU if we are not catching up (False) or do not
+            # yet know if we have anything to catch up (None)
+            self._pending_pdus.append(pdu)
+        else:
+            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -185,7 +205,7 @@ class PerDestinationQueue(object):
         returns immediately. Otherwise kicks off the process of sending a
         transaction in the background.
         """
-        # list of (pending_pdu, deferred, order)
+
         if self.transmission_loop_running:
             # XXX: this can get stuck on by a never-ending
             # request at which point pending_pdus just keeps growing.
@@ -210,7 +230,7 @@ class PerDestinationQueue(object):
         )
 
     async def _transaction_transmission_loop(self) -> None:
-        pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        pending_pdus = []  # type: List[EventBase]
         try:
             self.transmission_loop_running = True
 
@@ -219,6 +239,13 @@ class PerDestinationQueue(object):
             # hence why we throw the result away.
             await get_retry_limiter(self._destination, self._clock, self._store)
 
+            if self._catching_up:
+                # we potentially need to catch-up first
+                await self._catch_up_transmission_loop()
+                if self._catching_up:
+                    # not caught up yet
+                    return
+
             pending_pdus = []
             while True:
                 # We have to keep 2 free slots for presence and rr_edus
@@ -326,6 +353,17 @@ class PerDestinationQueue(object):
 
                     self._last_device_stream_id = device_stream_id
                     self._last_device_list_stream_id = dev_list_id
+
+                    if pending_pdus:
+                        # we sent some PDUs and it was successful, so update our
+                        # last_successful_stream_ordering in the destinations table.
+                        final_pdu = pending_pdus[-1]
+                        last_successful_stream_ordering = (
+                            final_pdu.internal_metadata.stream_ordering
+                        )
+                        await self._store.set_destination_last_successful_stream_ordering(
+                            self._destination, last_successful_stream_ordering
+                        )
                 else:
                     break
         except NotRetryingDestination as e:
@@ -337,6 +375,30 @@ class PerDestinationQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+
+            if e.retry_interval > 60 * 60 * 1000:
+                # we won't retry for another hour!
+                # (this suggests a significant outage)
+                # We drop pending EDUs because otherwise they will
+                # rack up indefinitely.
+                # (Dropping PDUs is already performed by `_start_catching_up`.)
+                # Note that:
+                # - the EDUs that are being dropped here are those that we can
+                #   afford to drop (specifically, only typing notifications,
+                #   read receipts and presence updates are being dropped here)
+                # - Other EDUs such as to_device messages are queued with a
+                #   different mechanism
+                # - this is all volatile state that would be lost if the
+                #   federation sender restarted anyway
+
+                # dropping read receipts is a bit sad but should be solved
+                # through another mechanism, because this is all volatile!
+                self._pending_edus = []
+                self._pending_edus_keyed = {}
+                self._pending_presence = {}
+                self._pending_rrs = {}
+
+            self._start_catching_up()
         except FederationDeniedError as e:
             logger.info(e)
         except HttpResponseException as e:
@@ -346,25 +408,107 @@ class PerDestinationQueue(object):
                 e.code,
                 e,
             )
+
+            self._start_catching_up()
         except RequestSendFailed as e:
             logger.warning(
                 "TX [%s] Failed to send transaction: %s", self._destination, e
             )
 
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
+
+            self._start_catching_up()
         except Exception:
             logger.exception("TX [%s] Failed to send transaction", self._destination)
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
+
+            self._start_catching_up()
         finally:
             # We want to be *very* sure we clear this after we stop processing
             self.transmission_loop_running = False
 
+    async def _catch_up_transmission_loop(self) -> None:
+        first_catch_up_check = self._last_successful_stream_ordering is None
+
+        if first_catch_up_check:
+            # first catchup so get last_successful_stream_ordering from database
+            self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
+                self._destination
+            )
+
+        if self._last_successful_stream_ordering is None:
+            # if it's still None, then this means we don't have the information
+            # in our database ­ we haven't successfully sent a PDU to this server
+            # (at least since the introduction of the feature tracking
+            # last_successful_stream_ordering).
+            # Sadly, this means we can't do anything here as we don't know what
+            # needs catching up — so catching up is futile; let's stop.
+            self._catching_up = False
+            return
+
+        # get at most 50 catchup room/PDUs
+        while True:
+            event_ids = await self._store.get_catch_up_room_event_ids(
+                self._destination, self._last_successful_stream_ordering,
+            )
+
+            if not event_ids:
+                # No more events to catch up on, but we can't ignore the chance
+                # of a race condition, so we check that no new events have been
+                # skipped due to us being in catch-up mode
+
+                if self._catchup_last_skipped > self._last_successful_stream_ordering:
+                    # another event has been skipped because we were in catch-up mode
+                    continue
+
+                # we are done catching up!
+                self._catching_up = False
+                break
+
+            if first_catch_up_check:
+                # as this is our check for needing catch-up, we may have PDUs in
+                # the queue from before we *knew* we had to do catch-up, so
+                # clear those out now.
+                self._start_catching_up()
+
+            # fetch the relevant events from the event store
+            # - redacted behaviour of REDACT is fine, since we only send metadata
+            #   of redacted events to the destination.
+            # - don't need to worry about rejected events as we do not actively
+            #   forward received events over federation.
+            catchup_pdus = await self._store.get_events_as_list(event_ids)
+            if not catchup_pdus:
+                raise AssertionError(
+                    "No events retrieved when we asked for %r. "
+                    "This should not happen." % event_ids
+                )
+
+            if logger.isEnabledFor(logging.INFO):
+                rooms = (p.room_id for p in catchup_pdus)
+                logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+
+            success = await self._transaction_manager.send_new_transaction(
+                self._destination, catchup_pdus, []
+            )
+
+            if not success:
+                return
+
+            sent_transactions_counter.inc()
+            final_pdu = catchup_pdus[-1]
+            self._last_successful_stream_ordering = cast(
+                int, final_pdu.internal_metadata.stream_ordering
+            )
+            await self._store.set_destination_last_successful_stream_ordering(
+                self._destination, self._last_successful_stream_ordering
+            )
+
     def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
         if not self._pending_rrs:
             return
@@ -425,3 +569,12 @@ class PerDestinationQueue(object):
         ]
 
         return (edus, stream_id)
+
+    def _start_catching_up(self) -> None:
+        """
+        Marks this destination as being in catch-up mode.
+
+        This throws away the PDU queue.
+        """
+        self._catching_up = True
+        self._pending_pdus = []
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c7f6cb3d73..c84072ab73 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, List, Tuple
-
-from canonicaljson import json
+from typing import TYPE_CHECKING, List
 
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
     tags,
     whitelisted_homeserver,
 )
+from synapse.util import json_decoder
 from synapse.util.metrics import measure_func
 
 if TYPE_CHECKING:
@@ -36,7 +35,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class TransactionManager(object):
+class TransactionManager:
     """Helper class which handles building and sending transactions
 
     shared between PerDestinationQueue objects
@@ -54,11 +53,17 @@ class TransactionManager(object):
 
     @measure_func("_send_new_transaction")
     async def send_new_transaction(
-        self,
-        destination: str,
-        pending_pdus: List[Tuple[EventBase, int]],
-        pending_edus: List[Edu],
-    ):
+        self, destination: str, pdus: List[EventBase], edus: List[Edu],
+    ) -> bool:
+        """
+        Args:
+            destination: The destination to send to (e.g. 'example.org')
+            pdus: In-order list of PDUs to send
+            edus: List of EDUs to send
+
+        Returns:
+            True iff the transaction was successful
+        """
 
         # Make a transaction-sending opentracing span. This span follows on from
         # all the edus in that transaction. This needs to be done since there is
@@ -68,20 +73,14 @@ class TransactionManager(object):
         span_contexts = []
         keep_destination = whitelisted_homeserver(destination)
 
-        for edu in pending_edus:
+        for edu in edus:
             context = edu.get_context()
             if context:
-                span_contexts.append(extract_text_map(json.loads(context)))
+                span_contexts.append(extract_text_map(json_decoder.decode(context)))
             if keep_destination:
                 edu.strip_context()
 
         with start_active_span_follows_from("send_transaction", span_contexts):
-
-            # Sort based on the order field
-            pending_pdus.sort(key=lambda t: t[1])
-            pdus = [x[0] for x in pending_pdus]
-            edus = pending_edus
-
             success = True
 
             logger.debug("TX [%s] _attempt_new_transaction", destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 9ea821dbb2..17a10f622e 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -30,7 +30,7 @@ from synapse.logging.utils import log_function
 logger = logging.getLogger(__name__)
 
 
-class TransportLayerClient(object):
+class TransportLayerClient:
     """Sends federation HTTP requests to other servers"""
 
     def __init__(self, hs):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5e111aa902..9325e0f857 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -100,7 +100,7 @@ class NoAuthenticationError(AuthenticationError):
     pass
 
 
-class Authenticator(object):
+class Authenticator:
     def __init__(self, hs: HomeServer):
         self._clock = hs.get_clock()
         self.keyring = hs.get_keyring()
@@ -228,7 +228,7 @@ def _parse_auth_header(header_bytes):
         )
 
 
-class BaseFederationServlet(object):
+class BaseFederationServlet:
     """Abstract base class for federation servlet classes.
 
     The servlet object should have a PATH attribute which takes the form of a regexp to
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 6b32e0dcbf..64d98fc8f6 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -107,9 +107,7 @@ class Transaction(JsonEncodedObject):
         if "edus" in kwargs and not kwargs["edus"]:
             del kwargs["edus"]
 
-        super(Transaction, self).__init__(
-            transaction_id=transaction_id, pdus=pdus, **kwargs
-        )
+        super().__init__(transaction_id=transaction_id, pdus=pdus, **kwargs)
 
     @staticmethod
     def create_new(pdus, **kwargs):