summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py138
-rw-r--r--synapse/handlers/auth.py44
-rw-r--r--synapse/handlers/device.py8
-rw-r--r--synapse/handlers/federation.py27
-rw-r--r--synapse/handlers/federation_event.py34
-rw-r--r--synapse/handlers/message.py20
-rw-r--r--synapse/handlers/oidc.py4
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/handlers/room_batch.py44
-rw-r--r--synapse/handlers/room_member.py31
-rw-r--r--synapse/handlers/search.py29
-rw-r--r--synapse/handlers/send_email.py2
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/handlers/ui_auth/__init__.py2
15 files changed, 317 insertions, 79 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7833e77e2b..a42c3558e4 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -55,6 +55,9 @@ class ApplicationServicesHandler:
         self.clock = hs.get_clock()
         self.notify_appservices = hs.config.appservice.notify_appservices
         self.event_sources = hs.get_event_sources()
+        self._msc2409_to_device_messages_enabled = (
+            hs.config.experimental.msc2409_to_device_messages_enabled
+        )
 
         self.current_max = 0
         self.is_processing = False
@@ -132,7 +135,9 @@ class ApplicationServicesHandler:
 
                         # Fork off pushes to these services
                         for service in services:
-                            self.scheduler.submit_event_for_as(service, event)
+                            self.scheduler.enqueue_for_appservice(
+                                service, events=[event]
+                            )
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -199,8 +204,9 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
-                value for `stream_key` will cause this function to return early.
+                `stream_key` can be "typing_key", "receipt_key", "presence_key" or
+                "to_device_key". Any other value for `stream_key` will cause this function
+                to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
                 receiving them by setting `push_ephemeral` to true in their registration
@@ -216,8 +222,15 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
-        # Ignore any unsupported streams
-        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+        # Notify appservices of updates in ephemeral event streams.
+        # Only the following streams are currently supported.
+        # FIXME: We should use constants for these values.
+        if stream_key not in (
+            "typing_key",
+            "receipt_key",
+            "presence_key",
+            "to_device_key",
+        ):
             return
 
         # Assert that new_token is an integer (and not a RoomStreamToken).
@@ -233,6 +246,13 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Ignore to-device messages if the feature flag is not enabled
+        if (
+            stream_key == "to_device_key"
+            and not self._msc2409_to_device_messages_enabled
+        ):
+            return
+
         # Check whether there are any appservices which have registered to receive
         # ephemeral events.
         #
@@ -266,7 +286,7 @@ class ApplicationServicesHandler:
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
                 if stream_key == "typing_key":
-                    # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+                    # Note that we don't persist the token (via set_appservice_stream_type_pos)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
                     #
@@ -274,7 +294,7 @@ class ApplicationServicesHandler:
                     # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
                     continue
 
                 # Since we read/update the stream position for this AS/stream
@@ -285,28 +305,37 @@ class ApplicationServicesHandler:
                 ):
                     if stream_key == "receipt_key":
                         events = await self._handle_receipts(service, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
-                        await self.store.set_type_stream_id_for_appservice(
+                        await self.store.set_appservice_stream_type_pos(
                             service, "read_receipt", new_token
                         )
 
                     elif stream_key == "presence_key":
                         events = await self._handle_presence(service, users, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
-                        await self.store.set_type_stream_id_for_appservice(
+                        await self.store.set_appservice_stream_type_pos(
                             service, "presence", new_token
                         )
 
+                    elif stream_key == "to_device_key":
+                        # Retrieve a list of to-device message events, as well as the
+                        # maximum stream token of the messages we were able to retrieve.
+                        to_device_messages = await self._get_to_device_messages(
+                            service, new_token, users
+                        )
+                        self.scheduler.enqueue_for_appservice(
+                            service, to_device_messages=to_device_messages
+                        )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_appservice_stream_type_pos(
+                            service, "to_device", new_token
+                        )
+
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
@@ -440,6 +469,79 @@ class ApplicationServicesHandler:
 
         return events
 
+    async def _get_to_device_messages(
+        self,
+        service: ApplicationService,
+        new_token: int,
+        users: Collection[Union[str, UserID]],
+    ) -> List[JsonDict]:
+        """
+        Given an application service, determine which events it should receive
+        from those between the last-recorded to-device message stream token for this
+        appservice and the given stream token.
+
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: The latest to-device event stream token.
+            users: The users to be notified for the new to-device messages
+                (ie, the recipients of the messages).
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the to-device events
+                that should be sent to the given application service.
+        """
+        # Get the stream token that this application service has processed up until
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "to_device"
+        )
+
+        # Filter out users that this appservice is not interested in
+        users_appservice_is_interested_in: List[str] = []
+        for user in users:
+            # FIXME: We should do this farther up the call stack. We currently repeat
+            #  this operation in _handle_presence.
+            if isinstance(user, UserID):
+                user = user.to_string()
+
+            if service.is_interested_in_user(user):
+                users_appservice_is_interested_in.append(user)
+
+        if not users_appservice_is_interested_in:
+            # Return early if the AS was not interested in any of these users
+            return []
+
+        # Retrieve the to-device messages for each user
+        recipient_device_to_messages = await self.store.get_messages_for_user_devices(
+            users_appservice_is_interested_in,
+            from_key,
+            new_token,
+        )
+
+        # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+        # to the event JSON so that the application service will know which user/device
+        # combination this messages was intended for.
+        #
+        # So we mangle this dict into a flat list of to-device messages with the relevant
+        # user ID and device ID embedded inside each message dict.
+        message_payload: List[JsonDict] = []
+        for (
+            user_id,
+            device_id,
+        ), messages in recipient_device_to_messages.items():
+            for message_json in messages:
+                # Remove 'message_id' from the to-device message, as it's an internal ID
+                message_json.pop("message_id", None)
+
+                message_payload.append(
+                    {
+                        "to_user_id": user_id,
+                        "to_device_id": device_id,
+                        **message_json,
+                    }
+                )
+
+        return message_payload
+
     async def query_user_exists(self, user_id: str) -> bool:
         """Check if any application service knows this user_id exists.
 
@@ -547,7 +649,7 @@ class ApplicationServicesHandler:
         """Retrieve a list of application services interested in this event.
 
         Args:
-            event: The event to check. Can be None if alias_list is not.
+            event: The event to check.
         Returns:
             A list of services interested in this event based on the service regex.
         """
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e32c93e234..6959d1aa7e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -2064,6 +2064,7 @@ GET_USERNAME_FOR_REGISTRATION_CALLBACK = Callable[
     [JsonDict, JsonDict],
     Awaitable[Optional[str]],
 ]
+IS_3PID_ALLOWED_CALLBACK = Callable[[str, str, bool], Awaitable[bool]]
 
 
 class PasswordAuthProvider:
@@ -2079,6 +2080,7 @@ class PasswordAuthProvider:
         self.get_username_for_registration_callbacks: List[
             GET_USERNAME_FOR_REGISTRATION_CALLBACK
         ] = []
+        self.is_3pid_allowed_callbacks: List[IS_3PID_ALLOWED_CALLBACK] = []
 
         # Mapping from login type to login parameters
         self._supported_login_types: Dict[str, Iterable[str]] = {}
@@ -2090,6 +2092,7 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
+        is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None,
         auth_checkers: Optional[
             Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
         ] = None,
@@ -2145,6 +2148,9 @@ class PasswordAuthProvider:
                 get_username_for_registration,
             )
 
+        if is_3pid_allowed is not None:
+            self.is_3pid_allowed_callbacks.append(is_3pid_allowed)
+
     def get_supported_login_types(self) -> Mapping[str, Iterable[str]]:
         """Get the login types supported by this password provider
 
@@ -2343,3 +2349,41 @@ class PasswordAuthProvider:
                 raise SynapseError(code=500, msg="Internal Server Error")
 
         return None
+
+    async def is_3pid_allowed(
+        self,
+        medium: str,
+        address: str,
+        registration: bool,
+    ) -> bool:
+        """Check if the user can be allowed to bind a 3PID on this homeserver.
+
+        Args:
+            medium: The medium of the 3PID.
+            address: The address of the 3PID.
+            registration: Whether the 3PID is being bound when registering a new user.
+
+        Returns:
+            Whether the 3PID is allowed to be bound on this homeserver
+        """
+        for callback in self.is_3pid_allowed_callbacks:
+            try:
+                res = await callback(medium, address, registration)
+
+                if res is False:
+                    return res
+                elif not isinstance(res, bool):
+                    # mypy complains that this line is unreachable because it assumes the
+                    # data returned by the module fits the expected type. We just want
+                    # to make sure this is the case.
+                    logger.warning(  # type: ignore[unreachable]
+                        "Ignoring non-string value returned by"
+                        " is_3pid_allowed callback %s: %s",
+                        callback,
+                        res,
+                    )
+            except Exception as e:
+                logger.error("Module raised an exception in is_3pid_allowed: %s", e)
+                raise SynapseError(code=500, msg="Internal Server Error")
+
+        return True
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b184a48cb1..36c05f8363 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -495,13 +495,11 @@ class DeviceHandler(DeviceWorkerHandler):
                 "Notifying about update %r/%r, ID: %r", user_id, device_id, position
             )
 
-        room_ids = await self.store.get_rooms_for_user(user_id)
-
         # specify the user ID too since the user should always get their own device list
         # updates, even if they aren't in any rooms.
-        self.notifier.on_new_event(
-            "device_list_key", position, users=[user_id], rooms=room_ids
-        )
+        users_to_notify = users_who_share_room.union({user_id})
+
+        self.notifier.on_new_event("device_list_key", position, users=users_to_notify)
 
         if hosts:
             logger.info(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a37ae0ca09..c0f642005f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -166,9 +166,14 @@ class FederationHandler:
         oldest_events_with_depth = (
             await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
         )
-        insertion_events_to_be_backfilled = (
-            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
-        )
+
+        insertion_events_to_be_backfilled: Dict[str, int] = {}
+        if self.hs.config.experimental.msc2716_enabled:
+            insertion_events_to_be_backfilled = (
+                await self.store.get_insertion_event_backward_extremities_in_room(
+                    room_id
+                )
+            )
         logger.debug(
             "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
             oldest_events_with_depth,
@@ -271,11 +276,12 @@ class FederationHandler:
         ]
 
         logger.debug(
-            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
+            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
             room_id,
             current_depth,
             limit,
             max_depth,
+            len(sorted_extremeties_tuple),
             sorted_extremeties_tuple,
             filtered_sorted_extremeties_tuple,
         )
@@ -1047,6 +1053,19 @@ class FederationHandler:
         limit = min(limit, 100)
 
         events = await self.store.get_backfill_events(room_id, pdu_list, limit)
+        logger.debug(
+            "on_backfill_request: backfill events=%s",
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         events = await filter_events_for_server(self.storage, origin, events)
 
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3905f60b3a..9edc7369d6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -508,7 +508,11 @@ class FederationEventHandler:
                     f"room {ev.room_id}, when we were backfilling in {room_id}"
                 )
 
-        await self._process_pulled_events(dest, events, backfilled=True)
+        await self._process_pulled_events(
+            dest,
+            events,
+            backfilled=True,
+        )
 
     async def _get_missing_events_for_pdu(
         self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@@ -626,11 +630,24 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        logger.debug(
+            "processing pulled backfilled=%s events=%s",
+            backfilled,
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         sorted_events = sorted(events, key=lambda x: x.depth)
-
         for ev in sorted_events:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -992,6 +1009,8 @@ class FederationEventHandler:
 
         await self._run_push_actions_and_persist_event(event, context, backfilled)
 
+        await self._handle_marker_event(origin, event)
+
         if backfilled or context.rejected:
             return
 
@@ -1071,8 +1090,6 @@ class FederationEventHandler:
                     event.sender,
                 )
 
-        await self._handle_marker_event(origin, event)
-
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1323,7 +1340,14 @@ class FederationEventHandler:
             return event, context
 
         events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
-        await self.persist_events_and_notify(room_id, tuple(events_to_persist))
+        await self.persist_events_and_notify(
+            room_id,
+            tuple(events_to_persist),
+            # Mark these events backfilled as they're historic events that will
+            # eventually be backfilled. For example, missing events we fetch
+            # during backfill should be marked as backfilled as well.
+            backfilled=True,
+        )
 
     async def _check_event_auth(
         self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b37250aa38..9267e586a8 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -490,12 +490,12 @@ class EventCreationHandler:
         requester: Requester,
         event_dict: dict,
         txn_id: Optional[str] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
-        allow_no_prev_events: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -510,6 +510,10 @@ class EventCreationHandler:
             requester
             event_dict: An entire event
             txn_id
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -604,10 +608,10 @@ class EventCreationHandler:
         event, context = await self.create_new_client_event(
             builder=builder,
             requester=requester,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             depth=depth,
-            allow_no_prev_events=allow_no_prev_events,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -764,6 +768,7 @@ class EventCreationHandler:
         self,
         requester: Requester,
         event_dict: dict,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         ratelimit: bool = True,
@@ -781,6 +786,10 @@ class EventCreationHandler:
         Args:
             requester: The requester sending the event.
             event_dict: An entire event.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 The event IDs to use as the prev events.
                 Should normally be left as None to automatically request them
@@ -880,16 +889,20 @@ class EventCreationHandler:
         self,
         builder: EventBuilder,
         requester: Optional[Requester] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
-        allow_no_prev_events: bool = False,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
         Args:
             builder:
             requester:
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -908,7 +921,6 @@ class EventCreationHandler:
         Returns:
             Tuple of created event, context
         """
-
         # Strip down the auth_event_ids to only what we need to auth the event.
         # For example, we don't need extra m.room.member that don't match event.sender
         full_state_ids_at_event = None
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index deb3539751..8f71d975e9 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -544,9 +544,9 @@ class OidcProvider:
         """
         metadata = await self.load_metadata()
         token_endpoint = metadata.get("token_endpoint")
-        raw_headers = {
+        raw_headers: Dict[str, str] = {
             "Content-Type": "application/x-www-form-urlencoded",
-            "User-Agent": self._http_client.user_agent,
+            "User-Agent": self._http_client.user_agent.decode("ascii"),
             "Accept": "application/json",
         }
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 1420d67729..a990727fc5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -694,11 +694,6 @@ class RoomCreationHandler:
 
         if not is_requester_admin and not (
             await self.spam_checker.user_may_create_room(user_id)
-            and await self.spam_checker.user_may_create_room_with_invites(
-                user_id,
-                invite_list,
-                invite_3pid_list,
-            )
         ):
             raise SynapseError(
                 403, "You are not permitted to create rooms", Codes.FORBIDDEN
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index f880aa93d2..f8137ec04c 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -13,10 +13,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def generate_fake_event_id() -> str:
-    return "$fake_" + random_string(43)
-
-
 class RoomBatchHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
@@ -182,11 +178,12 @@ class RoomBatchHandler:
         state_event_ids_at_start = []
         auth_event_ids = initial_auth_event_ids.copy()
 
-        # Make the state events float off on their own so we don't have a
-        # bunch of `@mxid joined the room` noise between each batch
-        prev_event_id_for_state_chain = generate_fake_event_id()
+        # Make the state events float off on their own by specifying no
+        # prev_events for the first one in the chain so we don't have a bunch of
+        # `@mxid joined the room` noise between each batch.
+        prev_event_ids_for_state_chain: List[str] = []
 
-        for state_event in state_events_at_start:
+        for index, state_event in enumerate(state_events_at_start):
             assert_params_in_dict(
                 state_event, ["type", "origin_server_ts", "content", "sender"]
             )
@@ -222,7 +219,10 @@ class RoomBatchHandler:
                     content=event_dict["content"],
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -242,7 +242,10 @@ class RoomBatchHandler:
                     event_dict,
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -253,7 +256,7 @@ class RoomBatchHandler:
             state_event_ids_at_start.append(event_id)
             auth_event_ids.append(event_id)
             # Connect all the state in a floating chain
-            prev_event_id_for_state_chain = event_id
+            prev_event_ids_for_state_chain = [event_id]
 
         return state_event_ids_at_start
 
@@ -261,7 +264,6 @@ class RoomBatchHandler:
         self,
         events_to_create: List[JsonDict],
         room_id: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -277,9 +279,6 @@ class RoomBatchHandler:
             events_to_create: List of historical events to create in JSON
                 dictionary format.
             room_id: Room where you want the events persisted in.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -291,11 +290,14 @@ class RoomBatchHandler:
         """
         assert app_service_requester.app_service
 
-        prev_event_ids = initial_prev_event_ids.copy()
+        # Make the historical event chain float off on its own by specifying no
+        # prev_events for the first event in the chain which causes the HS to
+        # ask for the state at the start of the batch later.
+        prev_event_ids: List[str] = []
 
         event_ids = []
         events_to_persist = []
-        for ev in events_to_create:
+        for index, ev in enumerate(events_to_create):
             assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
 
             assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
@@ -319,6 +321,9 @@ class RoomBatchHandler:
                     ev["sender"], app_service_requester.app_service
                 ),
                 event_dict,
+                # Only the first event in the chain should be floating.
+                # The rest should hang off each other in a chain.
+                allow_no_prev_events=index == 0,
                 prev_event_ids=event_dict.get("prev_events"),
                 auth_event_ids=auth_event_ids,
                 historical=True,
@@ -370,7 +375,6 @@ class RoomBatchHandler:
         events_to_create: List[JsonDict],
         room_id: str,
         batch_id_to_connect_to: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -385,9 +389,6 @@ class RoomBatchHandler:
             room_id: Room where you want the events created in.
             batch_id_to_connect_to: The batch_id from the insertion event you
                 want this batch to connect to.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -436,7 +437,6 @@ class RoomBatchHandler:
         event_ids = await self.persist_historical_events(
             events_to_create=events_to_create,
             room_id=room_id,
-            initial_prev_event_ids=initial_prev_event_ids,
             inherited_depth=inherited_depth,
             auth_event_ids=auth_event_ids,
             app_service_requester=app_service_requester,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 3dd5e1b6e4..bf1a47efb0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -116,6 +116,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count,
         )
 
+        self._third_party_invite_limiter = Ratelimiter(
+            store=self.store,
+            clock=self.clock,
+            rate_hz=hs.config.ratelimiting.rc_third_party_invite.per_second,
+            burst_count=hs.config.ratelimiting.rc_third_party_invite.burst_count,
+        )
+
         self.request_ratelimiter = hs.get_request_ratelimiter()
 
     @abc.abstractmethod
@@ -261,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         target: UserID,
         room_id: str,
         membership: str,
-        prev_event_ids: List[str],
+        allow_no_prev_events: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         txn_id: Optional[str] = None,
         ratelimit: bool = True,
@@ -279,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             target:
             room_id:
             membership:
-            prev_event_ids: The event IDs to use as the prev events
 
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
+            prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
@@ -337,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 "membership": membership,
             },
             txn_id=txn_id,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             require_consent=require_consent,
@@ -439,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -463,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -497,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 require_consent=require_consent,
                 outlier=outlier,
                 historical=historical,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
             )
@@ -518,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -544,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -673,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 membership=effective_membership_state,
                 txn_id=txn_id,
                 ratelimit=ratelimit,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
                 content=content,
@@ -1295,7 +1320,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         # We need to rate limit *before* we send out any 3PID invites, so we
         # can't just rely on the standard ratelimiting of events.
-        await self.request_ratelimiter.ratelimit(requester)
+        await self._third_party_invite_limiter.ratelimit(requester)
 
         can_invite = await self.third_party_event_rules.check_threepid_can_be_invited(
             medium, address, room_id
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 02bb5ae72f..41cb809078 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -43,6 +43,8 @@ class SearchHandler:
         self.state_store = self.storage.state
         self.auth = hs.get_auth()
 
+        self._msc3666_enabled = hs.config.experimental.msc3666_enabled
+
     async def get_old_rooms_from_upgraded_room(self, room_id: str) -> Iterable[str]:
         """Retrieves room IDs of old rooms in the history of an upgraded room.
 
@@ -238,8 +240,6 @@ class SearchHandler:
 
             results = search_result["results"]
 
-            results_map = {r["event"].event_id: r for r in results}
-
             rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
             filtered_events = await search_filter.filter([r["event"] for r in results])
@@ -420,12 +420,29 @@ class SearchHandler:
 
         time_now = self.clock.time_msec()
 
+        aggregations = None
+        if self._msc3666_enabled:
+            aggregations = await self.store.get_bundled_aggregations(
+                # Generate an iterable of EventBase for all the events that will be
+                # returned, including contextual events.
+                itertools.chain(
+                    # The events_before and events_after for each context.
+                    itertools.chain.from_iterable(
+                        itertools.chain(context["events_before"], context["events_after"])  # type: ignore[arg-type]
+                        for context in contexts.values()
+                    ),
+                    # The returned events.
+                    allowed_events,
+                ),
+                user.to_string(),
+            )
+
         for context in contexts.values():
             context["events_before"] = self._event_serializer.serialize_events(
-                context["events_before"], time_now  # type: ignore[arg-type]
+                context["events_before"], time_now, bundle_aggregations=aggregations  # type: ignore[arg-type]
             )
             context["events_after"] = self._event_serializer.serialize_events(
-                context["events_after"], time_now  # type: ignore[arg-type]
+                context["events_after"], time_now, bundle_aggregations=aggregations  # type: ignore[arg-type]
             )
 
         state_results = {}
@@ -442,7 +459,9 @@ class SearchHandler:
             results.append(
                 {
                     "rank": rank_map[e.event_id],
-                    "result": self._event_serializer.serialize_event(e, time_now),
+                    "result": self._event_serializer.serialize_event(
+                        e, time_now, bundle_aggregations=aggregations
+                    ),
                     "context": contexts.get(e.event_id, {}),
                 }
             )
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index 1a062a784c..a305a66860 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -106,7 +106,7 @@ async def _sendmail(
         factory = build_sender_factory(hostname=smtphost if enable_tls else None)
 
     reactor.connectTCP(
-        smtphost,  # type: ignore[arg-type]
+        smtphost,
         smtpport,
         factory,
         timeout=30,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c72ed7c290..aa9a76f8a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1348,8 +1348,8 @@ class SyncHandler:
         if sync_result_builder.since_token is not None:
             since_stream_id = int(sync_result_builder.since_token.to_device_key)
 
-        if since_stream_id != int(now_token.to_device_key):
-            messages, stream_id = await self.store.get_new_messages_for_device(
+        if device_id is not None and since_stream_id != int(now_token.to_device_key):
+            messages, stream_id = await self.store.get_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index e43c22832d..e4bed1c937 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -446,7 +446,7 @@ class TypingWriterHandler(FollowerTypingHandler):
 
 class TypingNotificationEventSource(EventSource[int, JsonDict]):
     def __init__(self, hs: "HomeServer"):
-        self.hs = hs
+        self._main_store = hs.get_datastore()
         self.clock = hs.get_clock()
         # We can't call get_typing_handler here because there's a cycle:
         #
@@ -487,7 +487,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
                     continue
 
                 if not await service.matches_user_in_member_list(
-                    room_id, handler.store
+                    room_id, self._main_store
                 ):
                     continue
 
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index 13b0c61d2e..56eee4057f 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -38,4 +38,4 @@ class UIAuthSessionDataConstants:
     # used during registration to store the registration token used (if required) so that:
     # - we can prevent a token being used twice by one session
     # - we can 'use up' the token after registration has successfully completed
-    REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token"
+    REGISTRATION_TOKEN = "m.login.registration_token"