summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth/internal.py2
-rw-r--r--synapse/api/presence.py4
-rw-r--r--synapse/federation/federation_client.py4
-rw-r--r--synapse/federation/federation_server.py9
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/device.py1
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/initial_sync.py3
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/push_rules.py6
-rw-r--r--synapse/handlers/receipts.py25
-rw-r--r--synapse/handlers/room.py3
-rw-r--r--synapse/handlers/room_member.py45
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/media/_base.py50
-rw-r--r--synapse/media/media_repository.py13
-rw-r--r--synapse/notifier.py17
-rw-r--r--synapse/push/__init__.py4
-rw-r--r--synapse/push/emailpusher.py2
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/push/pusherpool.py12
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/commands.py29
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/federation.py8
-rw-r--r--synapse/rest/media/config_resource.py13
-rw-r--r--synapse/rest/media/download_resource.py40
-rw-r--r--synapse/rest/media/media_repository_resource.py33
-rw-r--r--synapse/rest/media/preview_url_resource.py26
-rw-r--r--synapse/rest/media/thumbnail_resource.py133
-rw-r--r--synapse/rest/media/upload_resource.py14
-rw-r--r--synapse/storage/database.py6
-rw-r--r--synapse/storage/databases/main/__init__.py20
-rw-r--r--synapse/storage/databases/main/account_data.py18
-rw-r--r--synapse/storage/databases/main/appservice.py29
-rw-r--r--synapse/storage/databases/main/devices.py12
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py9
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py22
-rw-r--r--synapse/storage/databases/main/events.py9
-rw-r--r--synapse/storage/databases/main/media_repository.py30
-rw-r--r--synapse/storage/databases/main/presence.py76
-rw-r--r--synapse/storage/databases/main/purge_events.py5
-rw-r--r--synapse/storage/databases/main/pusher.py121
-rw-r--r--synapse/storage/databases/main/receipts.py78
-rw-r--r--synapse/storage/databases/main/registration.py133
-rw-r--r--synapse/storage/databases/main/room.py42
-rw-r--r--synapse/storage/databases/main/roommember.py10
-rw-r--r--synapse/storage/databases/main/search.py20
-rw-r--r--synapse/storage/databases/main/stream.py22
-rw-r--r--synapse/storage/databases/main/task_scheduler.py44
-rw-r--r--synapse/storage/databases/main/transactions.py27
-rw-r--r--synapse/storage/schema/__init__.py4
-rw-r--r--synapse/storage/schema/main/delta/82/03_drop_old_tables.sql24
-rw-r--r--synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql20
-rw-r--r--synapse/streams/events.py15
-rw-r--r--synapse/types/__init__.py191
61 files changed, 879 insertions, 655 deletions
diff --git a/synapse/api/auth/internal.py b/synapse/api/auth/internal.py
index a75f6f2cc4..36ee9c8b8f 100644
--- a/synapse/api/auth/internal.py
+++ b/synapse/api/auth/internal.py
@@ -115,7 +115,7 @@ class InternalAuth(BaseAuth):
         Once get_user_by_req has set up the opentracing span, this does the actual work.
         """
         try:
-            ip_addr = request.getClientAddress().host
+            ip_addr = request.get_client_ip_if_available()
             user_agent = get_request_user_agent(request)
 
             access_token = self.get_access_token_from_request(request)
diff --git a/synapse/api/presence.py b/synapse/api/presence.py
index b78f419994..afef6712e1 100644
--- a/synapse/api/presence.py
+++ b/synapse/api/presence.py
@@ -80,10 +80,6 @@ class UserPresenceState:
     def as_dict(self) -> JsonDict:
         return attr.asdict(self)
 
-    @staticmethod
-    def from_dict(d: JsonDict) -> "UserPresenceState":
-        return UserPresenceState(**d)
-
     def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState":
         return attr.evolve(self, **kwargs)
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c8bc46415d..1a7fa175ec 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1402,7 +1402,7 @@ class FederationClient(FederationBase):
             The remote homeserver return some state from the room. The response
             dictionary is in the form:
 
-            {"knock_state_events": [<state event dict>, ...]}
+            {"knock_room_state": [<state event dict>, ...]}
 
             The list of state events may be empty.
 
@@ -1429,7 +1429,7 @@ class FederationClient(FederationBase):
             The remote homeserver can optionally return some state from the room. The response
             dictionary is in the form:
 
-            {"knock_state_events": [<state event dict>, ...]}
+            {"knock_room_state": [<state event dict>, ...]}
 
             The list of state events may be empty.
         """
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ec8e770430..6ac8d16095 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -850,14 +850,7 @@ class FederationServer(FederationBase):
                 context, self._room_prejoin_state_types
             )
         )
-        return {
-            "knock_room_state": stripped_room_state,
-            # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
-            # Thus, we also populate a 'knock_state_events' with the same content to
-            # support old instances.
-            # See https://github.com/matrix-org/synapse/issues/14088.
-            "knock_state_events": stripped_room_state,
-        }
+        return {"knock_room_state": stripped_room_state}
 
     async def _on_send_membership_event(
         self, origin: str, content: JsonDict, membership_type: str, room_id: str
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 6520795635..525968bcba 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow):
     @staticmethod
     def from_data(data: JsonDict) -> "PresenceDestinationsRow":
         return PresenceDestinationsRow(
-            state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
+            state=UserPresenceState(**data["state"]), destinations=data["dests"]
         )
 
     def to_data(self) -> JsonDict:
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fb20fd8a10..7b6b1da090 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -67,7 +67,7 @@ The loop continues so long as there is anything to send. At each iteration of th
 
 When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop*
 (`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`.
-(Only once the catch-up mode has been exited can the regular tranaction transmission behaviour
+(Only once the catch-up mode has been exited can the regular transaction transmission behaviour
 be resumed.)
 
 *Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index b5e4b2680e..fab4800717 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -431,7 +431,7 @@ class TransportLayerClient:
             The remote homeserver can optionally return some state from the room. The response
             dictionary is in the form:
 
-            {"knock_state_events": [<state event dict>, ...]}
+            {"knock_room_state": [<state event dict>, ...]}
 
             The list of state events may be empty.
         """
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ba9704a065..97fd1fd427 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -171,8 +171,8 @@ class AdminHandler:
             else:
                 stream_ordering = room.stream_ordering
 
-            from_key = RoomStreamToken(0, 0)
-            to_key = RoomStreamToken(None, stream_ordering)
+            from_key = RoomStreamToken(topological=0, stream=0)
+            to_key = RoomStreamToken(stream=stream_ordering)
 
             # Events that we've processed in this room
             written_events: Set[str] = set()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..c200a45f3a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -216,7 +216,7 @@ class ApplicationServicesHandler:
 
     def notify_interested_services_ephemeral(
         self,
-        stream_key: str,
+        stream_key: StreamKeyType,
         new_token: Union[int, RoomStreamToken],
         users: Collection[Union[str, UserID]],
     ) -> None:
@@ -326,7 +326,7 @@ class ApplicationServicesHandler:
     async def _notify_interested_services_ephemeral(
         self,
         services: List[ApplicationService],
-        stream_key: str,
+        stream_key: StreamKeyType,
         new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 86ad96d030..50df4f2b06 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler):
                     else:
                         assert max_stream_id == stream_id
                         # Avoid moving `room_id` backwards.
-                        pass
 
                     if self._handle_new_device_update_new_data:
                         continue
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 29cd45550a..9d72794e8b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -868,19 +868,10 @@ class FederationHandler:
         # This is a bit of a hack and is cribbing off of invites. Basically we
         # store the room state here and retrieve it again when this event appears
         # in the invitee's sync stream. It is stripped out for all other local users.
-        stripped_room_state = (
-            knock_response.get("knock_room_state")
-            # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
-            # Thus, we also check for a 'knock_state_events' to support old instances.
-            # See https://github.com/matrix-org/synapse/issues/14088.
-            or knock_response.get("knock_state_events")
-        )
+        stripped_room_state = knock_response.get("knock_room_state")
 
         if stripped_room_state is None:
-            raise KeyError(
-                "Missing 'knock_room_state' (or legacy 'knock_state_events') field in "
-                "send_knock response"
-            )
+            raise KeyError("Missing 'knock_room_state' field in send_knock response")
 
         event.unsigned["knock_room_state"] = stripped_room_state
 
@@ -1506,7 +1497,6 @@ class FederationHandler:
                     # in the meantime and context needs to be recomputed, so let's do so.
                     if i == max_retries - 1:
                         raise e
-                    pass
         else:
             destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
 
@@ -1582,7 +1572,6 @@ class FederationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
     async def add_display_name_to_third_party_invite(
         self,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 5737f8014d..c34bd7db95 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -192,8 +192,7 @@ class InitialSyncHandler:
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = RoomStreamToken(
-                        None,
-                        event.stream_ordering,
+                        stream=event.stream_ordering,
                     )
                     deferred_room_state = run_in_background(
                         self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 44dbbf81dd..8de4b8e816 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -693,13 +693,9 @@ class EventCreationHandler:
         if require_consent and not is_exempt:
             await self.assert_accepted_privacy_policy(requester)
 
-        # Save the access token ID, the device ID and the transaction ID in the event
-        # internal metadata. This is useful to determine if we should echo the
-        # transaction_id in events.
+        # Save the the device ID and the transaction ID in the event internal metadata.
+        # This is useful to determine if we should echo the transaction_id in events.
         # See `synapse.events.utils.EventClientSerializer.serialize_event`
-        if requester.access_token_id is not None:
-            builder.internal_metadata.token_id = requester.access_token_id
-
         if requester.device_id is not None:
             builder.internal_metadata.device_id = requester.device_id
 
@@ -1133,7 +1129,6 @@ class EventCreationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so must have a stream ordering
         assert ev.internal_metadata.stream_ordering
@@ -2038,7 +2033,6 @@ class EventCreationHandler:
                         # in the meantime and context needs to be recomputed, so let's do so.
                         if i == max_retries - 1:
                             raise e
-                        pass
                 return True
             except AuthError:
                 logger.info(
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 7ed88a3611..87b428ab1c 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.push_rule import RuleNotFoundException
 from synapse.synapse_rust.push import get_base_rule_ids
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -114,7 +114,9 @@ class PushRulesHandler:
             user_id: the user ID the change is for.
         """
         stream_id = self._main_store.get_max_push_rules_stream_id()
-        self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
+        self._notifier.on_new_event(
+            StreamKeyType.PUSH_RULES, stream_id, users=[user_id]
+        )
 
     async def push_rules_for_user(
         self, user: UserID
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler:
 
     async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
         """Takes a list of receipts, stores them and informs the notifier."""
-        min_batch_id: Optional[int] = None
-        max_batch_id: Optional[int] = None
 
+        receipts_persisted: List[ReadReceipt] = []
         for receipt in receipts:
-            res = await self.store.insert_receipt(
+            stream_id = await self.store.insert_receipt(
                 receipt.room_id,
                 receipt.receipt_type,
                 receipt.user_id,
@@ -143,30 +142,26 @@ class ReceiptsHandler:
                 receipt.data,
             )
 
-            if not res:
-                # res will be None if this receipt is 'old'
+            if stream_id is None:
+                # stream_id will be None if this receipt is 'old'
                 continue
 
-            stream_id, max_persisted_id = res
+            receipts_persisted.append(receipt)
 
-            if min_batch_id is None or stream_id < min_batch_id:
-                min_batch_id = stream_id
-            if max_batch_id is None or max_persisted_id > max_batch_id:
-                max_batch_id = max_persisted_id
-
-        # Either both of these should be None or neither.
-        if min_batch_id is None or max_batch_id is None:
+        if not receipts_persisted:
             # no new receipts
             return False
 
-        affected_room_ids = list({r.room_id for r in receipts})
+        max_batch_id = self.store.get_max_receipt_stream_id()
+
+        affected_room_ids = list({r.room_id for r in receipts_persisted})
 
         self.notifier.on_new_event(
             StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
         )
         # Note that the min here shouldn't be relied upon to be accurate.
         await self.hs.get_pusherpool().on_new_receipts(
-            min_batch_id, max_batch_id, affected_room_ids
+            {r.user_id for r in receipts_persisted}
         )
 
         return True
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a0c3b16819..97c9f01245 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -261,7 +261,6 @@ class RoomCreationHandler:
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # This is to satisfy mypy and should never happen
         raise PartialStateConflictError()
@@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
 
         if from_key.topological:
             logger.warning("Stream has topological part!!!! %r", from_key)
-            from_key = RoomStreamToken(None, from_key.stream)
+            from_key = RoomStreamToken(stream=from_key.stream)
 
         app_service = self.store.get_app_service_by_user_id(user.to_string())
         if app_service:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 90343c2306..130eee7e1d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         and persist a new event for the new membership change.
 
         Args:
-            requester:
-            target:
+            requester: User requesting the membership change, i.e. the sender of the
+                desired membership event.
+            target: Use whose membership should change, i.e. the state_key of the
+                desired membership event.
             room_id:
             membership:
 
@@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         Returns:
             Tuple of event ID and stream ordering position
         """
-
         user_id = target.to_string()
 
         if content is None:
@@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                     (EventTypes.Member, user_id), None
                 )
 
-                if event.membership == Membership.JOIN:
-                    newly_joined = True
-                    if prev_member_event_id:
-                        prev_member_event = await self.store.get_event(
-                            prev_member_event_id
-                        )
-                        newly_joined = prev_member_event.membership != Membership.JOIN
-
-                    # Only rate-limit if the user actually joined the room, otherwise we'll end
-                    # up blocking profile updates.
-                    if newly_joined and ratelimit:
-                        await self._join_rate_limiter_local.ratelimit(requester)
-                        await self._join_rate_per_room_limiter.ratelimit(
-                            requester, key=room_id, update=False
-                        )
                 with opentracing.start_active_span("handle_new_client_event"):
                     result_event = (
                         await self.event_creation_handler.handle_new_client_event(
@@ -514,7 +500,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so should have a stream ordering
         assert result_event.internal_metadata.stream_ordering
@@ -618,6 +603,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         Raises:
             ShadowBanError if a shadow-banned requester attempts to send an invite.
         """
+        if ratelimit:
+            if action == Membership.JOIN:
+                # Only rate-limit if the user isn't already joined to the room, otherwise
+                # we'll end up blocking profile updates.
+                (
+                    current_membership,
+                    _,
+                ) = await self.store.get_local_current_membership_for_user_in_room(
+                    requester.user.to_string(),
+                    room_id,
+                )
+                if current_membership != Membership.JOIN:
+                    await self._join_rate_limiter_local.ratelimit(requester)
+                    await self._join_rate_per_room_limiter.ratelimit(
+                        requester, key=room_id, update=False
+                    )
+            elif action == Membership.INVITE:
+                await self.ratelimit_invite(requester, room_id, target.to_string())
+
         if action == Membership.INVITE and requester.shadow_banned:
             # We randomly sleep a bit just to annoy the requester.
             await self.clock.sleep(random.randint(1, 10))
@@ -794,8 +798,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         if effective_membership_state == Membership.INVITE:
             target_id = target.to_string()
-            if ratelimit:
-                await self.ratelimit_invite(requester, room_id, target_id)
 
             # block any attempts to invite the server notices mxid
             if target_id == self._server_notices_mxid:
@@ -2002,7 +2004,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
                 # in the meantime and context needs to be recomputed, so let's do so.
                 if i == max_retries - 1:
                     raise e
-                pass
 
         # we know it was persisted, so must have a stream ordering
         assert result_event.internal_metadata.stream_ordering
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..744e080309 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2333,7 +2333,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 3bbf91298e..1e4e56f36b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -266,7 +266,7 @@ class HttpServer(Protocol):
     def register_paths(
         self,
         method: str,
-        path_patterns: Iterable[Pattern],
+        path_patterns: Iterable[Pattern[str]],
         callback: ServletCallback,
         servlet_classname: str,
     ) -> None:
diff --git a/synapse/media/_base.py b/synapse/media/_base.py
index 80c448de2b..13345acf75 100644
--- a/synapse/media/_base.py
+++ b/synapse/media/_base.py
@@ -26,11 +26,11 @@ from twisted.internet.interfaces import IConsumer
 from twisted.protocols.basic import FileSender
 from twisted.web.server import Request
 
-from synapse.api.errors import Codes, SynapseError, cs_error
+from synapse.api.errors import Codes, cs_error
 from synapse.http.server import finish_request, respond_with_json
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import make_deferred_yieldable
-from synapse.util.stringutils import is_ascii, parse_and_validate_server_name
+from synapse.util.stringutils import is_ascii
 
 logger = logging.getLogger(__name__)
 
@@ -84,52 +84,12 @@ INLINE_CONTENT_TYPES = [
 ]
 
 
-def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
-    """Parses the server name, media ID and optional file name from the request URI
-
-    Also performs some rough validation on the server name.
-
-    Args:
-        request: The `Request`.
-
-    Returns:
-        A tuple containing the parsed server name, media ID and optional file name.
-
-    Raises:
-        SynapseError(404): if parsing or validation fail for any reason
-    """
-    try:
-        # The type on postpath seems incorrect in Twisted 21.2.0.
-        postpath: List[bytes] = request.postpath  # type: ignore
-        assert postpath
-
-        # This allows users to append e.g. /test.png to the URL. Useful for
-        # clients that parse the URL to see content type.
-        server_name_bytes, media_id_bytes = postpath[:2]
-        server_name = server_name_bytes.decode("utf-8")
-        media_id = media_id_bytes.decode("utf8")
-
-        # Validate the server name, raising if invalid
-        parse_and_validate_server_name(server_name)
-
-        file_name = None
-        if len(postpath) > 2:
-            try:
-                file_name = urllib.parse.unquote(postpath[-1].decode("utf-8"))
-            except UnicodeDecodeError:
-                pass
-        return server_name, media_id, file_name
-    except Exception:
-        raise SynapseError(
-            404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN
-        )
-
-
 def respond_404(request: SynapseRequest) -> None:
+    assert request.path is not None
     respond_with_json(
         request,
         404,
-        cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND),
+        cs_error("Not found '%s'" % (request.path.decode(),), code=Codes.NOT_FOUND),
         send_cors=True,
     )
 
@@ -372,7 +332,7 @@ class ThumbnailInfo:
     # Content type of thumbnail, e.g. image/png
     type: str
     # The size of the media file, in bytes.
-    length: Optional[int] = None
+    length: int
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 1b7b014f9a..7fd46901f7 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -48,6 +48,7 @@ from synapse.media.filepath import MediaFilePaths
 from synapse.media.media_storage import MediaStorage
 from synapse.media.storage_provider import StorageProviderWrapper
 from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
+from synapse.media.url_previewer import UrlPreviewer
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID
 from synapse.util.async_helpers import Linearizer
@@ -114,7 +115,7 @@ class MediaRepository:
             )
             storage_providers.append(provider)
 
-        self.media_storage = MediaStorage(
+        self.media_storage: MediaStorage = MediaStorage(
             self.hs, self.primary_base_path, self.filepaths, storage_providers
         )
 
@@ -142,6 +143,13 @@ class MediaRepository:
                 MEDIA_RETENTION_CHECK_PERIOD_MS,
             )
 
+        if hs.config.media.url_preview_enabled:
+            self.url_previewer: Optional[UrlPreviewer] = UrlPreviewer(
+                hs, self, self.media_storage
+            )
+        else:
+            self.url_previewer = None
+
     def _start_update_recently_accessed(self) -> Deferred:
         return run_as_background_process(
             "update_recently_accessed_media", self._update_recently_accessed
@@ -616,6 +624,7 @@ class MediaRepository:
                         height=t_height,
                         method=t_method,
                         type=t_type,
+                        length=t_byte_source.tell(),
                     ),
                 )
 
@@ -686,6 +695,7 @@ class MediaRepository:
                         height=t_height,
                         method=t_method,
                         type=t_type,
+                        length=t_byte_source.tell(),
                     ),
                 )
 
@@ -831,6 +841,7 @@ class MediaRepository:
                         height=t_height,
                         method=t_method,
                         type=t_type,
+                        length=t_byte_source.tell(),
                     ),
                 )
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index fc39e5c963..99e7715896 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -126,7 +126,7 @@ class _NotifierUserStream:
 
     def notify(
         self,
-        stream_key: str,
+        stream_key: StreamKeyType,
         stream_id: Union[int, RoomStreamToken],
         time_now_ms: int,
     ) -> None:
@@ -454,7 +454,7 @@ class Notifier:
 
     def on_new_event(
         self,
-        stream_key: str,
+        stream_key: StreamKeyType,
         new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
         rooms: Optional[StrCollection] = None,
@@ -655,30 +655,29 @@ class Notifier:
             events: List[Union[JsonDict, EventBase]] = []
             end_token = from_token
 
-            for name, source in self.event_sources.sources.get_sources():
-                keyname = "%s_key" % name
-                before_id = getattr(before_token, keyname)
-                after_id = getattr(after_token, keyname)
+            for keyname, source in self.event_sources.sources.get_sources():
+                before_id = before_token.get_field(keyname)
+                after_id = after_token.get_field(keyname)
                 if before_id == after_id:
                     continue
 
                 new_events, new_key = await source.get_new_events(
                     user=user,
-                    from_key=getattr(from_token, keyname),
+                    from_key=from_token.get_field(keyname),
                     limit=limit,
                     is_guest=is_peeking,
                     room_ids=room_ids,
                     explicit_room_id=explicit_room_id,
                 )
 
-                if name == "room":
+                if keyname == StreamKeyType.ROOM:
                     new_events = await filter_events_for_client(
                         self._storage_controllers,
                         user.to_string(),
                         new_events,
                         is_peeking=is_peeking,
                     )
-                elif name == "presence":
+                elif keyname == StreamKeyType.PRESENCE:
                     now = self.clock.time_msec()
                     new_events[:] = [
                         {
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9e3a98741a..4d405f2a0c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -101,7 +101,7 @@ if TYPE_CHECKING:
 class PusherConfig:
     """Parameters necessary to configure a pusher."""
 
-    id: Optional[str]
+    id: Optional[int]
     user_name: str
 
     profile_tag: str
@@ -182,7 +182,7 @@ class Pusher(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
+    def on_new_receipts(self) -> None:
         raise NotImplementedError()
 
     @abc.abstractmethod
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 1710dd51b9..cf45fd09a8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -99,7 +99,7 @@ class EmailPusher(Pusher):
                 pass
             self.timed_call = None
 
-    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
+    def on_new_receipts(self) -> None:
         # We could wake up and cancel the timer but there tend to be quite a
         # lot of read receipts so it's probably less work to just let the
         # timer fire
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 50027680cb..725910a659 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -160,7 +160,7 @@ class HttpPusher(Pusher):
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
+    def on_new_receipts(self) -> None:
         # Note that the min here shouldn't be relied upon to be accurate.
 
         # We could check the receipts are actually m.read receipts here,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 6517e3566f..15a2cc932f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -292,20 +292,12 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
-    async def on_new_receipts(
-        self, min_stream_id: int, max_stream_id: int, affected_room_ids: Iterable[str]
-    ) -> None:
+    async def on_new_receipts(self, users_affected: StrCollection) -> None:
         if not self.pushers:
             # nothing to do here.
             return
 
         try:
-            # Need to subtract 1 from the minimum because the lower bound here
-            # is not inclusive
-            users_affected = await self.store.get_users_sent_receipts_between(
-                min_stream_id - 1, max_stream_id
-            )
-
             for u in users_affected:
                 # Don't push if the user account has expired
                 expired = await self._account_validity_handler.is_user_expired(u)
@@ -314,7 +306,7 @@ class PusherPool:
 
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_receipts(min_stream_id, max_stream_id)
+                        p.on_new_receipts()
 
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index f4f2b29e96..d5337fe588 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -129,9 +129,7 @@ class ReplicationDataHandler:
             self.notifier.on_new_event(
                 StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
             )
-            await self._pusher_pool.on_new_receipts(
-                token, token, {row.room_id for row in rows}
-            )
+            await self._pusher_pool.on_new_receipts({row.user_id for row in rows})
         elif stream_name == ToDeviceStream.NAME:
             entities = [row.entity for row in rows if row.entity.startswith("@")]
             if entities:
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index e616b5e1c8..0f0f851b79 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,7 +18,7 @@ allowed to be sent by which side.
 """
 import abc
 import logging
-from typing import Optional, Tuple, Type, TypeVar
+from typing import List, Optional, Tuple, Type, TypeVar
 
 from synapse.replication.tcp.streams._base import StreamRow
 from synapse.util import json_decoder, json_encoder
@@ -74,6 +74,8 @@ SC = TypeVar("SC", bound="_SimpleCommand")
 class _SimpleCommand(Command):
     """An implementation of Command whose argument is just a 'data' string."""
 
+    __slots__ = ["data"]
+
     def __init__(self, data: str):
         self.data = data
 
@@ -122,6 +124,8 @@ class RdataCommand(Command):
         RDATA presence master 59 ["@baz:example.com", "online", ...]
     """
 
+    __slots__ = ["stream_name", "instance_name", "token", "row"]
+
     NAME = "RDATA"
 
     def __init__(
@@ -179,6 +183,8 @@ class PositionCommand(Command):
     of the stream.
     """
 
+    __slots__ = ["stream_name", "instance_name", "prev_token", "new_token"]
+
     NAME = "POSITION"
 
     def __init__(
@@ -235,6 +241,8 @@ class ReplicateCommand(Command):
         REPLICATE
     """
 
+    __slots__: List[str] = []
+
     NAME = "REPLICATE"
 
     def __init__(self) -> None:
@@ -264,6 +272,8 @@ class UserSyncCommand(Command):
     Where <state> is either "start" or "end"
     """
 
+    __slots__ = ["instance_id", "user_id", "device_id", "is_syncing", "last_sync_ms"]
+
     NAME = "USER_SYNC"
 
     def __init__(
@@ -316,6 +326,8 @@ class ClearUserSyncsCommand(Command):
         CLEAR_USER_SYNC <instance_id>
     """
 
+    __slots__ = ["instance_id"]
+
     NAME = "CLEAR_USER_SYNC"
 
     def __init__(self, instance_id: str):
@@ -343,6 +355,8 @@ class FederationAckCommand(Command):
         FEDERATION_ACK <instance_name> <token>
     """
 
+    __slots__ = ["instance_name", "token"]
+
     NAME = "FEDERATION_ACK"
 
     def __init__(self, instance_name: str, token: int):
@@ -368,6 +382,15 @@ class UserIpCommand(Command):
         USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
     """
 
+    __slots__ = [
+        "user_id",
+        "access_token",
+        "ip",
+        "user_agent",
+        "device_id",
+        "last_seen",
+    ]
+
     NAME = "USER_IP"
 
     def __init__(
@@ -423,8 +446,6 @@ class RemoteServerUpCommand(_SimpleCommand):
     """Sent when a worker has detected that a remote server is no longer
     "down" and retry timings should be reset.
 
-    If sent from a client the server will relay to all other workers.
-
     Format::
 
         REMOTE_SERVER_UP <server>
@@ -441,6 +462,8 @@ class LockReleasedCommand(Command):
         LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"]
     """
 
+    __slots__ = ["instance_name", "lock_name", "lock_key"]
+
     NAME = "LOCK_RELEASED"
 
     def __init__(
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index e42dade246..9bd0d764f8 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -146,7 +146,7 @@ class PurgeHistoryRestServlet(RestServlet):
             # RoomStreamToken expects [int] not Optional[int]
             assert event.internal_metadata.stream_ordering is not None
             room_token = RoomStreamToken(
-                event.depth, event.internal_metadata.stream_ordering
+                topological=event.depth, stream=event.internal_metadata.stream_ordering
             )
             token = await room_token.to_string(self.store)
 
diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py
index e0ee55bd0e..8a617af599 100644
--- a/synapse/rest/admin/federation.py
+++ b/synapse/rest/admin/federation.py
@@ -198,7 +198,13 @@ class DestinationMembershipRestServlet(RestServlet):
         rooms, total = await self._store.get_destination_rooms_paginate(
             destination, start, limit, direction
         )
-        response = {"rooms": rooms, "total": total}
+        response = {
+            "rooms": [
+                {"room_id": room_id, "stream_ordering": stream_ordering}
+                for room_id, stream_ordering in rooms
+            ],
+            "total": total,
+        }
         if (start + limit) < total:
             response["next_token"] = str(start + len(rooms))
 
diff --git a/synapse/rest/media/config_resource.py b/synapse/rest/media/config_resource.py
index a95804d327..dbf5133c72 100644
--- a/synapse/rest/media/config_resource.py
+++ b/synapse/rest/media/config_resource.py
@@ -14,17 +14,19 @@
 # limitations under the License.
 #
 
+import re
 from typing import TYPE_CHECKING
 
-from synapse.http.server import DirectServeJsonResource, respond_with_json
+from synapse.http.server import respond_with_json
+from synapse.http.servlet import RestServlet
 from synapse.http.site import SynapseRequest
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class MediaConfigResource(DirectServeJsonResource):
-    isLeaf = True
+class MediaConfigResource(RestServlet):
+    PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/config$")]
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
@@ -33,9 +35,6 @@ class MediaConfigResource(DirectServeJsonResource):
         self.auth = hs.get_auth()
         self.limits_dict = {"m.upload.size": config.media.max_upload_size}
 
-    async def _async_render_GET(self, request: SynapseRequest) -> None:
+    async def on_GET(self, request: SynapseRequest) -> None:
         await self.auth.get_user_by_req(request)
         respond_with_json(request, 200, self.limits_dict, send_cors=True)
-
-    async def _async_render_OPTIONS(self, request: SynapseRequest) -> None:
-        respond_with_json(request, 200, {}, send_cors=True)
diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py
index 3c618ef60a..65b9ff52fa 100644
--- a/synapse/rest/media/download_resource.py
+++ b/synapse/rest/media/download_resource.py
@@ -13,16 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING
+import re
+from typing import TYPE_CHECKING, Optional
 
-from synapse.http.server import (
-    DirectServeJsonResource,
-    set_corp_headers,
-    set_cors_headers,
-)
-from synapse.http.servlet import parse_boolean
+from synapse.http.server import set_corp_headers, set_cors_headers
+from synapse.http.servlet import RestServlet, parse_boolean
 from synapse.http.site import SynapseRequest
-from synapse.media._base import parse_media_id, respond_404
+from synapse.media._base import respond_404
+from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
     from synapse.media.media_repository import MediaRepository
@@ -31,15 +29,28 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class DownloadResource(DirectServeJsonResource):
-    isLeaf = True
+class DownloadResource(RestServlet):
+    PATTERNS = [
+        re.compile(
+            "/_matrix/media/(r0|v3|v1)/download/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)(/(?P<file_name>[^/]*))?$"
+        )
+    ]
 
     def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"):
         super().__init__()
         self.media_repo = media_repo
         self._is_mine_server_name = hs.is_mine_server_name
 
-    async def _async_render_GET(self, request: SynapseRequest) -> None:
+    async def on_GET(
+        self,
+        request: SynapseRequest,
+        server_name: str,
+        media_id: str,
+        file_name: Optional[str] = None,
+    ) -> None:
+        # Validate the server name, raising if invalid
+        parse_and_validate_server_name(server_name)
+
         set_cors_headers(request)
         set_corp_headers(request)
         request.setHeader(
@@ -58,9 +69,8 @@ class DownloadResource(DirectServeJsonResource):
             b"Referrer-Policy",
             b"no-referrer",
         )
-        server_name, media_id, name = parse_media_id(request)
         if self._is_mine_server_name(server_name):
-            await self.media_repo.get_local_media(request, media_id, name)
+            await self.media_repo.get_local_media(request, media_id, file_name)
         else:
             allow_remote = parse_boolean(request, "allow_remote", default=True)
             if not allow_remote:
@@ -72,4 +82,6 @@ class DownloadResource(DirectServeJsonResource):
                 respond_404(request)
                 return
 
-            await self.media_repo.get_remote_media(request, server_name, media_id, name)
+            await self.media_repo.get_remote_media(
+                request, server_name, media_id, file_name
+            )
diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py
index 5ebaa3b032..2089bb1029 100644
--- a/synapse/rest/media/media_repository_resource.py
+++ b/synapse/rest/media/media_repository_resource.py
@@ -15,7 +15,7 @@
 from typing import TYPE_CHECKING
 
 from synapse.config._base import ConfigError
-from synapse.http.server import UnrecognizedRequestResource
+from synapse.http.server import HttpServer, JsonResource
 
 from .config_resource import MediaConfigResource
 from .download_resource import DownloadResource
@@ -27,7 +27,7 @@ if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class MediaRepositoryResource(UnrecognizedRequestResource):
+class MediaRepositoryResource(JsonResource):
     """File uploading and downloading.
 
     Uploads are POSTed to a resource which returns a token which is used to GET
@@ -70,6 +70,11 @@ class MediaRepositoryResource(UnrecognizedRequestResource):
     width and height are close to the requested size and the aspect matches
     the requested size. The client should scale the image if it needs to fit
     within a given rectangle.
+
+    This gets mounted at various points under /_matrix/media, including:
+       * /_matrix/media/r0
+       * /_matrix/media/v1
+       * /_matrix/media/v3
     """
 
     def __init__(self, hs: "HomeServer"):
@@ -77,17 +82,23 @@ class MediaRepositoryResource(UnrecognizedRequestResource):
         if not hs.config.media.can_load_media_repo:
             raise ConfigError("Synapse is not configured to use a media repo.")
 
-        super().__init__()
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(self, hs)
+
+    @staticmethod
+    def register_servlets(http_server: HttpServer, hs: "HomeServer") -> None:
         media_repo = hs.get_media_repository()
 
-        self.putChild(b"upload", UploadResource(hs, media_repo))
-        self.putChild(b"download", DownloadResource(hs, media_repo))
-        self.putChild(
-            b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage)
+        # Note that many of these should not exist as v1 endpoints, but empirically
+        # a lot of traffic still goes to them.
+
+        UploadResource(hs, media_repo).register(http_server)
+        DownloadResource(hs, media_repo).register(http_server)
+        ThumbnailResource(hs, media_repo, media_repo.media_storage).register(
+            http_server
         )
         if hs.config.media.url_preview_enabled:
-            self.putChild(
-                b"preview_url",
-                PreviewUrlResource(hs, media_repo, media_repo.media_storage),
+            PreviewUrlResource(hs, media_repo, media_repo.media_storage).register(
+                http_server
             )
-        self.putChild(b"config", MediaConfigResource(hs))
+        MediaConfigResource(hs).register(http_server)
diff --git a/synapse/rest/media/preview_url_resource.py b/synapse/rest/media/preview_url_resource.py
index 58513c4be4..c8acb65dca 100644
--- a/synapse/rest/media/preview_url_resource.py
+++ b/synapse/rest/media/preview_url_resource.py
@@ -13,24 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import re
 from typing import TYPE_CHECKING
 
-from synapse.http.server import (
-    DirectServeJsonResource,
-    respond_with_json,
-    respond_with_json_bytes,
-)
-from synapse.http.servlet import parse_integer, parse_string
+from synapse.http.server import respond_with_json_bytes
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
 from synapse.media.media_storage import MediaStorage
-from synapse.media.url_previewer import UrlPreviewer
 
 if TYPE_CHECKING:
     from synapse.media.media_repository import MediaRepository
     from synapse.server import HomeServer
 
 
-class PreviewUrlResource(DirectServeJsonResource):
+class PreviewUrlResource(RestServlet):
     """
     The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API
     for URLs which outputs Open Graph (https://ogp.me/) responses (with some Matrix
@@ -48,7 +44,7 @@ class PreviewUrlResource(DirectServeJsonResource):
       * Matrix cannot be used to distribute the metadata between homeservers.
     """
 
-    isLeaf = True
+    PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/preview_url$")]
 
     def __init__(
         self,
@@ -62,14 +58,10 @@ class PreviewUrlResource(DirectServeJsonResource):
         self.clock = hs.get_clock()
         self.media_repo = media_repo
         self.media_storage = media_storage
+        assert self.media_repo.url_previewer is not None
+        self.url_previewer = self.media_repo.url_previewer
 
-        self._url_previewer = UrlPreviewer(hs, media_repo, media_storage)
-
-    async def _async_render_OPTIONS(self, request: SynapseRequest) -> None:
-        request.setHeader(b"Allow", b"OPTIONS, GET")
-        respond_with_json(request, 200, {}, send_cors=True)
-
-    async def _async_render_GET(self, request: SynapseRequest) -> None:
+    async def on_GET(self, request: SynapseRequest) -> None:
         # XXX: if get_user_by_req fails, what should we do in an async render?
         requester = await self.auth.get_user_by_req(request)
         url = parse_string(request, "url", required=True)
@@ -77,5 +69,5 @@ class PreviewUrlResource(DirectServeJsonResource):
         if ts is None:
             ts = self.clock.time_msec()
 
-        og = await self._url_previewer.preview(url, requester.user, ts)
+        og = await self.url_previewer.preview(url, requester.user, ts)
         respond_with_json_bytes(request, 200, og, send_cors=True)
diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py
index 661e604b85..85b6bdbe72 100644
--- a/synapse/rest/media/thumbnail_resource.py
+++ b/synapse/rest/media/thumbnail_resource.py
@@ -13,29 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+import re
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from synapse.api.errors import Codes, SynapseError, cs_error
 from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
-from synapse.http.server import (
-    DirectServeJsonResource,
-    respond_with_json,
-    set_corp_headers,
-    set_cors_headers,
-)
-from synapse.http.servlet import parse_integer, parse_string
+from synapse.http.server import respond_with_json, set_corp_headers, set_cors_headers
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
 from synapse.media._base import (
     FileInfo,
     ThumbnailInfo,
-    parse_media_id,
     respond_404,
     respond_with_file,
     respond_with_responder,
 )
 from synapse.media.media_storage import MediaStorage
+from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
     from synapse.media.media_repository import MediaRepository
@@ -44,8 +39,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class ThumbnailResource(DirectServeJsonResource):
-    isLeaf = True
+class ThumbnailResource(RestServlet):
+    PATTERNS = [
+        re.compile(
+            "/_matrix/media/(r0|v3|v1)/thumbnail/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$"
+        )
+    ]
 
     def __init__(
         self,
@@ -60,12 +59,17 @@ class ThumbnailResource(DirectServeJsonResource):
         self.media_storage = media_storage
         self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails
         self._is_mine_server_name = hs.is_mine_server_name
+        self._server_name = hs.hostname
         self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from
 
-    async def _async_render_GET(self, request: SynapseRequest) -> None:
+    async def on_GET(
+        self, request: SynapseRequest, server_name: str, media_id: str
+    ) -> None:
+        # Validate the server name, raising if invalid
+        parse_and_validate_server_name(server_name)
+
         set_cors_headers(request)
         set_corp_headers(request)
-        server_name, media_id, _ = parse_media_id(request)
         width = parse_integer(request, "width", required=True)
         height = parse_integer(request, "height", required=True)
         method = parse_string(request, "method", "scale")
@@ -155,30 +159,24 @@ class ThumbnailResource(DirectServeJsonResource):
 
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         for info in thumbnail_infos:
-            t_w = info["thumbnail_width"] == desired_width
-            t_h = info["thumbnail_height"] == desired_height
-            t_method = info["thumbnail_method"] == desired_method
-            t_type = info["thumbnail_type"] == desired_type
+            t_w = info.width == desired_width
+            t_h = info.height == desired_height
+            t_method = info.method == desired_method
+            t_type = info.type == desired_type
 
             if t_w and t_h and t_method and t_type:
                 file_info = FileInfo(
                     server_name=None,
                     file_id=media_id,
                     url_cache=media_info["url_cache"],
-                    thumbnail=ThumbnailInfo(
-                        width=info["thumbnail_width"],
-                        height=info["thumbnail_height"],
-                        type=info["thumbnail_type"],
-                        method=info["thumbnail_method"],
-                    ),
+                    thumbnail=info,
                 )
 
-                t_type = file_info.thumbnail_type
-                t_length = info["thumbnail_length"]
-
                 responder = await self.media_storage.fetch_media(file_info)
                 if responder:
-                    await respond_with_responder(request, responder, t_type, t_length)
+                    await respond_with_responder(
+                        request, responder, info.type, info.length
+                    )
                     return
 
         logger.debug("We don't have a thumbnail of that size. Generating")
@@ -218,29 +216,23 @@ class ThumbnailResource(DirectServeJsonResource):
         file_id = media_info["filesystem_id"]
 
         for info in thumbnail_infos:
-            t_w = info["thumbnail_width"] == desired_width
-            t_h = info["thumbnail_height"] == desired_height
-            t_method = info["thumbnail_method"] == desired_method
-            t_type = info["thumbnail_type"] == desired_type
+            t_w = info.width == desired_width
+            t_h = info.height == desired_height
+            t_method = info.method == desired_method
+            t_type = info.type == desired_type
 
             if t_w and t_h and t_method and t_type:
                 file_info = FileInfo(
                     server_name=server_name,
                     file_id=media_info["filesystem_id"],
-                    thumbnail=ThumbnailInfo(
-                        width=info["thumbnail_width"],
-                        height=info["thumbnail_height"],
-                        type=info["thumbnail_type"],
-                        method=info["thumbnail_method"],
-                    ),
+                    thumbnail=info,
                 )
 
-                t_type = file_info.thumbnail_type
-                t_length = info["thumbnail_length"]
-
                 responder = await self.media_storage.fetch_media(file_info)
                 if responder:
-                    await respond_with_responder(request, responder, t_type, t_length)
+                    await respond_with_responder(
+                        request, responder, info.type, info.length
+                    )
                     return
 
         logger.debug("We don't have a thumbnail of that size. Generating")
@@ -300,7 +292,7 @@ class ThumbnailResource(DirectServeJsonResource):
         desired_height: int,
         desired_method: str,
         desired_type: str,
-        thumbnail_infos: List[Dict[str, Any]],
+        thumbnail_infos: List[ThumbnailInfo],
         media_id: str,
         file_id: str,
         url_cache: bool,
@@ -315,7 +307,7 @@ class ThumbnailResource(DirectServeJsonResource):
             desired_height: The desired height, the returned thumbnail may be larger than this.
             desired_method: The desired method used to generate the thumbnail.
             desired_type: The desired content-type of the thumbnail.
-            thumbnail_infos: A list of dictionaries of candidate thumbnails.
+            thumbnail_infos: A list of thumbnail info of candidate thumbnails.
             file_id: The ID of the media that a thumbnail is being requested for.
             url_cache: True if this is from a URL cache.
             server_name: The server name, if this is a remote thumbnail.
@@ -418,13 +410,14 @@ class ThumbnailResource(DirectServeJsonResource):
             # `dynamic_thumbnails` is disabled.
             logger.info("Failed to find any generated thumbnails")
 
+            assert request.path is not None
             respond_with_json(
                 request,
                 400,
                 cs_error(
-                    "Cannot find any thumbnails for the requested media (%r). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)"
+                    "Cannot find any thumbnails for the requested media ('%s'). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)"
                     % (
-                        request.postpath,
+                        request.path.decode(),
                         ", ".join(THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP.keys()),
                     ),
                     code=Codes.UNKNOWN,
@@ -438,7 +431,7 @@ class ThumbnailResource(DirectServeJsonResource):
         desired_height: int,
         desired_method: str,
         desired_type: str,
-        thumbnail_infos: List[Dict[str, Any]],
+        thumbnail_infos: List[ThumbnailInfo],
         file_id: str,
         url_cache: bool,
         server_name: Optional[str],
@@ -451,7 +444,7 @@ class ThumbnailResource(DirectServeJsonResource):
             desired_height: The desired height, the returned thumbnail may be larger than this.
             desired_method: The desired method used to generate the thumbnail.
             desired_type: The desired content-type of the thumbnail.
-            thumbnail_infos: A list of dictionaries of candidate thumbnails.
+            thumbnail_infos: A list of thumbnail infos of candidate thumbnails.
             file_id: The ID of the media that a thumbnail is being requested for.
             url_cache: True if this is from a URL cache.
             server_name: The server name, if this is a remote thumbnail.
@@ -469,21 +462,25 @@ class ThumbnailResource(DirectServeJsonResource):
 
         if desired_method == "crop":
             # Thumbnails that match equal or larger sizes of desired width/height.
-            crop_info_list: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = []
+            crop_info_list: List[
+                Tuple[int, int, int, bool, Optional[int], ThumbnailInfo]
+            ] = []
             # Other thumbnails.
-            crop_info_list2: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = []
+            crop_info_list2: List[
+                Tuple[int, int, int, bool, Optional[int], ThumbnailInfo]
+            ] = []
             for info in thumbnail_infos:
                 # Skip thumbnails generated with different methods.
-                if info["thumbnail_method"] != "crop":
+                if info.method != "crop":
                     continue
 
-                t_w = info["thumbnail_width"]
-                t_h = info["thumbnail_height"]
+                t_w = info.width
+                t_h = info.height
                 aspect_quality = abs(d_w * t_h - d_h * t_w)
                 min_quality = 0 if d_w <= t_w and d_h <= t_h else 1
                 size_quality = abs((d_w - t_w) * (d_h - t_h))
-                type_quality = desired_type != info["thumbnail_type"]
-                length_quality = info["thumbnail_length"]
+                type_quality = desired_type != info.type
+                length_quality = info.length
                 if t_w >= d_w or t_h >= d_h:
                     crop_info_list.append(
                         (
@@ -508,7 +505,7 @@ class ThumbnailResource(DirectServeJsonResource):
                     )
             # Pick the most appropriate thumbnail. Some values of `desired_width` and
             # `desired_height` may result in a tie, in which case we avoid comparing on
-            # the thumbnail info dictionary and pick the thumbnail that appears earlier
+            # the thumbnail info and pick the thumbnail that appears earlier
             # in the list of candidates.
             if crop_info_list:
                 thumbnail_info = min(crop_info_list, key=lambda t: t[:-1])[-1]
@@ -516,20 +513,20 @@ class ThumbnailResource(DirectServeJsonResource):
                 thumbnail_info = min(crop_info_list2, key=lambda t: t[:-1])[-1]
         elif desired_method == "scale":
             # Thumbnails that match equal or larger sizes of desired width/height.
-            info_list: List[Tuple[int, bool, int, Dict[str, Any]]] = []
+            info_list: List[Tuple[int, bool, int, ThumbnailInfo]] = []
             # Other thumbnails.
-            info_list2: List[Tuple[int, bool, int, Dict[str, Any]]] = []
+            info_list2: List[Tuple[int, bool, int, ThumbnailInfo]] = []
 
             for info in thumbnail_infos:
                 # Skip thumbnails generated with different methods.
-                if info["thumbnail_method"] != "scale":
+                if info.method != "scale":
                     continue
 
-                t_w = info["thumbnail_width"]
-                t_h = info["thumbnail_height"]
+                t_w = info.width
+                t_h = info.height
                 size_quality = abs((d_w - t_w) * (d_h - t_h))
-                type_quality = desired_type != info["thumbnail_type"]
-                length_quality = info["thumbnail_length"]
+                type_quality = desired_type != info.type
+                length_quality = info.length
                 if t_w >= d_w or t_h >= d_h:
                     info_list.append((size_quality, type_quality, length_quality, info))
                 else:
@@ -538,7 +535,7 @@ class ThumbnailResource(DirectServeJsonResource):
                     )
             # Pick the most appropriate thumbnail. Some values of `desired_width` and
             # `desired_height` may result in a tie, in which case we avoid comparing on
-            # the thumbnail info dictionary and pick the thumbnail that appears earlier
+            # the thumbnail info and pick the thumbnail that appears earlier
             # in the list of candidates.
             if info_list:
                 thumbnail_info = min(info_list, key=lambda t: t[:-1])[-1]
@@ -550,13 +547,7 @@ class ThumbnailResource(DirectServeJsonResource):
                 file_id=file_id,
                 url_cache=url_cache,
                 server_name=server_name,
-                thumbnail=ThumbnailInfo(
-                    width=thumbnail_info["thumbnail_width"],
-                    height=thumbnail_info["thumbnail_height"],
-                    type=thumbnail_info["thumbnail_type"],
-                    method=thumbnail_info["thumbnail_method"],
-                    length=thumbnail_info["thumbnail_length"],
-                ),
+                thumbnail=thumbnail_info,
             )
 
         # No matching thumbnail was found.
diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py
index 043e8d6077..949326d85d 100644
--- a/synapse/rest/media/upload_resource.py
+++ b/synapse/rest/media/upload_resource.py
@@ -14,11 +14,12 @@
 # limitations under the License.
 
 import logging
+import re
 from typing import IO, TYPE_CHECKING, Dict, List, Optional
 
 from synapse.api.errors import Codes, SynapseError
-from synapse.http.server import DirectServeJsonResource, respond_with_json
-from synapse.http.servlet import parse_bytes_from_args
+from synapse.http.server import respond_with_json
+from synapse.http.servlet import RestServlet, parse_bytes_from_args
 from synapse.http.site import SynapseRequest
 from synapse.media.media_storage import SpamMediaException
 
@@ -29,8 +30,8 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class UploadResource(DirectServeJsonResource):
-    isLeaf = True
+class UploadResource(RestServlet):
+    PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")]
 
     def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"):
         super().__init__()
@@ -43,10 +44,7 @@ class UploadResource(DirectServeJsonResource):
         self.max_upload_size = hs.config.media.max_upload_size
         self.clock = hs.get_clock()
 
-    async def _async_render_OPTIONS(self, request: SynapseRequest) -> None:
-        respond_with_json(request, 200, {}, send_cors=True)
-
-    async def _async_render_POST(self, request: SynapseRequest) -> None:
+    async def on_POST(self, request: SynapseRequest) -> None:
         requester = await self.auth.get_user_by_req(request)
         raw_content_length = request.getHeader("Content-Length")
         if raw_content_length is None:
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ca894edd5a..7d8af5c610 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -2418,7 +2418,7 @@ class DatabasePool:
         keyvalues: Optional[Dict[str, Any]] = None,
         exclude_keyvalues: Optional[Dict[str, Any]] = None,
         order_direction: str = "ASC",
-    ) -> List[Dict[str, Any]]:
+    ) -> List[Tuple[Any, ...]]:
         """
         Executes a SELECT query on the named table with start and limit,
         of row numbers, which may return zero or number of rows from start to limit,
@@ -2447,7 +2447,7 @@ class DatabasePool:
             order_direction: Whether the results should be ordered "ASC" or "DESC".
 
         Returns:
-            The result as a list of dictionaries.
+            The result as a list of tuples.
         """
         if order_direction not in ["ASC", "DESC"]:
             raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
@@ -2474,7 +2474,7 @@ class DatabasePool:
         )
         txn.execute(sql, arg_list + [limit, start])
 
-        return cls.cursor_to_dict(txn)
+        return txn.fetchall()
 
     async def simple_search_list(
         self,
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 101403578c..dfcbf0a175 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -142,26 +142,6 @@ class DataStore(
 
         super().__init__(database, db_conn, hs)
 
-    async def get_users(self) -> List[JsonDict]:
-        """Function to retrieve a list of users in users table.
-
-        Returns:
-            A list of dictionaries representing users.
-        """
-        return await self.db_pool.simple_select_list(
-            table="users",
-            keyvalues={},
-            retcols=[
-                "name",
-                "password_hash",
-                "is_guest",
-                "admin",
-                "user_type",
-                "deactivated",
-            ],
-            desc="get_users",
-        )
-
     async def get_users_paginate(
         self,
         start: int,
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 80f146dd53..39498d52c6 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -103,6 +103,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
             "AccountDataAndTagsChangeCache", account_max
         )
 
+        self.db_pool.updates.register_background_index_update(
+            update_name="room_account_data_index_room_id",
+            index_name="room_account_data_room_id",
+            table="room_account_data",
+            columns=("room_id",),
+        )
+
         self.db_pool.updates.register_background_update_handler(
             "delete_account_data_for_deactivated_users",
             self._delete_account_data_for_deactivated_users,
@@ -151,10 +158,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
                 sql += " AND content != '{}'"
 
             txn.execute(sql, (user_id,))
-            rows = self.db_pool.cursor_to_dict(txn)
 
             return {
-                row["account_data_type"]: db_to_json(row["content"]) for row in rows
+                account_data_type: db_to_json(content)
+                for account_data_type, content in txn
             }
 
         return await self.db_pool.runInteraction(
@@ -196,13 +203,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
                 sql += " AND content != '{}'"
 
             txn.execute(sql, (user_id,))
-            rows = self.db_pool.cursor_to_dict(txn)
 
             by_room: Dict[str, Dict[str, JsonDict]] = {}
-            for row in rows:
-                room_data = by_room.setdefault(row["room_id"], {})
+            for room_id, account_data_type, content in txn:
+                room_data = by_room.setdefault(room_id, {})
 
-                room_data[row["account_data_type"]] = db_to_json(row["content"])
+                room_data[account_data_type] = db_to_json(content)
 
             return by_room
 
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 0553a0621a..073a99cd84 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -14,17 +14,7 @@
 # limitations under the License.
 import logging
 import re
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    Dict,
-    List,
-    Optional,
-    Pattern,
-    Sequence,
-    Tuple,
-    cast,
-)
+from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast
 
 from synapse.appservice import (
     ApplicationService,
@@ -353,21 +343,15 @@ class ApplicationServiceTransactionWorkerStore(
 
         def _get_oldest_unsent_txn(
             txn: LoggingTransaction,
-        ) -> Optional[Dict[str, Any]]:
+        ) -> Optional[Tuple[int, str]]:
             # Monotonically increasing txn ids, so just select the smallest
             # one in the txns table (we delete them when they are sent)
             txn.execute(
-                "SELECT * FROM application_services_txns WHERE as_id=?"
+                "SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?"
                 " ORDER BY txn_id ASC LIMIT 1",
                 (service.id,),
             )
-            rows = self.db_pool.cursor_to_dict(txn)
-            if not rows:
-                return None
-
-            entry = rows[0]
-
-            return entry
+            return cast(Optional[Tuple[int, str]], txn.fetchone())
 
         entry = await self.db_pool.runInteraction(
             "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
@@ -376,8 +360,9 @@ class ApplicationServiceTransactionWorkerStore(
         if not entry:
             return None
 
-        event_ids = db_to_json(entry["event_ids"])
+        txn_id, event_ids_str = entry
 
+        event_ids = db_to_json(event_ids_str)
         events = await self.get_events_as_list(event_ids)
 
         # TODO: to-device messages, one-time key counts, device list summaries and unused
@@ -385,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore(
         #       We likely want to populate those for reliability.
         return AppServiceTransaction(
             service=service,
-            id=entry["txn_id"],
+            id=txn_id,
             events=events,
             ephemeral=[],
             to_device_messages=[],
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index df596f35f9..9f3804a504 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1413,13 +1413,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
 
         def get_devices_not_accessed_since_txn(
             txn: LoggingTransaction,
-        ) -> List[Dict[str, str]]:
+        ) -> List[Tuple[str, str]]:
             sql = """
                 SELECT user_id, device_id
                 FROM devices WHERE last_seen < ? AND hidden = FALSE
             """
             txn.execute(sql, (since_ms,))
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(List[Tuple[str, str]], txn.fetchall())
 
         rows = await self.db_pool.runInteraction(
             "get_devices_not_accessed_since",
@@ -1427,11 +1427,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
         )
 
         devices: Dict[str, List[str]] = {}
-        for row in rows:
+        for user_id, device_id in rows:
             # Remote devices are never stale from our point of view.
-            if self.hs.is_mine_id(row["user_id"]):
-                user_devices = devices.setdefault(row["user_id"], [])
-                user_devices.append(row["device_id"])
+            if self.hs.is_mine_id(user_id):
+                user_devices = devices.setdefault(user_id, [])
+                user_devices.append(device_id)
 
         return devices
 
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index d01f28cc80..aac4cfb054 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -53,6 +53,13 @@ class EndToEndRoomKeyBackgroundStore(SQLBaseStore):
     ):
         super().__init__(database, db_conn, hs)
 
+        self.db_pool.updates.register_background_index_update(
+            update_name="e2e_room_keys_index_room_id",
+            index_name="e2e_room_keys_room_id",
+            table="e2e_room_keys",
+            columns=("room_id",),
+        )
+
         self.db_pool.updates.register_background_update_handler(
             "delete_e2e_backup_keys_for_deactivated_users",
             self._delete_e2e_backup_keys_for_deactivated_users,
@@ -208,7 +215,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore):
                     "message": "Set room key",
                     "room_id": room_id,
                     "session_id": session_id,
-                    StreamKeyType.ROOM: room_key,
+                    StreamKeyType.ROOM.value: room_key,
                 }
             )
 
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 89fac23f93..749ae54e20 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -921,14 +921,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
                 }
 
             txn.execute(sql, params)
-            rows = self.db_pool.cursor_to_dict(txn)
 
-            for row in rows:
-                user_id = row["user_id"]
-                key_type = row["keytype"]
-                key = db_to_json(row["keydata"])
+            for user_id, key_type, key_data, _ in txn:
                 user_keys = result.setdefault(user_id, {})
-                user_keys[key_type] = key
+                user_keys[key_type] = db_to_json(key_data)
 
         return result
 
@@ -988,13 +984,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
                 query_params.extend(item)
 
             txn.execute(sql, query_params)
-            rows = self.db_pool.cursor_to_dict(txn)
 
             # and add the signatures to the appropriate keys
-            for row in rows:
-                key_id: str = row["key_id"]
-                target_user_id: str = row["target_user_id"]
-                target_device_id: str = row["target_device_id"]
+            for target_user_id, target_device_id, key_id, signature in txn:
                 key_type = devices[(target_user_id, target_device_id)]
                 # We need to copy everything, because the result may have come
                 # from the cache.  dict.copy only does a shallow copy, so we
@@ -1012,13 +1004,11 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
                     ].copy()
                     if from_user_id in signatures:
                         user_sigs = signatures[from_user_id] = signatures[from_user_id]
-                        user_sigs[key_id] = row["signature"]
+                        user_sigs[key_id] = signature
                     else:
-                        signatures[from_user_id] = {key_id: row["signature"]}
+                        signatures[from_user_id] = {key_id: signature}
                 else:
-                    target_user_key["signatures"] = {
-                        from_user_id: {key_id: row["signature"]}
-                    }
+                    target_user_key["signatures"] = {from_user_id: {key_id: signature}}
 
         return keys
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 790d058c43..d4dcdb898c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1654,8 +1654,6 @@ class PersistEventsStore:
     ) -> None:
         to_prefill = []
 
-        rows = []
-
         ev_map = {e.event_id: e for e, _ in events_and_contexts}
         if not ev_map:
             return
@@ -1676,10 +1674,9 @@ class PersistEventsStore:
         )
 
         txn.execute(sql + clause, args)
-        rows = self.db_pool.cursor_to_dict(txn)
-        for row in rows:
-            event = ev_map[row["event_id"]]
-            if not row["rejects"] and not row["redacts"]:
+        for event_id, redacts, rejects in txn:
+            event = ev_map[event_id]
+            if not rejects and not redacts:
                 to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
 
         async def external_prefill() -> None:
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 8cebeb5189..2e6b176bd2 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -28,6 +28,7 @@ from typing import (
 
 from synapse.api.constants import Direction
 from synapse.logging.opentracing import trace
+from synapse.media._base import ThumbnailInfo
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="store_url_cache",
         )
 
-    async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]:
-        return await self.db_pool.simple_select_list(
+    async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]:
+        rows = await self.db_pool.simple_select_list(
             "local_media_repository_thumbnails",
             {"media_id": media_id},
             (
@@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             ),
             desc="get_local_media_thumbnails",
         )
+        return [
+            ThumbnailInfo(
+                width=row["thumbnail_width"],
+                height=row["thumbnail_height"],
+                method=row["thumbnail_method"],
+                type=row["thumbnail_type"],
+                length=row["thumbnail_length"],
+            )
+            for row in rows
+        ]
 
     @trace
     async def store_local_thumbnail(
@@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
 
     async def get_remote_media_thumbnails(
         self, origin: str, media_id: str
-    ) -> List[Dict[str, Any]]:
-        return await self.db_pool.simple_select_list(
+    ) -> List[ThumbnailInfo]:
+        rows = await self.db_pool.simple_select_list(
             "remote_media_cache_thumbnails",
             {"media_origin": origin, "media_id": media_id},
             (
@@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "thumbnail_method",
                 "thumbnail_type",
                 "thumbnail_length",
-                "filesystem_id",
             ),
             desc="get_remote_media_thumbnails",
         )
+        return [
+            ThumbnailInfo(
+                width=row["thumbnail_width"],
+                height=row["thumbnail_height"],
+                method=row["thumbnail_method"],
+                type=row["thumbnail_type"],
+                length=row["thumbnail_length"],
+            )
+            for row in rows
+        ]
 
     @trace
     async def get_remote_media_thumbnail(
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 194b4e031f..519f05fb60 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -20,6 +20,7 @@ from typing import (
     Mapping,
     Optional,
     Tuple,
+    Union,
     cast,
 )
 
@@ -385,28 +386,47 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
         limit = 100
         offset = 0
         while True:
-            rows = await self.db_pool.runInteraction(
-                "get_presence_for_all_users",
-                self.db_pool.simple_select_list_paginate_txn,
-                "presence_stream",
-                orderby="stream_id",
-                start=offset,
-                limit=limit,
-                exclude_keyvalues=exclude_keyvalues,
-                retcols=(
-                    "user_id",
-                    "state",
-                    "last_active_ts",
-                    "last_federation_update_ts",
-                    "last_user_sync_ts",
-                    "status_msg",
-                    "currently_active",
+            rows = cast(
+                List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]],
+                await self.db_pool.runInteraction(
+                    "get_presence_for_all_users",
+                    self.db_pool.simple_select_list_paginate_txn,
+                    "presence_stream",
+                    orderby="stream_id",
+                    start=offset,
+                    limit=limit,
+                    exclude_keyvalues=exclude_keyvalues,
+                    retcols=(
+                        "user_id",
+                        "state",
+                        "last_active_ts",
+                        "last_federation_update_ts",
+                        "last_user_sync_ts",
+                        "status_msg",
+                        "currently_active",
+                    ),
+                    order_direction="ASC",
                 ),
-                order_direction="ASC",
             )
 
-            for row in rows:
-                users_to_state[row["user_id"]] = UserPresenceState(**row)
+            for (
+                user_id,
+                state,
+                last_active_ts,
+                last_federation_update_ts,
+                last_user_sync_ts,
+                status_msg,
+                currently_active,
+            ) in rows:
+                users_to_state[user_id] = UserPresenceState(
+                    user_id=user_id,
+                    state=state,
+                    last_active_ts=last_active_ts,
+                    last_federation_update_ts=last_federation_update_ts,
+                    last_user_sync_ts=last_user_sync_ts,
+                    status_msg=status_msg,
+                    currently_active=bool(currently_active),
+                )
 
             # We've run out of updates to query
             if len(rows) < limit:
@@ -434,13 +454,21 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
 
         txn = db_conn.cursor()
         txn.execute(sql, (PresenceState.OFFLINE,))
-        rows = self.db_pool.cursor_to_dict(txn)
+        rows = txn.fetchall()
         txn.close()
 
-        for row in rows:
-            row["currently_active"] = bool(row["currently_active"])
-
-        return [UserPresenceState(**row) for row in rows]
+        return [
+            UserPresenceState(
+                user_id=user_id,
+                state=state,
+                last_active_ts=last_active_ts,
+                last_federation_update_ts=last_federation_update_ts,
+                last_user_sync_ts=last_user_sync_ts,
+                status_msg=status_msg,
+                currently_active=bool(currently_active),
+            )
+            for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows
+        ]
 
     def take_presence_startup_info(self) -> List[UserPresenceState]:
         active_on_startup = self._presence_on_startup
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index dea0e0458c..1e11bf2706 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -89,6 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # furthermore, we might already have the table from a previous (failed)
         # purge attempt, so let's drop the table first.
 
+        if isinstance(self.database_engine, PostgresEngine):
+            # Disable statement timeouts for this transaction; purging rooms can
+            # take a while!
+            txn.execute("SET LOCAL statement_timeout = 0")
+
         txn.execute("DROP TABLE IF EXISTS events_to_purge")
 
         txn.execute(
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 87e28e22d3..c7eb7fc478 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -47,6 +47,27 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# The type of a row in the pushers table.
+PusherRow = Tuple[
+    int,  # id
+    str,  # user_name
+    Optional[int],  # access_token
+    str,  # profile_tag
+    str,  # kind
+    str,  # app_id
+    str,  # app_display_name
+    str,  # device_display_name
+    str,  # pushkey
+    int,  # ts
+    str,  # lang
+    str,  # data
+    int,  # last_stream_ordering
+    int,  # last_success
+    int,  # failing_since
+    bool,  # enabled
+    str,  # device_id
+]
+
 
 class PusherWorkerStore(SQLBaseStore):
     def __init__(
@@ -83,30 +104,66 @@ class PusherWorkerStore(SQLBaseStore):
             self._remove_deleted_email_pushers,
         )
 
-    def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
+    def _decode_pushers_rows(
+        self,
+        rows: Iterable[PusherRow],
+    ) -> Iterator[PusherConfig]:
         """JSON-decode the data in the rows returned from the `pushers` table
 
         Drops any rows whose data cannot be decoded
         """
-        for r in rows:
-            data_json = r["data"]
+        for (
+            id,
+            user_name,
+            access_token,
+            profile_tag,
+            kind,
+            app_id,
+            app_display_name,
+            device_display_name,
+            pushkey,
+            ts,
+            lang,
+            data,
+            last_stream_ordering,
+            last_success,
+            failing_since,
+            enabled,
+            device_id,
+        ) in rows:
             try:
-                r["data"] = db_to_json(data_json)
+                data_json = db_to_json(data)
             except Exception as e:
                 logger.warning(
                     "Invalid JSON in data for pusher %d: %s, %s",
-                    r["id"],
-                    data_json,
+                    id,
+                    data,
                     e.args[0],
                 )
                 continue
 
-            # If we're using SQLite, then boolean values are integers. This is
-            # troublesome since some code using the return value of this method might
-            # expect it to be a boolean, or will expose it to clients (in responses).
-            r["enabled"] = bool(r["enabled"])
-
-            yield PusherConfig(**r)
+            yield PusherConfig(
+                id=id,
+                user_name=user_name,
+                profile_tag=profile_tag,
+                kind=kind,
+                app_id=app_id,
+                app_display_name=app_display_name,
+                device_display_name=device_display_name,
+                pushkey=pushkey,
+                ts=ts,
+                lang=lang,
+                data=data_json,
+                last_stream_ordering=last_stream_ordering,
+                last_success=last_success,
+                failing_since=failing_since,
+                # If we're using SQLite, then boolean values are integers. This is
+                # troublesome since some code using the return value of this method might
+                # expect it to be a boolean, or will expose it to clients (in responses).
+                enabled=bool(enabled),
+                device_id=device_id,
+                access_token=access_token,
+            )
 
     def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
@@ -136,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore):
             The pushers for which the given columns have the given values.
         """
 
-        def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def get_pushers_by_txn(txn: LoggingTransaction) -> List[PusherRow]:
             # We could technically use simple_select_list here, but we need to call
             # COALESCE on the 'enabled' column. While it is technically possible to give
             # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it
@@ -154,7 +211,7 @@ class PusherWorkerStore(SQLBaseStore):
 
             txn.execute(sql, list(keyvalues.values()))
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(List[PusherRow], txn.fetchall())
 
         ret = await self.db_pool.runInteraction(
             desc="get_pushers_by",
@@ -164,14 +221,22 @@ class PusherWorkerStore(SQLBaseStore):
         return self._decode_pushers_rows(ret)
 
     async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
-        def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
-            txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
-            rows = self.db_pool.cursor_to_dict(txn)
-
-            return self._decode_pushers_rows(rows)
+        def get_enabled_pushers_txn(txn: LoggingTransaction) -> List[PusherRow]:
+            txn.execute(
+                """
+                SELECT id, user_name, access_token, profile_tag, kind, app_id,
+                    app_display_name, device_display_name, pushkey, ts, lang, data,
+                    last_stream_ordering, last_success, failing_since,
+                    enabled, device_id
+                FROM pushers WHERE COALESCE(enabled, TRUE)
+                """
+            )
+            return cast(List[PusherRow], txn.fetchall())
 
-        return await self.db_pool.runInteraction(
-            "get_enabled_pushers", get_enabled_pushers_txn
+        return self._decode_pushers_rows(
+            await self.db_pool.runInteraction(
+                "get_enabled_pushers", get_enabled_pushers_txn
+            )
         )
 
     async def get_all_updated_pushers_rows(
@@ -304,7 +369,7 @@ class PusherWorkerStore(SQLBaseStore):
         )
 
     async def get_throttle_params_by_room(
-        self, pusher_id: str
+        self, pusher_id: int
     ) -> Dict[str, ThrottleParams]:
         res = await self.db_pool.simple_select_list(
             "pusher_throttle",
@@ -323,7 +388,7 @@ class PusherWorkerStore(SQLBaseStore):
         return params_by_room
 
     async def set_throttle_params(
-        self, pusher_id: str, room_id: str, params: ThrottleParams
+        self, pusher_id: int, room_id: str, params: ThrottleParams
     ) -> None:
         await self.db_pool.simple_upsert(
             "pusher_throttle",
@@ -534,7 +599,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
                 (last_pusher_id, batch_size),
             )
 
-            rows = self.db_pool.cursor_to_dict(txn)
+            rows = txn.fetchall()
             if len(rows) == 0:
                 return 0
 
@@ -550,19 +615,19 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
                 txn=txn,
                 table="pushers",
                 key_names=("id",),
-                key_values=[(row["pusher_id"],) for row in rows],
+                key_values=[row[0] for row in rows],
                 value_names=("device_id", "access_token"),
                 # If there was already a device_id on the pusher, we only want to clear
                 # the access_token column, so we keep the existing device_id. Otherwise,
                 # we set the device_id we got from joining the access_tokens table.
                 value_values=[
-                    (row["pusher_device_id"] or row["token_device_id"], None)
-                    for row in rows
+                    (pusher_device_id or token_device_id, None)
+                    for _, pusher_device_id, token_device_id in rows
                 ],
             )
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["pusher_id"]}
+                txn, "set_device_id_for_pushers", {"pusher_id": rows[-1][0]}
             )
 
             return len(rows)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0231f9407b..b2645ab43c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore):
     ) -> Sequence[JsonMapping]:
         """See get_linearized_receipts_for_room"""
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]:
             if from_key:
                 sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
+                    "SELECT receipt_type, user_id, event_id, data"
+                    " FROM receipts_linearized WHERE"
                     " room_id = ? AND stream_id > ? AND stream_id <= ?"
                 )
 
                 txn.execute(sql, (room_id, from_key, to_key))
             else:
                 sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
+                    "SELECT receipt_type, user_id, event_id, data"
+                    " FROM receipts_linearized WHERE"
                     " room_id = ? AND stream_id <= ?"
                 )
 
                 txn.execute(sql, (room_id, to_key))
 
-            rows = self.db_pool.cursor_to_dict(txn)
-
-            return rows
+            return cast(List[Tuple[str, str, str, str]], txn.fetchall())
 
         rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f)
 
@@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
             return []
 
         content: JsonDict = {}
-        for row in rows:
-            content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
-                row["user_id"]
-            ] = db_to_json(row["data"])
+        for receipt_type, user_id, event_id, data in rows:
+            content.setdefault(event_id, {}).setdefault(receipt_type, {})[
+                user_id
+            ] = db_to_json(data)
 
         return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}]
 
@@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
         if not room_ids:
             return {}
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, str, str, str, Optional[str], str]]:
             if from_key:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+                    FROM receipts_linearized WHERE
                     stream_id > ? AND stream_id <= ? AND
                 """
                 clause, args = make_in_list_sql_clause(
@@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 txn.execute(sql + clause, [from_key, to_key] + list(args))
             else:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+                    FROM receipts_linearized WHERE
                     stream_id <= ? AND
                 """
 
@@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
                 txn.execute(sql + clause, [to_key] + list(args))
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(
+                List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall()
+            )
 
         txn_results = await self.db_pool.runInteraction(
             "_get_linearized_receipts_for_rooms", f
         )
 
         results: JsonDict = {}
-        for row in txn_results:
+        for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results:
             # We want a single event per room, since we want to batch the
             # receipts by room, event and type.
             room_event = results.setdefault(
-                row["room_id"],
-                {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+                room_id,
+                {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
             )
 
             # The content is of the form:
             # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
-            event_entry = room_event["content"].setdefault(row["event_id"], {})
-            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+            event_entry = room_event["content"].setdefault(event_id, {})
+            receipt_type_dict = event_entry.setdefault(receipt_type, {})
 
-            receipt_type[row["user_id"]] = db_to_json(row["data"])
-            if row["thread_id"]:
-                receipt_type[row["user_id"]]["thread_id"] = row["thread_id"]
+            receipt_type_dict[user_id] = db_to_json(data)
+            if thread_id:
+                receipt_type_dict[user_id]["thread_id"] = thread_id
 
         results = {
             room_id: [results[room_id]] if room_id in results else []
@@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
             A dictionary of roomids to a list of receipts.
         """
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]:
             if from_key:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, data
+                    FROM receipts_linearized WHERE
                     stream_id > ? AND stream_id <= ?
                     ORDER BY stream_id DESC
                     LIMIT 100
@@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 txn.execute(sql, [from_key, to_key])
             else:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, data
+                    FROM receipts_linearized WHERE
                     stream_id <= ?
                     ORDER BY stream_id DESC
                     LIMIT 100
@@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
                 txn.execute(sql, [to_key])
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(List[Tuple[str, str, str, str, str]], txn.fetchall())
 
         txn_results = await self.db_pool.runInteraction(
             "get_linearized_receipts_for_all_rooms", f
         )
 
         results: JsonDict = {}
-        for row in txn_results:
+        for room_id, receipt_type, user_id, event_id, data in txn_results:
             # We want a single event per room, since we want to batch the
             # receipts by room, event and type.
             room_event = results.setdefault(
-                row["room_id"],
-                {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+                room_id,
+                {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
             )
 
             # The content is of the form:
             # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
-            event_entry = room_event["content"].setdefault(row["event_id"], {})
-            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+            event_entry = room_event["content"].setdefault(event_id, {})
+            receipt_type_dict = event_entry.setdefault(receipt_type, {})
 
-            receipt_type[row["user_id"]] = db_to_json(row["data"])
+            receipt_type_dict[user_id] = db_to_json(data)
 
         return results
 
@@ -742,7 +750,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         event_ids: List[str],
         thread_id: Optional[str],
         data: dict,
-    ) -> Optional[Tuple[int, int]]:
+    ) -> Optional[int]:
         """Insert a receipt, either from local client or remote server.
 
         Automatically does conversion between linearized and graph
@@ -804,9 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             data,
         )
 
-        max_persisted_id = self._receipts_id_gen.get_current_token()
-
-        return stream_id, max_persisted_id
+        return stream_id
 
     async def _insert_graph_receipt(
         self,
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cc964604e2..64a2c31a5d 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -195,7 +195,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
     async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
         """Returns info about the user account, if it exists."""
 
-        def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]:
+        def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]:
             # We could technically use simple_select_one here, but it would not perform
             # the COALESCEs (unless hacked into the column names), which could yield
             # confusing results.
@@ -213,35 +213,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 (user_id,),
             )
 
-            rows = self.db_pool.cursor_to_dict(txn)
-
-            if len(rows) == 0:
+            row = txn.fetchone()
+            if not row:
                 return None
 
-            return rows[0]
+            (
+                name,
+                is_guest,
+                admin,
+                consent_version,
+                consent_ts,
+                consent_server_notice_sent,
+                appservice_id,
+                creation_ts,
+                user_type,
+                deactivated,
+                shadow_banned,
+                approved,
+                locked,
+            ) = row
+
+            return UserInfo(
+                appservice_id=appservice_id,
+                consent_server_notice_sent=consent_server_notice_sent,
+                consent_version=consent_version,
+                consent_ts=consent_ts,
+                creation_ts=creation_ts,
+                is_admin=bool(admin),
+                is_deactivated=bool(deactivated),
+                is_guest=bool(is_guest),
+                is_shadow_banned=bool(shadow_banned),
+                user_id=UserID.from_string(name),
+                user_type=user_type,
+                approved=bool(approved),
+                locked=bool(locked),
+            )
 
-        row = await self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             desc="get_user_by_id",
             func=get_user_by_id_txn,
         )
-        if row is None:
-            return None
-
-        return UserInfo(
-            appservice_id=row["appservice_id"],
-            consent_server_notice_sent=row["consent_server_notice_sent"],
-            consent_version=row["consent_version"],
-            consent_ts=row["consent_ts"],
-            creation_ts=row["creation_ts"],
-            is_admin=bool(row["admin"]),
-            is_deactivated=bool(row["deactivated"]),
-            is_guest=bool(row["is_guest"]),
-            is_shadow_banned=bool(row["shadow_banned"]),
-            user_id=UserID.from_string(row["name"]),
-            user_type=row["user_type"],
-            approved=bool(row["approved"]),
-            locked=bool(row["locked"]),
-        )
 
     async def is_trial_user(self, user_id: str) -> bool:
         """Checks if user is in the "trial" period, i.e. within the first
@@ -579,16 +590,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         """
 
         txn.execute(sql, (token,))
-        rows = self.db_pool.cursor_to_dict(txn)
-
-        if rows:
-            row = rows[0]
-
-            # This field is nullable, ensure it comes out as a boolean
-            if row["token_used"] is None:
-                row["token_used"] = False
+        row = txn.fetchone()
 
-            return TokenLookupResult(**row)
+        if row:
+            (
+                user_id,
+                is_guest,
+                shadow_banned,
+                token_id,
+                device_id,
+                valid_until_ms,
+                token_owner,
+                token_used,
+            ) = row
+
+            return TokenLookupResult(
+                user_id=user_id,
+                is_guest=is_guest,
+                shadow_banned=shadow_banned,
+                token_id=token_id,
+                device_id=device_id,
+                valid_until_ms=valid_until_ms,
+                token_owner=token_owner,
+                # This field is nullable, ensure it comes out as a boolean
+                token_used=bool(token_used),
+            )
 
         return None
 
@@ -833,11 +859,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         """Counts all users registered on the homeserver."""
 
         def _count_users(txn: LoggingTransaction) -> int:
-            txn.execute("SELECT COUNT(*) AS users FROM users")
-            rows = self.db_pool.cursor_to_dict(txn)
-            if rows:
-                return rows[0]["users"]
-            return 0
+            txn.execute("SELECT COUNT(*) FROM users")
+            row = txn.fetchone()
+            assert row is not None
+            return row[0]
 
         return await self.db_pool.runInteraction("count_users", _count_users)
 
@@ -891,11 +916,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         """Counts all users without a special user_type registered on the homeserver."""
 
         def _count_users(txn: LoggingTransaction) -> int:
-            txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null")
-            rows = self.db_pool.cursor_to_dict(txn)
-            if rows:
-                return rows[0]["users"]
-            return 0
+            txn.execute("SELECT COUNT(*) FROM users where user_type is null")
+            row = txn.fetchone()
+            assert row is not None
+            return row[0]
 
         return await self.db_pool.runInteraction("count_real_users", _count_users)
 
@@ -1252,12 +1276,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             )
             txn.execute(sql, [])
 
-            res = self.db_pool.cursor_to_dict(txn)
-            if res:
-                for user in res:
-                    self.set_expiration_date_for_user_txn(
-                        txn, user["name"], use_delta=True
-                    )
+            for (name,) in txn.fetchall():
+                self.set_expiration_date_for_user_txn(txn, name, use_delta=True)
 
         await self.db_pool.runInteraction(
             "get_users_with_no_expiration_date",
@@ -1963,11 +1983,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 (user_id,),
             )
 
-            rows = self.db_pool.cursor_to_dict(txn)
+            row = txn.fetchone()
+            assert row is not None
 
             # We cast to bool because the value returned by the database engine might
             # be an integer if we're using SQLite.
-            return bool(rows[0]["approved"])
+            return bool(row[0])
 
         return await self.db_pool.runInteraction(
             desc="is_user_pending_approval",
@@ -2045,22 +2066,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
                 (last_user, batch_size),
             )
 
-            rows = self.db_pool.cursor_to_dict(txn)
+            rows = txn.fetchall()
 
             if not rows:
                 return True, 0
 
             rows_processed_nb = 0
 
-            for user in rows:
-                if not user["count_tokens"] and not user["count_threepids"]:
-                    self.set_user_deactivated_status_txn(txn, user["name"], True)
+            for name, count_tokens, count_threepids in rows:
+                if not count_tokens and not count_threepids:
+                    self.set_user_deactivated_status_txn(txn, name, True)
                     rows_processed_nb += 1
 
             logger.info("Marked %d rows as deactivated", rows_processed_nb)
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+                txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]}
             )
 
             if batch_size > len(rows):
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 719e11aea6..1d4d99932b 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -831,7 +831,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         def get_retention_policy_for_room_txn(
             txn: LoggingTransaction,
-        ) -> List[Dict[str, Optional[int]]]:
+        ) -> Optional[Tuple[Optional[int], Optional[int]]]:
             txn.execute(
                 """
                 SELECT min_lifetime, max_lifetime FROM room_retention
@@ -841,7 +841,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
                 (room_id,),
             )
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(Optional[Tuple[Optional[int], Optional[int]]], txn.fetchone())
 
         ret = await self.db_pool.runInteraction(
             "get_retention_policy_for_room",
@@ -856,8 +856,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
                 max_lifetime=self.config.retention.retention_default_max_lifetime,
             )
 
-        min_lifetime = ret[0]["min_lifetime"]
-        max_lifetime = ret[0]["max_lifetime"]
+        min_lifetime, max_lifetime = ret
 
         # If one of the room's policy's attributes isn't defined, use the matching
         # attribute from the default policy.
@@ -1162,14 +1161,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
             txn.execute(sql, args)
 
-            rows = self.db_pool.cursor_to_dict(txn)
-            rooms_dict = {}
-
-            for row in rows:
-                rooms_dict[row["room_id"]] = RetentionPolicy(
-                    min_lifetime=row["min_lifetime"],
-                    max_lifetime=row["max_lifetime"],
+            rooms_dict = {
+                room_id: RetentionPolicy(
+                    min_lifetime=min_lifetime,
+                    max_lifetime=max_lifetime,
                 )
+                for room_id, min_lifetime, max_lifetime in txn
+            }
 
             if include_null:
                 # If required, do a second query that retrieves all of the rooms we know
@@ -1178,13 +1176,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
                 txn.execute(sql)
 
-                rows = self.db_pool.cursor_to_dict(txn)
-
                 # If a room isn't already in the dict (i.e. it doesn't have a retention
                 # policy in its state), add it with a null policy.
-                for row in rows:
-                    if row["room_id"] not in rooms_dict:
-                        rooms_dict[row["room_id"]] = RetentionPolicy()
+                for (room_id,) in txn:
+                    if room_id not in rooms_dict:
+                        rooms_dict[room_id] = RetentionPolicy()
 
             return rooms_dict
 
@@ -1703,24 +1699,24 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
                 (last_room, batch_size),
             )
 
-            rows = self.db_pool.cursor_to_dict(txn)
+            rows = txn.fetchall()
 
             if not rows:
                 return True
 
-            for row in rows:
-                if not row["json"]:
+            for room_id, event_id, json in rows:
+                if not json:
                     retention_policy = {}
                 else:
-                    ev = db_to_json(row["json"])
+                    ev = db_to_json(json)
                     retention_policy = ev["content"]
 
                 self.db_pool.simple_insert_txn(
                     txn=txn,
                     table="room_retention",
                     values={
-                        "room_id": row["room_id"],
-                        "event_id": row["event_id"],
+                        "room_id": room_id,
+                        "event_id": event_id,
                         "min_lifetime": retention_policy.get("min_lifetime"),
                         "max_lifetime": retention_policy.get("max_lifetime"),
                     },
@@ -1729,7 +1725,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             logger.info("Inserted %d rows into room_retention", len(rows))
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+                txn, "insert_room_retention", {"room_id": rows[-1][0]}
             )
 
             if batch_size > len(rows):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e93573f315..bbe08368db 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1349,18 +1349,16 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
 
             txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
 
-            rows = self.db_pool.cursor_to_dict(txn)
+            rows = txn.fetchall()
             if not rows:
                 return 0
 
-            min_stream_id = rows[-1]["stream_ordering"]
+            min_stream_id = rows[-1][0]
 
             to_update = []
-            for row in rows:
-                event_id = row["event_id"]
-                room_id = row["room_id"]
+            for _, event_id, room_id, json in rows:
                 try:
-                    event_json = db_to_json(row["json"])
+                    event_json = db_to_json(json)
                     content = event_json["content"]
                 except Exception:
                     continue
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index a7aae661d8..1d69c4a5f0 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -179,22 +179,24 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
             # store_search_entries_txn with a generator function, but that
             # would mean having two cursors open on the database at once.
             # Instead we just build a list of results.
-            rows = self.db_pool.cursor_to_dict(txn)
+            rows = txn.fetchall()
             if not rows:
                 return 0
 
-            min_stream_id = rows[-1]["stream_ordering"]
+            min_stream_id = rows[-1][0]
 
             event_search_rows = []
-            for row in rows:
+            for (
+                stream_ordering,
+                event_id,
+                room_id,
+                etype,
+                json,
+                origin_server_ts,
+            ) in rows:
                 try:
-                    event_id = row["event_id"]
-                    room_id = row["room_id"]
-                    etype = row["type"]
-                    stream_ordering = row["stream_ordering"]
-                    origin_server_ts = row["origin_server_ts"]
                     try:
-                        event_json = db_to_json(row["json"])
+                        event_json = db_to_json(json)
                         content = event_json["content"]
                     except Exception:
                         continue
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 5a3611c415..ea06e4eee0 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -266,7 +266,7 @@ def generate_next_token(
         # when we are going backwards so we subtract one from the
         # stream part.
         last_stream_ordering -= 1
-    return RoomStreamToken(last_topo_ordering, last_stream_ordering)
+    return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering)
 
 
 def _make_generic_sql_bound(
@@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 if p > min_pos
             }
 
-        return RoomStreamToken(None, min_pos, immutabledict(positions))
+        return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
 
     async def get_room_events_stream_for_rooms(
         self,
@@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = RoomStreamToken(None, min(r.stream_ordering for r in rows))
+            key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         topo = await self.db_pool.runInteraction(
             "_get_max_topological_txn", self._get_max_topological_txn, room_id
         )
-        return RoomStreamToken(topo, stream_ordering)
+        return RoomStreamToken(topological=topo, stream=stream_ordering)
 
     @overload
     def get_stream_id_for_event_txn(
@@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcols=("stream_ordering", "topological_ordering"),
             desc="get_topological_token_for_event",
         )
-        return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
+        return RoomStreamToken(
+            topological=row["topological_ordering"], stream=row["stream_ordering"]
+        )
 
     async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
         """Gets the topological token in a room after or at the given stream
@@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             else:
                 topo = None
             internal = event.internal_metadata
-            internal.before = RoomStreamToken(topo, stream - 1)
-            internal.after = RoomStreamToken(topo, stream)
+            internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
+            internal.after = RoomStreamToken(topological=topo, stream=stream)
             internal.order = (int(topo) if topo else 0, int(stream))
 
     async def get_events_around(
@@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # Paginating backwards includes the event at the token, but paginating
         # forward doesn't.
         before_token = RoomStreamToken(
-            results["topological_ordering"] - 1, results["stream_ordering"]
+            topological=results["topological_ordering"] - 1,
+            stream=results["stream_ordering"],
         )
 
         after_token = RoomStreamToken(
-            results["topological_ordering"], results["stream_ordering"]
+            topological=results["topological_ordering"],
+            stream=results["stream_ordering"],
         )
 
         rows, start_token = self._paginate_room_events_txn(
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 5c5372a825..5555b53575 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast
 
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
@@ -27,6 +27,8 @@ from synapse.util import json_encoder
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
+ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str]
+
 
 class TaskSchedulerWorkerStore(SQLBaseStore):
     def __init__(
@@ -38,13 +40,18 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
         super().__init__(database, db_conn, hs)
 
     @staticmethod
-    def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
-        row["status"] = TaskStatus(row["status"])
-        if row["params"] is not None:
-            row["params"] = db_to_json(row["params"])
-        if row["result"] is not None:
-            row["result"] = db_to_json(row["result"])
-        return ScheduledTask(**row)
+    def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask:
+        task_id, action, status, timestamp, resource_id, params, result, error = row
+        return ScheduledTask(
+            id=task_id,
+            action=action,
+            status=TaskStatus(status),
+            timestamp=timestamp,
+            resource_id=resource_id,
+            params=db_to_json(params) if params is not None else None,
+            result=db_to_json(result) if result is not None else None,
+            error=error,
+        )
 
     async def get_scheduled_tasks(
         self,
@@ -68,7 +75,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
         Returns: a list of `ScheduledTask`, ordered by increasing timestamps
         """
 
-        def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]:
             clauses: List[str] = []
             args: List[Any] = []
             if resource_id:
@@ -101,7 +108,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
                 args.append(limit)
 
             txn.execute(sql, args)
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(List[ScheduledTaskRow], txn.fetchall())
 
         rows = await self.db_pool.runInteraction(
             "get_scheduled_tasks", get_scheduled_tasks_txn
@@ -193,7 +200,22 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
             desc="get_scheduled_task",
         )
 
-        return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
+        return (
+            TaskSchedulerWorkerStore._convert_row_to_task(
+                (
+                    row["id"],
+                    row["action"],
+                    row["status"],
+                    row["timestamp"],
+                    row["resource_id"],
+                    row["params"],
+                    row["result"],
+                    row["error"],
+                )
+            )
+            if row
+            else None
+        )
 
     async def delete_scheduled_task(self, id: str) -> None:
         """Delete a specific task from its id.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 8f70eff809..f35757280d 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -526,7 +526,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         start: int,
         limit: int,
         direction: Direction = Direction.FORWARDS,
-    ) -> Tuple[List[JsonDict], int]:
+    ) -> Tuple[List[Tuple[str, int]], int]:
         """Function to retrieve a paginated list of destination's rooms.
         This will return a json list of rooms and the
         total number of rooms.
@@ -537,12 +537,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             limit: number of rows to retrieve
             direction: sort ascending or descending by room_id
         Returns:
-            A tuple of a dict of rooms and a count of total rooms.
+            A tuple of a list of room tuples and a count of total rooms.
+
+            Each room tuple is room_id, stream_ordering.
         """
 
         def get_destination_rooms_paginate_txn(
             txn: LoggingTransaction,
-        ) -> Tuple[List[JsonDict], int]:
+        ) -> Tuple[List[Tuple[str, int]], int]:
             if direction == Direction.BACKWARDS:
                 order = "DESC"
             else:
@@ -556,14 +558,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             txn.execute(sql, [destination])
             count = cast(Tuple[int], txn.fetchone())[0]
 
-            rooms = self.db_pool.simple_select_list_paginate_txn(
-                txn=txn,
-                table="destination_rooms",
-                orderby="room_id",
-                start=start,
-                limit=limit,
-                retcols=("room_id", "stream_ordering"),
-                order_direction=order,
+            rooms = cast(
+                List[Tuple[str, int]],
+                self.db_pool.simple_select_list_paginate_txn(
+                    txn=txn,
+                    table="destination_rooms",
+                    orderby="room_id",
+                    start=start,
+                    limit=limit,
+                    retcols=("room_id", "stream_ordering"),
+                    order_direction=order,
+                ),
             )
             return rooms, count
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 5b50bd66bc..de89de7d74 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -125,8 +125,8 @@ Changes in SCHEMA_VERSION = 82
 
 
 SCHEMA_COMPAT_VERSION = (
-    # The `event_txn_id_device_id` must be written to for new events.
-    80
+    # The event_txn_id table and tables from MSC2716 no longer exist.
+    82
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql
new file mode 100644
index 0000000000..149020bbd7
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop the old event transaction ID table, the event_txn_id_device_id table
+-- should be used instead.
+DROP TABLE IF EXISTS event_txn_id;
+
+-- Drop tables related to MSC2716 since the implementation is being removed
+DROP TABLE insertion_events;
+DROP TABLE insertion_event_edges;
+DROP TABLE insertion_event_extremities;
+DROP TABLE batch_events;
diff --git a/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql
new file mode 100644
index 0000000000..fc948166e6
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql
@@ -0,0 +1,20 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8204, 'e2e_room_keys_index_room_id', '{}');
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8204, 'room_account_data_index_room_id', '{}');
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index d7084d2358..609a0978a9 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import TYPE_CHECKING, Iterator, Tuple
+from typing import TYPE_CHECKING, Sequence, Tuple
 
 import attr
 
@@ -23,7 +23,7 @@ from synapse.handlers.room import RoomEventSource
 from synapse.handlers.typing import TypingNotificationEventSource
 from synapse.logging.opentracing import trace
 from synapse.streams import EventSource
-from synapse.types import StreamToken
+from synapse.types import StreamKeyType, StreamToken
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -37,9 +37,14 @@ class _EventSourcesInner:
     receipt: ReceiptEventSource
     account_data: AccountDataEventSource
 
-    def get_sources(self) -> Iterator[Tuple[str, EventSource]]:
-        for attribute in attr.fields(_EventSourcesInner):
-            yield attribute.name, getattr(self, attribute.name)
+    def get_sources(self) -> Sequence[Tuple[StreamKeyType, EventSource]]:
+        return [
+            (StreamKeyType.ROOM, self.room),
+            (StreamKeyType.PRESENCE, self.presence),
+            (StreamKeyType.TYPING, self.typing),
+            (StreamKeyType.RECEIPT, self.receipt),
+            (StreamKeyType.ACCOUNT_DATA, self.account_data),
+        ]
 
 
 class EventSources:
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 76b0e3e694..09a88c86a7 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -22,8 +22,8 @@ from typing import (
     Any,
     ClassVar,
     Dict,
-    Final,
     List,
+    Literal,
     Mapping,
     Match,
     MutableMapping,
@@ -34,6 +34,7 @@ from typing import (
     Type,
     TypeVar,
     Union,
+    overload,
 )
 
 import attr
@@ -60,6 +61,8 @@ from synapse.util.cancellation import cancellable
 from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
+    from typing_extensions import Self
+
     from synapse.appservice.api import ApplicationService
     from synapse.storage.databases.main import DataStore, PurgeEventsStore
     from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
@@ -436,7 +439,78 @@ def map_username_to_mxid_localpart(
 
 
 @attr.s(frozen=True, slots=True, order=False)
-class RoomStreamToken:
+class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
+    """An abstract stream token class for streams that supports multiple
+    writers.
+
+    This works by keeping track of the stream position of each writer,
+    represented by a default `stream` attribute and a map of instance name to
+    stream position of any writers that are ahead of the default stream
+    position.
+    """
+
+    stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
+
+    instance_map: "immutabledict[str, int]" = attr.ib(
+        factory=immutabledict,
+        validator=attr.validators.deep_mapping(
+            key_validator=attr.validators.instance_of(str),
+            value_validator=attr.validators.instance_of(int),
+            mapping_validator=attr.validators.instance_of(immutabledict),
+        ),
+        kw_only=True,
+    )
+
+    @classmethod
+    @abc.abstractmethod
+    async def parse(cls, store: "DataStore", string: str) -> "Self":
+        """Parse the string representation of the token."""
+        ...
+
+    @abc.abstractmethod
+    async def to_string(self, store: "DataStore") -> str:
+        """Serialize the token into its string representation."""
+        ...
+
+    def copy_and_advance(self, other: "Self") -> "Self":
+        """Return a new token such that if an event is after both this token and
+        the other token, then its after the returned token too.
+        """
+
+        max_stream = max(self.stream, other.stream)
+
+        instance_map = {
+            instance: max(
+                self.instance_map.get(instance, self.stream),
+                other.instance_map.get(instance, other.stream),
+            )
+            for instance in set(self.instance_map).union(other.instance_map)
+        }
+
+        return attr.evolve(
+            self, stream=max_stream, instance_map=immutabledict(instance_map)
+        )
+
+    def get_max_stream_pos(self) -> int:
+        """Get the maximum stream position referenced in this token.
+
+        The corresponding "min" position is, by definition just `self.stream`.
+
+        This is used to handle tokens that have non-empty `instance_map`, and so
+        reference stream positions after the `self.stream` position.
+        """
+        return max(self.instance_map.values(), default=self.stream)
+
+    def get_stream_pos_for_instance(self, instance_name: str) -> int:
+        """Get the stream position that the given writer was at at this token."""
+
+        # If we don't have an entry for the instance we can assume that it was
+        # at `self.stream`.
+        return self.instance_map.get(instance_name, self.stream)
+
+
+@attr.s(frozen=True, slots=True, order=False)
+class RoomStreamToken(AbstractMultiWriterStreamToken):
     """Tokens are positions between events. The token "s1" comes after event 1.
 
             s0    s1
@@ -513,16 +587,8 @@ class RoomStreamToken:
 
     topological: Optional[int] = attr.ib(
         validator=attr.validators.optional(attr.validators.instance_of(int)),
-    )
-    stream: int = attr.ib(validator=attr.validators.instance_of(int))
-
-    instance_map: "immutabledict[str, int]" = attr.ib(
-        factory=immutabledict,
-        validator=attr.validators.deep_mapping(
-            key_validator=attr.validators.instance_of(str),
-            value_validator=attr.validators.instance_of(int),
-            mapping_validator=attr.validators.instance_of(immutabledict),
-        ),
+        kw_only=True,
+        default=None,
     )
 
     def __attrs_post_init__(self) -> None:
@@ -582,17 +648,7 @@ class RoomStreamToken:
         if self.topological or other.topological:
             raise Exception("Can't advance topological tokens")
 
-        max_stream = max(self.stream, other.stream)
-
-        instance_map = {
-            instance: max(
-                self.instance_map.get(instance, self.stream),
-                other.instance_map.get(instance, other.stream),
-            )
-            for instance in set(self.instance_map).union(other.instance_map)
-        }
-
-        return RoomStreamToken(None, max_stream, immutabledict(instance_map))
+        return super().copy_and_advance(other)
 
     def as_historical_tuple(self) -> Tuple[int, int]:
         """Returns a tuple of `(topological, stream)` for historical tokens.
@@ -618,16 +674,6 @@ class RoomStreamToken:
         # at `self.stream`.
         return self.instance_map.get(instance_name, self.stream)
 
-    def get_max_stream_pos(self) -> int:
-        """Get the maximum stream position referenced in this token.
-
-        The corresponding "min" position is, by definition just `self.stream`.
-
-        This is used to handle tokens that have non-empty `instance_map`, and so
-        reference stream positions after the `self.stream` position.
-        """
-        return max(self.instance_map.values(), default=self.stream)
-
     async def to_string(self, store: "DataStore") -> str:
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
@@ -649,20 +695,20 @@ class RoomStreamToken:
             return "s%d" % (self.stream,)
 
 
-class StreamKeyType:
+class StreamKeyType(Enum):
     """Known stream types.
 
     A stream is a list of entities ordered by an incrementing "stream token".
     """
 
-    ROOM: Final = "room_key"
-    PRESENCE: Final = "presence_key"
-    TYPING: Final = "typing_key"
-    RECEIPT: Final = "receipt_key"
-    ACCOUNT_DATA: Final = "account_data_key"
-    PUSH_RULES: Final = "push_rules_key"
-    TO_DEVICE: Final = "to_device_key"
-    DEVICE_LIST: Final = "device_list_key"
+    ROOM = "room_key"
+    PRESENCE = "presence_key"
+    TYPING = "typing_key"
+    RECEIPT = "receipt_key"
+    ACCOUNT_DATA = "account_data_key"
+    PUSH_RULES = "push_rules_key"
+    TO_DEVICE = "to_device_key"
+    DEVICE_LIST = "device_list_key"
     UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
 
 
@@ -784,7 +830,7 @@ class StreamToken:
     def room_stream_id(self) -> int:
         return self.room_key.stream
 
-    def copy_and_advance(self, key: str, new_value: Any) -> "StreamToken":
+    def copy_and_advance(self, key: StreamKeyType, new_value: Any) -> "StreamToken":
         """Advance the given key in the token to a new value if and only if the
         new value is after the old value.
 
@@ -797,35 +843,68 @@ class StreamToken:
             return new_token
 
         new_token = self.copy_and_replace(key, new_value)
-        new_id = int(getattr(new_token, key))
-        old_id = int(getattr(self, key))
+        new_id = new_token.get_field(key)
+        old_id = self.get_field(key)
 
         if old_id < new_id:
             return new_token
         else:
             return self
 
-    def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
-        return attr.evolve(self, **{key: new_value})
+    def copy_and_replace(self, key: StreamKeyType, new_value: Any) -> "StreamToken":
+        return attr.evolve(self, **{key.value: new_value})
 
+    @overload
+    def get_field(self, key: Literal[StreamKeyType.ROOM]) -> RoomStreamToken:
+        ...
 
-StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    @overload
+    def get_field(
+        self,
+        key: Literal[
+            StreamKeyType.ACCOUNT_DATA,
+            StreamKeyType.DEVICE_LIST,
+            StreamKeyType.PRESENCE,
+            StreamKeyType.PUSH_RULES,
+            StreamKeyType.RECEIPT,
+            StreamKeyType.TO_DEVICE,
+            StreamKeyType.TYPING,
+            StreamKeyType.UN_PARTIAL_STATED_ROOMS,
+        ],
+    ) -> int:
+        ...
 
+    @overload
+    def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]:
+        ...
 
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class PersistedEventPosition:
-    """Position of a newly persisted event with instance that persisted it.
+    def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]:
+        """Returns the stream ID for the given key."""
+        return getattr(self, key.value)
 
-    This can be used to test whether the event is persisted before or after a
-    RoomStreamToken.
-    """
+
+StreamToken.START = StreamToken(RoomStreamToken(stream=0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class PersistedPosition:
+    """Position of a newly persisted row with instance that persisted it."""
 
     instance_name: str
     stream: int
 
-    def persisted_after(self, token: RoomStreamToken) -> bool:
+    def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
         return token.get_stream_pos_for_instance(self.instance_name) < self.stream
 
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class PersistedEventPosition(PersistedPosition):
+    """Position of a newly persisted event with instance that persisted it.
+
+    This can be used to test whether the event is persisted before or after a
+    RoomStreamToken.
+    """
+
     def to_room_stream_token(self) -> RoomStreamToken:
         """Converts the position to a room stream token such that events
         persisted in the same room after this position will be after the
@@ -836,7 +915,7 @@ class PersistedEventPosition:
         """
         # Doing the naive thing satisfies the desired properties described in
         # the docstring.
-        return RoomStreamToken(None, self.stream)
+        return RoomStreamToken(stream=self.stream)
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)