summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-20 19:51:28 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-20 19:51:28 +0100
commit717683212418c965edfcf3e1deddb30e2f778ea3 (patch)
tree6933b815fefb864aed22c7779efaf1c5e3c78660 /synapse/storage/databases/main/transactions.py
parentMerge commit '7141057e8' into anoa/dinsic_release_1_21_x (diff)
parentRemove obsolete __future__ imports (#8337) (diff)
downloadsynapse-717683212418c965edfcf3e1deddb30e2f778ea3.tar.xz
Merge commit '837293c31' into anoa/dinsic_release_1_21_x
* commit '837293c31':
  Remove obsolete __future__ imports (#8337)
  Use admin_patterns for all admin APIs. (#8331)
  Fix a potential bug of UnboundLocalError (#8329)
  Switch metaclass initialization to python 3-compatible syntax (#8326)
  Catch-up after Federation Outage (split, 4): catch-up loop (#8272)
  Use slots in attrs classes where possible (#8296)
  Fix typos in comments.
  Add the topic and avatar to the room details admin API (#8305)
  Improve SAML error messages (#8248)
  Add experimental support for sharding event persister. Again. (#8294)
  Make `StreamToken.room_key` be a `RoomStreamToken` instance. (#8281)
  Use TLSv1.2 for fake servers in tests (#8208)
  Add /_synapse/client to the reverse proxy docs (#8227)
  Clean up `Notifier.on_new_room_event` code path (#8288)
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py43
1 files changed, 42 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py

index c0a958252e..091367006e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import Iterable, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore): values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) + + async def get_catch_up_room_event_ids( + self, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + """ + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + + Returns: + list of event_ids + """ + return await self.db_pool.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) + WHERE destination = ? + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering), + ) + event_ids = [row[0] for row in txn] + return event_ids