summary refs log tree commit diff
path: root/synapse/storage/databases/main/room.py
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2023-01-23 15:44:39 +0000
committerGitHub <noreply@github.com>2023-01-23 15:44:39 +0000
commit80d44060c99e87c84da72fdfcaa9a508d38a26b4 (patch)
treed26f9ab6427271c9b299f2dd4aab00702303aa6b /synapse/storage/databases/main/room.py
parentBump ruff from 0.0.224 to 0.0.230 (#14897) (diff)
downloadsynapse-80d44060c99e87c84da72fdfcaa9a508d38a26b4.tar.xz
Faster joins: omit partial rooms from eager syncs until the resync completes (#14870)
* Allow `AbstractSet` in `StrCollection`

Or else frozensets are excluded. This will be useful in an upcoming
commit where I plan to change a function that accepts `List[str]` to
accept `StrCollection` instead.

* `rooms_to_exclude` -> `rooms_to_exclude_globally`

I am about to make use of this exclusion mechanism to exclude rooms for
a specific user and a specific sync. This rename helps to clarify the
distinction between the global config and the rooms to exclude for a
specific sync.

* Better function names for internal sync methods

* Track a list of excluded rooms on SyncResultBuilder

I plan to feed a list of partially stated rooms for this sync to ignore

* Exclude partial state rooms during eager sync

using the mechanism established in the previous commit

* Track un-partial-state stream in sync tokens

So that we can work out which rooms have become fully-stated during a
given sync period.

* Fix mutation of `@cached` return value

This was fouling up a complement test added alongside this PR.
Excluding a room would mean the set of forgotten rooms in the cache
would be extended. This means that room could be erroneously considered
forgotten in the future.

Introduced in #12310, Synapse 1.57.0. I don't think this had any
user-visible side effects (until now).

* SyncResultBuilder: track rooms to force as newly joined

Similar plan as before. We've omitted rooms from certain sync responses;
now we establish the mechanism to reintroduce them into future syncs.

* Read new field, to present rooms as newly joined

* Force un-partial-stated rooms to be newly-joined

for eager incremental syncs only, provided they're still fully stated

* Notify user stream listeners to wake up long polling syncs

* Changelog

* Typo fix

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Unnecessary list cast

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Rephrase comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Another comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Fixup merge(?)

* Poke notifier when receiving un-partial-stated msg over replication

* Fixup merge whoops

Thanks MV :)

Co-authored-by: Mathieu Velen <mathieuv@matrix.org>

Co-authored-by: Mathieu Velten <mathieuv@matrix.org>
Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/databases/main/room.py')
-rw-r--r--synapse/storage/databases/main/room.py47
1 files changed, 41 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6a65b2a89b..3aa7b94560 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -26,6 +26,7 @@ from typing import (
     Mapping,
     Optional,
     Sequence,
+    Set,
     Tuple,
     Union,
     cast,
@@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             instance_name
         )
 
+    async def get_un_partial_stated_rooms_between(
+        self, last_id: int, current_id: int, room_ids: Collection[str]
+    ) -> Set[str]:
+        """Get all rooms that got un partial stated between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            The list of room ids.
+        """
+
+        if last_id == current_id:
+            return set()
+
+        def _get_un_partial_stated_rooms_between_txn(
+            txn: LoggingTransaction,
+        ) -> Set[str]:
+            sql = """
+                SELECT DISTINCT room_id FROM un_partial_stated_room_stream
+                WHERE ? < stream_id AND stream_id <= ? AND
+            """
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", room_ids
+            )
+
+            txn.execute(sql + clause, [last_id, current_id] + args)
+
+            return {r[0] for r in txn}
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_rooms_between",
+            _get_un_partial_stated_rooms_between_txn,
+        )
+
     async def get_un_partial_stated_rooms_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
-        """Get updates for caches replication stream.
+        """Get updates for un partial stated rooms replication stream.
 
         Args:
             instance_name: The writer we want to fetch updates from. Unused
@@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             (room_id,),
         )
 
-    async def clear_partial_state_room(self, room_id: str) -> bool:
+    async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
         """Clears the partial state flag for a room.
 
         Args:
             room_id: The room whose partial state flag is to be cleared.
 
         Returns:
-            `True` if the partial state flag has been cleared successfully.
+            The corresponding stream id for the un-partial-stated rooms stream.
 
-            `False` if the partial state flag could not be cleared because the room
+            `None` if the partial state flag could not be cleared because the room
             still contains events with partial state.
         """
         try:
@@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                     room_id,
                     un_partial_state_room_stream_id,
                 )
-                return True
+                return un_partial_state_room_stream_id
         except self.db_pool.engine.module.IntegrityError as e:
             # Assume that any `IntegrityError`s are due to partial state events.
             logger.info(
@@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                 room_id,
                 e,
             )
-            return False
+            return None
 
     def _clear_partial_state_room_txn(
         self,