summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
authorreivilibre <38398653+reivilibre@users.noreply.github.com>2020-09-15 09:07:19 +0100
committerGitHub <noreply@github.com>2020-09-15 09:07:19 +0100
commit576bc37d318f866f11f71e34ce7190aa45b74780 (patch)
treef85170f877c61b9f5d7837ba5f227b81d5231385 /synapse/storage/databases/main/transactions.py
parentUse slots in attrs classes where possible (#8296) (diff)
downloadsynapse-576bc37d318f866f11f71e34ce7190aa45b74780.tar.xz
Catch-up after Federation Outage (split, 4): catch-up loop (#8272)
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py43
1 files changed, 42 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c0a958252e..091367006e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,7 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import Iterable, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore):
             values={"last_successful_stream_ordering": last_successful_stream_ordering},
             desc="set_last_successful_stream_ordering",
         )
+
+    async def get_catch_up_room_event_ids(
+        self, destination: str, last_successful_stream_ordering: int,
+    ) -> List[str]:
+        """
+        Returns at most 50 event IDs and their corresponding stream_orderings
+        that correspond to the oldest events that have not yet been sent to
+        the destination.
+
+        Args:
+            destination: the destination in question
+            last_successful_stream_ordering: the stream_ordering of the
+                most-recently successfully-transmitted event to the destination
+
+        Returns:
+            list of event_ids
+        """
+        return await self.db_pool.runInteraction(
+            "get_catch_up_room_event_ids",
+            self._get_catch_up_room_event_ids_txn,
+            destination,
+            last_successful_stream_ordering,
+        )
+
+    @staticmethod
+    def _get_catch_up_room_event_ids_txn(
+        txn, destination: str, last_successful_stream_ordering: int,
+    ) -> List[str]:
+        q = """
+                SELECT event_id FROM destination_rooms
+                 JOIN events USING (stream_ordering)
+                WHERE destination = ?
+                  AND stream_ordering > ?
+                ORDER BY stream_ordering
+                LIMIT 50
+            """
+        txn.execute(
+            q, (destination, last_successful_stream_ordering),
+        )
+        event_ids = [row[0] for row in txn]
+        return event_ids