summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-10-16 15:42:54 -0400
committerPatrick Cloke <patrickc@matrix.org>2023-10-16 15:42:54 -0400
commitc1878cd4ae7fec1d7b090b4b45698ce36bcd3726 (patch)
tree2de86373f8b7516a69ac910597a2a514ec41afd5 /synapse/handlers
parentMerge branch 'develop' into clokep/db-upgrades (diff)
parentUpdate the release script to remind releaser to check for special release not... (diff)
downloadsynapse-c1878cd4ae7fec1d7b090b4b45698ce36bcd3726.tar.xz
Merge remote-tracking branch 'origin/develop' into clokep/db-upgrades
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_validity.py4
-rw-r--r--synapse/handlers/admin.py8
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/device.py1
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/initial_sync.py3
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/presence.py32
-rw-r--r--synapse/handlers/push_rules.py6
-rw-r--r--synapse/handlers/receipts.py25
-rw-r--r--synapse/handlers/room.py3
-rw-r--r--synapse/handlers/room_member.py66
-rw-r--r--synapse/handlers/stats.py64
-rw-r--r--synapse/handlers/sync.py24
-rw-r--r--synapse/handlers/user_directory.py34
16 files changed, 126 insertions, 169 deletions
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index f1a7a05df6..6c2a49a3b9 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -212,8 +212,8 @@ class AccountValidityHandler:
 
         addresses = []
         for threepid in threepids:
-            if threepid["medium"] == "email":
-                addresses.append(threepid["address"])
+            if threepid.medium == "email":
+                addresses.append(threepid.address)
 
         return addresses
 
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ba9704a065..2c2baeac67 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -16,6 +16,8 @@ import abc
 import logging
 from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
 
+import attr
+
 from synapse.api.constants import Direction, Membership
 from synapse.events import EventBase
 from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
@@ -93,7 +95,7 @@ class AdminHandler:
         ]
         user_info_dict["displayname"] = profile.display_name
         user_info_dict["avatar_url"] = profile.avatar_url
-        user_info_dict["threepids"] = threepids
+        user_info_dict["threepids"] = [attr.asdict(t) for t in threepids]
         user_info_dict["external_ids"] = external_ids
         user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())
 
@@ -171,8 +173,8 @@ class AdminHandler:
             else:
                 stream_ordering = room.stream_ordering
 
-            from_key = RoomStreamToken(0, 0)
-            to_key = RoomStreamToken(None, stream_ordering)
+            from_key = RoomStreamToken(topological=0, stream=0)
+            to_key = RoomStreamToken(stream=stream_ordering)
 
             # Events that we've processed in this room
             written_events: Set[str] = set()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..c200a45f3a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -216,7 +216,7 @@ class ApplicationServicesHandler:
 
     def notify_interested_services_ephemeral(
         self,
-        stream_key: str,
+        stream_key: StreamKeyType,
         new_token: Union[int, RoomStreamToken],
         users: Collection[Union[str, UserID]],
     ) -> None:
@@ -326,7 +326,7 @@ class ApplicationServicesHandler:
     async def _notify_interested_services_ephemeral(
         self,
         services: List[ApplicationService],
-        stream_key: str,
+        stream_key: StreamKeyType,
         new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 67adeae6a7..6a8f8f2fd1 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -117,9 +117,9 @@ class DeactivateAccountHandler:
 
         # Remove any local threepid associations for this account.
         local_threepids = await self.store.user_get_threepids(user_id)
-        for threepid in local_threepids:
+        for local_threepid in local_threepids:
             await self._auth_handler.delete_local_threepid(
-                user_id, threepid["medium"], threepid["address"]
+                user_id, local_threepid.medium, local_threepid.address
             )
 
         # delete any devices belonging to the user, which will also
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 86ad96d030..50df4f2b06 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler):
                     else:
                         assert max_stream_id == stream_id
                         # Avoid moving `room_id` backwards.
-                        pass
 
                     if self._handle_new_device_update_new_data:
                         continue
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 29cd45550a..9d72794e8b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -868,19 +868,10 @@ class FederationHandler:
         # This is a bit of a hack and is cribbing off of invites. Basically we
         # store the room state here and retrieve it again when this event appears
         # in the invitee's sync stream. It is stripped out for all other local users.
-        stripped_room_state = (
-            knock_response.get("knock_room_state")
-            # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
-            # Thus, we also check for a 'knock_state_events' to support old instances.
-            # See https://github.com/matrix-org/synapse/issues/14088.
-            or knock_response.get("knock_state_events")
-        )
+        stripped_room_state = knock_response.get("knock_room_state")
 
         if stripped_room_state is None:
-            raise KeyError(
-                "Missing 'knock_room_state' (or legacy 'knock_state_events') field in "
-                "send_knock response"
-            )
+            raise KeyError("Missing 'knock_room_state' field in send_knock response")
 
         event.unsigned["knock_room_state"] = stripped_room_state
 
@@ -1506,7 +1497,6 @@ class FederationHandler:
                     # in the meantime and context needs to be recomputed, so let's do so.
                     if i == max_retries - 1:
                         raise e
-                    pass
         else:
             destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
 
@@ -1582,7 +1572,6 @@ class FederationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
     async def add_display_name_to_third_party_invite(
         self,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 5737f8014d..c34bd7db95 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -192,8 +192,7 @@ class InitialSyncHandler:
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = RoomStreamToken(
-                        None,
-                        event.stream_ordering,
+                        stream=event.stream_ordering,
                     )
                     deferred_room_state = run_in_background(
                         self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 44dbbf81dd..41a35ce510 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1133,7 +1133,6 @@ class EventCreationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so must have a stream ordering
         assert ev.internal_metadata.stream_ordering
@@ -2038,7 +2037,6 @@ class EventCreationHandler:
                         # in the meantime and context needs to be recomputed, so let's do so.
                         if i == max_retries - 1:
                             raise e
-                        pass
                 return True
             except AuthError:
                 logger.info(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7c7cda3e95..dfc0b9db07 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -110,6 +110,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
 from synapse.replication.tcp.commands import ClearUserSyncsCommand
 from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
 from synapse.storage.databases.main import DataStore
+from synapse.storage.databases.main.state_deltas import StateDelta
 from synapse.streams import EventSource
 from synapse.types import (
     JsonDict,
@@ -1499,9 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
                 # We may get multiple deltas for different rooms, but we want to
                 # handle them on a room by room basis, so we batch them up by
                 # room.
-                deltas_by_room: Dict[str, List[JsonDict]] = {}
+                deltas_by_room: Dict[str, List[StateDelta]] = {}
                 for delta in deltas:
-                    deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+                    deltas_by_room.setdefault(delta.room_id, []).append(delta)
 
                 for room_id, deltas_for_room in deltas_by_room.items():
                     await self._handle_state_delta(room_id, deltas_for_room)
@@ -1513,7 +1514,7 @@ class PresenceHandler(BasePresenceHandler):
                     max_pos
                 )
 
-    async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+    async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
         """Process current state deltas for the room to find new joins that need
         to be handled.
         """
@@ -1524,31 +1525,30 @@ class PresenceHandler(BasePresenceHandler):
         newly_joined_users = set()
 
         for delta in deltas:
-            assert room_id == delta["room_id"]
+            assert room_id == delta.room_id
 
-            typ = delta["type"]
-            state_key = delta["state_key"]
-            event_id = delta["event_id"]
-            prev_event_id = delta["prev_event_id"]
-
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug(
+                "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
+            )
 
             # Drop any event that isn't a membership join
-            if typ != EventTypes.Member:
+            if delta.event_type != EventTypes.Member:
                 continue
 
-            if event_id is None:
+            if delta.event_id is None:
                 # state has been deleted, so this is not a join. We only care about
                 # joins.
                 continue
 
-            event = await self.store.get_event(event_id, allow_none=True)
+            event = await self.store.get_event(delta.event_id, allow_none=True)
             if not event or event.content.get("membership") != Membership.JOIN:
                 # We only care about joins
                 continue
 
-            if prev_event_id:
-                prev_event = await self.store.get_event(prev_event_id, allow_none=True)
+            if delta.prev_event_id:
+                prev_event = await self.store.get_event(
+                    delta.prev_event_id, allow_none=True
+                )
                 if (
                     prev_event
                     and prev_event.content.get("membership") == Membership.JOIN
@@ -1556,7 +1556,7 @@ class PresenceHandler(BasePresenceHandler):
                     # Ignore changes to join events.
                     continue
 
-            newly_joined_users.add(state_key)
+            newly_joined_users.add(delta.state_key)
 
         if not newly_joined_users:
             # If nobody has joined then there's nothing to do.
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 7ed88a3611..87b428ab1c 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.push_rule import RuleNotFoundException
 from synapse.synapse_rust.push import get_base_rule_ids
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -114,7 +114,9 @@ class PushRulesHandler:
             user_id: the user ID the change is for.
         """
         stream_id = self._main_store.get_max_push_rules_stream_id()
-        self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
+        self._notifier.on_new_event(
+            StreamKeyType.PUSH_RULES, stream_id, users=[user_id]
+        )
 
     async def push_rules_for_user(
         self, user: UserID
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler:
 
     async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
         """Takes a list of receipts, stores them and informs the notifier."""
-        min_batch_id: Optional[int] = None
-        max_batch_id: Optional[int] = None
 
+        receipts_persisted: List[ReadReceipt] = []
         for receipt in receipts:
-            res = await self.store.insert_receipt(
+            stream_id = await self.store.insert_receipt(
                 receipt.room_id,
                 receipt.receipt_type,
                 receipt.user_id,
@@ -143,30 +142,26 @@ class ReceiptsHandler:
                 receipt.data,
             )
 
-            if not res:
-                # res will be None if this receipt is 'old'
+            if stream_id is None:
+                # stream_id will be None if this receipt is 'old'
                 continue
 
-            stream_id, max_persisted_id = res
+            receipts_persisted.append(receipt)
 
-            if min_batch_id is None or stream_id < min_batch_id:
-                min_batch_id = stream_id
-            if max_batch_id is None or max_persisted_id > max_batch_id:
-                max_batch_id = max_persisted_id
-
-        # Either both of these should be None or neither.
-        if min_batch_id is None or max_batch_id is None:
+        if not receipts_persisted:
             # no new receipts
             return False
 
-        affected_room_ids = list({r.room_id for r in receipts})
+        max_batch_id = self.store.get_max_receipt_stream_id()
+
+        affected_room_ids = list({r.room_id for r in receipts_persisted})
 
         self.notifier.on_new_event(
             StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
         )
         # Note that the min here shouldn't be relied upon to be accurate.
         await self.hs.get_pusherpool().on_new_receipts(
-            min_batch_id, max_batch_id, affected_room_ids
+            {r.user_id for r in receipts_persisted}
         )
 
         return True
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a0c3b16819..97c9f01245 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -261,7 +261,6 @@ class RoomCreationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # This is to satisfy mypy and should never happen
         raise PartialStateConflictError()
@@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
 
         if from_key.topological:
             logger.warning("Stream has topological part!!!! %r", from_key)
-            from_key = RoomStreamToken(None, from_key.stream)
+            from_key = RoomStreamToken(stream=from_key.stream)
 
         app_service = self.store.get_app_service_by_user_id(user.to_string())
         if app_service:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 90343c2306..918eb203e2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -16,7 +16,7 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
 
 from synapse import types
 from synapse.api.constants import (
@@ -44,6 +44,7 @@ from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
 from synapse.logging import opentracing
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.databases.main.state_deltas import StateDelta
 from synapse.types import (
     JsonDict,
     Requester,
@@ -382,8 +383,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         and persist a new event for the new membership change.
 
         Args:
-            requester:
-            target:
+            requester: User requesting the membership change, i.e. the sender of the
+                desired membership event.
+            target: Use whose membership should change, i.e. the state_key of the
+                desired membership event.
             room_id:
             membership:
 
@@ -415,7 +418,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         Returns:
             Tuple of event ID and stream ordering position
         """
-
         user_id = target.to_string()
 
         if content is None:
@@ -475,21 +477,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                     (EventTypes.Member, user_id), None
                 )
 
-                if event.membership == Membership.JOIN:
-                    newly_joined = True
-                    if prev_member_event_id:
-                        prev_member_event = await self.store.get_event(
-                            prev_member_event_id
-                        )
-                        newly_joined = prev_member_event.membership != Membership.JOIN
-
-                    # Only rate-limit if the user actually joined the room, otherwise we'll end
-                    # up blocking profile updates.
-                    if newly_joined and ratelimit:
-                        await self._join_rate_limiter_local.ratelimit(requester)
-                        await self._join_rate_per_room_limiter.ratelimit(
-                            requester, key=room_id, update=False
-                        )
                 with opentracing.start_active_span("handle_new_client_event"):
                     result_event = (
                         await self.event_creation_handler.handle_new_client_event(
@@ -514,7 +501,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so should have a stream ordering
         assert result_event.internal_metadata.stream_ordering
@@ -618,6 +604,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         Raises:
             ShadowBanError if a shadow-banned requester attempts to send an invite.
         """
+        if ratelimit:
+            if action == Membership.JOIN:
+                # Only rate-limit if the user isn't already joined to the room, otherwise
+                # we'll end up blocking profile updates.
+                (
+                    current_membership,
+                    _,
+                ) = await self.store.get_local_current_membership_for_user_in_room(
+                    requester.user.to_string(),
+                    room_id,
+                )
+                if current_membership != Membership.JOIN:
+                    await self._join_rate_limiter_local.ratelimit(requester)
+                    await self._join_rate_per_room_limiter.ratelimit(
+                        requester, key=room_id, update=False
+                    )
+            elif action == Membership.INVITE:
+                await self.ratelimit_invite(requester, room_id, target.to_string())
+
         if action == Membership.INVITE and requester.shadow_banned:
             # We randomly sleep a bit just to annoy the requester.
             await self.clock.sleep(random.randint(1, 10))
@@ -794,8 +799,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         if effective_membership_state == Membership.INVITE:
             target_id = target.to_string()
-            if ratelimit:
-                await self.ratelimit_invite(requester, room_id, target_id)
 
             # block any attempts to invite the server notices mxid
             if target_id == self._server_notices_mxid:
@@ -2002,7 +2005,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so must have a stream ordering
         assert result_event.internal_metadata.stream_ordering
@@ -2145,24 +2147,18 @@ class RoomForgetterHandler(StateDeltasHandler):
 
             await self._store.update_room_forgetter_stream_pos(max_pos)
 
-    async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+    async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
         """Called with the state deltas to process"""
         for delta in deltas:
-            typ = delta["type"]
-            state_key = delta["state_key"]
-            room_id = delta["room_id"]
-            event_id = delta["event_id"]
-            prev_event_id = delta["prev_event_id"]
-
-            if typ != EventTypes.Member:
+            if delta.event_type != EventTypes.Member:
                 continue
 
-            if not self._hs.is_mine_id(state_key):
+            if not self._hs.is_mine_id(delta.state_key):
                 continue
 
             change = await self._get_key_change(
-                prev_event_id,
-                event_id,
+                delta.prev_event_id,
+                delta.event_id,
                 key_name="membership",
                 public_value=Membership.JOIN,
             )
@@ -2171,7 +2167,7 @@ class RoomForgetterHandler(StateDeltasHandler):
             if is_leave:
                 try:
                     await self._room_member_handler.forget(
-                        UserID.from_string(state_key), room_id
+                        UserID.from_string(delta.state_key), delta.room_id
                     )
                 except SynapseError as e:
                     if e.code == 400:
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 3dde19fc81..817b41aa37 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -27,6 +27,7 @@ from typing import (
 from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.databases.main.state_deltas import StateDelta
 from synapse.types import JsonDict
 
 if TYPE_CHECKING:
@@ -142,7 +143,7 @@ class StatsHandler:
             self.pos = max_pos
 
     async def _handle_deltas(
-        self, deltas: Iterable[JsonDict]
+        self, deltas: Iterable[StateDelta]
     ) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]:
         """Called with the state deltas to process
 
@@ -157,51 +158,50 @@ class StatsHandler:
         room_to_state_updates: Dict[str, Dict[str, Any]] = {}
 
         for delta in deltas:
-            typ = delta["type"]
-            state_key = delta["state_key"]
-            room_id = delta["room_id"]
-            event_id = delta["event_id"]
-            stream_id = delta["stream_id"]
-            prev_event_id = delta["prev_event_id"]
-
-            logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
+            logger.debug(
+                "Handling: %r, %r %r, %s",
+                delta.room_id,
+                delta.event_type,
+                delta.state_key,
+                delta.event_id,
+            )
 
-            token = await self.store.get_earliest_token_for_stats("room", room_id)
+            token = await self.store.get_earliest_token_for_stats("room", delta.room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
-            if token is not None and token >= stream_id:
+            if token is not None and token >= delta.stream_id:
                 logger.debug(
                     "Ignoring: %s as earlier than this room's initial ingestion event",
-                    event_id,
+                    delta.event_id,
                 )
                 continue
 
-            if event_id is None and prev_event_id is None:
+            if delta.event_id is None and delta.prev_event_id is None:
                 logger.error(
                     "event ID is None and so is the previous event ID. stream_id: %s",
-                    stream_id,
+                    delta.stream_id,
                 )
                 continue
 
             event_content: JsonDict = {}
 
-            if event_id is not None:
-                event = await self.store.get_event(event_id, allow_none=True)
+            if delta.event_id is not None:
+                event = await self.store.get_event(delta.event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
 
             # All the values in this dict are deltas (RELATIVE changes)
-            room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
+            room_stats_delta = room_to_stats_deltas.setdefault(delta.room_id, Counter())
 
-            room_state = room_to_state_updates.setdefault(room_id, {})
+            room_state = room_to_state_updates.setdefault(delta.room_id, {})
 
-            if prev_event_id is None:
+            if delta.prev_event_id is None:
                 # this state event doesn't overwrite another,
                 # so it is a new effective/current state event
                 room_stats_delta["current_state_events"] += 1
 
-            if typ == EventTypes.Member:
+            if delta.event_type == EventTypes.Member:
                 # we could use StateDeltasHandler._get_key_change here but it's
                 # a bit inefficient given we're not testing for a specific
                 # result; might as well just grab the prev_membership and
@@ -210,9 +210,9 @@ class StatsHandler:
                 # in the absence of a previous event because we do not want to
                 # reduce the leave count when a new-to-the-room user joins.
                 prev_membership = None
-                if prev_event_id is not None:
+                if delta.prev_event_id is not None:
                     prev_event = await self.store.get_event(
-                        prev_event_id, allow_none=True
+                        delta.prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
@@ -256,7 +256,7 @@ class StatsHandler:
                 else:
                     raise ValueError("%r is not a valid membership" % (membership,))
 
-                user_id = state_key
+                user_id = delta.state_key
                 if self.is_mine_id(user_id):
                     # this accounts for transitions like leave → ban and so on.
                     has_changed_joinedness = (prev_membership == Membership.JOIN) != (
@@ -272,30 +272,30 @@ class StatsHandler:
 
                         room_stats_delta["local_users_in_room"] += membership_delta
 
-            elif typ == EventTypes.Create:
+            elif delta.event_type == EventTypes.Create:
                 room_state["is_federatable"] = (
                     event_content.get(EventContentFields.FEDERATE, True) is True
                 )
                 room_type = event_content.get(EventContentFields.ROOM_TYPE)
                 if isinstance(room_type, str):
                     room_state["room_type"] = room_type
-            elif typ == EventTypes.JoinRules:
+            elif delta.event_type == EventTypes.JoinRules:
                 room_state["join_rules"] = event_content.get("join_rule")
-            elif typ == EventTypes.RoomHistoryVisibility:
+            elif delta.event_type == EventTypes.RoomHistoryVisibility:
                 room_state["history_visibility"] = event_content.get(
                     "history_visibility"
                 )
-            elif typ == EventTypes.RoomEncryption:
+            elif delta.event_type == EventTypes.RoomEncryption:
                 room_state["encryption"] = event_content.get("algorithm")
-            elif typ == EventTypes.Name:
+            elif delta.event_type == EventTypes.Name:
                 room_state["name"] = event_content.get("name")
-            elif typ == EventTypes.Topic:
+            elif delta.event_type == EventTypes.Topic:
                 room_state["topic"] = event_content.get("topic")
-            elif typ == EventTypes.RoomAvatar:
+            elif delta.event_type == EventTypes.RoomAvatar:
                 room_state["avatar"] = event_content.get("url")
-            elif typ == EventTypes.CanonicalAlias:
+            elif delta.event_type == EventTypes.CanonicalAlias:
                 room_state["canonical_alias"] = event_content.get("alias")
-            elif typ == EventTypes.GuestAccess:
+            elif delta.event_type == EventTypes.GuestAccess:
                 room_state["guest_access"] = event_content.get(
                     EventContentFields.GUEST_ACCESS
                 )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..60b4d95cd7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,7 +40,6 @@ from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
-from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging import issue9533_logger
 from synapse.logging.context import current_context
@@ -363,36 +362,15 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
-            # Fast path: delete a limited number of to-device messages up front.
-            # We do this to avoid the overhead of scheduling a task for every
-            # sync.
-            device_deletion_limit = 100
             deleted = await self.store.delete_messages_for_device(
                 sync_config.user.to_string(),
                 sync_config.device_id,
                 since_stream_id,
-                limit=device_deletion_limit,
             )
             logger.debug(
                 "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
-            # If we hit the limit, schedule a background task to delete the rest.
-            if deleted >= device_deletion_limit:
-                await self._task_scheduler.schedule_task(
-                    DELETE_DEVICE_MSGS_TASK_NAME,
-                    resource_id=sync_config.device_id,
-                    params={
-                        "user_id": sync_config.user.to_string(),
-                        "device_id": sync_config.device_id,
-                        "up_to_stream_id": since_stream_id,
-                    },
-                )
-                logger.debug(
-                    "Deletion of to-device messages up to %d scheduled",
-                    since_stream_id,
-                )
-
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -2333,7 +2311,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a0f5568000..75717ba4f9 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,7 +14,7 @@
 
 import logging
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple
 
 from twisted.internet.interfaces import IDelayedCall
 
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Memb
 from synapse.api.errors import Codes, SynapseError
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.databases.main.state_deltas import StateDelta
 from synapse.storage.databases.main.user_directory import SearchResult
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import UserID
@@ -247,32 +248,31 @@ class UserDirectoryHandler(StateDeltasHandler):
 
                 await self.store.update_user_directory_stream_pos(max_pos)
 
-    async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+    async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
         """Called with the state deltas to process"""
         for delta in deltas:
-            typ = delta["type"]
-            state_key = delta["state_key"]
-            room_id = delta["room_id"]
-            event_id: Optional[str] = delta["event_id"]
-            prev_event_id: Optional[str] = delta["prev_event_id"]
-
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug(
+                "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
+            )
 
             # For join rule and visibility changes we need to check if the room
             # may have become public or not and add/remove the users in said room
-            if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
+            if delta.event_type in (
+                EventTypes.RoomHistoryVisibility,
+                EventTypes.JoinRules,
+            ):
                 await self._handle_room_publicity_change(
-                    room_id, prev_event_id, event_id, typ
+                    delta.room_id, delta.prev_event_id, delta.event_id, delta.event_type
                 )
-            elif typ == EventTypes.Member:
+            elif delta.event_type == EventTypes.Member:
                 await self._handle_room_membership_event(
-                    room_id,
-                    prev_event_id,
-                    event_id,
-                    state_key,
+                    delta.room_id,
+                    delta.prev_event_id,
+                    delta.event_id,
+                    delta.state_key,
                 )
             else:
-                logger.debug("Ignoring irrelevant type: %r", typ)
+                logger.debug("Ignoring irrelevant type: %r", delta.event_type)
 
     async def _handle_room_publicity_change(
         self,