summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2023-01-23 15:44:39 +0000
committerGitHub <noreply@github.com>2023-01-23 15:44:39 +0000
commit80d44060c99e87c84da72fdfcaa9a508d38a26b4 (patch)
treed26f9ab6427271c9b299f2dd4aab00702303aa6b /synapse/handlers
parentBump ruff from 0.0.224 to 0.0.230 (#14897) (diff)
downloadsynapse-80d44060c99e87c84da72fdfcaa9a508d38a26b4.tar.xz
Faster joins: omit partial rooms from eager syncs until the resync completes (#14870)
* Allow `AbstractSet` in `StrCollection`

Or else frozensets are excluded. This will be useful in an upcoming
commit where I plan to change a function that accepts `List[str]` to
accept `StrCollection` instead.

* `rooms_to_exclude` -> `rooms_to_exclude_globally`

I am about to make use of this exclusion mechanism to exclude rooms for
a specific user and a specific sync. This rename helps to clarify the
distinction between the global config and the rooms to exclude for a
specific sync.

* Better function names for internal sync methods

* Track a list of excluded rooms on SyncResultBuilder

I plan to feed a list of partially stated rooms for this sync to ignore

* Exclude partial state rooms during eager sync

using the mechanism established in the previous commit

* Track un-partial-state stream in sync tokens

So that we can work out which rooms have become fully-stated during a
given sync period.

* Fix mutation of `@cached` return value

This was fouling up a complement test added alongside this PR.
Excluding a room would mean the set of forgotten rooms in the cache
would be extended. This means that room could be erroneously considered
forgotten in the future.

Introduced in #12310, Synapse 1.57.0. I don't think this had any
user-visible side effects (until now).

* SyncResultBuilder: track rooms to force as newly joined

Similar plan as before. We've omitted rooms from certain sync responses;
now we establish the mechanism to reintroduce them into future syncs.

* Read new field, to present rooms as newly joined

* Force un-partial-stated rooms to be newly-joined

for eager incremental syncs only, provided they're still fully stated

* Notify user stream listeners to wake up long polling syncs

* Changelog

* Typo fix

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Unnecessary list cast

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Rephrase comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Another comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Fixup merge(?)

* Poke notifier when receiving un-partial-stated msg over replication

* Fixup merge whoops

Thanks MV :)

Co-authored-by: Mathieu Velen <mathieuv@matrix.org>

Co-authored-by: Mathieu Velten <mathieuv@matrix.org>
Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/sync.py65
2 files changed, 60 insertions, 20 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3217127865..233f8c113d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1868,22 +1868,17 @@ class FederationHandler:
 
                 async with self._is_partial_state_room_linearizer.queue(room_id):
                     logger.info("Clearing partial-state flag for %s", room_id)
-                    success = await self.store.clear_partial_state_room(room_id)
+                    new_stream_id = await self.store.clear_partial_state_room(room_id)
 
-                    # Poke the notifier so that other workers see the write to
-                    # the un-partial-stated rooms stream.
-                    self._notifier.notify_replication()
-
-                if success:
+                if new_stream_id is not None:
                     logger.info("State resync complete for %s", room_id)
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
 
-                    # Poke the notifier so that other workers see the write to
-                    # the un-partial-stated rooms stream.
-                    self._notifier.notify_replication()
-
+                    await self._notifier.on_un_partial_stated_room(
+                        room_id, new_stream_id
+                    )
                     return
 
                 # we raced against more events arriving with partial state. Go round
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 78d488f2b1..ee11764567 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -290,7 +290,7 @@ class SyncHandler:
             expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
-        self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
+        self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
         self,
@@ -1340,7 +1340,10 @@ class SyncHandler:
         membership_change_events = []
         if since_token:
             membership_change_events = await self.store.get_membership_changes_for_user(
-                user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+                user_id,
+                since_token.room_key,
+                now_token.room_key,
+                self.rooms_to_exclude_globally,
             )
 
             mem_last_change_by_room_id: Dict[str, EventBase] = {}
@@ -1375,12 +1378,39 @@ class SyncHandler:
                 else:
                     mutable_joined_room_ids.discard(room_id)
 
+        # Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
+        mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
+        if not sync_config.filter_collection.lazy_load_members():
+            # Non-lazy syncs should never include partially stated rooms.
+            # Exclude all partially stated rooms from this sync.
+            for room_id in mutable_joined_room_ids:
+                if await self.store.is_partial_state_room(room_id):
+                    mutable_rooms_to_exclude.add(room_id)
+
+        # Incremental eager syncs should additionally include rooms that
+        # - we are joined to
+        # - are full-stated
+        # - became fully-stated at some point during the sync period
+        #   (These rooms will have been omitted during a previous eager sync.)
+        forced_newly_joined_room_ids = set()
+        if since_token and not sync_config.filter_collection.lazy_load_members():
+            un_partial_stated_rooms = (
+                await self.store.get_un_partial_stated_rooms_between(
+                    since_token.un_partial_stated_rooms_key,
+                    now_token.un_partial_stated_rooms_key,
+                    mutable_joined_room_ids,
+                )
+            )
+            for room_id in un_partial_stated_rooms:
+                if not await self.store.is_partial_state_room(room_id):
+                    forced_newly_joined_room_ids.add(room_id)
+
         # Now we have our list of joined room IDs, exclude as configured and freeze
         joined_room_ids = frozenset(
             (
                 room_id
                 for room_id in mutable_joined_room_ids
-                if room_id not in self.rooms_to_exclude
+                if room_id not in mutable_rooms_to_exclude
             )
         )
 
@@ -1397,6 +1427,8 @@ class SyncHandler:
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
+            excluded_room_ids=frozenset(mutable_rooms_to_exclude),
+            forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
             membership_change_events=membership_change_events,
         )
 
@@ -1834,14 +1866,16 @@ class SyncHandler:
         # 3. Work out which rooms need reporting in the sync response.
         ignored_users = await self.store.ignored_users(user_id)
         if since_token:
-            room_changes = await self._get_rooms_changed(
+            room_changes = await self._get_room_changes_for_incremental_sync(
                 sync_result_builder, ignored_users
             )
             tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
             )
         else:
-            room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
+            room_changes = await self._get_room_changes_for_initial_sync(
+                sync_result_builder, ignored_users
+            )
             tags_by_room = await self.store.get_tags_for_user(user_id)
 
         log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1900,7 +1934,7 @@ class SyncHandler:
 
         assert since_token
 
-        if membership_change_events:
+        if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1909,7 +1943,7 @@ class SyncHandler:
                 return True
         return False
 
-    async def _get_rooms_changed(
+    async def _get_room_changes_for_incremental_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -1947,7 +1981,9 @@ class SyncHandler:
         for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
-        newly_joined_rooms: List[str] = []
+        newly_joined_rooms: List[str] = list(
+            sync_result_builder.forced_newly_joined_room_ids
+        )
         newly_left_rooms: List[str] = []
         room_entries: List[RoomSyncResultBuilder] = []
         invited: List[InvitedSyncResult] = []
@@ -2153,7 +2189,7 @@ class SyncHandler:
             newly_left_rooms,
         )
 
-    async def _get_all_rooms(
+    async def _get_room_changes_for_initial_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -2178,7 +2214,7 @@ class SyncHandler:
         room_list = await self.store.get_rooms_for_local_user_where_membership_is(
             user_id=user_id,
             membership_list=Membership.LIST,
-            excluded_rooms=self.rooms_to_exclude,
+            excluded_rooms=sync_result_builder.excluded_room_ids,
         )
 
         room_entries = []
@@ -2549,6 +2585,13 @@ class SyncResultBuilder:
         since_token: The token supplied by user, or None.
         now_token: The token to sync up to.
         joined_room_ids: List of rooms the user is joined to
+        excluded_room_ids: Set of room ids we should omit from the /sync response.
+        forced_newly_joined_room_ids:
+            Rooms that should be presented in the /sync response as if they were
+            newly joined during the sync period, even if that's not the case.
+            (This is useful if the room was previously excluded from a /sync response,
+            and now the client should be made aware of it.)
+            Only used by incremental syncs.
 
         # The following mirror the fields in a sync response
         presence
@@ -2565,6 +2608,8 @@ class SyncResultBuilder:
     since_token: Optional[StreamToken]
     now_token: StreamToken
     joined_room_ids: FrozenSet[str]
+    excluded_room_ids: FrozenSet[str]
+    forced_newly_joined_room_ids: FrozenSet[str]
     membership_change_events: List[EventBase]
 
     presence: List[UserPresenceState] = attr.Factory(list)