summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/device.py10
-rw-r--r--synapse/handlers/e2e_keys.py12
-rw-r--r--synapse/handlers/e2e_room_keys.py15
-rw-r--r--synapse/handlers/events.py9
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/initial_sync.py51
-rw-r--r--synapse/handlers/message.py41
-rw-r--r--synapse/handlers/pagination.py5
-rw-r--r--synapse/handlers/receipts.py6
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_member.py9
-rw-r--r--synapse/handlers/sync.py74
13 files changed, 146 insertions, 111 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61607cf2ba..84724b207c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -997,9 +997,7 @@ class AuthHandler:
         # really don't want is active access_tokens without a record of the
         # device, so we double-check it here.
         if device_id is not None:
-            try:
-                await self.store.get_device(user_id, device_id)
-            except StoreError:
+            if await self.store.get_device(user_id, device_id) is None:
                 await self.store.delete_access_token(access_token)
                 raise StoreError(400, "Login raced against device deletion")
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 82ee11e921..7665425232 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -106,10 +106,10 @@ class DeviceWorkerHandler:
         Raises:
             errors.NotFoundError: if the device was not found
         """
-        try:
-            device = await self.store.get_device(user_id, device_id)
-        except errors.StoreError:
-            raise errors.NotFoundError
+        device = await self.store.get_device(user_id, device_id)
+        if device is None:
+            raise errors.NotFoundError()
+
         ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
         _update_device_from_client_ips(device, ips)
 
@@ -602,6 +602,8 @@ class DeviceHandler(DeviceWorkerHandler):
             access_token, device_id
         )
         old_device = await self.store.get_device(user_id, old_device_id)
+        if old_device is None:
+            raise errors.NotFoundError()
         await self.store.update_device(user_id, device_id, old_device["display_name"])
         # can't call self.delete_device because that will clobber the
         # access token so call the storage layer directly
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 60c11e3d21..14360b4e40 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,8 +65,12 @@ class E2eKeysHandler:
         else:
             # Only register this edu handler on master as it requires writing
             # device updates to the db
-            #
-            # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+            federation_registry.register_edu_handler(
+                "m.signing_key_update",
+                self._edu_updater.incoming_signing_key_update,
+            )
+            # also handle the unstable version
+            # FIXME: remove this when enough servers have upgraded
             federation_registry.register_edu_handler(
                 "org.matrix.signing_key_update",
                 self._edu_updater.incoming_signing_key_update,
@@ -576,7 +580,9 @@ class E2eKeysHandler:
             log_kv(
                 {"message": "Did not update one_time_keys", "reason": "no keys given"}
             )
-        fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
+        fallback_keys = keys.get("fallback_keys") or keys.get(
+            "org.matrix.msc2732.fallback_keys"
+        )
         if fallback_keys and isinstance(fallback_keys, dict):
             log_kv(
                 {
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 31742236a9..12614b2c5d 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -14,7 +14,9 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, Dict, Optional
+
+from typing_extensions import Literal
 
 from synapse.api.errors import (
     Codes,
@@ -24,6 +26,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.logging.opentracing import log_kv, trace
+from synapse.storage.databases.main.e2e_room_keys import RoomKey
 from synapse.types import JsonDict
 from synapse.util.async_helpers import Linearizer
 
@@ -58,7 +61,9 @@ class E2eRoomKeysHandler:
         version: str,
         room_id: Optional[str] = None,
         session_id: Optional[str] = None,
-    ) -> List[JsonDict]:
+    ) -> Dict[
+        Literal["rooms"], Dict[str, Dict[Literal["sessions"], Dict[str, RoomKey]]]
+    ]:
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
         room, or a given session.
         See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
@@ -72,8 +77,8 @@ class E2eRoomKeysHandler:
         Raises:
             NotFoundError: if the backup version does not exist
         Returns:
-            A list of dicts giving the session_data and message metadata for
-            these room keys.
+            A dict giving the session_data and message metadata for these room keys.
+            `{"rooms": {room_id: {"sessions": {session_id: room_key}}}}`
         """
 
         # we deliberately take the lock to get keys so that changing the version
@@ -273,7 +278,7 @@ class E2eRoomKeysHandler:
 
     @staticmethod
     def _should_replace_room_key(
-        current_room_key: Optional[JsonDict], room_key: JsonDict
+        current_room_key: Optional[RoomKey], room_key: RoomKey
     ) -> bool:
         """
         Determine whether to replace a given current_room_key (if any)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 32b0254c5f..1b996c420d 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -79,13 +79,14 @@ class EventStreamHandler:
                 # thundering herds on restart.
                 timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
-            events, tokens = await self.notifier.get_events_for(
+            stream_result = await self.notifier.get_events_for(
                 auth_user,
                 pagin_config,
                 timeout,
                 is_guest=is_guest,
                 explicit_room_id=room_id,
             )
+            events = stream_result.events
 
             time_now = self.clock.time_msec()
 
@@ -122,14 +123,12 @@ class EventStreamHandler:
                 events,
                 time_now,
                 as_client_event=as_client_event,
-                # Don't bundle aggregations as this is a deprecated API.
-                bundle_aggregations=False,
             )
 
             chunk = {
                 "chunk": chunks,
-                "start": await tokens[0].to_string(self.store),
-                "end": await tokens[1].to_string(self.store),
+                "start": await stream_result.start_token.to_string(self.store),
+                "end": await stream_result.end_token.to_string(self.store),
             }
 
             return chunk
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1ea837d082..26b8e3f43c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -360,31 +360,34 @@ class FederationHandler:
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
-        states = await make_deferred_yieldable(
+        states_list = await make_deferred_yieldable(
             defer.gatherResults(
                 [resolve(room_id, [e]) for e in event_ids], consumeErrors=True
             )
         )
 
-        # dict[str, dict[tuple, str]], a map from event_id to state map of
-        # event_ids.
-        states = dict(zip(event_ids, [s.state for s in states]))
+        # A map from event_id to state map of event_ids.
+        state_ids: Dict[str, StateMap[str]] = dict(
+            zip(event_ids, [s.state for s in states_list])
+        )
 
         state_map = await self.store.get_events(
-            [e_id for ids in states.values() for e_id in ids.values()],
+            [e_id for ids in state_ids.values() for e_id in ids.values()],
             get_prev_content=False,
         )
-        states = {
+
+        # A map from event_id to state map of events.
+        state_events: Dict[str, StateMap[EventBase]] = {
             key: {
                 k: state_map[e_id]
                 for k, e_id in state_dict.items()
                 if e_id in state_map
             }
-            for key, state_dict in states.items()
+            for key, state_dict in state_ids.items()
         }
 
         for e_id in event_ids:
-            likely_extremeties_domains = get_domains_from_state(states[e_id])
+            likely_extremeties_domains = get_domains_from_state(state_events[e_id])
 
             success = await try_backfill(
                 [
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9cd21e7f2b..601bab67f9 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,21 +13,27 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple
-
-from twisted.internet import defer
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import SynapseError
+from synapse.events import EventBase
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.receipts import ReceiptEventSource
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.storage.roommember import RoomsForUser
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    Requester,
+    RoomStreamToken,
+    StateMap,
+    StreamToken,
+    UserID,
+)
 from synapse.util import unwrapFirstError
-from synapse.util.async_helpers import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute, gather_results
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.visibility import filter_events_for_client
 
@@ -167,8 +173,6 @@ class InitialSyncHandler:
                 d["invite"] = await self._event_serializer.serialize_event(
                     invite_event,
                     time_now,
-                    # Don't bundle aggregations as this is a deprecated API.
-                    bundle_aggregations=False,
                     as_client_event=as_client_event,
                 )
 
@@ -190,14 +194,13 @@ class InitialSyncHandler:
                     )
                     deferred_room_state = run_in_background(
                         self.state_store.get_state_for_events, [event.event_id]
-                    )
-                    deferred_room_state.addCallback(
-                        lambda states: states[event.event_id]
+                    ).addCallback(
+                        lambda states: cast(StateMap[EventBase], states[event.event_id])
                     )
 
                 (messages, token), current_state = await make_deferred_yieldable(
-                    defer.gatherResults(
-                        [
+                    gather_results(
+                        (
                             run_in_background(
                                 self.store.get_recent_events_for_room,
                                 event.room_id,
@@ -205,7 +208,7 @@ class InitialSyncHandler:
                                 end_token=room_end_token,
                             ),
                             deferred_room_state,
-                        ]
+                        )
                     )
                 ).addErrback(unwrapFirstError)
 
@@ -222,8 +225,6 @@ class InitialSyncHandler:
                         await self._event_serializer.serialize_events(
                             messages,
                             time_now=time_now,
-                            # Don't bundle aggregations as this is a deprecated API.
-                            bundle_aggregations=False,
                             as_client_event=as_client_event,
                         )
                     ),
@@ -234,8 +235,6 @@ class InitialSyncHandler:
                 d["state"] = await self._event_serializer.serialize_events(
                     current_state.values(),
                     time_now=time_now,
-                    # Don't bundle aggregations as this is a deprecated API.
-                    bundle_aggregations=False,
                     as_client_event=as_client_event,
                 )
 
@@ -377,9 +376,7 @@ class InitialSyncHandler:
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    await self._event_serializer.serialize_events(
-                        messages, time_now, bundle_aggregations=False
-                    )
+                    await self._event_serializer.serialize_events(messages, time_now)
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
@@ -387,7 +384,7 @@ class InitialSyncHandler:
             "state": (
                 # Don't bundle aggregations as this is a deprecated API.
                 await self._event_serializer.serialize_events(
-                    room_state.values(), time_now, bundle_aggregations=False
+                    room_state.values(), time_now
                 )
             ),
             "presence": [],
@@ -408,7 +405,7 @@ class InitialSyncHandler:
         time_now = self.clock.time_msec()
         # Don't bundle aggregations as this is a deprecated API.
         state = await self._event_serializer.serialize_events(
-            current_state.values(), time_now, bundle_aggregations=False
+            current_state.values(), time_now
         )
 
         now_token = self.hs.get_event_sources().get_current_token()
@@ -454,8 +451,8 @@ class InitialSyncHandler:
             return receipts
 
         presence, receipts, (messages, token) = await make_deferred_yieldable(
-            defer.gatherResults(
-                [
+            gather_results(
+                (
                     run_in_background(get_presence),
                     run_in_background(get_receipts),
                     run_in_background(
@@ -464,7 +461,7 @@ class InitialSyncHandler:
                         limit=limit,
                         end_token=now_token.room_key,
                     ),
-                ],
+                ),
                 consumeErrors=True,
             ).addErrback(unwrapFirstError)
         )
@@ -483,9 +480,7 @@ class InitialSyncHandler:
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    await self._event_serializer.serialize_events(
-                        messages, time_now, bundle_aggregations=False
-                    )
+                    await self._event_serializer.serialize_events(messages, time_now)
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 87f671708c..1a7190085a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
-from twisted.internet import defer
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
@@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
 from synapse.util import json_decoder, json_encoder, log_failure
-from synapse.util.async_helpers import Linearizer, unwrapFirstError
+from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -247,7 +246,9 @@ class MessageHandler:
                 room_state = room_state_events[membership_event_id]
 
         now = self.clock.time_msec()
-        events = await self._event_serializer.serialize_events(room_state.values(), now)
+        events = await self._event_serializer.serialize_events(
+            room_state.values(), now, bundle_aggregations=True
+        )
         return events
 
     async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
@@ -496,6 +497,7 @@ class EventCreationHandler:
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -607,6 +609,7 @@ class EventCreationHandler:
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             depth=depth,
+            allow_no_prev_events=allow_no_prev_events,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -882,6 +885,7 @@ class EventCreationHandler:
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
+        allow_no_prev_events: bool = False,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
@@ -912,6 +916,7 @@ class EventCreationHandler:
         full_state_ids_at_event = None
         if auth_event_ids is not None:
             # If auth events are provided, prev events must be also.
+            # prev_event_ids could be an empty array though.
             assert prev_event_ids is not None
 
             # Copy the full auth state before it stripped down
@@ -943,14 +948,22 @@ class EventCreationHandler:
         else:
             prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
 
-        # we now ought to have some prev_events (unless it's a create event).
-        #
-        # do a quick sanity check here, rather than waiting until we've created the
+        # Do a quick sanity check here, rather than waiting until we've created the
         # event and then try to auth it (which fails with a somewhat confusing "No
         # create event in auth events")
-        assert (
-            builder.type == EventTypes.Create or len(prev_event_ids) > 0
-        ), "Attempting to create an event with no prev_events"
+        if allow_no_prev_events:
+            # We allow events with no `prev_events` but it better have some `auth_events`
+            assert (
+                builder.type == EventTypes.Create
+                # Allow an event to have empty list of prev_event_ids
+                # only if it has auth_event_ids.
+                or auth_event_ids
+            ), "Attempting to create a non-m.room.create event with no prev_events or auth_event_ids"
+        else:
+            # we now ought to have some prev_events (unless it's a create event).
+            assert (
+                builder.type == EventTypes.Create or prev_event_ids
+            ), "Attempting to create a non-m.room.create event with no prev_events"
 
         event = await builder.build(
             prev_event_ids=prev_event_ids,
@@ -1156,9 +1169,9 @@ class EventCreationHandler:
 
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
-        result = await make_deferred_yieldable(
-            defer.gatherResults(
-                [
+        result, _ = await make_deferred_yieldable(
+            gather_results(
+                (
                     run_in_background(
                         self._persist_event,
                         requester=requester,
@@ -1170,12 +1183,12 @@ class EventCreationHandler:
                     run_in_background(
                         self.cache_joined_hosts_for_event, event, context
                     ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
-                ],
+                ),
                 consumeErrors=True,
             )
         ).addErrback(unwrapFirstError)
 
-        return result[0]
+        return result
 
     async def _persist_event(
         self,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 4f42438053..7469cc55a2 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -542,7 +542,10 @@ class PaginationHandler:
         chunk = {
             "chunk": (
                 await self._event_serializer.serialize_events(
-                    events, time_now, as_client_event=as_client_event
+                    events,
+                    time_now,
+                    bundle_aggregations=True,
+                    as_client_event=as_client_event,
                 )
             ),
             "start": await from_token.to_string(self.store),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4911a11535..5cb1ff749d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -14,7 +14,7 @@
 import logging
 from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 
-from synapse.api.constants import ReadReceiptEventFields
+from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
 from synapse.appservice import ApplicationService
 from synapse.streams import EventSource
 from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
@@ -178,7 +178,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
 
             for event_id in content.keys():
                 event_content = content.get(event_id, {})
-                m_read = event_content.get("m.read", {})
+                m_read = event_content.get(ReceiptTypes.READ, {})
 
                 # If m_read is missing copy over the original event_content as there is nothing to process here
                 if not m_read:
@@ -206,7 +206,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
 
                 # Set new users unless empty
                 if len(new_users.keys()) > 0:
-                    new_event["content"][event_id] = {"m.read": new_users}
+                    new_event["content"][event_id] = {ReceiptTypes.READ: new_users}
 
             # Append new_event to visible_events unless empty
             if len(new_event["content"].keys()) > 0:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ead2198e14..b9c1cbffa5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -172,7 +172,7 @@ class RoomCreationHandler:
         user_id = requester.user.to_string()
 
         # Check if this room is already being upgraded by another person
-        for key in self._upgrade_response_cache.pending_result_cache:
+        for key in self._upgrade_response_cache.keys():
             if key[0] == old_room_id and key[1] != user_id:
                 # Two different people are trying to upgrade the same room.
                 # Send the second an error.
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a6dbff637f..6aa910dd10 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -658,7 +658,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             if block_invite:
                 raise SynapseError(403, "Invites have been disabled on this server")
 
-        if prev_event_ids:
+        # An empty prev_events list is allowed as long as the auth_event_ids are present
+        if prev_event_ids is not None:
             return await self._local_membership_update(
                 requester=requester,
                 target=target,
@@ -1019,7 +1020,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # Add new room to the room directory if the old room was there
         # Remove old room from the room directory
         old_room = await self.store.get_room(old_room_id)
-        if old_room and old_room["is_public"]:
+        if old_room is not None and old_room["is_public"]:
             await self.store.set_room_is_public(old_room_id, False)
             await self.store.set_room_is_public(room_id, True)
 
@@ -1030,7 +1031,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
         for group_id in local_group_ids:
             # Add new the new room to those groups
-            await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+            await self.store.add_room_to_group(
+                group_id, room_id, old_room is not None and old_room["is_public"]
+            )
 
             # Remove the old room from those groups
             await self.store.remove_room_from_group(group_id, old_room_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f3039c3c3f..7baf3f199c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,7 +28,7 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -36,6 +36,7 @@ from synapse.events import EventBase
 from synapse.logging.context import current_context
 from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.databases.main.event_push_actions import NotifCounts
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -421,7 +422,7 @@ class SyncHandler:
         span to track the sync. See `generate_sync_result` for the next part of your
         indoctrination.
         """
-        with start_active_span("current_sync_for_user"):
+        with start_active_span("sync.current_sync_for_user"):
             log_kv({"since_token": since_token})
             sync_result = await self.generate_sync_result(
                 sync_config, since_token, full_state
@@ -1041,18 +1042,17 @@ class SyncHandler:
 
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
-    ) -> Dict[str, int]:
+    ) -> NotifCounts:
         with Measure(self.clock, "unread_notifs_for_room_id"):
             last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
                 user_id=sync_config.user.to_string(),
                 room_id=room_id,
-                receipt_type="m.read",
+                receipt_type=ReceiptTypes.READ,
             )
 
-            notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
+            return await self.store.get_unread_event_push_actions_by_room_for_user(
                 room_id, sync_config.user.to_string(), last_unread_event_id
             )
-            return notifs
 
     async def generate_sync_result(
         self,
@@ -1585,7 +1585,8 @@ class SyncHandler:
             )
             logger.debug("Generated room entry for %s", room_entry.room_id)
 
-        await concurrently_execute(handle_room_entries, room_entries, 10)
+        with start_active_span("sync.generate_room_entries"):
+            await concurrently_execute(handle_room_entries, room_entries, 10)
 
         sync_result_builder.invited.extend(invited)
         sync_result_builder.knocked.extend(knocked)
@@ -1662,20 +1663,20 @@ class SyncHandler:
     ) -> _RoomChanges:
         """Determine the changes in rooms to report to the user.
 
-        Ideally, we want to report all events whose stream ordering `s` lies in the
-        range `since_token < s <= now_token`, where the two tokens are read from the
-        sync_result_builder.
+        This function is a first pass at generating the rooms part of the sync response.
+        It determines which rooms have changed during the sync period, and categorises
+        them into four buckets: "knock", "invite", "join" and "leave".
 
-        If there are too many events in that range to report, things get complicated.
-        In this situation we return a truncated list of the most recent events, and
-        indicate in the response that there is a "gap" of omitted events. Additionally:
+        1. Finds all membership changes for the user in the sync period (from
+           `since_token` up to `now_token`).
+        2. Uses those to place the room in one of the four categories above.
+        3. Builds a `_RoomChanges` struct to record this, and return that struct.
 
-        - we include a "state_delta", to describe the changes in state over the gap,
-        - we include all membership events applying to the user making the request,
-          even those in the gap.
-
-        See the spec for the rationale:
-            https://spec.matrix.org/v1.1/client-server-api/#syncing
+        For rooms classified as "knock", "invite" or "leave", we just need to report
+        a single membership event in the eventual /sync response. For "join" we need
+        to fetch additional non-membership events, e.g. messages in the room. That is
+        more complicated, so instead we report an intermediary `RoomSyncResultBuilder`
+        struct, and leave the additional work to `_generate_room_entry`.
 
         The sync_result_builder is not modified by this function.
         """
@@ -1686,16 +1687,6 @@ class SyncHandler:
 
         assert since_token
 
-        # The spec
-        #     https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
-        # notes that membership events need special consideration:
-        #
-        # > When a sync is limited, the server MUST return membership events for events
-        # > in the gap (between since and the start of the returned timeline), regardless
-        # > as to whether or not they are redundant.
-        #
-        # We fetch such events here, but we only seem to use them for categorising rooms
-        # as newly joined, newly left, invited or knocked.
         # TODO: we've already called this function and ran this query in
         #       _have_rooms_changed. We could keep the results in memory to avoid a
         #       second query, at the cost of more complicated source code.
@@ -2009,6 +2000,23 @@ class SyncHandler:
         """Populates the `joined` and `archived` section of `sync_result_builder`
         based on the `room_builder`.
 
+        Ideally, we want to report all events whose stream ordering `s` lies in the
+        range `since_token < s <= now_token`, where the two tokens are read from the
+        sync_result_builder.
+
+        If there are too many events in that range to report, things get complicated.
+        In this situation we return a truncated list of the most recent events, and
+        indicate in the response that there is a "gap" of omitted events. Lots of this
+        is handled in `_load_filtered_recents`, but some of is handled in this method.
+
+        Additionally:
+        - we include a "state_delta", to describe the changes in state over the gap,
+        - we include all membership events applying to the user making the request,
+          even those in the gap.
+
+        See the spec for the rationale:
+            https://spec.matrix.org/v1.1/client-server-api/#syncing
+
         Args:
             sync_result_builder
             ignored_users: Set of users ignored by user.
@@ -2038,7 +2046,7 @@ class SyncHandler:
         since_token = room_builder.since_token
         upto_token = room_builder.upto_token
 
-        with start_active_span("generate_room_entry"):
+        with start_active_span("sync.generate_room_entry"):
             set_tag("room_id", room_id)
             log_kv({"events": len(events or ())})
 
@@ -2166,10 +2174,10 @@ class SyncHandler:
                 if room_sync or always_include:
                     notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
 
-                    unread_notifications["notification_count"] = notifs["notify_count"]
-                    unread_notifications["highlight_count"] = notifs["highlight_count"]
+                    unread_notifications["notification_count"] = notifs.notify_count
+                    unread_notifications["highlight_count"] = notifs.highlight_count
 
-                    room_sync.unread_count = notifs["unread_count"]
+                    room_sync.unread_count = notifs.unread_count
 
                     sync_result_builder.joined.append(room_sync)