summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py122
-rw-r--r--synapse/handlers/device.py102
-rw-r--r--synapse/handlers/directory.py19
-rw-r--r--synapse/handlers/event_auth.py9
-rw-r--r--synapse/handlers/federation.py45
-rw-r--r--synapse/handlers/federation_event.py45
-rw-r--r--synapse/handlers/message.py40
-rw-r--r--synapse/handlers/oidc.py131
-rw-r--r--synapse/handlers/register.py3
-rw-r--r--synapse/handlers/room.py28
-rw-r--r--synapse/handlers/room_member.py62
-rw-r--r--synapse/handlers/sync.py33
12 files changed, 199 insertions, 440 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index fbafbbee6b..3d83236b0c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -37,9 +37,7 @@ from typing import (
 
 import attr
 import bcrypt
-import pymacaroons
 import unpaddedbase64
-from pymacaroons.exceptions import MacaroonVerificationFailedException
 
 from twisted.internet.defer import CancelledError
 from twisted.web.server import Request
@@ -69,7 +67,7 @@ from synapse.storage.roommember import ProfileInfo
 from synapse.types import JsonDict, Requester, UserID
 from synapse.util import stringutils as stringutils
 from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
-from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
+from synapse.util.macaroons import LoginTokenAttributes
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.stringutils import base62_encode
 from synapse.util.threepids import canonicalise_email
@@ -81,6 +79,8 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+INVALID_USERNAME_OR_PASSWORD = "Invalid username or password"
+
 
 def convert_client_dict_legacy_fields_to_identifier(
     submission: JsonDict,
@@ -178,25 +178,13 @@ class SsoLoginExtraAttributes:
     extra_attributes: JsonDict
 
 
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class LoginTokenAttributes:
-    """Data we store in a short-term login token"""
-
-    user_id: str
-
-    auth_provider_id: str
-    """The SSO Identity Provider that the user authenticated with, to get this token."""
-
-    auth_provider_session_id: Optional[str]
-    """The session ID advertised by the SSO Identity Provider."""
-
-
 class AuthHandler:
     SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
 
     def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastores().main
         self.auth = hs.get_auth()
+        self.auth_blocking = hs.get_auth_blocking()
         self.clock = hs.get_clock()
         self.checkers: Dict[str, UserInteractiveAuthChecker] = {}
         for auth_checker_class in INTERACTIVE_AUTH_CHECKERS:
@@ -983,7 +971,7 @@ class AuthHandler:
             not is_appservice_ghost
             or self.hs.config.appservice.track_appservice_user_ips
         ):
-            await self.auth.check_auth_blocking(user_id)
+            await self.auth_blocking.check_auth_blocking(user_id)
 
         access_token = self.generate_access_token(target_user_id_obj)
         await self.store.add_access_token_to_user(
@@ -1215,7 +1203,9 @@ class AuthHandler:
                     await self._failed_login_attempts_ratelimiter.can_do_action(
                         None, (medium, address)
                     )
-                raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+                raise LoginError(
+                    403, msg=INVALID_USERNAME_OR_PASSWORD, errcode=Codes.FORBIDDEN
+                )
 
             identifier_dict = {"type": "m.id.user", "user": user_id}
 
@@ -1341,7 +1331,7 @@ class AuthHandler:
 
         # We raise a 403 here, but note that if we're doing user-interactive
         # login, it turns all LoginErrors into a 401 anyway.
-        raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
+        raise LoginError(403, msg=INVALID_USERNAME_OR_PASSWORD, errcode=Codes.FORBIDDEN)
 
     async def check_password_provider_3pid(
         self, medium: str, address: str, password: str
@@ -1435,7 +1425,7 @@ class AuthHandler:
         except Exception:
             raise AuthError(403, "Invalid login token", errcode=Codes.FORBIDDEN)
 
-        await self.auth.check_auth_blocking(res.user_id)
+        await self.auth_blocking.check_auth_blocking(res.user_id)
         return res
 
     async def delete_access_token(self, access_token: str) -> None:
@@ -1826,98 +1816,6 @@ class AuthHandler:
         return urllib.parse.urlunparse(url_parts)
 
 
-@attr.s(slots=True, auto_attribs=True)
-class MacaroonGenerator:
-    hs: "HomeServer"
-
-    def generate_guest_access_token(self, user_id: str) -> str:
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = access")
-        # Include a nonce, to make sure that each login gets a different
-        # access token.
-        macaroon.add_first_party_caveat(
-            "nonce = %s" % (stringutils.random_string_with_symbols(16),)
-        )
-        macaroon.add_first_party_caveat("guest = true")
-        return macaroon.serialize()
-
-    def generate_short_term_login_token(
-        self,
-        user_id: str,
-        auth_provider_id: str,
-        auth_provider_session_id: Optional[str] = None,
-        duration_in_ms: int = (2 * 60 * 1000),
-    ) -> str:
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = login")
-        now = self.hs.get_clock().time_msec()
-        expiry = now + duration_in_ms
-        macaroon.add_first_party_caveat("time < %d" % (expiry,))
-        macaroon.add_first_party_caveat("auth_provider_id = %s" % (auth_provider_id,))
-        if auth_provider_session_id is not None:
-            macaroon.add_first_party_caveat(
-                "auth_provider_session_id = %s" % (auth_provider_session_id,)
-            )
-        return macaroon.serialize()
-
-    def verify_short_term_login_token(self, token: str) -> LoginTokenAttributes:
-        """Verify a short-term-login macaroon
-
-        Checks that the given token is a valid, unexpired short-term-login token
-        minted by this server.
-
-        Args:
-            token: the login token to verify
-
-        Returns:
-            the user_id that this token is valid for
-
-        Raises:
-            MacaroonVerificationFailedException if the verification failed
-        """
-        macaroon = pymacaroons.Macaroon.deserialize(token)
-        user_id = get_value_from_macaroon(macaroon, "user_id")
-        auth_provider_id = get_value_from_macaroon(macaroon, "auth_provider_id")
-
-        auth_provider_session_id: Optional[str] = None
-        try:
-            auth_provider_session_id = get_value_from_macaroon(
-                macaroon, "auth_provider_session_id"
-            )
-        except MacaroonVerificationFailedException:
-            pass
-
-        v = pymacaroons.Verifier()
-        v.satisfy_exact("gen = 1")
-        v.satisfy_exact("type = login")
-        v.satisfy_general(lambda c: c.startswith("user_id = "))
-        v.satisfy_general(lambda c: c.startswith("auth_provider_id = "))
-        v.satisfy_general(lambda c: c.startswith("auth_provider_session_id = "))
-        satisfy_expiry(v, self.hs.get_clock().time_msec)
-        v.verify(macaroon, self.hs.config.key.macaroon_secret_key)
-
-        return LoginTokenAttributes(
-            user_id=user_id,
-            auth_provider_id=auth_provider_id,
-            auth_provider_session_id=auth_provider_session_id,
-        )
-
-    def generate_delete_pusher_token(self, user_id: str) -> str:
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = delete_pusher")
-        return macaroon.serialize()
-
-    def _generate_base_macaroon(self, user_id: str) -> pymacaroons.Macaroon:
-        macaroon = pymacaroons.Macaroon(
-            location=self.hs.config.server.server_name,
-            identifier="key",
-            key=self.hs.config.key.macaroon_secret_key,
-        )
-        macaroon.add_first_party_caveat("gen = 1")
-        macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
-        return macaroon
-
-
 def load_legacy_password_auth_providers(hs: "HomeServer") -> None:
     module_api = hs.get_module_api()
     for module, config in hs.config.authproviders.password_providers:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a0cbeedc30..c05a170c55 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -123,23 +123,28 @@ class DeviceWorkerHandler:
 
         return device
 
-    @trace
-    @measure_func("device.get_user_ids_changed")
-    async def get_user_ids_changed(
-        self, user_id: str, from_token: StreamToken
-    ) -> JsonDict:
-        """Get list of users that have had the devices updated, or have newly
-        joined a room, that `user_id` may be interested in.
+    async def get_device_changes_in_shared_rooms(
+        self, user_id: str, room_ids: Collection[str], from_token: StreamToken
+    ) -> Collection[str]:
+        """Get the set of users whose devices have changed who share a room with
+        the given user.
         """
+        changed_users = await self.store.get_device_list_changes_in_rooms(
+            room_ids, from_token.device_list_key
+        )
 
-        set_tag("user_id", user_id)
-        set_tag("from_token", from_token)
-        now_room_key = self.store.get_room_max_token()
+        if changed_users is not None:
+            # We also check if the given user has changed their device. If
+            # they're in no rooms then the above query won't include them.
+            changed = await self.store.get_users_whose_devices_changed(
+                from_token.device_list_key, [user_id]
+            )
+            changed_users.update(changed)
+            return changed_users
 
-        room_ids = await self.store.get_rooms_for_user(user_id)
+        # If the DB returned None then the `from_token` is too old, so we fall
+        # back on looking for device updates for all users.
 
-        # First we check if any devices have changed for users that we share
-        # rooms with.
         users_who_share_room = await self.store.get_users_who_share_room_with_user(
             user_id
         )
@@ -153,6 +158,27 @@ class DeviceWorkerHandler:
             from_token.device_list_key, tracked_users
         )
 
+        return changed
+
+    @trace
+    @measure_func("device.get_user_ids_changed")
+    async def get_user_ids_changed(
+        self, user_id: str, from_token: StreamToken
+    ) -> JsonDict:
+        """Get list of users that have had the devices updated, or have newly
+        joined a room, that `user_id` may be interested in.
+        """
+
+        set_tag("user_id", user_id)
+        set_tag("from_token", from_token)
+        now_room_key = self.store.get_room_max_token()
+
+        room_ids = await self.store.get_rooms_for_user(user_id)
+
+        changed = await self.get_device_changes_in_shared_rooms(
+            user_id, room_ids, from_token
+        )
+
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
@@ -237,10 +263,19 @@ class DeviceWorkerHandler:
                         break
 
         if possibly_changed or possibly_left:
-            # Take the intersection of the users whose devices may have changed
-            # and those that actually still share a room with the user
-            possibly_joined = possibly_changed & users_who_share_room
-            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+            possibly_joined = possibly_changed
+            possibly_left = possibly_changed | possibly_left
+
+            # Double check if we still share rooms with the given user.
+            users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
+                possibly_left
+            )
+            for changed_user_id, entries in users_rooms.items():
+                if any(e.room_id in room_ids for e in entries):
+                    possibly_left.discard(changed_user_id)
+                else:
+                    possibly_joined.discard(changed_user_id)
+
         else:
             possibly_joined = set()
             possibly_left = set()
@@ -398,35 +433,6 @@ class DeviceHandler(DeviceWorkerHandler):
             await self.delete_devices(user_id, user_devices)
 
     @trace
-    async def delete_device(self, user_id: str, device_id: str) -> None:
-        """Delete the given device
-
-        Args:
-            user_id: The user to delete the device from.
-            device_id: The device to delete.
-        """
-
-        try:
-            await self.store.delete_device(user_id, device_id)
-        except errors.StoreError as e:
-            if e.code == 404:
-                # no match
-                set_tag("error", True)
-                log_kv(
-                    {"reason": "User doesn't have device id.", "device_id": device_id}
-                )
-            else:
-                raise
-
-        await self._auth_handler.delete_access_tokens_for_user(
-            user_id, device_id=device_id
-        )
-
-        await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id)
-
-        await self.notify_device_update(user_id, [device_id])
-
-    @trace
     async def delete_all_devices_for_user(
         self, user_id: str, except_device_id: Optional[str] = None
     ) -> None:
@@ -591,7 +597,7 @@ class DeviceHandler(DeviceWorkerHandler):
             user_id, device_id, device_data
         )
         if old_device_id is not None:
-            await self.delete_device(user_id, old_device_id)
+            await self.delete_devices(user_id, [old_device_id])
         return device_id
 
     async def get_dehydrated_device(
@@ -638,7 +644,7 @@ class DeviceHandler(DeviceWorkerHandler):
         await self.store.update_device(user_id, device_id, old_device["display_name"])
         # can't call self.delete_device because that will clobber the
         # access token so call the storage layer directly
-        await self.store.delete_device(user_id, old_device_id)
+        await self.store.delete_devices(user_id, [old_device_id])
         await self.store.delete_e2e_keys_by_device(
             user_id=user_id, device_id=old_device_id
         )
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1459a046de..8b0f16f965 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -28,6 +28,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.appservice import ApplicationService
+from synapse.module_api import NOT_SPAM
 from synapse.storage.databases.main.directory import RoomAliasMapping
 from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
 
@@ -141,10 +142,15 @@ class DirectoryHandler:
                         403, "You must be in the room to create an alias for it"
                     )
 
-            if not await self.spam_checker.user_may_create_room_alias(
+            spam_check = await self.spam_checker.user_may_create_room_alias(
                 user_id, room_alias
-            ):
-                raise AuthError(403, "This user is not permitted to create this alias")
+            )
+            if spam_check != self.spam_checker.NOT_SPAM:
+                raise AuthError(
+                    403,
+                    "This user is not permitted to create this alias",
+                    spam_check,
+                )
 
             if not self.config.roomdirectory.is_alias_creation_allowed(
                 user_id, room_id, room_alias_str
@@ -430,9 +436,12 @@ class DirectoryHandler:
         """
         user_id = requester.user.to_string()
 
-        if not await self.spam_checker.user_may_publish_room(user_id, room_id):
+        spam_check = await self.spam_checker.user_may_publish_room(user_id, room_id)
+        if spam_check != NOT_SPAM:
             raise AuthError(
-                403, "This user is not permitted to publish rooms to the room list"
+                403,
+                "This user is not permitted to publish rooms to the room list",
+                spam_check,
             )
 
         if requester.is_guest:
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index 6bed464351..a2dd9c7efa 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -23,7 +23,10 @@ from synapse.api.constants import (
 )
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.api.room_versions import RoomVersion
-from synapse.event_auth import check_auth_rules_for_event
+from synapse.event_auth import (
+    check_state_dependent_auth_rules,
+    check_state_independent_auth_rules,
+)
 from synapse.events import EventBase
 from synapse.events.builder import EventBuilder
 from synapse.events.snapshot import EventContext
@@ -48,14 +51,14 @@ class EventAuthHandler:
 
     async def check_auth_rules_from_context(
         self,
-        room_version_obj: RoomVersion,
         event: EventBase,
         context: EventContext,
     ) -> None:
         """Check an event passes the auth rules at its own auth events"""
+        await check_state_independent_auth_rules(self._store, event)
         auth_event_ids = event.auth_event_ids()
         auth_events_by_id = await self._store.get_events(auth_event_ids)
-        check_auth_rules_for_event(room_version_obj, event, auth_events_by_id.values())
+        check_state_dependent_auth_rules(event, auth_events_by_id.values())
 
     def compute_auth_events(
         self,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6a143440d3..34cc5ecd11 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -59,6 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.module_api import NOT_SPAM
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
     ReplicationStoreRoomOnOutlierMembershipRestServlet,
@@ -545,6 +546,7 @@ class FederationHandler:
             if ret.partial_state:
                 # TODO(faster_joins): roll this back if we don't manage to start the
                 #   background resync (eg process_remote_join fails)
+                #   https://github.com/matrix-org/synapse/issues/12998
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
             max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -799,9 +801,7 @@ class FederationHandler:
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
-        await self._event_auth_handler.check_auth_rules_from_context(
-            room_version, event, context
-        )
+        await self._event_auth_handler.check_auth_rules_from_context(event, context)
         return event
 
     async def on_invite_request(
@@ -821,11 +821,14 @@ class FederationHandler:
         if self.hs.config.server.block_non_admin_invites:
             raise SynapseError(403, "This server does not accept room invites")
 
-        if not await self.spam_checker.user_may_invite(
+        spam_check = await self.spam_checker.user_may_invite(
             event.sender, event.state_key, event.room_id
-        ):
+        )
+        if spam_check != NOT_SPAM:
             raise SynapseError(
-                403, "This user is not permitted to send invites to this server/user"
+                403,
+                "This user is not permitted to send invites to this server/user",
+                spam_check,
             )
 
         membership = event.content.get("membership")
@@ -972,9 +975,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
-            await self._event_auth_handler.check_auth_rules_from_context(
-                room_version_obj, event, context
-            )
+            await self._event_auth_handler.check_auth_rules_from_context(event, context)
         except AuthError as e:
             logger.warning("Failed to create new leave %r because %s", event, e)
             raise e
@@ -1033,9 +1034,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_knock_request`
-            await self._event_auth_handler.check_auth_rules_from_context(
-                room_version_obj, event, context
-            )
+            await self._event_auth_handler.check_auth_rules_from_context(event, context)
         except AuthError as e:
             logger.warning("Failed to create new knock %r because %s", event, e)
             raise e
@@ -1206,9 +1205,9 @@ class FederationHandler:
             event.internal_metadata.send_on_behalf_of = self.hs.hostname
 
             try:
-                validate_event_for_room_version(room_version_obj, event)
+                validate_event_for_room_version(event)
                 await self._event_auth_handler.check_auth_rules_from_context(
-                    room_version_obj, event, context
+                    event, context
                 )
             except AuthError as e:
                 logger.warning("Denying new third party invite %r because %s", event, e)
@@ -1258,10 +1257,8 @@ class FederationHandler:
         )
 
         try:
-            validate_event_for_room_version(room_version_obj, event)
-            await self._event_auth_handler.check_auth_rules_from_context(
-                room_version_obj, event, context
-            )
+            validate_event_for_room_version(event)
+            await self._event_auth_handler.check_auth_rules_from_context(event, context)
         except AuthError as e:
             logger.warning("Denying third party invite %r because %s", event, e)
             raise e
@@ -1506,14 +1503,17 @@ class FederationHandler:
         # TODO(faster_joins): do we need to lock to avoid races? What happens if other
         #   worker processes kick off a resync in parallel? Perhaps we should just elect
         #   a single worker to do the resync.
+        #   https://github.com/matrix-org/synapse/issues/12994
         #
         # TODO(faster_joins): what happens if we leave the room during a resync? if we
         #   really leave, that might mean we have difficulty getting the room state over
         #   federation.
+        #   https://github.com/matrix-org/synapse/issues/12802
         #
         # TODO(faster_joins): we need some way of prioritising which homeservers in
         #   `other_destinations` to try first, otherwise we'll spend ages trying dead
         #   homeservers for large rooms.
+        #   https://github.com/matrix-org/synapse/issues/12999
 
         if initial_destination is None and len(other_destinations) == 0:
             raise ValueError(
@@ -1543,9 +1543,11 @@ class FederationHandler:
                 # all the events are updated, so we can update current state and
                 # clear the lazy-loading flag.
                 logger.info("Updating current state for %s", room_id)
+                # TODO(faster_joins): support workers
+                #   https://github.com/matrix-org/synapse/issues/12994
                 assert (
                     self._storage_controllers.persistence is not None
-                ), "TODO(faster_joins): support for workers"
+                ), "worker-mode deployments not currently supported here"
                 await self._storage_controllers.persistence.update_current_state(
                     room_id
                 )
@@ -1559,6 +1561,8 @@ class FederationHandler:
                     )
 
                     # TODO(faster_joins) update room stats and user directory?
+                    #   https://github.com/matrix-org/synapse/issues/12814
+                    #   https://github.com/matrix-org/synapse/issues/12815
                     return
 
                 # we raced against more events arriving with partial state. Go round
@@ -1566,6 +1570,8 @@ class FederationHandler:
                 # TODO(faster_joins): there is still a race here, whereby incoming events which raced
                 #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
                 #   having partial state.
+                #   https://github.com/matrix-org/synapse/issues/12988
+                #
                 continue
 
             events = await self.store.get_events_as_list(
@@ -1588,6 +1594,7 @@ class FederationHandler:
                             #   indefinitely is also not the right thing to do if we can
                             #   reach all homeservers and they all claim they don't have
                             #   the state we want.
+                            #   https://github.com/matrix-org/synapse/issues/13000
                             logger.error(
                                 "Failed to get state for %s at %s from %s because %s, "
                                 "giving up!",
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 87a0608359..b7c54e642f 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -50,13 +50,14 @@ from synapse.api.errors import (
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
 from synapse.event_auth import (
     auth_types_for_event,
-    check_auth_rules_for_event,
+    check_state_dependent_auth_rules,
+    check_state_independent_auth_rules,
     validate_event_for_room_version,
 )
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.federation.federation_client import InvalidResponseError
-from synapse.logging.context import nested_logging_context, run_in_background
+from synapse.logging.context import nested_logging_context
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
@@ -532,6 +533,7 @@ class FederationEventHandler:
                 #
                 # TODO(faster_joins): we probably need to be more intelligent, and
                 #    exclude partial-state prev_events from consideration
+                #    https://github.com/matrix-org/synapse/issues/13001
                 logger.warning(
                     "%s still has partial state: can't de-partial-state it yet",
                     event.event_id,
@@ -777,6 +779,7 @@ class FederationEventHandler:
             state_ids = await self._resolve_state_at_missing_prevs(origin, event)
             # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
             #   not return partial state
+            #   https://github.com/matrix-org/synapse/issues/13002
 
             await self._process_received_pdu(
                 origin, event, state_ids=state_ids, backfilled=backfilled
@@ -1428,10 +1431,9 @@ class FederationEventHandler:
             allow_rejected=True,
         )
 
-        room_version = await self._store.get_room_version_id(room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+        events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
 
-        def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
+        async def prep(event: EventBase) -> None:
             with nested_logging_context(suffix=event.event_id):
                 auth = []
                 for auth_event_id in event.auth_event_ids():
@@ -1445,7 +1447,7 @@ class FederationEventHandler:
                             event,
                             auth_event_id,
                         )
-                        return None
+                        return
                     auth.append(ae)
 
                 # we're not bothering about room state, so flag the event as an outlier.
@@ -1453,18 +1455,21 @@ class FederationEventHandler:
 
                 context = EventContext.for_outlier(self._storage_controllers)
                 try:
-                    validate_event_for_room_version(room_version_obj, event)
-                    check_auth_rules_for_event(room_version_obj, event, auth)
+                    validate_event_for_room_version(event)
+                    await check_state_independent_auth_rules(self._store, event)
+                    check_state_dependent_auth_rules(event, auth)
                 except AuthError as e:
                     logger.warning("Rejecting %r because %s", event, e)
                     context.rejected = RejectedReason.AUTH_ERROR
 
-            return event, context
+            events_and_contexts_to_persist.append((event, context))
+
+        for event in fetched_events:
+            await prep(event)
 
-        events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
         await self.persist_events_and_notify(
             room_id,
-            tuple(events_to_persist),
+            events_and_contexts_to_persist,
             # Mark these events backfilled as they're historic events that will
             # eventually be backfilled. For example, missing events we fetch
             # during backfill should be marked as backfilled as well.
@@ -1497,11 +1502,8 @@ class FederationEventHandler:
         assert not event.internal_metadata.outlier
 
         # first of all, check that the event itself is valid.
-        room_version = await self._store.get_room_version_id(event.room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
         try:
-            validate_event_for_room_version(room_version_obj, event)
+            validate_event_for_room_version(event)
         except AuthError as e:
             logger.warning("While validating received event %r: %s", event, e)
             # TODO: use a different rejected reason here?
@@ -1519,7 +1521,8 @@ class FederationEventHandler:
 
         # ... and check that the event passes auth at those auth events.
         try:
-            check_auth_rules_for_event(room_version_obj, event, claimed_auth_events)
+            await check_state_independent_auth_rules(self._store, event)
+            check_state_dependent_auth_rules(event, claimed_auth_events)
         except AuthError as e:
             logger.warning(
                 "While checking auth of %r against auth_events: %s", event, e
@@ -1567,9 +1570,7 @@ class FederationEventHandler:
             auth_events_for_auth = calculated_auth_event_map
 
         try:
-            check_auth_rules_for_event(
-                room_version_obj, event, auth_events_for_auth.values()
-            )
+            check_state_dependent_auth_rules(event, auth_events_for_auth.values())
         except AuthError as e:
             logger.warning("Failed auth resolution for %r because %s", event, e)
             context.rejected = RejectedReason.AUTH_ERROR
@@ -1669,7 +1670,7 @@ class FederationEventHandler:
         )
 
         try:
-            check_auth_rules_for_event(room_version_obj, event, current_auth_events)
+            check_state_dependent_auth_rules(event, current_auth_events)
         except AuthError as e:
             logger.warning(
                 "Soft-failing %r (from %s) because %s",
@@ -1963,9 +1964,7 @@ class FederationEventHandler:
                 event.room_id, [(event, context)], backfilled=backfilled
             )
         except Exception:
-            run_in_background(
-                self._store.remove_push_actions_from_staging, event.event_id
-            )
+            await self._store.remove_push_actions_from_staging(event.event_id)
             raise
 
     async def persist_events_and_notify(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f455158a2c..189f52fe5a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -42,7 +42,7 @@ from synapse.api.errors import (
     SynapseError,
     UnsupportedRoomVersionError,
 )
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.api.urls import ConsentURIBuilder
 from synapse.event_auth import validate_event_for_room_version
 from synapse.events import EventBase, relation_from_event
@@ -444,7 +444,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
 class EventCreationHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
-        self.auth = hs.get_auth()
+        self.auth_blocking = hs.get_auth_blocking()
         self._event_auth_handler = hs.get_event_auth_handler()
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
@@ -605,7 +605,7 @@ class EventCreationHandler:
         Returns:
             Tuple of created event, Context
         """
-        await self.auth.check_auth_blocking(requester=requester)
+        await self.auth_blocking.check_auth_blocking(requester=requester)
 
         if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
             room_version_id = event_dict["content"]["room_version"]
@@ -954,14 +954,12 @@ class EventCreationHandler:
                             "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
                             spam_check_result,
                         )
-                        spam_check_result = Codes.FORBIDDEN
 
-                if isinstance(spam_check_result, Codes):
-                    raise SynapseError(
-                        403,
-                        "This message has been rejected as probable spam",
-                        spam_check_result,
-                    )
+                        raise SynapseError(
+                            403,
+                            "This message has been rejected as probable spam",
+                            Codes.FORBIDDEN,
+                        )
 
                 # Backwards compatibility: if the return value is not an error code, it
                 # means the module returned an error message to be included in the
@@ -1102,6 +1100,7 @@ class EventCreationHandler:
             #
             # TODO(faster_joins): figure out how this works, and make sure that the
             #   old state is complete.
+            #   https://github.com/matrix-org/synapse/issues/13003
             metadata = await self.store.get_metadata_for_events(state_event_ids)
 
             state_map_for_event: MutableStateMap[str] = {}
@@ -1273,23 +1272,6 @@ class EventCreationHandler:
                 )
                 return prev_event
 
-        if event.is_state() and (event.type, event.state_key) == (
-            EventTypes.Create,
-            "",
-        ):
-            room_version_id = event.content.get(
-                "room_version", RoomVersions.V1.identifier
-            )
-            maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
-            if not maybe_room_version_obj:
-                raise UnsupportedRoomVersionError(
-                    "Attempt to create a room with unsupported room version %s"
-                    % (room_version_id,)
-                )
-            room_version_obj = maybe_room_version_obj
-        else:
-            room_version_obj = await self.store.get_room_version(event.room_id)
-
         if event.internal_metadata.is_out_of_band_membership():
             # the only sort of out-of-band-membership events we expect to see here are
             # invite rejections and rescinded knocks that we have generated ourselves.
@@ -1297,9 +1279,9 @@ class EventCreationHandler:
             assert event.content["membership"] == Membership.LEAVE
         else:
             try:
-                validate_event_for_room_version(room_version_obj, event)
+                validate_event_for_room_version(event)
                 await self._event_auth_handler.check_auth_rules_from_context(
-                    room_version_obj, event, context
+                    event, context
                 )
             except AuthError as err:
                 logger.warning("Denying new event %r because %s", event, err)
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 9de61d554f..d7a8226900 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -18,7 +18,6 @@ from typing import TYPE_CHECKING, Any, Dict, Generic, List, Optional, TypeVar, U
 from urllib.parse import urlencode, urlparse
 
 import attr
-import pymacaroons
 from authlib.common.security import generate_token
 from authlib.jose import JsonWebToken, jwt
 from authlib.oauth2.auth import ClientAuth
@@ -44,7 +43,7 @@ from synapse.logging.context import make_deferred_yieldable
 from synapse.types import JsonDict, UserID, map_username_to_mxid_localpart
 from synapse.util import Clock, json_decoder
 from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
-from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
+from synapse.util.macaroons import MacaroonGenerator, OidcSessionData
 from synapse.util.templates import _localpart_from_email_filter
 
 if TYPE_CHECKING:
@@ -105,9 +104,10 @@ class OidcHandler:
         # we should not have been instantiated if there is no configured provider.
         assert provider_confs
 
-        self._token_generator = OidcSessionTokenGenerator(hs)
+        self._macaroon_generator = hs.get_macaroon_generator()
         self._providers: Dict[str, "OidcProvider"] = {
-            p.idp_id: OidcProvider(hs, self._token_generator, p) for p in provider_confs
+            p.idp_id: OidcProvider(hs, self._macaroon_generator, p)
+            for p in provider_confs
         }
 
     async def load_metadata(self) -> None:
@@ -216,7 +216,7 @@ class OidcHandler:
 
         # Deserialize the session token and verify it.
         try:
-            session_data = self._token_generator.verify_oidc_session_token(
+            session_data = self._macaroon_generator.verify_oidc_session_token(
                 session, state
             )
         except (MacaroonInitException, MacaroonDeserializationException, KeyError) as e:
@@ -271,12 +271,12 @@ class OidcProvider:
     def __init__(
         self,
         hs: "HomeServer",
-        token_generator: "OidcSessionTokenGenerator",
+        macaroon_generator: MacaroonGenerator,
         provider: OidcProviderConfig,
     ):
         self._store = hs.get_datastores().main
 
-        self._token_generator = token_generator
+        self._macaroon_generaton = macaroon_generator
 
         self._config = provider
         self._callback_url: str = hs.config.oidc.oidc_callback_url
@@ -761,7 +761,7 @@ class OidcProvider:
         if not client_redirect_url:
             client_redirect_url = b""
 
-        cookie = self._token_generator.generate_oidc_session_token(
+        cookie = self._macaroon_generaton.generate_oidc_session_token(
             state=state,
             session_data=OidcSessionData(
                 idp_id=self.idp_id,
@@ -1112,121 +1112,6 @@ class JwtClientSecret:
         return self._cached_secret
 
 
-class OidcSessionTokenGenerator:
-    """Methods for generating and checking OIDC Session cookies."""
-
-    def __init__(self, hs: "HomeServer"):
-        self._clock = hs.get_clock()
-        self._server_name = hs.hostname
-        self._macaroon_secret_key = hs.config.key.macaroon_secret_key
-
-    def generate_oidc_session_token(
-        self,
-        state: str,
-        session_data: "OidcSessionData",
-        duration_in_ms: int = (60 * 60 * 1000),
-    ) -> str:
-        """Generates a signed token storing data about an OIDC session.
-
-        When Synapse initiates an authorization flow, it creates a random state
-        and a random nonce. Those parameters are given to the provider and
-        should be verified when the client comes back from the provider.
-        It is also used to store the client_redirect_url, which is used to
-        complete the SSO login flow.
-
-        Args:
-            state: The ``state`` parameter passed to the OIDC provider.
-            session_data: data to include in the session token.
-            duration_in_ms: An optional duration for the token in milliseconds.
-                Defaults to an hour.
-
-        Returns:
-            A signed macaroon token with the session information.
-        """
-        macaroon = pymacaroons.Macaroon(
-            location=self._server_name,
-            identifier="key",
-            key=self._macaroon_secret_key,
-        )
-        macaroon.add_first_party_caveat("gen = 1")
-        macaroon.add_first_party_caveat("type = session")
-        macaroon.add_first_party_caveat("state = %s" % (state,))
-        macaroon.add_first_party_caveat("idp_id = %s" % (session_data.idp_id,))
-        macaroon.add_first_party_caveat("nonce = %s" % (session_data.nonce,))
-        macaroon.add_first_party_caveat(
-            "client_redirect_url = %s" % (session_data.client_redirect_url,)
-        )
-        macaroon.add_first_party_caveat(
-            "ui_auth_session_id = %s" % (session_data.ui_auth_session_id,)
-        )
-        now = self._clock.time_msec()
-        expiry = now + duration_in_ms
-        macaroon.add_first_party_caveat("time < %d" % (expiry,))
-
-        return macaroon.serialize()
-
-    def verify_oidc_session_token(
-        self, session: bytes, state: str
-    ) -> "OidcSessionData":
-        """Verifies and extract an OIDC session token.
-
-        This verifies that a given session token was issued by this homeserver
-        and extract the nonce and client_redirect_url caveats.
-
-        Args:
-            session: The session token to verify
-            state: The state the OIDC provider gave back
-
-        Returns:
-            The data extracted from the session cookie
-
-        Raises:
-            KeyError if an expected caveat is missing from the macaroon.
-        """
-        macaroon = pymacaroons.Macaroon.deserialize(session)
-
-        v = pymacaroons.Verifier()
-        v.satisfy_exact("gen = 1")
-        v.satisfy_exact("type = session")
-        v.satisfy_exact("state = %s" % (state,))
-        v.satisfy_general(lambda c: c.startswith("nonce = "))
-        v.satisfy_general(lambda c: c.startswith("idp_id = "))
-        v.satisfy_general(lambda c: c.startswith("client_redirect_url = "))
-        v.satisfy_general(lambda c: c.startswith("ui_auth_session_id = "))
-        satisfy_expiry(v, self._clock.time_msec)
-
-        v.verify(macaroon, self._macaroon_secret_key)
-
-        # Extract the session data from the token.
-        nonce = get_value_from_macaroon(macaroon, "nonce")
-        idp_id = get_value_from_macaroon(macaroon, "idp_id")
-        client_redirect_url = get_value_from_macaroon(macaroon, "client_redirect_url")
-        ui_auth_session_id = get_value_from_macaroon(macaroon, "ui_auth_session_id")
-        return OidcSessionData(
-            nonce=nonce,
-            idp_id=idp_id,
-            client_redirect_url=client_redirect_url,
-            ui_auth_session_id=ui_auth_session_id,
-        )
-
-
-@attr.s(frozen=True, slots=True, auto_attribs=True)
-class OidcSessionData:
-    """The attributes which are stored in a OIDC session cookie"""
-
-    # the Identity Provider being used
-    idp_id: str
-
-    # The `nonce` parameter passed to the OIDC provider.
-    nonce: str
-
-    # The URL the client gave when it initiated the flow. ("" if this is a UI Auth)
-    client_redirect_url: str
-
-    # The session ID of the ongoing UI Auth ("" if this is a login)
-    ui_auth_session_id: str
-
-
 class UserAttributeDict(TypedDict):
     localpart: Optional[str]
     confirm_localpart: bool
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 338204287f..c77d181722 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -91,6 +91,7 @@ class RegistrationHandler:
         self.clock = hs.get_clock()
         self.hs = hs
         self.auth = hs.get_auth()
+        self.auth_blocking = hs.get_auth_blocking()
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
@@ -276,7 +277,7 @@ class RegistrationHandler:
 
         # do not check_auth_blocking if the call is coming through the Admin API
         if not by_admin:
-            await self.auth.check_auth_blocking(threepid=threepid)
+            await self.auth_blocking.check_auth_blocking(threepid=threepid)
 
         if localpart is not None:
             await self.check_username(localpart, guest_access_token=guest_access_token)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 520663f172..75c0be8c36 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.handlers.federation import get_domains_from_state
 from synapse.handlers.relations import BundledAggregations
+from synapse.module_api import NOT_SPAM
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams import EventSource
@@ -109,6 +110,7 @@ class RoomCreationHandler:
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self.auth = hs.get_auth()
+        self.auth_blocking = hs.get_auth_blocking()
         self.clock = hs.get_clock()
         self.hs = hs
         self.spam_checker = hs.get_spam_checker()
@@ -226,10 +228,9 @@ class RoomCreationHandler:
                 },
             },
         )
-        old_room_version = await self.store.get_room_version(old_room_id)
-        validate_event_for_room_version(old_room_version, tombstone_event)
+        validate_event_for_room_version(tombstone_event)
         await self._event_auth_handler.check_auth_rules_from_context(
-            old_room_version, tombstone_event, tombstone_context
+            tombstone_event, tombstone_context
         )
 
         # Upgrade the room
@@ -437,10 +438,9 @@ class RoomCreationHandler:
         """
         user_id = requester.user.to_string()
 
-        if not await self.spam_checker.user_may_create_room(user_id):
-            raise SynapseError(
-                403, "You are not permitted to create rooms", Codes.FORBIDDEN
-            )
+        spam_check = await self.spam_checker.user_may_create_room(user_id)
+        if spam_check != NOT_SPAM:
+            raise SynapseError(403, "You are not permitted to create rooms", spam_check)
 
         creation_content: JsonDict = {
             "room_version": new_room_version.identifier,
@@ -707,7 +707,7 @@ class RoomCreationHandler:
         """
         user_id = requester.user.to_string()
 
-        await self.auth.check_auth_blocking(requester=requester)
+        await self.auth_blocking.check_auth_blocking(requester=requester)
 
         if (
             self._server_notices_mxid is not None
@@ -727,12 +727,12 @@ class RoomCreationHandler:
         invite_3pid_list = config.get("invite_3pid", [])
         invite_list = config.get("invite", [])
 
-        if not is_requester_admin and not (
-            await self.spam_checker.user_may_create_room(user_id)
-        ):
-            raise SynapseError(
-                403, "You are not permitted to create rooms", Codes.FORBIDDEN
-            )
+        if not is_requester_admin:
+            spam_check = await self.spam_checker.user_may_create_room(user_id)
+            if spam_check != NOT_SPAM:
+                raise SynapseError(
+                    403, "You are not permitted to create rooms", spam_check
+                )
 
         if ratelimit:
             await self.request_ratelimiter.ratelimit(requester)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d1199a0644..bf6bae1232 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -26,18 +26,13 @@ from synapse.api.constants import (
     GuestAccess,
     Membership,
 )
-from synapse.api.errors import (
-    AuthError,
-    Codes,
-    LimitExceededError,
-    ShadowBanError,
-    SynapseError,
-)
+from synapse.api.errors import AuthError, Codes, ShadowBanError, SynapseError
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.module_api import NOT_SPAM
 from synapse.storage.state import StateFilter
 from synapse.types import (
     JsonDict,
@@ -379,16 +374,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             # Only rate-limit if the user actually joined the room, otherwise we'll end
             # up blocking profile updates.
             if newly_joined and ratelimit:
-                time_now_s = self.clock.time()
-                (
-                    allowed,
-                    time_allowed,
-                ) = await self._join_rate_limiter_local.can_do_action(requester)
-
-                if not allowed:
-                    raise LimitExceededError(
-                        retry_after_ms=int(1000 * (time_allowed - time_now_s))
-                    )
+                await self._join_rate_limiter_local.ratelimit(requester)
 
         result_event = await self.event_creation_handler.handle_new_client_event(
             requester,
@@ -683,7 +669,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             if target_id == self._server_notices_mxid:
                 raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
-            block_invite = False
+            block_invite_code = None
 
             if (
                 self._server_notices_mxid is not None
@@ -701,16 +687,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                         "Blocking invite: user is not admin and non-admin "
                         "invites disabled"
                     )
-                    block_invite = True
+                    block_invite_code = Codes.FORBIDDEN
 
-                if not await self.spam_checker.user_may_invite(
+                spam_check = await self.spam_checker.user_may_invite(
                     requester.user.to_string(), target_id, room_id
-                ):
+                )
+                if spam_check != NOT_SPAM:
                     logger.info("Blocking invite due to spam checker")
-                    block_invite = True
+                    block_invite_code = spam_check
 
-            if block_invite:
-                raise SynapseError(403, "Invites have been disabled on this server")
+            if block_invite_code is not None:
+                raise SynapseError(
+                    403, "Invites have been disabled on this server", block_invite_code
+                )
 
         # An empty prev_events list is allowed as long as the auth_event_ids are present
         if prev_event_ids is not None:
@@ -818,11 +807,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 # We assume that if the spam checker allowed the user to create
                 # a room then they're allowed to join it.
                 and not new_room
-                and not await self.spam_checker.user_may_join_room(
+            ):
+                spam_check = await self.spam_checker.user_may_join_room(
                     target.to_string(), room_id, is_invited=inviter is not None
                 )
-            ):
-                raise SynapseError(403, "Not allowed to join this room")
+                if spam_check != NOT_SPAM:
+                    raise SynapseError(403, "Not allowed to join this room", spam_check)
 
             # Check if a remote join should be performed.
             remote_join, remote_room_hosts = await self._should_perform_remote_join(
@@ -830,19 +820,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             )
             if remote_join:
                 if ratelimit:
-                    time_now_s = self.clock.time()
-                    (
-                        allowed,
-                        time_allowed,
-                    ) = await self._join_rate_limiter_remote.can_do_action(
+                    await self._join_rate_limiter_remote.ratelimit(
                         requester,
                     )
 
-                    if not allowed:
-                        raise LimitExceededError(
-                            retry_after_ms=int(1000 * (time_allowed - time_now_s))
-                        )
-
                 inviter = await self._get_inviter(target.to_string(), room_id)
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
@@ -1369,13 +1350,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             )
         else:
             # Check if the spamchecker(s) allow this invite to go through.
-            if not await self.spam_checker.user_may_send_3pid_invite(
+            spam_check = await self.spam_checker.user_may_send_3pid_invite(
                 inviter_userid=requester.user.to_string(),
                 medium=medium,
                 address=address,
                 room_id=room_id,
-            ):
-                raise SynapseError(403, "Cannot send threepid invite")
+            )
+            if spam_check != NOT_SPAM:
+                raise SynapseError(403, "Cannot send threepid invite", spam_check)
 
             stream_id = await self._make_and_store_3pid_invite(
                 requester,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b4ead79f97..d42a414c90 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tup
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -237,9 +237,10 @@ class SyncHandler:
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
-        self.auth = hs.get_auth()
+        self.auth_blocking = hs.get_auth_blocking()
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
+        self._device_handler = hs.get_device_handler()
 
         # TODO: flush cache entries on subsequent sync request.
         #    Once we get the next /sync request (ie, one with the same access token
@@ -280,7 +281,7 @@ class SyncHandler:
         # not been exceeded (if not part of the group by this point, almost certain
         # auth_blocking will occur)
         user_id = sync_config.user.to_string()
-        await self.auth.check_auth_blocking(requester=requester)
+        await self.auth_blocking.check_auth_blocking(requester=requester)
 
         res = await self.response_cache.wrap(
             sync_config.request_key,
@@ -1054,14 +1055,10 @@ class SyncHandler:
         self, room_id: str, sync_config: SyncConfig
     ) -> NotifCounts:
         with Measure(self.clock, "unread_notifs_for_room_id"):
-            last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
-                user_id=sync_config.user.to_string(),
-                room_id=room_id,
-                receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
-            )
 
             return await self.store.get_unread_event_push_actions_by_room_for_user(
-                room_id, sync_config.user.to_string(), last_unread_event_id
+                room_id,
+                sync_config.user.to_string(),
             )
 
     async def generate_sync_result(
@@ -1272,21 +1269,11 @@ class SyncHandler:
                     ):
                         users_that_have_changed.add(changed_user_id)
             else:
-                users_who_share_room = (
-                    await self.store.get_users_who_share_room_with_user(user_id)
-                )
-
-                # Always tell the user about their own devices. We check as the user
-                # ID is almost certainly already included (unless they're not in any
-                # rooms) and taking a copy of the set is relatively expensive.
-                if user_id not in users_who_share_room:
-                    users_who_share_room = set(users_who_share_room)
-                    users_who_share_room.add(user_id)
-
-                tracked_users = users_who_share_room
                 users_that_have_changed = (
-                    await self.store.get_users_whose_devices_changed(
-                        since_token.device_list_key, tracked_users
+                    await self._device_handler.get_device_changes_in_shared_rooms(
+                        user_id,
+                        sync_result_builder.joined_room_ids,
+                        from_token=since_token,
                     )
                 )