summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-17 10:32:50 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 13:59:35 +0100
commitde6e3bdee8715669a76b40eaacdc8728cc02d4eb (patch)
tree01cc05de9a9df4ef80b414494893ede57f2d06a3 /synapse
parentHandle initial flag correctly (diff)
downloadsynapse-de6e3bdee8715669a76b40eaacdc8728cc02d4eb.tar.xz
Handle state deltas in non-initial rooms
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sliding_sync.py14
-rw-r--r--synapse/storage/databases/main/state_deltas.py38
2 files changed, 49 insertions, 3 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 32686036b4..6d0d9b425c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1720,9 +1720,17 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
         else:
-            # TODO: Once we can figure out if we've sent a room down this connection before,
-            # we can return updates instead of the full required state.
-            raise NotImplementedError()
+            assert to_bound is not None
+
+            deltas = await self.store.get_current_state_deltas_for_room(
+                room_id, to_bound, to_token.room_key
+            )
+            # TODO: Filter room state before fetching events
+            # TODO: Handle state resets where event_id is None
+            events = await self.store.get_events(
+                [d.event_id for d in deltas if d.event_id]
+            )
+            room_state = {(s.type, s.state_key): s for s in events.values()}
 
         required_room_state: StateMap[EventBase] = {}
         if required_state_filter != StateFilter.none():
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..cd6cb2c7a9 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
@@ -156,3 +158,39 @@ class StateDeltasStore(SQLBaseStore):
             "get_max_stream_id_in_current_state_deltas",
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
+
+    async def get_current_state_deltas_for_room(
+        self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+    ) -> List[StateDelta]:
+        """Get the state deltas between that have happened between two
+        tokens."""
+
+        def get_current_state_deltas_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> List[StateDelta]:
+            sql = """
+                SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(
+                sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+            )
+
+            return [
+                StateDelta(
+                    stream_id=row[1],
+                    room_id=room_id,
+                    event_type=row[2],
+                    state_key=row[3],
+                    event_id=row[4],
+                    prev_event_id=row[5],
+                )
+                for row in txn
+                if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+            ]
+
+        return await self.db_pool.runInteraction(
+            "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+        )