diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c0a958252e..091367006e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,7 +15,7 @@
import logging
from collections import namedtuple
-from typing import Iterable, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json
@@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore):
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)
+
+ async def get_catch_up_room_event_ids(
+ self, destination: str, last_successful_stream_ordering: int,
+ ) -> List[str]:
+ """
+ Returns at most 50 event IDs and their corresponding stream_orderings
+ that correspond to the oldest events that have not yet been sent to
+ the destination.
+
+ Args:
+ destination: the destination in question
+ last_successful_stream_ordering: the stream_ordering of the
+ most-recently successfully-transmitted event to the destination
+
+ Returns:
+ list of event_ids
+ """
+ return await self.db_pool.runInteraction(
+ "get_catch_up_room_event_ids",
+ self._get_catch_up_room_event_ids_txn,
+ destination,
+ last_successful_stream_ordering,
+ )
+
+ @staticmethod
+ def _get_catch_up_room_event_ids_txn(
+ txn, destination: str, last_successful_stream_ordering: int,
+ ) -> List[str]:
+ q = """
+ SELECT event_id FROM destination_rooms
+ JOIN events USING (stream_ordering)
+ WHERE destination = ?
+ AND stream_ordering > ?
+ ORDER BY stream_ordering
+ LIMIT 50
+ """
+ txn.execute(
+ q, (destination, last_successful_stream_ordering),
+ )
+ event_ids = [row[0] for row in txn]
+ return event_ids
|