From 576bc37d318f866f11f71e34ce7190aa45b74780 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 15 Sep 2020 09:07:19 +0100 Subject: Catch-up after Federation Outage (split, 4): catch-up loop (#8272) --- synapse/storage/databases/main/transactions.py | 43 +++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index c0a958252e..091367006e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import Iterable, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore): values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) + + async def get_catch_up_room_event_ids( + self, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + """ + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + + Returns: + list of event_ids + """ + return await self.db_pool.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) + WHERE destination = ? + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering), + ) + event_ids = [row[0] for row in txn] + return event_ids -- cgit 1.4.1