diff options
author | reivilibre <38398653+reivilibre@users.noreply.github.com> | 2020-09-15 09:07:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-15 09:07:19 +0100 |
commit | 576bc37d318f866f11f71e34ce7190aa45b74780 (patch) | |
tree | f85170f877c61b9f5d7837ba5f227b81d5231385 /synapse/storage/databases | |
parent | Use slots in attrs classes where possible (#8296) (diff) | |
download | synapse-576bc37d318f866f11f71e34ce7190aa45b74780.tar.xz |
Catch-up after Federation Outage (split, 4): catch-up loop (#8272)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/transactions.py | 43 |
1 files changed, 42 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index c0a958252e..091367006e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import Iterable, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore): values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) + + async def get_catch_up_room_event_ids( + self, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + """ + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + + Returns: + list of event_ids + """ + return await self.db_pool.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) + WHERE destination = ? + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering), + ) + event_ids = [row[0] for row in txn] + return event_ids |