diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..da3ebe66b8 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
@@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
"get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn,
)
+
+ async def get_current_state_deltas_for_room(
+ self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+ ) -> List[StateDelta]:
+ """Get the state deltas between two tokens."""
+
+ def get_current_state_deltas_for_room_txn(
+ txn: LoggingTransaction,
+ ) -> List[StateDelta]:
+ sql = """
+ SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+ FROM current_state_delta_stream
+ WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ """
+ txn.execute(
+ sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+ )
+
+ return [
+ StateDelta(
+ stream_id=row[1],
+ room_id=room_id,
+ event_type=row[2],
+ state_key=row[3],
+ event_id=row[4],
+ prev_event_id=row[5],
+ )
+ for row in txn
+ if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+ ]
+
+ return await self.db_pool.runInteraction(
+ "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+ )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b034361aec..4207e73c7f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=last_position.stream - 1)
return None
+
+ def get_rooms_that_might_have_updates(
+ self, room_ids: StrCollection, from_token: RoomStreamToken
+ ) -> StrCollection:
+ """Filters given room IDs down to those that might have updates, i.e.
+ removes rooms that definitely do not have updates.
+ """
+ return self._events_stream_cache.get_entities_changed(
+ room_ids, from_token.stream
+ )
|