summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2024-03-22 13:24:11 +0000
committerGitHub <noreply@github.com>2024-03-22 13:24:11 +0000
commitb5322b4daf2e13310200e57eb427568cb6a92ddf (patch)
tree8a8aed36f136df07d0fea011581bc4cadba821af /synapse/storage
parentAdd OIDC config to add extra parameters to the authorize URL (#16971) (diff)
downloadsynapse-b5322b4daf2e13310200e57eb427568cb6a92ddf.tar.xz
Ensure that pending to-device events are sent over federation at startup (#16925)
Fixes https://github.com/element-hq/synapse/issues/16680, as well as a
related bug, where servers which we had *never* successfully sent an
event to would not be retried.

In order to fix the case of pending to-device messages, we hook into the
existing `wake_destinations_needing_catchup` process, by extending it to
look for destinations that have pending to-device messages. The
federation transmission loop then attempts to send the pending to-device
messages as normal.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/transactions.py99
1 files changed, 79 insertions, 20 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c91c44818f..08e0241f68 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -423,8 +423,11 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         self, after_destination: Optional[str]
     ) -> List[str]:
         """
-        Gets at most 25 destinations which have outstanding PDUs to be caught up,
-        and are not being backed off from
+        Get a list of destinations we should retry transaction sending to.
+
+        Returns up to 25 destinations which have outstanding PDUs or to-device messages,
+        and are not subject to a backoff.
+
         Args:
             after_destination:
                 If provided, all destinations must be lexicographically greater
@@ -448,30 +451,86 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
     def _get_catch_up_outstanding_destinations_txn(
         txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
     ) -> List[str]:
+        # We're looking for destinations which satisfy either of the following
+        # conditions:
+        #
+        #   * There is at least one room where we have an event that we have not yet
+        #     sent to them, indicated by a row in `destination_rooms` with a
+        #     `stream_ordering` older than the `last_successful_stream_ordering`
+        #     (if any) in `destinations`, or:
+        #
+        #   * There is at least one to-device message outstanding for the destination,
+        #     indicated by a row in `device_federation_outbox`.
+        #
+        # Of course, that may produce destinations where we are already busy sending
+        # the relevant PDU or to-device message, but in that case, waking up the
+        # sender will just be a no-op.
+        #
+        # From those two lists, we need to *exclude* destinations which are subject
+        # to a backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval`
+        # is in the future). There is also an edge-case where, if the server was
+        # previously shut down in the middle of the first send attempt to a given
+        # destination, there may be no row in `destinations` at all; we need to include
+        # such rows in the output, which means we need to left-join rather than
+        # inner-join against `destinations`.
+        #
+        # The two sources of destinations (`destination_rooms` and
+        # `device_federation_outbox`) are queried separately and UNIONed; but the list
+        # may be very long, and we don't want to return all the rows at once. We
+        # therefore sort the output and just return the first 25 rows. Obviously that
+        # means there is no point in either of the inner queries returning more than
+        # 25 results, since any further results are certain to be dropped by the outer
+        # LIMIT. In order to help the query-optimiser understand that, we *also* sort
+        # and limit the *inner* queries, hence we express them as CTEs rather than
+        # sub-queries.
+        #
+        # (NB: we make sure to do the top-level sort and limit on the database, rather
+        # than making two queries and combining the result in Python. We could otherwise
+        # suffer from slight differences in sort order between Python and the database,
+        # which would make the `after_destination` condition unreliable.)
+
         q = """
-            SELECT DISTINCT destination FROM destinations
-            INNER JOIN destination_rooms USING (destination)
-                WHERE
-                    stream_ordering > last_successful_stream_ordering
-                    AND destination > ?
-                    AND (
-                        retry_last_ts IS NULL OR
-                        retry_last_ts + retry_interval < ?
-                    )
-                    ORDER BY destination
-                    LIMIT 25
+        WITH pdu_destinations AS (
+            SELECT DISTINCT destination FROM destination_rooms
+            LEFT JOIN destinations USING (destination)
+            WHERE
+                destination > ?
+                AND destination_rooms.stream_ordering > COALESCE(destinations.last_successful_stream_ordering, 0)
+                AND (
+                    destinations.retry_last_ts IS NULL OR
+                    destinations.retry_last_ts + destinations.retry_interval < ?
+                )
+            ORDER BY destination
+            LIMIT 25
+        ), to_device_destinations AS (
+            SELECT DISTINCT destination FROM device_federation_outbox
+            LEFT JOIN destinations USING (destination)
+            WHERE
+               destination > ?
+               AND (
+                    destinations.retry_last_ts IS NULL OR
+                    destinations.retry_last_ts + destinations.retry_interval < ?
+               )
+            ORDER BY destination
+            LIMIT 25
+        )
+
+        SELECT destination FROM pdu_destinations
+        UNION SELECT destination FROM to_device_destinations
+            ORDER BY destination
+            LIMIT 25
         """
+
+        # everything is lexicographically greater than "" so this gives
+        # us the first batch of up to 25.
+        after_destination = after_destination or ""
+
         txn.execute(
             q,
-            (
-                # everything is lexicographically greater than "" so this gives
-                # us the first batch of up to 25.
-                after_destination or "",
-                now_time_ms,
-            ),
+            (after_destination, now_time_ms, after_destination, now_time_ms),
         )
-
         destinations = [row[0] for row in txn]
+
         return destinations
 
     async def get_destinations_paginate(