summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/transactions.py66
1 files changed, 64 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 99cffff50c..97aed1500e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -218,6 +218,7 @@ class TransactionStore(SQLBaseStore):
                         retry_interval = EXCLUDED.retry_interval
                     WHERE
                         EXCLUDED.retry_interval = 0
+                        OR destinations.retry_interval IS NULL
                         OR destinations.retry_interval < EXCLUDED.retry_interval
             """
 
@@ -249,7 +250,11 @@ class TransactionStore(SQLBaseStore):
                     "retry_interval": retry_interval,
                 },
             )
-        elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+        elif (
+            retry_interval == 0
+            or prev_row["retry_interval"] is None
+            or prev_row["retry_interval"] < retry_interval
+        ):
             self.db_pool.simple_update_one_txn(
                 txn,
                 "destinations",
@@ -397,7 +402,7 @@ class TransactionStore(SQLBaseStore):
 
     @staticmethod
     def _get_catch_up_room_event_ids_txn(
-        txn, destination: str, last_successful_stream_ordering: int,
+        txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
     ) -> List[str]:
         q = """
                 SELECT event_id FROM destination_rooms
@@ -412,3 +417,60 @@ class TransactionStore(SQLBaseStore):
         )
         event_ids = [row[0] for row in txn]
         return event_ids
+
+    async def get_catch_up_outstanding_destinations(
+        self, after_destination: Optional[str]
+    ) -> List[str]:
+        """
+        Gets at most 25 destinations which have outstanding PDUs to be caught up,
+        and are not being backed off from
+        Args:
+            after_destination:
+                If provided, all destinations must be lexicographically greater
+                than this one.
+
+        Returns:
+            list of up to 25 destinations with outstanding catch-up.
+                These are the lexicographically first destinations which are
+                lexicographically greater than after_destination (if provided).
+        """
+        time = self.hs.get_clock().time_msec()
+
+        return await self.db_pool.runInteraction(
+            "get_catch_up_outstanding_destinations",
+            self._get_catch_up_outstanding_destinations_txn,
+            time,
+            after_destination,
+        )
+
+    @staticmethod
+    def _get_catch_up_outstanding_destinations_txn(
+        txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
+    ) -> List[str]:
+        q = """
+            SELECT destination FROM destinations
+                WHERE destination IN (
+                    SELECT destination FROM destination_rooms
+                        WHERE destination_rooms.stream_ordering >
+                            destinations.last_successful_stream_ordering
+                )
+                AND destination > ?
+                AND (
+                    retry_last_ts IS NULL OR
+                    retry_last_ts + retry_interval < ?
+                )
+                ORDER BY destination
+                LIMIT 25
+        """
+        txn.execute(
+            q,
+            (
+                # everything is lexicographically greater than "" so this gives
+                # us the first batch of up to 25.
+                after_destination or "",
+                now_time_ms,
+            ),
+        )
+
+        destinations = [row[0] for row in txn]
+        return destinations