summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py597
1 files changed, 374 insertions, 223 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c8858b22dd..f1f19666d7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,7 +17,6 @@ from typing import (
     TYPE_CHECKING,
     AbstractSet,
     Any,
-    Collection,
     Dict,
     FrozenSet,
     List,
@@ -31,19 +30,30 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import (
+    AccountDataTypes,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
+from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
 from synapse.handlers.relations import BundledAggregations
+from synapse.logging import issue9533_logger
 from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
-from synapse.push.clientformat import format_push_rules_for_user
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    trace,
+)
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.roommember import MemberSummary
-from synapse.storage.state import StateFilter
 from synapse.types import (
     DeviceListUpdates,
     JsonDict,
@@ -51,10 +61,12 @@ from synapse.types import (
     Requester,
     RoomStreamToken,
     StateMap,
+    StrCollection,
     StreamKeyType,
     StreamToken,
     UserID,
 )
+from synapse.types.state import StateFilter
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.lrucache import LruCache
@@ -249,6 +261,7 @@ class SyncHandler:
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
         self._relations_handler = hs.get_relations_handler()
+        self._push_rules_handler = hs.get_push_rules_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
@@ -256,6 +269,9 @@ class SyncHandler:
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
         self._device_handler = hs.get_device_handler()
+        self._task_scheduler = hs.get_task_scheduler()
+
+        self.should_calculate_push_rules = hs.config.push.enable_push
 
         # TODO: flush cache entries on subsequent sync request.
         #    Once we get the next /sync request (ie, one with the same access token
@@ -278,7 +294,7 @@ class SyncHandler:
             expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
-        self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
+        self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
         self,
@@ -346,13 +362,36 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
+            # Fast path: delete a limited number of to-device messages up front.
+            # We do this to avoid the overhead of scheduling a task for every
+            # sync.
+            device_deletion_limit = 100
             deleted = await self.store.delete_messages_for_device(
-                sync_config.user.to_string(), sync_config.device_id, since_stream_id
+                sync_config.user.to_string(),
+                sync_config.device_id,
+                since_stream_id,
+                limit=device_deletion_limit,
             )
             logger.debug(
                 "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
+            # If we hit the limit, schedule a background task to delete the rest.
+            if deleted >= device_deletion_limit:
+                await self._task_scheduler.schedule_task(
+                    DELETE_DEVICE_MSGS_TASK_NAME,
+                    resource_id=sync_config.device_id,
+                    params={
+                        "user_id": sync_config.user.to_string(),
+                        "device_id": sync_config.device_id,
+                        "up_to_stream_id": since_stream_id,
+                    },
+                )
+                logger.debug(
+                    "Deletion of to-device messages up to %d scheduled",
+                    since_stream_id,
+                )
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -373,16 +412,16 @@ class SyncHandler:
                 from_token=since_token,
             )
 
-            # if nothing has happened in any of the users' rooms since /sync was called,
-            # the resultant next_batch will be the same as since_token (since the result
-            # is generated when wait_for_events is first called, and not regenerated
-            # when wait_for_events times out).
-            #
-            # If that happens, we mustn't cache it, so that when the client comes back
-            # with the same cache token, we don't immediately return the same empty
-            # result, causing a tightloop. (#8518)
-            if result.next_batch == since_token:
-                cache_context.should_cache = False
+        # if nothing has happened in any of the users' rooms since /sync was called,
+        # the resultant next_batch will be the same as since_token (since the result
+        # is generated when wait_for_events is first called, and not regenerated
+        # when wait_for_events times out).
+        #
+        # If that happens, we mustn't cache it, so that when the client comes back
+        # with the same cache token, we don't immediately return the same empty
+        # result, causing a tightloop. (#8518)
+        if result.next_batch == since_token:
+            cache_context.should_cache = False
 
         if result:
             if sync_config.filter_collection.lazy_load_members():
@@ -414,12 +453,6 @@ class SyncHandler:
             set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
             return sync_result
 
-    async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
-        user_id = user.to_string()
-        rules_raw = await self.store.get_push_rules_for_user(user_id)
-        rules = format_push_rules_for_user(user, rules_raw)
-        return rules
-
     async def ephemeral_by_room(
         self,
         sync_result_builder: "SyncResultBuilder",
@@ -929,6 +962,8 @@ class SyncHandler:
 
                 timeline_state = {}
 
+                # Membership events to fetch that can be found in the room state, or in
+                # the case of partial state rooms, the auth events of timeline events.
                 members_to_fetch = set()
                 first_event_by_sender_map = {}
                 for event in batch.events:
@@ -950,9 +985,19 @@ class SyncHandler:
                     # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
                     # We only need apply this on full state syncs given we disabled
                     # LL for incr syncs in #3840.
-                    members_to_fetch.add(sync_config.user.to_string())
-
-                state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+                    # We don't insert ourselves into `members_to_fetch`, because in some
+                    # rare cases (an empty event batch with a now_token after the user's
+                    # leave in a partial state room which another local user has
+                    # joined), the room state will be missing our membership and there
+                    # is no guarantee that our membership will be in the auth events of
+                    # timeline events when the room is partial stated.
+                    state_filter = StateFilter.from_lazy_load_member_list(
+                        members_to_fetch.union((sync_config.user.to_string(),))
+                    )
+                else:
+                    state_filter = StateFilter.from_lazy_load_member_list(
+                        members_to_fetch
+                    )
 
                 # We are happy to use partial state to compute the `/sync` response.
                 # Since partial state may not include the lazy-loaded memberships we
@@ -974,7 +1019,9 @@ class SyncHandler:
             # sync's timeline and the start of the current sync's timeline.
             # See the docstring above for details.
             state_ids: StateMap[str]
-
+            # We need to know whether the state we fetch may be partial, so check
+            # whether the room is partial stated *before* fetching it.
+            is_partial_state_room = await self.store.is_partial_state_room(room_id)
             if full_state:
                 if batch:
                     state_at_timeline_end = (
@@ -1105,7 +1152,7 @@ class SyncHandler:
             # If we only have partial state for the room, `state_ids` may be missing the
             # memberships we wanted. We attempt to find some by digging through the auth
             # events of timeline events.
-            if lazy_load_members and await self.store.is_partial_state_room(room_id):
+            if lazy_load_members and is_partial_state_room:
                 assert members_to_fetch is not None
                 assert first_event_by_sender_map is not None
 
@@ -1167,7 +1214,7 @@ class SyncHandler:
     async def _find_missing_partial_state_memberships(
         self,
         room_id: str,
-        members_to_fetch: Collection[str],
+        members_to_fetch: StrCollection,
         events_with_membership_auth: Mapping[str, EventBase],
         found_state_ids: StateMap[str],
     ) -> StateMap[str]:
@@ -1212,6 +1259,10 @@ class SyncHandler:
                 continue
 
             event_with_membership_auth = events_with_membership_auth[member]
+            is_create = (
+                event_with_membership_auth.is_state()
+                and event_with_membership_auth.type == EventTypes.Create
+            )
             is_join = (
                 event_with_membership_auth.is_state()
                 and event_with_membership_auth.type == EventTypes.Member
@@ -1219,9 +1270,10 @@ class SyncHandler:
                 and event_with_membership_auth.content.get("membership")
                 == Membership.JOIN
             )
-            if not is_join:
+            if not is_create and not is_join:
                 # The event must include the desired membership as an auth event, unless
-                # it's the first join event for a given user.
+                # it's the `m.room.create` event for a room or the first join event for
+                # a given user.
                 missing_members.add(member)
             auth_event_ids.update(event_with_membership_auth.auth_event_ids())
 
@@ -1276,8 +1328,13 @@ class SyncHandler:
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
     ) -> RoomNotifCounts:
-        with Measure(self.clock, "unread_notifs_for_room_id"):
+        if not self.should_calculate_push_rules:
+            # If push rules have been universally disabled then we know we won't
+            # have any unread counts in the DB, so we may as well skip asking
+            # the DB.
+            return RoomNotifCounts.empty()
 
+        with Measure(self.clock, "unread_notifs_for_room_id"):
             return await self.store.get_unread_event_push_actions_by_room_for_user(
                 room_id,
                 sync_config.user.to_string(),
@@ -1328,7 +1385,10 @@ class SyncHandler:
         membership_change_events = []
         if since_token:
             membership_change_events = await self.store.get_membership_changes_for_user(
-                user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+                user_id,
+                since_token.room_key,
+                now_token.room_key,
+                self.rooms_to_exclude_globally,
             )
 
             mem_last_change_by_room_id: Dict[str, EventBase] = {}
@@ -1363,13 +1423,53 @@ class SyncHandler:
                 else:
                     mutable_joined_room_ids.discard(room_id)
 
-        # Now we have our list of joined room IDs, exclude as configured and freeze
-        joined_room_ids = frozenset(
-            (
+        # Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
+        mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
+        if not sync_config.filter_collection.lazy_load_members():
+            # Non-lazy syncs should never include partially stated rooms.
+            # Exclude all partially stated rooms from this sync.
+            results = await self.store.is_partial_state_room_batched(
+                mutable_joined_room_ids
+            )
+            mutable_rooms_to_exclude.update(
+                room_id
+                for room_id, is_partial_state in results.items()
+                if is_partial_state
+            )
+            membership_change_events = [
+                event
+                for event in membership_change_events
+                if not results.get(event.room_id, False)
+            ]
+
+        # Incremental eager syncs should additionally include rooms that
+        # - we are joined to
+        # - are full-stated
+        # - became fully-stated at some point during the sync period
+        #   (These rooms will have been omitted during a previous eager sync.)
+        forced_newly_joined_room_ids: Set[str] = set()
+        if since_token and not sync_config.filter_collection.lazy_load_members():
+            un_partial_stated_rooms = (
+                await self.store.get_un_partial_stated_rooms_between(
+                    since_token.un_partial_stated_rooms_key,
+                    now_token.un_partial_stated_rooms_key,
+                    mutable_joined_room_ids,
+                )
+            )
+            results = await self.store.is_partial_state_room_batched(
+                un_partial_stated_rooms
+            )
+            forced_newly_joined_room_ids.update(
                 room_id
-                for room_id in mutable_joined_room_ids
-                if room_id not in self.rooms_to_exclude
+                for room_id, is_partial_state in results.items()
+                if not is_partial_state
             )
+
+        # Now we have our list of joined room IDs, exclude as configured and freeze
+        joined_room_ids = frozenset(
+            room_id
+            for room_id in mutable_joined_room_ids
+            if room_id not in mutable_rooms_to_exclude
         )
 
         logger.debug(
@@ -1385,45 +1485,76 @@ class SyncHandler:
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
+            excluded_room_ids=frozenset(mutable_rooms_to_exclude),
+            forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
             membership_change_events=membership_change_events,
         )
 
         logger.debug("Fetching account data")
 
-        account_data_by_room = await self._generate_sync_entry_for_account_data(
-            sync_result_builder
-        )
+        # Global account data is included if it is not filtered out.
+        if not sync_config.filter_collection.blocks_all_global_account_data():
+            await self._generate_sync_entry_for_account_data(sync_result_builder)
 
-        logger.debug("Fetching room data")
-
-        res = await self._generate_sync_entry_for_rooms(
-            sync_result_builder, account_data_by_room
+        # Presence data is included if the server has it enabled and not filtered out.
+        include_presence_data = bool(
+            self.hs_config.server.use_presence
+            and not sync_config.filter_collection.blocks_all_presence()
         )
-        newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
-        _, _, newly_left_rooms, newly_left_users = res
+        # Device list updates are sent if a since token is provided.
+        include_device_list_updates = bool(since_token and since_token.device_list_key)
+
+        # If we do not care about the rooms or things which depend on the room
+        # data (namely presence and device list updates), then we can skip
+        # this process completely.
+        device_lists = DeviceListUpdates()
+        if (
+            not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+            or include_presence_data
+            or include_device_list_updates
+        ):
+            logger.debug("Fetching room data")
 
-        block_all_presence_data = (
-            since_token is None and sync_config.filter_collection.blocks_all_presence()
-        )
-        if self.hs_config.server.use_presence and not block_all_presence_data:
-            logger.debug("Fetching presence data")
-            await self._generate_sync_entry_for_presence(
-                sync_result_builder,
+            # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
+            # is used in calculate_user_changes below.
+            (
                 newly_joined_rooms,
-                newly_joined_or_invited_or_knocked_users,
-            )
+                newly_left_rooms,
+            ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
+
+            # Work out which users have joined or left rooms we're in. We use this
+            # to build the presence and device_list parts of the sync response in
+            # `_generate_sync_entry_for_presence` and
+            # `_generate_sync_entry_for_device_list` respectively.
+            if include_presence_data or include_device_list_updates:
+                # This uses the sync_result_builder.joined which is set in
+                # `_generate_sync_entry_for_rooms`, if that didn't find any joined
+                # rooms for some reason it is a no-op.
+                (
+                    newly_joined_or_invited_or_knocked_users,
+                    newly_left_users,
+                ) = sync_result_builder.calculate_user_changes()
+
+                if include_presence_data:
+                    logger.debug("Fetching presence data")
+                    await self._generate_sync_entry_for_presence(
+                        sync_result_builder,
+                        newly_joined_rooms,
+                        newly_joined_or_invited_or_knocked_users,
+                    )
+
+                if include_device_list_updates:
+                    device_lists = await self._generate_sync_entry_for_device_list(
+                        sync_result_builder,
+                        newly_joined_rooms=newly_joined_rooms,
+                        newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+                        newly_left_rooms=newly_left_rooms,
+                        newly_left_users=newly_left_users,
+                    )
 
         logger.debug("Fetching to-device data")
         await self._generate_sync_entry_for_to_device(sync_result_builder)
 
-        device_lists = await self._generate_sync_entry_for_device_list(
-            sync_result_builder,
-            newly_joined_rooms=newly_joined_rooms,
-            newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
-            newly_left_rooms=newly_left_rooms,
-            newly_left_users=newly_left_users,
-        )
-
         logger.debug("Fetching OTK data")
         device_id = sync_config.device_id
         one_time_keys_count: JsonDict = {}
@@ -1436,7 +1567,7 @@ class SyncHandler:
             one_time_keys_count = await self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
-            unused_fallback_key_types = (
+            unused_fallback_key_types = list(
                 await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
             )
 
@@ -1492,6 +1623,7 @@ class SyncHandler:
 
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
+        assert since_token is not None
 
         # Take a copy since these fields will be mutated later.
         newly_joined_or_invited_or_knocked_users = set(
@@ -1499,91 +1631,87 @@ class SyncHandler:
         )
         newly_left_users = set(newly_left_users)
 
-        if since_token and since_token.device_list_key:
-            # We want to figure out what user IDs the client should refetch
-            # device keys for, and which users we aren't going to track changes
-            # for anymore.
-            #
-            # For the first step we check:
-            #   a. if any users we share a room with have updated their devices,
-            #      and
-            #   b. we also check if we've joined any new rooms, or if a user has
-            #      joined a room we're in.
-            #
-            # For the second step we just find any users we no longer share a
-            # room with by looking at all users that have left a room plus users
-            # that were in a room we've left.
-
-            users_that_have_changed = set()
-
-            joined_rooms = sync_result_builder.joined_room_ids
-
-            # Step 1a, check for changes in devices of users we share a room
-            # with
-            #
-            # We do this in two different ways depending on what we have cached.
-            # If we already have a list of all the user that have changed since
-            # the last sync then it's likely more efficient to compare the rooms
-            # they're in with the rooms the syncing user is in.
-            #
-            # If we don't have that info cached then we get all the users that
-            # share a room with our user and check if those users have changed.
-            changed_users = self.store.get_cached_device_list_changes(
-                since_token.device_list_key
-            )
-            if changed_users is not None:
-                result = await self.store.get_rooms_for_users(changed_users)
-
-                for changed_user_id, entries in result.items():
-                    # Check if the changed user shares any rooms with the user,
-                    # or if the changed user is the syncing user (as we always
-                    # want to include device list updates of their own devices).
-                    if user_id == changed_user_id or any(
-                        rid in joined_rooms for rid in entries
-                    ):
-                        users_that_have_changed.add(changed_user_id)
-            else:
-                users_that_have_changed = (
-                    await self._device_handler.get_device_changes_in_shared_rooms(
-                        user_id,
-                        sync_result_builder.joined_room_ids,
-                        from_token=since_token,
-                    )
-                )
+        # We want to figure out what user IDs the client should refetch
+        # device keys for, and which users we aren't going to track changes
+        # for anymore.
+        #
+        # For the first step we check:
+        #   a. if any users we share a room with have updated their devices,
+        #      and
+        #   b. we also check if we've joined any new rooms, or if a user has
+        #      joined a room we're in.
+        #
+        # For the second step we just find any users we no longer share a
+        # room with by looking at all users that have left a room plus users
+        # that were in a room we've left.
 
-            # Step 1b, check for newly joined rooms
-            for room_id in newly_joined_rooms:
-                joined_users = await self.store.get_users_in_room(room_id)
-                newly_joined_or_invited_or_knocked_users.update(joined_users)
+        users_that_have_changed = set()
 
-            # TODO: Check that these users are actually new, i.e. either they
-            # weren't in the previous sync *or* they left and rejoined.
-            users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
+        joined_rooms = sync_result_builder.joined_room_ids
 
-            user_signatures_changed = (
-                await self.store.get_users_whose_signatures_changed(
-                    user_id, since_token.device_list_key
+        # Step 1a, check for changes in devices of users we share a room
+        # with
+        #
+        # We do this in two different ways depending on what we have cached.
+        # If we already have a list of all the user that have changed since
+        # the last sync then it's likely more efficient to compare the rooms
+        # they're in with the rooms the syncing user is in.
+        #
+        # If we don't have that info cached then we get all the users that
+        # share a room with our user and check if those users have changed.
+        cache_result = self.store.get_cached_device_list_changes(
+            since_token.device_list_key
+        )
+        if cache_result.hit:
+            changed_users = cache_result.entities
+
+            result = await self.store.get_rooms_for_users(changed_users)
+
+            for changed_user_id, entries in result.items():
+                # Check if the changed user shares any rooms with the user,
+                # or if the changed user is the syncing user (as we always
+                # want to include device list updates of their own devices).
+                if user_id == changed_user_id or any(
+                    rid in joined_rooms for rid in entries
+                ):
+                    users_that_have_changed.add(changed_user_id)
+        else:
+            users_that_have_changed = (
+                await self._device_handler.get_device_changes_in_shared_rooms(
+                    user_id,
+                    sync_result_builder.joined_room_ids,
+                    from_token=since_token,
                 )
             )
-            users_that_have_changed.update(user_signatures_changed)
 
-            # Now find users that we no longer track
-            for room_id in newly_left_rooms:
-                left_users = await self.store.get_users_in_room(room_id)
-                newly_left_users.update(left_users)
+        # Step 1b, check for newly joined rooms
+        for room_id in newly_joined_rooms:
+            joined_users = await self.store.get_users_in_room(room_id)
+            newly_joined_or_invited_or_knocked_users.update(joined_users)
+
+        # TODO: Check that these users are actually new, i.e. either they
+        # weren't in the previous sync *or* they left and rejoined.
+        users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
 
-            # Remove any users that we still share a room with.
-            left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
-            for user_id, entries in left_users_rooms.items():
-                if any(rid in joined_rooms for rid in entries):
-                    newly_left_users.discard(user_id)
+        user_signatures_changed = await self.store.get_users_whose_signatures_changed(
+            user_id, since_token.device_list_key
+        )
+        users_that_have_changed.update(user_signatures_changed)
 
-            return DeviceListUpdates(
-                changed=users_that_have_changed, left=newly_left_users
-            )
-        else:
-            return DeviceListUpdates()
+        # Now find users that we no longer track
+        for room_id in newly_left_rooms:
+            left_users = await self.store.get_users_in_room(room_id)
+            newly_left_users.update(left_users)
 
+        # Remove any users that we still share a room with.
+        left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
+        for user_id, entries in left_users_rooms.items():
+            if any(rid in joined_rooms for rid in entries):
+                newly_left_users.discard(user_id)
+
+        return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
+
+    @trace
     async def _generate_sync_entry_for_to_device(
         self, sync_result_builder: "SyncResultBuilder"
     ) -> None:
@@ -1603,19 +1731,29 @@ class SyncHandler:
             )
 
             for message in messages:
-                # We pop here as we shouldn't be sending the message ID down
-                # `/sync`
-                message_id = message.pop("message_id", None)
-                if message_id:
-                    set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+                log_kv(
+                    {
+                        "event": "to_device_message",
+                        "sender": message["sender"],
+                        "type": message["type"],
+                        EventContentFields.TO_DEVICE_MSGID: message["content"].get(
+                            EventContentFields.TO_DEVICE_MSGID
+                        ),
+                    }
+                )
 
-            logger.debug(
-                "Returning %d to-device messages between %d and %d (current token: %d)",
-                len(messages),
-                since_stream_id,
-                stream_id,
-                now_token.to_device_key,
-            )
+            if messages and issue9533_logger.isEnabledFor(logging.DEBUG):
+                issue9533_logger.debug(
+                    "Returning to-device messages with stream_ids (%d, %d]; now: %d;"
+                    " msgids: %s",
+                    since_stream_id,
+                    stream_id,
+                    now_token.to_device_key,
+                    [
+                        message["content"].get(EventContentFields.TO_DEVICE_MSGID)
+                        for message in messages
+                    ],
+                )
             sync_result_builder.now_token = now_token.copy_and_replace(
                 StreamKeyType.TO_DEVICE, stream_id
             )
@@ -1625,34 +1763,29 @@ class SyncHandler:
 
     async def _generate_sync_entry_for_account_data(
         self, sync_result_builder: "SyncResultBuilder"
-    ) -> Dict[str, Dict[str, JsonDict]]:
-        """Generates the account data portion of the sync response.
+    ) -> None:
+        """Generates the global account data portion of the sync response.
 
         Account data (called "Client Config" in the spec) can be set either globally
         or for a specific room. Account data consists of a list of events which
         accumulate state, much like a room.
 
-        This function retrieves global and per-room account data. The former is written
-        to the given `sync_result_builder`. The latter is returned directly, to be
-        later written to the `sync_result_builder` on a room-by-room basis.
+        This function retrieves global account data and writes it to the given
+        `sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling
+         of per-room account data.
 
         Args:
             sync_result_builder
-
-        Returns:
-            A dictionary whose keys (room ids) map to the per room account data for that
-            room.
         """
         sync_config = sync_result_builder.sync_config
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
         if since_token and not sync_result_builder.full_state:
-            (
-                global_account_data,
-                account_data_by_room,
-            ) = await self.store.get_updated_account_data_for_user(
-                user_id, since_token.account_data_key
+            global_account_data = (
+                await self.store.get_updated_global_account_data_for_user(
+                    user_id, since_token.account_data_key
+                )
             )
 
             push_rules_changed = await self.store.have_push_rules_changed_for_user(
@@ -1660,30 +1793,31 @@ class SyncHandler:
             )
 
             if push_rules_changed:
-                global_account_data["m.push_rules"] = await self.push_rules_for_user(
-                    sync_config.user
-                )
+                global_account_data = dict(global_account_data)
+                global_account_data[
+                    AccountDataTypes.PUSH_RULES
+                ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
         else:
-            (
-                global_account_data,
-                account_data_by_room,
-            ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
-
-            global_account_data["m.push_rules"] = await self.push_rules_for_user(
-                sync_config.user
+            all_global_account_data = await self.store.get_global_account_data_for_user(
+                user_id
             )
 
-        account_data_for_user = await sync_config.filter_collection.filter_account_data(
-            [
-                {"type": account_data_type, "content": content}
-                for account_data_type, content in global_account_data.items()
-            ]
+            global_account_data = dict(all_global_account_data)
+            global_account_data[
+                AccountDataTypes.PUSH_RULES
+            ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+
+        account_data_for_user = (
+            await sync_config.filter_collection.filter_global_account_data(
+                [
+                    {"type": account_data_type, "content": content}
+                    for account_data_type, content in global_account_data.items()
+                ]
+            )
         )
 
         sync_result_builder.account_data = account_data_for_user
 
-        return account_data_by_room
-
     async def _generate_sync_entry_for_presence(
         self,
         sync_result_builder: "SyncResultBuilder",
@@ -1743,10 +1877,8 @@ class SyncHandler:
         sync_result_builder.presence = presence
 
     async def _generate_sync_entry_for_rooms(
-        self,
-        sync_result_builder: "SyncResultBuilder",
-        account_data_by_room: Dict[str, Dict[str, JsonDict]],
-    ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
+        self, sync_result_builder: "SyncResultBuilder"
+    ) -> Tuple[AbstractSet[str], AbstractSet[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -1756,28 +1888,44 @@ class SyncHandler:
 
         Args:
             sync_result_builder
-            account_data_by_room: Dictionary of per room account data
 
         Returns:
-            Returns a 4-tuple describing rooms the user has joined or left, and users who've
-            joined or left rooms any rooms the user is in. This gets used later in
-            `_generate_sync_entry_for_device_list`.
+            Returns a 2-tuple describing rooms the user has joined or left.
 
             Its entries are:
             - newly_joined_rooms
-            - newly_joined_or_invited_or_knocked_users
             - newly_left_rooms
-            - newly_left_users
         """
+
         since_token = sync_result_builder.since_token
+        user_id = sync_result_builder.sync_config.user.to_string()
+
+        blocks_all_rooms = (
+            sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+        )
+
+        # 0. Start by fetching room account data (if required).
+        if (
+            blocks_all_rooms
+            or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
+        ):
+            account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
+        elif since_token and not sync_result_builder.full_state:
+            account_data_by_room = (
+                await self.store.get_updated_room_account_data_for_user(
+                    user_id, since_token.account_data_key
+                )
+            )
+        else:
+            account_data_by_room = await self.store.get_room_account_data_for_user(
+                user_id
+            )
 
         # 1. Start by fetching all ephemeral events in rooms we've joined (if required).
-        user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
-            since_token is None
-            and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
+            blocks_all_rooms
+            or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
         )
-
         if block_all_room_ephemeral:
             ephemeral_by_room: Dict[str, List[JsonDict]] = {}
         else:
@@ -1800,19 +1948,21 @@ class SyncHandler:
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        return set(), set(), set(), set()
+                        return set(), set()
 
         # 3. Work out which rooms need reporting in the sync response.
         ignored_users = await self.store.ignored_users(user_id)
         if since_token:
-            room_changes = await self._get_rooms_changed(
+            room_changes = await self._get_room_changes_for_incremental_sync(
                 sync_result_builder, ignored_users
             )
             tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
             )
         else:
-            room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
+            room_changes = await self._get_room_changes_for_initial_sync(
+                sync_result_builder, ignored_users
+            )
             tags_by_room = await self.store.get_tags_for_user(user_id)
 
         log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1827,6 +1977,7 @@ class SyncHandler:
         # joined or archived).
         async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
             logger.debug("Generating room entry for %s", room_entry.room_id)
+            # Note that this mutates sync_result_builder.{joined,archived}.
             await self._generate_room_entry(
                 sync_result_builder,
                 room_entry,
@@ -1843,20 +1994,7 @@ class SyncHandler:
         sync_result_builder.invited.extend(invited)
         sync_result_builder.knocked.extend(knocked)
 
-        # 5. Work out which users have joined or left rooms we're in. We use this
-        # to build the device_list part of the sync response in
-        # `_generate_sync_entry_for_device_list`.
-        (
-            newly_joined_or_invited_or_knocked_users,
-            newly_left_users,
-        ) = sync_result_builder.calculate_user_changes()
-
-        return (
-            set(newly_joined_rooms),
-            newly_joined_or_invited_or_knocked_users,
-            set(newly_left_rooms),
-            newly_left_users,
-        )
+        return set(newly_joined_rooms), set(newly_left_rooms)
 
     async def _have_rooms_changed(
         self, sync_result_builder: "SyncResultBuilder"
@@ -1871,7 +2009,7 @@ class SyncHandler:
 
         assert since_token
 
-        if membership_change_events:
+        if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1880,7 +2018,7 @@ class SyncHandler:
                 return True
         return False
 
-    async def _get_rooms_changed(
+    async def _get_room_changes_for_incremental_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -1918,7 +2056,9 @@ class SyncHandler:
         for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
-        newly_joined_rooms: List[str] = []
+        newly_joined_rooms: List[str] = list(
+            sync_result_builder.forced_newly_joined_room_ids
+        )
         newly_left_rooms: List[str] = []
         room_entries: List[RoomSyncResultBuilder] = []
         invited: List[InvitedSyncResult] = []
@@ -2124,7 +2264,7 @@ class SyncHandler:
             newly_left_rooms,
         )
 
-    async def _get_all_rooms(
+    async def _get_room_changes_for_initial_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -2149,7 +2289,7 @@ class SyncHandler:
         room_list = await self.store.get_rooms_for_local_user_where_membership_is(
             user_id=user_id,
             membership_list=Membership.LIST,
-            excluded_rooms=self.rooms_to_exclude,
+            excluded_rooms=sync_result_builder.excluded_room_ids,
         )
 
         room_entries = []
@@ -2209,8 +2349,8 @@ class SyncHandler:
         sync_result_builder: "SyncResultBuilder",
         room_builder: "RoomSyncResultBuilder",
         ephemeral: List[JsonDict],
-        tags: Optional[Dict[str, Dict[str, Any]]],
-        account_data: Dict[str, JsonDict],
+        tags: Optional[Mapping[str, Mapping[str, Any]]],
+        account_data: Mapping[str, JsonDict],
         always_include: bool = False,
     ) -> None:
         """Populates the `joined` and `archived` section of `sync_result_builder`
@@ -2307,7 +2447,9 @@ class SyncHandler:
 
             account_data_events = []
             if tags is not None:
-                account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+                account_data_events.append(
+                    {"type": AccountDataTypes.TAG, "content": {"tags": tags}}
+                )
 
             for account_data_type, content in account_data.items():
                 account_data_events.append(
@@ -2518,6 +2660,13 @@ class SyncResultBuilder:
         since_token: The token supplied by user, or None.
         now_token: The token to sync up to.
         joined_room_ids: List of rooms the user is joined to
+        excluded_room_ids: Set of room ids we should omit from the /sync response.
+        forced_newly_joined_room_ids:
+            Rooms that should be presented in the /sync response as if they were
+            newly joined during the sync period, even if that's not the case.
+            (This is useful if the room was previously excluded from a /sync response,
+            and now the client should be made aware of it.)
+            Only used by incremental syncs.
 
         # The following mirror the fields in a sync response
         presence
@@ -2534,6 +2683,8 @@ class SyncResultBuilder:
     since_token: Optional[StreamToken]
     now_token: StreamToken
     joined_room_ids: FrozenSet[str]
+    excluded_room_ids: FrozenSet[str]
+    forced_newly_joined_room_ids: FrozenSet[str]
     membership_change_events: List[EventBase]
 
     presence: List[UserPresenceState] = attr.Factory(list)