summary refs log tree commit diff
path: root/synapse/storage/databases/main/state_deltas.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-30 09:31:42 +0100
committerErik Johnston <erik@matrix.org>2024-07-30 09:31:42 +0100
commit9cdfb4e08d7fd7edd3f0705410ba978778b4a6d7 (patch)
treef0057eac637b410a75e0e34bca82b1175a356fa2 /synapse/storage/databases/main/state_deltas.py
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentOnly send rooms with updates down sliding sync (#17479) (diff)
downloadsynapse-9cdfb4e08d7fd7edd3f0705410ba978778b4a6d7.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage/databases/main/state_deltas.py')
-rw-r--r--synapse/storage/databases/main/state_deltas.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py

index 036972ac25..da3ebe66b8 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main.stream import _filter_results_by_stream +from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore): "get_max_stream_id_in_current_state_deltas", self._get_max_stream_id_in_current_state_deltas_txn, ) + + async def get_current_state_deltas_for_room( + self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken + ) -> List[StateDelta]: + """Get the state deltas between two tokens.""" + + def get_current_state_deltas_for_room_txn( + txn: LoggingTransaction, + ) -> List[StateDelta]: + sql = """ + SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE room_id = ? AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute( + sql, (room_id, from_token.stream, to_token.get_max_stream_pos()) + ) + + return [ + StateDelta( + stream_id=row[1], + room_id=room_id, + event_type=row[2], + state_key=row[3], + event_id=row[4], + prev_event_id=row[5], + ) + for row in txn + if _filter_results_by_stream(from_token, to_token, row[0], row[1]) + ] + + return await self.db_pool.runInteraction( + "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn + )