summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/device.py28
-rw-r--r--synapse/handlers/directory.py6
-rw-r--r--synapse/handlers/event_auth.py5
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/federation_event.py7
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/profile.py10
-rw-r--r--synapse/handlers/push_rules.py18
-rw-r--r--synapse/handlers/read_marker.py5
-rw-r--r--synapse/handlers/register.py27
-rw-r--r--synapse/handlers/room.py10
-rw-r--r--synapse/handlers/room_member.py179
-rw-r--r--synapse/handlers/room_member_worker.py3
-rw-r--r--synapse/handlers/sso.py9
-rw-r--r--synapse/handlers/sync.py20
-rw-r--r--synapse/handlers/typing.py3
18 files changed, 269 insertions, 83 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 1e89447044..59e340974d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -212,7 +212,7 @@ class AuthHandler:
         self._password_enabled_for_login = hs.config.auth.password_enabled_for_login
         self._password_enabled_for_reauth = hs.config.auth.password_enabled_for_reauth
         self._password_localdb_enabled = hs.config.auth.password_localdb_enabled
-        self._third_party_rules = hs.get_third_party_event_rules()
+        self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
 
         # Ratelimiter for failed auth during UIA. Uses same ratelimit config
         # as per `rc_login.failed_attempts`.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index bd5867491b..f299b89a1b 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -39,11 +39,11 @@ class DeactivateAccountHandler:
         self._profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
         self._server_name = hs.hostname
-        self._third_party_rules = hs.get_third_party_event_rules()
+        self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
 
         # Flag that indicates whether the process to part users from rooms is running
         self._user_parter_running = False
-        self._third_party_rules = hs.get_third_party_event_rules()
+        self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b9d3b7fbc6..5d12a39e26 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -75,10 +75,14 @@ class DeviceWorkerHandler:
         self.store = hs.get_datastores().main
         self.notifier = hs.get_notifier()
         self.state = hs.get_state_handler()
+        self._appservice_handler = hs.get_application_service_handler()
         self._state_storage = hs.get_storage_controllers().state
         self._auth_handler = hs.get_auth_handler()
         self.server_name = hs.hostname
         self._msc3852_enabled = hs.config.experimental.msc3852_enabled
+        self._query_appservices_for_keys = (
+            hs.config.experimental.msc3984_appservice_key_query
+        )
 
         self.device_list_updater = DeviceListWorkerUpdater(hs)
 
@@ -328,6 +332,30 @@ class DeviceWorkerHandler:
             user_id, "self_signing"
         )
 
+        # Check if the application services have any results.
+        if self._query_appservices_for_keys:
+            # Query the appservice for all devices for this user.
+            query: Dict[str, Optional[List[str]]] = {user_id: None}
+
+            # Query the appservices for any keys.
+            appservice_results = await self._appservice_handler.query_keys(query)
+
+            # Merge results, overriding anything from the database.
+            appservice_devices = appservice_results.get("device_keys", {}).get(
+                user_id, {}
+            )
+
+            # Filter the database results to only those devices that the appservice has
+            # *not* responded with.
+            devices = [d for d in devices if d["device_id"] not in appservice_devices]
+            # Append the appservice response by wrapping each result in another dictionary.
+            devices.extend(
+                {"device_id": device_id, "keys": device}
+                for device_id, device in appservice_devices.items()
+            )
+
+            # TODO Handle cross-signing keys.
+
         return {
             "user_id": user_id,
             "stream_id": stream_id,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 5e8316e2e5..1e0623c7f8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -52,7 +52,9 @@ class DirectoryHandler:
         self.config = hs.config
         self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
         self.require_membership = hs.config.server.require_membership_for_aliases
-        self.third_party_event_rules = hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            hs.get_module_api_callbacks().third_party_event_rules
+        )
         self.server_name = hs.hostname
 
         self.federation = hs.get_federation_client()
@@ -503,7 +505,7 @@ class DirectoryHandler:
             # Check if publishing is blocked by a third party module
             allowed_by_third_party_rules = (
                 await (
-                    self.third_party_event_rules.check_visibility_can_be_modified(
+                    self._third_party_event_rules.check_visibility_can_be_modified(
                         room_id, visibility
                     )
                 )
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index 0db0bd7304..3e37c0cbe2 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -29,7 +29,7 @@ from synapse.event_auth import (
 )
 from synapse.events import EventBase
 from synapse.events.builder import EventBuilder
-from synapse.types import StateMap, StrCollection, get_domain_from_id
+from synapse.types import StateMap, StrCollection
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -47,6 +47,7 @@ class EventAuthHandler:
         self._store = hs.get_datastores().main
         self._state_storage_controller = hs.get_storage_controllers().state
         self._server_name = hs.hostname
+        self._is_mine_id = hs.is_mine_id
 
     async def check_auth_rules_from_context(
         self,
@@ -247,7 +248,7 @@ class EventAuthHandler:
         if not await self.is_user_in_rooms(allowed_rooms, user_id):
             # If this is a remote request, the user might be in an allowed room
             # that we do not know about.
-            if get_domain_from_id(user_id) != self._server_name:
+            if not self._is_mine_id(user_id):
                 for room_id in allowed_rooms:
                     if not await self._store.is_host_joined(room_id, self._server_name):
                         raise SynapseError(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d1a88cc604..19dec4812f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -141,6 +141,7 @@ class FederationHandler:
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
         self.is_mine_id = hs.is_mine_id
+        self.is_mine_server_name = hs.is_mine_server_name
         self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
         self.event_creation_handler = hs.get_event_creation_handler()
         self.event_builder_factory = hs.get_event_builder_factory()
@@ -169,7 +170,9 @@ class FederationHandler:
 
         self._room_backfill = Linearizer("room_backfill")
 
-        self.third_party_event_rules = hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            hs.get_module_api_callbacks().third_party_event_rules
+        )
 
         # Tracks running partial state syncs by room ID.
         # Partial state syncs currently only run on the main process, so it's okay to
@@ -451,7 +454,7 @@ class FederationHandler:
 
             for dom in domains:
                 # We don't want to ask our own server for information we don't have
-                if dom == self.server_name:
+                if self.is_mine_server_name(dom):
                     continue
 
                 try:
@@ -1253,7 +1256,7 @@ class FederationHandler:
             unpersisted_context,
         ) = await self.event_creation_handler.create_new_client_event(builder=builder)
 
-        event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
+        event_allowed, _ = await self._third_party_event_rules.check_event_allowed(
             event, unpersisted_context
         )
         if not event_allowed:
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 06609fab93..06343d40e4 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -157,10 +157,13 @@ class FederationEventHandler:
         self._get_room_member_handler = hs.get_room_member_handler
 
         self._federation_client = hs.get_federation_client()
-        self._third_party_event_rules = hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            hs.get_module_api_callbacks().third_party_event_rules
+        )
         self._notifier = hs.get_notifier()
 
         self._is_mine_id = hs.is_mine_id
+        self._is_mine_server_name = hs.is_mine_server_name
         self._server_name = hs.hostname
         self._instance_name = hs.get_instance_name()
 
@@ -686,7 +689,7 @@ class FederationEventHandler:
         server from invalid events (there is probably no point in trying to
         re-fetch invalid events from every other HS in the room.)
         """
-        if dest == self._server_name:
+        if self._is_mine_server_name(dest):
             raise SynapseError(400, "Can't backfill from self.")
 
         events = await self._federation_client.backfill(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ac1932a7f9..0b61c2272b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -77,7 +77,6 @@ from synapse.util.metrics import measure_func
 from synapse.visibility import get_effective_room_visibility_from_state
 
 if TYPE_CHECKING:
-    from synapse.events.third_party_rules import ThirdPartyEventRules
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -509,8 +508,8 @@ class EventCreationHandler:
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
 
         self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
-        self.third_party_event_rules: "ThirdPartyEventRules" = (
-            self.hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            self.hs.get_module_api_callbacks().third_party_event_rules
         )
 
         self._block_events_without_consent_error = (
@@ -1314,7 +1313,7 @@ class EventCreationHandler:
         if requester:
             context.app_service = requester.app_service
 
-        res, new_content = await self.third_party_event_rules.check_event_allowed(
+        res, new_content = await self._third_party_event_rules.check_event_allowed(
             event, context
         )
         if res is False:
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 440d3f4acd..a9160c87e3 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -59,9 +59,9 @@ class ProfileHandler:
         self.max_avatar_size = hs.config.server.max_avatar_size
         self.allowed_avatar_mimetypes = hs.config.server.allowed_avatar_mimetypes
 
-        self.server_name = hs.config.server.server_name
+        self._is_mine_server_name = hs.is_mine_server_name
 
-        self._third_party_rules = hs.get_third_party_event_rules()
+        self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
 
     async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
         target_user = UserID.from_string(user_id)
@@ -170,8 +170,8 @@ class ProfileHandler:
             displayname_to_set = None
 
         # If the admin changes the display name of a user, the requesting user cannot send
-        # the join event to update the displayname in the rooms.
-        # This must be done by the target user himself.
+        # the join event to update the display name in the rooms.
+        # This must be done by the target user themselves.
         if by_admin:
             requester = create_requester(
                 target_user,
@@ -309,7 +309,7 @@ class ProfileHandler:
         else:
             server_name = host
 
-        if server_name == self.server_name:
+        if self._is_mine_server_name(server_name):
             media_info = await self.store.get_local_media(media_id)
         else:
             media_info = await self.store.get_cached_remote_media(server_name, media_id)
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 1219672a59..7ed88a3611 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -11,14 +11,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import TYPE_CHECKING, List, Optional, Union
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
 
 import attr
 
 from synapse.api.errors import SynapseError, UnrecognizedRequestError
+from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.push_rule import RuleNotFoundException
 from synapse.synapse_rust.push import get_base_rule_ids
-from synapse.types import JsonDict
+from synapse.types import JsonDict, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -115,6 +116,17 @@ class PushRulesHandler:
         stream_id = self._main_store.get_max_push_rules_stream_id()
         self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
 
+    async def push_rules_for_user(
+        self, user: UserID
+    ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
+        """
+        Push rules aren't really account data, but get formatted as such for /sync.
+        """
+        user_id = user.to_string()
+        rules_raw = await self._main_store.get_push_rules_for_user(user_id)
+        rules = format_push_rules_for_user(user, rules_raw)
+        return rules
+
 
 def check_actions(actions: List[Union[str, JsonDict]]) -> None:
     """Check if the given actions are spec compliant.
@@ -129,6 +141,8 @@ def check_actions(actions: List[Union[str, JsonDict]]) -> None:
         raise InvalidRuleException("No actions found")
 
     for a in actions:
+        # "dont_notify" and "coalesce" are legacy actions. They are allowed, but
+        # ignored (resulting in no action from the pusher).
         if a in ["notify", "dont_notify", "coalesce"]:
             pass
         elif isinstance(a, dict) and "set_tweak" in a:
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 05122fd5a6..6d35e61880 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -15,6 +15,7 @@
 import logging
 from typing import TYPE_CHECKING
 
+from synapse.api.constants import ReceiptTypes
 from synapse.util.async_helpers import Linearizer
 
 if TYPE_CHECKING:
@@ -42,7 +43,7 @@ class ReadMarkerHandler:
 
         async with self.read_marker_linearizer.queue((room_id, user_id)):
             existing_read_marker = await self.store.get_account_data_for_room_and_type(
-                user_id, room_id, "m.fully_read"
+                user_id, room_id, ReceiptTypes.FULLY_READ
             )
 
             should_update = True
@@ -56,5 +57,5 @@ class ReadMarkerHandler:
             if should_update:
                 content = {"event_id": event_id}
                 await self.account_data_handler.add_account_data_to_room(
-                    user_id, room_id, "m.fully_read", content
+                    user_id, room_id, ReceiptTypes.FULLY_READ, content
                 )
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 61c4b833bd..c80946c2e9 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -46,7 +46,7 @@ from synapse.replication.http.register import (
     ReplicationRegisterServlet,
 )
 from synapse.spam_checker_api import RegistrationBehaviour
-from synapse.types import RoomAlias, UserID, create_requester
+from synapse.types import GUEST_USER_ID_PATTERN, RoomAlias, UserID, create_requester
 from synapse.types.state import StateFilter
 
 if TYPE_CHECKING:
@@ -143,10 +143,15 @@ class RegistrationHandler:
         assigned_user_id: Optional[str] = None,
         inhibit_user_in_use_error: bool = False,
     ) -> None:
-        if types.contains_invalid_mxid_characters(localpart):
+        if types.contains_invalid_mxid_characters(
+            localpart, self.hs.config.experimental.msc4009_e164_mxids
+        ):
+            extra_chars = (
+                "=_-./+" if self.hs.config.experimental.msc4009_e164_mxids else "=_-./"
+            )
             raise SynapseError(
                 400,
-                "User ID can only contain characters a-z, 0-9, or '=_-./'",
+                f"User ID can only contain characters a-z, 0-9, or '{extra_chars}'",
                 Codes.INVALID_USERNAME,
             )
 
@@ -195,16 +200,12 @@ class RegistrationHandler:
                         errcode=Codes.FORBIDDEN,
                     )
 
-        if guest_access_token is None:
-            try:
-                int(localpart)
-                raise SynapseError(
-                    400,
-                    "Numeric user IDs are reserved for guest users.",
-                    errcode=Codes.INVALID_USERNAME,
-                )
-            except ValueError:
-                pass
+        if guest_access_token is None and GUEST_USER_ID_PATTERN.fullmatch(localpart):
+            raise SynapseError(
+                400,
+                "Numeric user IDs are reserved for guest users.",
+                errcode=Codes.INVALID_USERNAME,
+            )
 
     async def register_user(
         self,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index efd9612d90..5e1702d78a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -160,7 +160,9 @@ class RoomCreationHandler:
         )
         self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
 
-        self.third_party_event_rules = hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            hs.get_module_api_callbacks().third_party_event_rules
+        )
 
     async def upgrade_room(
         self, requester: Requester, old_room_id: str, new_version: RoomVersion
@@ -742,7 +744,7 @@ class RoomCreationHandler:
 
         # Let the third party rules modify the room creation config if needed, or abort
         # the room creation entirely with an exception.
-        await self.third_party_event_rules.on_create_room(
+        await self._third_party_event_rules.on_create_room(
             requester, config, is_requester_admin=is_requester_admin
         )
 
@@ -879,7 +881,7 @@ class RoomCreationHandler:
         # Check whether this visibility value is blocked by a third party module
         allowed_by_third_party_rules = (
             await (
-                self.third_party_event_rules.check_visibility_can_be_modified(
+                self._third_party_event_rules.check_visibility_can_be_modified(
                     room_id, visibility
                 )
             )
@@ -1731,7 +1733,7 @@ class RoomShutdownHandler:
         self.room_member_handler = hs.get_room_member_handler()
         self._room_creation_handler = hs.get_room_creation_handler()
         self._replication = hs.get_replication_data_handler()
-        self._third_party_rules = hs.get_third_party_event_rules()
+        self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
         self.event_creation_handler = hs.get_event_creation_handler()
         self.store = hs.get_datastores().main
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed805d6ec8..af0ca5c26d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -16,7 +16,7 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse import types
 from synapse.api.constants import (
@@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.logging import opentracing
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.types import (
     JsonDict,
@@ -97,7 +100,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         self.clock = hs.get_clock()
         self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
-        self.third_party_event_rules = hs.get_third_party_event_rules()
+        self._third_party_event_rules = (
+            hs.get_module_api_callbacks().third_party_event_rules
+        )
         self._server_notices_mxid = self.config.servernotices.server_notices_mxid
         self._enable_lookup = hs.config.registration.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.server.allow_per_room_profiles
@@ -280,9 +285,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """
         raise NotImplementedError()
 
-    @abc.abstractmethod
     async def forget(self, user: UserID, room_id: str) -> None:
-        raise NotImplementedError()
+        user_id = user.to_string()
+
+        member = await self._storage_controllers.state.get_current_state_event(
+            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership not in [
+            Membership.LEAVE,
+            Membership.BAN,
+        ]:
+            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+
+        # In normal case this call is only required if `membership` is not `None`.
+        # But: After the last member had left the room, the background update
+        # `_background_remove_left_rooms` is deleting rows related to this room from
+        # the table `current_state_events` and `get_current_state_events` is `None`.
+        await self.store.forget(user_id, room_id)
 
     async def ratelimit_multiple_invites(
         self,
@@ -1541,7 +1562,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # can't just rely on the standard ratelimiting of events.
         await self._third_party_invite_limiter.ratelimit(requester)
 
-        can_invite = await self.third_party_event_rules.check_threepid_can_be_invited(
+        can_invite = await self._third_party_event_rules.check_threepid_can_be_invited(
             medium, address, room_id
         )
         if not can_invite:
@@ -2046,25 +2067,141 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         """Implements RoomMemberHandler._user_left_room"""
         user_left_room(self.distributor, target, room_id)
 
-    async def forget(self, user: UserID, room_id: str) -> None:
-        user_id = user.to_string()
 
-        member = await self._storage_controllers.state.get_current_state_event(
-            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
-        )
-        membership = member.membership if member else None
+class RoomForgetterHandler(StateDeltasHandler):
+    """Forgets rooms when they are left, when enabled in the homeserver config.
 
-        if membership is not None and membership not in [
-            Membership.LEAVE,
-            Membership.BAN,
-        ]:
-            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+    For the purposes of this feature, kicks, bans and "leaves" via state resolution
+    weirdness are all considered to be leaves.
 
-        # In normal case this call is only required if `membership` is not `None`.
-        # But: After the last member had left the room, the background update
-        # `_background_remove_left_rooms` is deleting rows related to this room from
-        # the table `current_state_events` and `get_current_state_events` is `None`.
-        await self.store.forget(user_id, room_id)
+    Derived from `StatsHandler` and `UserDirectoryHandler`.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._hs = hs
+        self._store = hs.get_datastores().main
+        self._storage_controllers = hs.get_storage_controllers()
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._room_member_handler = hs.get_room_member_handler()
+
+        # The current position in the current_state_delta stream
+        self.pos: Optional[int] = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.worker.run_background_tasks:
+            self._notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off to pick up outstanding work from before the last restart.
+            self._clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self) -> None:
+        """Called when there may be more deltas to process"""
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+
+        async def process() -> None:
+            try:
+                await self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        run_as_background_process("room_forgetter.notify_new_event", process)
+
+    async def _unsafe_process(self) -> None:
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = await self._store.get_room_forgetter_stream_pos()
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos > room_max_stream_ordering:
+                # apparently, we've processed more events than exist in the database!
+                # this can happen if events are removed with history purge or similar.
+                logger.warning(
+                    "Event stream ordering appears to have gone backwards (%i -> %i): "
+                    "rewinding room forgetter processor",
+                    self.pos,
+                    room_max_stream_ordering,
+                )
+                self.pos = room_max_stream_ordering
+
+        if not self._hs.config.room.forget_on_leave:
+            # Update the processing position, so that if the server admin turns the
+            # feature on at a later date, we don't decide to forget every room that
+            # has ever been left in the past.
+            self.pos = self._store.get_room_max_stream_ordering()
+            await self._store.update_room_forgetter_stream_pos(self.pos)
+            return
+
+        # Loop round handling deltas until we're up to date
+
+        while True:
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            logger.debug(
+                "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
+            )
+            (
+                max_pos,
+                deltas,
+            ) = await self._storage_controllers.state.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
+
+            logger.debug("Handling %d state deltas", len(deltas))
+            await self._handle_deltas(deltas)
+
+            self.pos = max_pos
+
+            # Expose current event processing position to prometheus
+            event_processing_positions.labels("room_forgetter").set(max_pos)
+
+            await self._store.update_room_forgetter_stream_pos(max_pos)
+
+    async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+        """Called with the state deltas to process"""
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            if typ != EventTypes.Member:
+                continue
+
+            if not self._hs.is_mine_id(state_key):
+                continue
+
+            change = await self._get_key_change(
+                prev_event_id,
+                event_id,
+                key_name="membership",
+                public_value=Membership.JOIN,
+            )
+            is_leave = change is MatchChange.now_false
+
+            if is_leave:
+                try:
+                    await self._room_member_handler.forget(
+                        UserID.from_string(state_key), room_id
+                    )
+                except SynapseError as e:
+                    if e.code == 400:
+                        # The user is back in the room.
+                        pass
+                    else:
+                        raise
 
 
 def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 76e36b8a6d..e8ff1ad063 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         await self._notify_change_client(
             user_id=target.to_string(), room_id=room_id, change="left"
         )
-
-    async def forget(self, target: UserID, room_id: str) -> None:
-        raise RuntimeError("Cannot forget rooms on workers.")
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index c28325323c..25fd2eb3a1 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -194,6 +194,7 @@ class SsoHandler:
         self._clock = hs.get_clock()
         self._store = hs.get_datastores().main
         self._server_name = hs.hostname
+        self._is_mine_server_name = hs.is_mine_server_name
         self._registration_handler = hs.get_registration_handler()
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
@@ -224,6 +225,8 @@ class SsoHandler:
 
         self._consent_at_registration = hs.config.consent.user_consent_at_registration
 
+        self._e164_mxids = hs.config.experimental.msc4009_e164_mxids
+
     def register_identity_provider(self, p: SsoIdentityProvider) -> None:
         p_id = p.idp_id
         assert p_id not in self._identity_providers
@@ -710,7 +713,7 @@ class SsoHandler:
         # Since the localpart is provided via a potentially untrusted module,
         # ensure the MXID is valid before registering.
         if not attributes.localpart or contains_invalid_mxid_characters(
-            attributes.localpart
+            attributes.localpart, self._e164_mxids
         ):
             raise MappingException("localpart is invalid: %s" % (attributes.localpart,))
 
@@ -802,7 +805,7 @@ class SsoHandler:
             if profile["avatar_url"] is not None:
                 server_name = profile["avatar_url"].split("/")[-2]
                 media_id = profile["avatar_url"].split("/")[-1]
-                if server_name == self._server_name:
+                if self._is_mine_server_name(server_name):
                     media = await self._media_repo.store.get_local_media(media_id)
                     if media is not None and upload_name == media["upload_name"]:
                         logger.info("skipping saving the user avatar")
@@ -943,7 +946,7 @@ class SsoHandler:
             localpart,
         )
 
-        if contains_invalid_mxid_characters(localpart):
+        if contains_invalid_mxid_characters(localpart, self._e164_mxids):
             raise SynapseError(400, "localpart is invalid: %s" % (localpart,))
         user_id = UserID(localpart, self._server_name).to_string()
         user_infos = await self._store.get_users_by_id_case_insensitive(user_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 64d298408d..c010405be6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -50,7 +50,6 @@ from synapse.logging.opentracing import (
     start_active_span,
     trace,
 )
-from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.roommember import MemberSummary
@@ -261,6 +260,7 @@ class SyncHandler:
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
         self._relations_handler = hs.get_relations_handler()
+        self._push_rules_handler = hs.get_push_rules_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
@@ -428,12 +428,6 @@ class SyncHandler:
             set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
             return sync_result
 
-    async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
-        user_id = user.to_string()
-        rules_raw = await self.store.get_push_rules_for_user(user_id)
-        rules = format_push_rules_for_user(user, rules_raw)
-        return rules
-
     async def ephemeral_by_room(
         self,
         sync_result_builder: "SyncResultBuilder",
@@ -1777,18 +1771,18 @@ class SyncHandler:
 
             if push_rules_changed:
                 global_account_data = dict(global_account_data)
-                global_account_data["m.push_rules"] = await self.push_rules_for_user(
-                    sync_config.user
-                )
+                global_account_data[
+                    AccountDataTypes.PUSH_RULES
+                ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
         else:
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
             global_account_data = dict(all_global_account_data)
-            global_account_data["m.push_rules"] = await self.push_rules_for_user(
-                sync_config.user
-            )
+            global_account_data[
+                AccountDataTypes.PUSH_RULES
+            ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
 
         account_data_for_user = (
             await sync_config.filter_collection.filter_global_account_data(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 39ae44ea95..7aeae5319c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -68,6 +68,7 @@ class FollowerTypingHandler:
         self.server_name = hs.config.server.server_name
         self.clock = hs.get_clock()
         self.is_mine_id = hs.is_mine_id
+        self.is_mine_server_name = hs.is_mine_server_name
 
         self.federation = None
         if hs.should_send_federation():
@@ -153,7 +154,7 @@ class FollowerTypingHandler:
                 member.room_id
             )
             for domain in hosts:
-                if domain != self.server_name:
+                if not self.is_mine_server_name(domain):
                     logger.debug("sending typing update to %s", domain)
                     self.federation.build_and_send_edu(
                         destination=domain,