summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py12
-rw-r--r--synapse/federation/sender/__init__.py86
-rw-r--r--synapse/federation/sender/per_destination_queue.py6
-rw-r--r--synapse/handlers/device.py26
-rw-r--r--synapse/handlers/devicemessage.py7
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/typing.py14
-rw-r--r--synapse/module_api/__init__.py2
-rw-r--r--synapse/replication/tcp/client.py8
-rw-r--r--synapse/storage/databases/main/transactions.py26
-rw-r--r--synapse/util/retryutils.py25
11 files changed, 161 insertions, 67 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index fb448f2155..6520795635 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -49,7 +49,7 @@ from synapse.api.presence import UserPresenceState
 from synapse.federation.sender import AbstractFederationSender, FederationSender
 from synapse.metrics import LaterGauge
 from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
 from synapse.util.metrics import Measure
 
 from .units import Edu
@@ -229,7 +229,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         """
         # nothing to do here: the replication listener will handle it.
 
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
         """As per FederationSender
@@ -245,7 +245,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         self.notifier.on_new_replication_data()
 
-    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+    async def send_device_messages(
+        self, destinations: StrCollection, immediate: bool = True
+    ) -> None:
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
@@ -463,7 +465,7 @@ class ParsedFederationStreamData:
     edus: Dict[str, List[Edu]]
 
 
-def process_rows_for_federation(
+async def process_rows_for_federation(
     transaction_queue: FederationSender,
     rows: List[FederationStream.FederationStreamRow],
 ) -> None:
@@ -496,7 +498,7 @@ def process_rows_for_federation(
         parsed_row.add_to_buffer(buff)
 
     for state, destinations in buff.presence_destinations:
-        transaction_queue.send_presence_to_destinations(
+        await transaction_queue.send_presence_to_destinations(
             states=[state], destinations=destinations
         )
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 97abbdee18..fb20fd8a10 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -147,7 +147,10 @@ from twisted.internet import defer
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
-from synapse.federation.sender.per_destination_queue import PerDestinationQueue
+from synapse.federation.sender.per_destination_queue import (
+    CATCHUP_RETRY_INTERVAL,
+    PerDestinationQueue,
+)
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
 from synapse.util import Clock
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
 
 if TYPE_CHECKING:
     from synapse.events.presence_router import PresenceRouter
@@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
         """Send the given presence states to the given destinations.
@@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+    async def send_device_messages(
+        self, destinations: StrCollection, immediate: bool = True
+    ) -> None:
         """Tells the sender that a new device message is ready to be sent to the
-        destination. The `immediate` flag specifies whether the messages should
+        destinations. The `immediate` flag specifies whether the messages should
         be tried to be sent immediately, or whether it can be delayed for a
         short while (to aid performance).
         """
@@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender):
             pdu.internal_metadata.stream_ordering,
         )
 
+        destinations = await filter_destinations_by_retry_limiter(
+            destinations,
+            clock=self.clock,
+            store=self.store,
+            retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+        )
+
         for destination in destinations:
             self._get_per_destination_queue(destination).send_pdu(pdu)
 
@@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender):
         domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
             room_id
         )
-        domains = [
+        domains: StrCollection = [
             d
             for d in domains_set
             if not self.is_mine_server_name(d)
             and self._federation_shard_config.should_handle(self._instance_name, d)
         ]
+
+        domains = await filter_destinations_by_retry_limiter(
+            domains,
+            clock=self.clock,
+            store=self.store,
+            retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+        )
+
         if not domains:
             return
 
@@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender):
         for queue in queues:
             queue.flush_read_receipts_for_room(room_id)
 
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
         """Send the given presence states to the given destinations.
@@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender):
         for state in states:
             assert self.is_mine_id(state.user_id)
 
+        destinations = await filter_destinations_by_retry_limiter(
+            [
+                d
+                for d in destinations
+                if self._federation_shard_config.should_handle(self._instance_name, d)
+            ],
+            clock=self.clock,
+            store=self.store,
+            retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+        )
+
         for destination in destinations:
             if self.is_mine_server_name(destination):
                 continue
-            if not self._federation_shard_config.should_handle(
-                self._instance_name, destination
-            ):
-                continue
 
             self._get_per_destination_queue(destination).send_presence(
                 states, start_loop=False
@@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender):
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
-        if self.is_mine_server_name(destination):
-            logger.warning("Not sending device update to ourselves")
-            return
-
-        if not self._federation_shard_config.should_handle(
-            self._instance_name, destination
-        ):
-            return
+    async def send_device_messages(
+        self, destinations: StrCollection, immediate: bool = True
+    ) -> None:
+        destinations = await filter_destinations_by_retry_limiter(
+            [
+                destination
+                for destination in destinations
+                if self._federation_shard_config.should_handle(
+                    self._instance_name, destination
+                )
+                and not self.is_mine_server_name(destination)
+            ],
+            clock=self.clock,
+            store=self.store,
+            retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+        )
 
-        if immediate:
-            self._get_per_destination_queue(destination).attempt_new_transaction()
-        else:
-            self._get_per_destination_queue(destination).mark_new_data()
-            self._destination_wakeup_queue.add_to_queue(destination)
+        for destination in destinations:
+            if immediate:
+                self._get_per_destination_queue(destination).attempt_new_transaction()
+            else:
+                self._get_per_destination_queue(destination).mark_new_data()
+                self._destination_wakeup_queue.add_to_queue(destination)
 
     def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 31c5c2b7de..9105ba664c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -59,6 +59,10 @@ sent_edus_by_type = Counter(
 )
 
 
+# If the retry interval is larger than this then we enter "catchup" mode
+CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
+
+
 class PerDestinationQueue:
     """
     Manages the per-destination transmission queues.
@@ -370,7 +374,7 @@ class PerDestinationQueue:
                 ),
             )
 
-            if e.retry_interval > 60 * 60 * 1000:
+            if e.retry_interval > CATCHUP_RETRY_INTERVAL:
                 # we won't retry for another hour!
                 # (this suggests a significant outage)
                 # We drop pending EDUs because otherwise they will
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5ae427d52c..763f56dfc1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler):
                             user_id,
                             hosts,
                         )
-                        for host in hosts:
-                            self.federation_sender.send_device_messages(
-                                host, immediate=False
-                            )
-                            # TODO: when called, this isn't in a logging context.
-                            # This leads to log spam, sentry event spam, and massive
-                            # memory usage.
-                            # See https://github.com/matrix-org/synapse/issues/12552.
-                            # log_kv(
-                            #     {"message": "sent device update to host", "host": host}
-                            # )
+                        await self.federation_sender.send_device_messages(
+                            hosts, immediate=False
+                        )
+                        # TODO: when called, this isn't in a logging context.
+                        # This leads to log spam, sentry event spam, and massive
+                        # memory usage.
+                        # See https://github.com/matrix-org/synapse/issues/12552.
+                        # log_kv(
+                        #     {"message": "sent device update to host", "host": host}
+                        # )
 
                     if current_stream_id != stream_id:
                         # Clear the set of hosts we've already sent to as we're
@@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler):
 
         # Notify things that device lists need to be sent out.
         self.notifier.notify_replication()
-        for host in potentially_changed_hosts:
-            self.federation_sender.send_device_messages(host, immediate=False)
+        await self.federation_sender.send_device_messages(
+            potentially_changed_hosts, immediate=False
+        )
 
 
 def _update_device_from_client_ips(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 798c7039f9..1c79f7a61e 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -302,10 +302,9 @@ class DeviceMessageHandler:
         )
 
         if self.federation_sender:
-            for destination in remote_messages.keys():
-                # Enqueue a new federation transaction to send the new
-                # device messages to each remote destination.
-                self.federation_sender.send_device_messages(destination)
+            # Enqueue a new federation transaction to send the new
+            # device messages to each remote destination.
+            await self.federation_sender.send_device_messages(remote_messages.keys())
 
     async def get_events_for_dehydrated_device(
         self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2f841863ae..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
         )
 
         for destination, host_states in hosts_to_states.items():
-            self._federation.send_presence_to_destinations(host_states, [destination])
+            await self._federation.send_presence_to_destinations(
+                host_states, [destination]
+            )
 
     async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
         """
@@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
                 )
 
                 for destination, states in hosts_to_states.items():
-                    self._federation_queue.send_presence_to_destinations(
+                    await self._federation_queue.send_presence_to_destinations(
                         states, [destination]
                     )
 
@@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
                 or state.status_msg is not None
             ]
 
-            self._federation_queue.send_presence_to_destinations(
+            await self._federation_queue.send_presence_to_destinations(
                 destinations=newly_joined_remote_hosts,
                 states=states,
             )
@@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
             prev_remote_hosts or newly_joined_remote_hosts
         ):
             local_states = await self.current_state_for_users(newly_joined_local_users)
-            self._federation_queue.send_presence_to_destinations(
+            await self._federation_queue.send_presence_to_destinations(
                 destinations=prev_remote_hosts | newly_joined_remote_hosts,
                 states=list(local_states.values()),
             )
@@ -2182,7 +2184,7 @@ class PresenceFederationQueue:
         index = bisect(self._queue, (clear_before,))
         self._queue = self._queue[index:]
 
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Collection[UserPresenceState], destinations: StrCollection
     ) -> None:
         """Send the presence states to the given destinations.
@@ -2202,7 +2204,7 @@ class PresenceFederationQueue:
             return
 
         if self._federation:
-            self._federation.send_presence_to_destinations(
+            await self._federation.send_presence_to_destinations(
                 states=states,
                 destinations=destinations,
             )
@@ -2325,7 +2327,7 @@ class PresenceFederationQueue:
 
         for host, user_ids in hosts_to_users.items():
             states = await self._presence_handler.current_state_for_users(user_ids)
-            self._federation.send_presence_to_destinations(
+            await self._federation.send_presence_to_destinations(
                 states=states.values(),
                 destinations=[host],
             )
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 7aeae5319c..4b4227003d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
 from synapse.util.wheel_timer import WheelTimer
 
 if TYPE_CHECKING:
@@ -150,8 +151,15 @@ class FollowerTypingHandler:
                 now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
             )
 
-            hosts = await self._storage_controllers.state.get_current_hosts_in_room(
-                member.room_id
+            hosts: StrCollection = (
+                await self._storage_controllers.state.get_current_hosts_in_room(
+                    member.room_id
+                )
+            )
+            hosts = await filter_destinations_by_retry_limiter(
+                hosts,
+                clock=self.clock,
+                store=self.store,
             )
             for domain in hosts:
                 if not self.is_mine_server_name(domain):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 9ad8e038ae..2f00a7ba20 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -1180,7 +1180,7 @@ class ModuleApi:
 
             # Send to remote destinations.
             destination = UserID.from_string(user).domain
-            presence_handler.get_federation_queue().send_presence_to_destinations(
+            await presence_handler.get_federation_queue().send_presence_to_destinations(
                 presence_events, [destination]
             )
 
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3b88dc68ea..51285e6d33 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -422,7 +422,7 @@ class FederationSenderHandler:
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
-            send_queue.process_rows_for_federation(self.federation_sender, rows)
+            await send_queue.process_rows_for_federation(self.federation_sender, rows)
             await self.update_token(token)
 
         # ... and when new receipts happen
@@ -439,16 +439,14 @@ class FederationSenderHandler:
                 for row in rows
                 if not row.entity.startswith("@") and not row.is_signature
             }
-            for host in hosts:
-                self.federation_sender.send_device_messages(host, immediate=False)
+            await self.federation_sender.send_device_messages(hosts, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
             # clients and remote servers, so we ignore entities that start with
             # '@' (since they'll be local users rather than destinations).
             hosts = {row.entity for row in rows if not row.entity.startswith("@")}
-            for host in hosts:
-                self.federation_sender.send_device_messages(host)
+            await self.federation_sender.send_device_messages(hosts)
 
     async def _on_new_receipts(
         self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow]
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 860bbf7c0f..efd21b5bfc 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
 
 import logging
 from enum import Enum
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast
 
 import attr
 from canonicaljson import encode_canonical_json
@@ -28,8 +28,8 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.types import JsonDict
-from synapse.util.caches.descriptors import cached
+from synapse.types import JsonDict, StrCollection
+from synapse.util.caches.descriptors import cached, cachedList
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         else:
             return None
 
+    @cachedList(
+        cached_method_name="get_destination_retry_timings", list_name="destinations"
+    )
+    async def get_destination_retry_timings_batch(
+        self, destinations: StrCollection
+    ) -> Dict[str, Optional[DestinationRetryTimings]]:
+        rows = await self.db_pool.simple_select_many_batch(
+            table="destinations",
+            iterable=destinations,
+            column="destination",
+            retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
+            desc="get_destination_retry_timings_batch",
+        )
+
+        return {
+            row.pop("destination"): DestinationRetryTimings(**row)
+            for row in rows
+            if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
+        }
+
     async def set_destination_retry_timings(
         self,
         destination: str,
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 9d2065372c..0e1f907667 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Optional, Type
 from synapse.api.errors import CodeMessageException
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage import DataStore
+from synapse.types import StrCollection
 from synapse.util import Clock
 
 if TYPE_CHECKING:
@@ -116,6 +117,30 @@ async def get_retry_limiter(
     )
 
 
+async def filter_destinations_by_retry_limiter(
+    destinations: StrCollection,
+    clock: Clock,
+    store: DataStore,
+    retry_due_within_ms: int = 0,
+) -> StrCollection:
+    """Filter down the list of destinations to only those that will are either
+    alive or due for a retry (within `retry_due_within_ms`)
+    """
+    if not destinations:
+        return destinations
+
+    retry_timings = await store.get_destination_retry_timings_batch(destinations)
+
+    now = int(clock.time_msec())
+
+    return [
+        destination
+        for destination, timings in retry_timings.items()
+        if timings is None
+        or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms
+    ]
+
+
 class RetryDestinationLimiter:
     def __init__(
         self,