summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-29 22:45:48 +0100
committerGitHub <noreply@github.com>2024-07-29 22:45:48 +0100
commitbe4a16ff445c9dfba04aeaed695afb3a56e204f7 (patch)
tree62de0ad9059e727a072ea06743afbf21765991ae /synapse/storage
parentRefactor Sliding Sync tests to better utilize the `SlidingSyncBase.do_sync(..... (diff)
downloadsynapse-be4a16ff445c9dfba04aeaed695afb3a56e204f7.tar.xz
Sliding Sync: Track whether we have sent rooms down to clients (#17447)
The basic idea is that we introduce a new token for a sliding sync
connection, which stores the mapping of room to room "status" (i.e. have
we sent the room down?). This token allows us to handle duplicate
requests properly. In future it can be used to store more
"per-connection" information safely.

In future this should be migrated into the DB, so its important that we
try to reduce the number of syncs where we need to update the
per-connection information. In this PoC this only happens when we: a)
send down a set of room for the first time, or b) we have previously
sent down a room and there are updates but we are not sending the room
down the sync (due to not falling in a list range)

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/state_deltas.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..da3ebe66b8 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
@@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
             "get_max_stream_id_in_current_state_deltas",
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
+
+    async def get_current_state_deltas_for_room(
+        self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+    ) -> List[StateDelta]:
+        """Get the state deltas between two tokens."""
+
+        def get_current_state_deltas_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> List[StateDelta]:
+            sql = """
+                SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(
+                sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+            )
+
+            return [
+                StateDelta(
+                    stream_id=row[1],
+                    room_id=room_id,
+                    event_type=row[2],
+                    state_key=row[3],
+                    event_id=row[4],
+                    prev_event_id=row[5],
+                )
+                for row in txn
+                if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+            ]
+
+        return await self.db_pool.runInteraction(
+            "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+        )