summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-12-06 16:26:14 +0000
committerBrendan Abolivier <babolivier@matrix.org>2021-12-06 16:26:14 +0000
commitbbf27eb0455101c1319edb9ab4bbe6c4731ea14e (patch)
treed13e9e9eee3c290b7e722c0db1dce7e01edf4c73 /synapse/handlers
parentMerge tag 'v1.46.0' into babolivier/dinsic_1.48.0 (diff)
parent1.47.0 (diff)
downloadsynapse-bbf27eb0455101c1319edb9ab4bbe6c4731ea14e.tar.xz
Merge tag 'v1.47.0' into babolivier/dinsic_1.48.0
Synapse 1.47.0 (2021-11-17)
===========================

No significant changes since 1.47.0rc3.

Synapse 1.47.0rc3 (2021-11-16)
==============================

Bugfixes
--------

- Fix a bug introduced in 1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations. ([\#11346](https://github.com/matrix-org/synapse/issues/11346))
- Fix a bug introduced in 1.47.0rc1 which prevented the 'remove deleted devices from `device_inbox` column' background process from running when updating from a recent Synapse version. ([\#11303](https://github.com/matrix-org/synapse/issues/11303), [\#11353](https://github.com/matrix-org/synapse/issues/11353))

Synapse 1.47.0rc2 (2021-11-10)
==============================

This fixes an issue with publishing the Debian packages for 1.47.0rc1.
It is otherwise identical to 1.47.0rc1.

Synapse 1.47.0rc1 (2021-11-09)
==============================

Deprecations and Removals
-------------------------

- The `user_may_create_room_with_invites` module callback is now deprecated. Please refer to the [upgrade notes](https://matrix-org.github.io/synapse/develop/upgrade#upgrading-to-v1470) for more information. ([\#11206](https://github.com/matrix-org/synapse/issues/11206))
- Remove deprecated admin API to delete rooms (`POST /_synapse/admin/v1/rooms/<room_id>/delete`). ([\#11213](https://github.com/matrix-org/synapse/issues/11213))

Features
--------

- Advertise support for Client-Server API r0.6.1. ([\#11097](https://github.com/matrix-org/synapse/issues/11097))
- Add search by room ID and room alias to the List Room admin API. ([\#11099](https://github.com/matrix-org/synapse/issues/11099))
- Add an `on_new_event` third-party rules callback to allow Synapse modules to act after an event has been sent into a room. ([\#11126](https://github.com/matrix-org/synapse/issues/11126))
- Add a module API method to update a user's membership in a room. ([\#11147](https://github.com/matrix-org/synapse/issues/11147))
- Add metrics for thread pool usage. ([\#11178](https://github.com/matrix-org/synapse/issues/11178))
- Support the stable room type field for [MSC3288](https://github.com/matrix-org/matrix-doc/pull/3288). ([\#11187](https://github.com/matrix-org/synapse/issues/11187))
- Add a module API method to retrieve the current state of a room. ([\#11204](https://github.com/matrix-org/synapse/issues/11204))
- Calculate a default value for `public_baseurl` based on `server_name`. ([\#11210](https://github.com/matrix-org/synapse/issues/11210))
- Add support for serving `/.well-known/matrix/server` files, to redirect federation traffic to port 443. ([\#11211](https://github.com/matrix-org/synapse/issues/11211))
- Add admin APIs to pause, start and check the status of background updates. ([\#11263](https://github.com/matrix-org/synapse/issues/11263))

Bugfixes
--------

- Fix a long-standing bug which allowed hidden devices to receive to-device messages, resulting in unnecessary database bloat. ([\#10097](https://github.com/matrix-org/synapse/issues/10097))
- Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine. ([\#10969](https://github.com/matrix-org/synapse/issues/10969), [\#11212](https://github.com/matrix-org/synapse/issues/11212))
- Do not accept events if a third-party rule `check_event_allowed` callback raises an exception. ([\#11033](https://github.com/matrix-org/synapse/issues/11033))
- Fix long-standing bug where verification requests could fail in certain cases if a federation whitelist was in place but did not include your own homeserver. ([\#11129](https://github.com/matrix-org/synapse/issues/11129))
- Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`. ([\#11188](https://github.com/matrix-org/synapse/issues/11188))
- Fix a bug introduced in Synapse 1.45.0 which prevented the `synapse_review_recent_signups` script from running. Contributed by @samuel-p. ([\#11191](https://github.com/matrix-org/synapse/issues/11191))
- Delete `to_device` messages for hidden devices that will never be read, reducing database size. ([\#11199](https://github.com/matrix-org/synapse/issues/11199))
- Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error. ([\#11200](https://github.com/matrix-org/synapse/issues/11200))
- Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. ([\#11207](https://github.com/matrix-org/synapse/issues/11207))
- Fix a bug introduced in Synapse 1.35.0 which made it impossible to join rooms that return a `send_join` response containing floats. ([\#11217](https://github.com/matrix-org/synapse/issues/11217))
- Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried. ([\#11234](https://github.com/matrix-org/synapse/issues/11234))
- Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. ([\#11240](https://github.com/matrix-org/synapse/issues/11240))
- Fix a bug preventing Synapse from being rolled back to an earlier version when using workers. ([\#11255](https://github.com/matrix-org/synapse/issues/11255), [\#11276](https://github.com/matrix-org/synapse/issues/11276))
- Fix a bug introduced in Synapse 1.37.1 which caused a remote event being processed by a worker to not get processed on restart if the worker was killed. ([\#11262](https://github.com/matrix-org/synapse/issues/11262))
- Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu. ([\#11157](https://github.com/matrix-org/synapse/issues/11157))

Updates to the Docker image
---------------------------

- Avoid changing user ID when started as a non-root user, and no explicit `UID` is set. ([\#11209](https://github.com/matrix-org/synapse/issues/11209))

Improved Documentation
----------------------

- Improve example HAProxy config in the docs to properly handle HTTP `Host` headers with port information. This is required for federation over port 443 to work correctly. ([\#11128](https://github.com/matrix-org/synapse/issues/11128))
- Add documentation for using Authentik as an OpenID Connect Identity Provider. Contributed by @samip5. ([\#11151](https://github.com/matrix-org/synapse/issues/11151))
- Clarify lack of support for Windows. ([\#11198](https://github.com/matrix-org/synapse/issues/11198))
- Improve code formatting and fix a few typos in docs. Contributed by @sumnerevans at Beeper. ([\#11221](https://github.com/matrix-org/synapse/issues/11221))
- Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr. ([\#11257](https://github.com/matrix-org/synapse/issues/11257))

Internal Changes
----------------

- Add type annotations for the `log_function` decorator. ([\#10943](https://github.com/matrix-org/synapse/issues/10943))
- Add type hints to `synapse.events`. ([\#11098](https://github.com/matrix-org/synapse/issues/11098))
- Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code. ([\#11137](https://github.com/matrix-org/synapse/issues/11137))
- Add type hints so that `synapse.http` passes `mypy` checks. ([\#11164](https://github.com/matrix-org/synapse/issues/11164))
- Update scripts to pass Shellcheck lints. ([\#11166](https://github.com/matrix-org/synapse/issues/11166))
- Add knock information in admin export. Contributed by Rafael Gonçalves. ([\#11171](https://github.com/matrix-org/synapse/issues/11171))
- Add tests to check that `ClientIpStore.get_last_client_ip_by_device` and `get_user_ip_and_agents` combine database and in-memory data correctly. ([\#11179](https://github.com/matrix-org/synapse/issues/11179))
- Refactor `Filter` to check different fields depending on the data type. ([\#11194](https://github.com/matrix-org/synapse/issues/11194))
- Improve type hints for the relations datastore. ([\#11205](https://github.com/matrix-org/synapse/issues/11205))
- Replace outdated links in the pull request checklist with links to the rendered documentation. ([\#11225](https://github.com/matrix-org/synapse/issues/11225))
- Fix a bug in unit test `test_block_room_and_not_purge`. ([\#11226](https://github.com/matrix-org/synapse/issues/11226))
- In `ObservableDeferred`, run observers in the order they were registered. ([\#11229](https://github.com/matrix-org/synapse/issues/11229))
- Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`. ([\#11231](https://github.com/matrix-org/synapse/issues/11231))
- Add `twine` and `towncrier` as dev dependencies, as they're used by the release script. ([\#11233](https://github.com/matrix-org/synapse/issues/11233))
- Allow `stream_writers.typing` config to be a list of one worker. ([\#11237](https://github.com/matrix-org/synapse/issues/11237))
- Remove debugging statement in tests. ([\#11239](https://github.com/matrix-org/synapse/issues/11239))
- Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. ([\#11244](https://github.com/matrix-org/synapse/issues/11244))
- Add an additional test for the `cachedList` method decorator. ([\#11246](https://github.com/matrix-org/synapse/issues/11246))
- Make minor correction to the type of `auth_checkers` callbacks. ([\#11253](https://github.com/matrix-org/synapse/issues/11253))
- Clean up trivial aspects of the Debian package build tooling. ([\#11269](https://github.com/matrix-org/synapse/issues/11269), [\#11273](https://github.com/matrix-org/synapse/issues/11273))
- Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse. ([\#11270](https://github.com/matrix-org/synapse/issues/11270))
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py22
-rw-r--r--synapse/handlers/appservice.py91
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py211
-rw-r--r--synapse/handlers/federation_event.py6
-rw-r--r--synapse/handlers/identity.py6
-rw-r--r--synapse/handlers/message.py23
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_batch.py2
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py12
-rw-r--r--synapse/handlers/typing.py6
16 files changed, 262 insertions, 139 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index a53cd62d3c..be3203ac80 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -90,6 +90,7 @@ class AdminHandler:
                 Membership.LEAVE,
                 Membership.BAN,
                 Membership.INVITE,
+                Membership.KNOCK,
             ),
         )
 
@@ -122,6 +123,13 @@ class AdminHandler:
                         invited_state = invite.unsigned["invite_room_state"]
                         writer.write_invite(room_id, invite, invited_state)
 
+                if room.membership == Membership.KNOCK:
+                    event_id = room.event_id
+                    knock = await self.store.get_event(event_id, allow_none=True)
+                    if knock:
+                        knock_state = knock.unsigned["knock_room_state"]
+                        writer.write_knock(room_id, knock, knock_state)
+
                 continue
 
             # We only want to bother fetching events up to the last time they
@@ -239,6 +247,20 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
+    def write_knock(
+        self, room_id: str, event: EventBase, state: StateMap[dict]
+    ) -> None:
+        """Write a knock for the room, with associated knock state.
+
+        Args:
+            room_id: The room ID the knock is for.
+            event: The knock event.
+            state: A subset of the state at the knock, with a subset of the
+                event keys (type, state_key content and sender).
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
     def finished(self) -> Any:
         """Called when all data has successfully been exported and written.
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206dae6..ddc9105ee9 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
 from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
+        self._ephemeral_events_linearizer = Linearizer(
+            name="appservice_ephemeral_events"
+        )
+
     def notify_interested_services(self, max_token: RoomStreamToken) -> None:
         """Notifies (pushes) all application services interested in this event.
 
@@ -182,7 +187,7 @@ class ApplicationServicesHandler:
     def notify_interested_services_ephemeral(
         self,
         stream_key: str,
-        new_token: Optional[int],
+        new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
         """
@@ -203,7 +208,7 @@ class ApplicationServicesHandler:
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
 
-            new_token: The latest stream token.
+            new_token: The stream token of the event.
             users: The users that should be informed of the new event, if any.
         """
         if not self.notify_appservices:
@@ -212,6 +217,19 @@ class ApplicationServicesHandler:
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
+        # Assert that new_token is an integer (and not a RoomStreamToken).
+        # All of the supported streams that this function handles use an
+        # integer to track progress (rather than a RoomStreamToken - a
+        # vector clock implementation) as they don't support multiple
+        # stream writers.
+        #
+        # As a result, we simply assert that new_token is an integer.
+        # If we do end up needing to pass a RoomStreamToken down here
+        # in the future, using RoomStreamToken.stream (the minimum stream
+        # position) to convert to an ascending integer value should work.
+        # Additional context: https://github.com/matrix-org/synapse/pull/11137
+        assert isinstance(new_token, int)
+
         services = [
             service
             for service in self.store.get_app_services()
@@ -231,14 +249,13 @@ class ApplicationServicesHandler:
         self,
         services: List[ApplicationService],
         stream_key: str,
-        new_token: Optional[int],
+        new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
         logger.debug("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                # Only handle typing if we have the latest token
-                if stream_key == "typing_key" and new_token is not None:
+                if stream_key == "typing_key":
                     # Note that we don't persist the token (via set_type_stream_id_for_appservice)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
@@ -248,26 +265,37 @@ class ApplicationServicesHandler:
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    continue
 
-                elif stream_key == "receipt_key":
-                    events = await self._handle_receipts(service)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
-
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "read_receipt", new_token
+                # Since we read/update the stream position for this AS/stream
+                with (
+                    await self._ephemeral_events_linearizer.queue(
+                        (service.id, stream_key)
                     )
+                ):
+                    if stream_key == "receipt_key":
+                        events = await self._handle_receipts(service, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
 
-                elif stream_key == "presence_key":
-                    events = await self._handle_presence(service, users)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    elif stream_key == "presence_key":
+                        events = await self._handle_presence(service, users, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
 
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "presence", new_token
-                    )
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
 
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
@@ -304,7 +332,9 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+    async def _handle_receipts(
+        self, service: ApplicationService, new_token: Optional[int]
+    ) -> List[JsonDict]:
         """
         Return the latest read receipts that the given application service should receive.
 
@@ -323,6 +353,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
             service=service, from_key=from_key
@@ -330,7 +366,10 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[Union[str, UserID]]
+        self,
+        service: ApplicationService,
+        users: Collection[Union[str, UserID]],
+        new_token: Optional[int],
     ) -> List[JsonDict]:
         """
         Return the latest presence updates that the given application service should receive.
@@ -353,6 +392,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "presence"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         for user in users:
             if isinstance(user, str):
                 user = UserID.from_string(user)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d508d7d32a..60e59d11a0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
-        auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
+        auth_checkers: Optional[
+            Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
+        ] = None,
     ) -> None:
         # Register check_3pid_auth callback
         if check_3pid_auth is not None:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8567cb0e00..8ca5f60b1c 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -245,7 +245,7 @@ class DirectoryHandler:
                 servers = result.servers
         else:
             try:
-                fed_result = await self.federation.make_query(
+                fed_result: Optional[JsonDict] = await self.federation.make_query(
                     destination=room_alias.domain,
                     query_type="directory",
                     args={"room_alias": room_alias.to_string()},
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d0fb2fc7dc..60c11e3d21 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -201,95 +201,19 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
-            @trace
-            async def do_remote_query(destination: str) -> None:
-                """This is called when we are querying the device list of a user on
-                a remote homeserver and their device list is not in the device list
-                cache. If we share a room with this user and we're not querying for
-                specific user we will update the cache with their device list.
-                """
-
-                destination_query = remote_queries_not_in_cache[destination]
-
-                # We first consider whether we wish to update the device list cache with
-                # the users device list. We want to track a user's devices when the
-                # authenticated user shares a room with the queried user and the query
-                # has not specified a particular device.
-                # If we update the cache for the queried user we remove them from further
-                # queries. We use the more efficient batched query_client_keys for all
-                # remaining users
-                user_ids_updated = []
-                for (user_id, device_list) in destination_query.items():
-                    if user_id in user_ids_updated:
-                        continue
-
-                    if device_list:
-                        continue
-
-                    room_ids = await self.store.get_rooms_for_user(user_id)
-                    if not room_ids:
-                        continue
-
-                    # We've decided we're sharing a room with this user and should
-                    # probably be tracking their device lists. However, we haven't
-                    # done an initial sync on the device list so we do it now.
-                    try:
-                        if self._is_master:
-                            user_devices = await self.device_handler.device_list_updater.user_device_resync(
-                                user_id
-                            )
-                        else:
-                            user_devices = await self._user_device_resync_client(
-                                user_id=user_id
-                            )
-
-                        user_devices = user_devices["devices"]
-                        user_results = results.setdefault(user_id, {})
-                        for device in user_devices:
-                            user_results[device["device_id"]] = device["keys"]
-                        user_ids_updated.append(user_id)
-                    except Exception as e:
-                        failures[destination] = _exception_to_failure(e)
-
-                if len(destination_query) == len(user_ids_updated):
-                    # We've updated all the users in the query and we do not need to
-                    # make any further remote calls.
-                    return
-
-                # Remove all the users from the query which we have updated
-                for user_id in user_ids_updated:
-                    destination_query.pop(user_id)
-
-                try:
-                    remote_result = await self.federation.query_client_keys(
-                        destination, {"device_keys": destination_query}, timeout=timeout
-                    )
-
-                    for user_id, keys in remote_result["device_keys"].items():
-                        if user_id in destination_query:
-                            results[user_id] = keys
-
-                    if "master_keys" in remote_result:
-                        for user_id, key in remote_result["master_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["master_keys"][user_id] = key
-
-                    if "self_signing_keys" in remote_result:
-                        for user_id, key in remote_result["self_signing_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["self_signing_keys"][user_id] = key
-
-                except Exception as e:
-                    failure = _exception_to_failure(e)
-                    failures[destination] = failure
-                    set_tag("error", True)
-                    set_tag("reason", failure)
-
             await make_deferred_yieldable(
                 defer.gatherResults(
                     [
-                        run_in_background(do_remote_query, destination)
-                        for destination in remote_queries_not_in_cache
+                        run_in_background(
+                            self._query_devices_for_destination,
+                            results,
+                            cross_signing_keys,
+                            failures,
+                            destination,
+                            queries,
+                            timeout,
+                        )
+                        for destination, queries in remote_queries_not_in_cache.items()
                     ],
                     consumeErrors=True,
                 ).addErrback(unwrapFirstError)
@@ -301,6 +225,121 @@ class E2eKeysHandler:
 
             return ret
 
+    @trace
+    async def _query_devices_for_destination(
+        self,
+        results: JsonDict,
+        cross_signing_keys: JsonDict,
+        failures: Dict[str, JsonDict],
+        destination: str,
+        destination_query: Dict[str, Iterable[str]],
+        timeout: int,
+    ) -> None:
+        """This is called when we are querying the device list of a user on
+        a remote homeserver and their device list is not in the device list
+        cache. If we share a room with this user and we're not querying for
+        specific user we will update the cache with their device list.
+
+        Args:
+            results: A map from user ID to their device keys, which gets
+                updated with the newly fetched keys.
+            cross_signing_keys: Map from user ID to their cross signing keys,
+                which gets updated with the newly fetched keys.
+            failures: Map of destinations to failures that have occurred while
+                attempting to fetch keys.
+            destination: The remote server to query
+            destination_query: The query dict of devices to query the remote
+                server for.
+            timeout: The timeout for remote HTTP requests.
+        """
+
+        # We first consider whether we wish to update the device list cache with
+        # the users device list. We want to track a user's devices when the
+        # authenticated user shares a room with the queried user and the query
+        # has not specified a particular device.
+        # If we update the cache for the queried user we remove them from further
+        # queries. We use the more efficient batched query_client_keys for all
+        # remaining users
+        user_ids_updated = []
+        for (user_id, device_list) in destination_query.items():
+            if user_id in user_ids_updated:
+                continue
+
+            if device_list:
+                continue
+
+            room_ids = await self.store.get_rooms_for_user(user_id)
+            if not room_ids:
+                continue
+
+            # We've decided we're sharing a room with this user and should
+            # probably be tracking their device lists. However, we haven't
+            # done an initial sync on the device list so we do it now.
+            try:
+                if self._is_master:
+                    resync_results = await self.device_handler.device_list_updater.user_device_resync(
+                        user_id
+                    )
+                else:
+                    resync_results = await self._user_device_resync_client(
+                        user_id=user_id
+                    )
+
+                # Add the device keys to the results.
+                user_devices = resync_results["devices"]
+                user_results = results.setdefault(user_id, {})
+                for device in user_devices:
+                    user_results[device["device_id"]] = device["keys"]
+                user_ids_updated.append(user_id)
+
+                # Add any cross signing keys to the results.
+                master_key = resync_results.get("master_key")
+                self_signing_key = resync_results.get("self_signing_key")
+
+                if master_key:
+                    cross_signing_keys["master_keys"][user_id] = master_key
+
+                if self_signing_key:
+                    cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
+            except Exception as e:
+                failures[destination] = _exception_to_failure(e)
+
+        if len(destination_query) == len(user_ids_updated):
+            # We've updated all the users in the query and we do not need to
+            # make any further remote calls.
+            return
+
+        # Remove all the users from the query which we have updated
+        for user_id in user_ids_updated:
+            destination_query.pop(user_id)
+
+        try:
+            remote_result = await self.federation.query_client_keys(
+                destination, {"device_keys": destination_query}, timeout=timeout
+            )
+
+            for user_id, keys in remote_result["device_keys"].items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+            if "master_keys" in remote_result:
+                for user_id, key in remote_result["master_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["master_keys"][user_id] = key
+
+            if "self_signing_keys" in remote_result:
+                for user_id, key in remote_result["self_signing_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["self_signing_keys"][user_id] = key
+
+        except Exception as e:
+            failure = _exception_to_failure(e)
+            failures[destination] = failure
+            set_tag("error", True)
+            set_tag("reason", failure)
+
+        return
+
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9584d5bd46..1a1cd93b1a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,7 +477,7 @@ class FederationEventHandler:
 
     @log_function
     async def backfill(
-        self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
+        self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> None:
         """Trigger a backfill request to `dest` for the given `room_id`
 
@@ -1643,7 +1643,7 @@ class FederationEventHandler:
             event: the event whose auth_events we want
 
         Returns:
-            all of the events in `event.auth_events`, after deduplication
+            all of the events listed in `event.auth_events_ids`, after deduplication
 
         Raises:
             AuthError if we were unable to fetch the auth_events for any reason.
@@ -1916,7 +1916,7 @@ class FederationEventHandler:
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
-        self._notifier.on_new_room_event(
+        await self._notifier.on_new_room_event(
             event, event_pos, max_stream_token, extra_users=extra_users
         )
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 156db1d2b9..91c8c61c25 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -596,10 +596,6 @@ class IdentityHandler:
         except RequestTimedOutError:
             raise SynapseError(500, "Timed out contacting identity server")
 
-        # It is already checked that public_baseurl is configured since this code
-        # should only be used if account_threepid_delegate_msisdn is true.
-        assert self.hs.config.server.public_baseurl
-
         # we need to tell the client to send the token back to us, since it doesn't
         # otherwise know where to send it, so add submit_url response parameter
         # (see also MSC2078)
@@ -1020,6 +1016,8 @@ class IdentityHandler:
         }
 
         if room_type is not None:
+            invite_config["room_type"] = room_type
+            # TODO The unstable field is deprecated and should be removed in the future.
             invite_config["org.matrix.msc3288.room_type"] = room_type
 
         # If a custom web client location is available, include it in the request.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e024b551f..d4c2a6ab7a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1318,6 +1318,8 @@ class EventCreationHandler:
             # user is actually admin or not).
             is_admin_redaction = False
             if event.type == EventTypes.Redaction:
+                assert event.redacts is not None
+
                 original_event = await self.store.get_event(
                     event.redacts,
                     redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1413,6 +1415,8 @@ class EventCreationHandler:
                 )
 
         if event.type == EventTypes.Redaction:
+            assert event.redacts is not None
+
             original_event = await self.store.get_event(
                 event.redacts,
                 redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1500,11 +1504,13 @@ class EventCreationHandler:
                 next_batch_id = event.content.get(
                     EventContentFields.MSC2716_NEXT_BATCH_ID
                 )
-                conflicting_insertion_event_id = (
-                    await self.store.get_insertion_event_by_batch_id(
-                        event.room_id, next_batch_id
+                conflicting_insertion_event_id = None
+                if next_batch_id:
+                    conflicting_insertion_event_id = (
+                        await self.store.get_insertion_event_id_by_batch_id(
+                            event.room_id, next_batch_id
+                        )
                     )
-                )
                 if conflicting_insertion_event_id is not None:
                     # The current insertion event that we're processing is invalid
                     # because an insertion event already exists in the room with the
@@ -1537,13 +1543,16 @@ class EventCreationHandler:
             # If there's an expiry timestamp on the event, schedule its expiry.
             self._message_handler.maybe_schedule_expiry(event)
 
-        def _notify() -> None:
+        async def _notify() -> None:
             try:
-                self.notifier.on_new_room_event(
+                await self.notifier.on_new_room_event(
                     event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
-                logger.exception("Error notifying about new room event")
+                logger.exception(
+                    "Error notifying about new room event %s",
+                    event.event_id,
+                )
 
         run_in_background(_notify)
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 60ff896386..abfe7be0e3 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -438,7 +438,7 @@ class PaginationHandler:
             }
 
         state = None
-        if event_filter and event_filter.lazy_load_members() and len(events) > 0:
+        if event_filter and event_filter.lazy_load_members and len(events) > 0:
             # TODO: remove redundant members
 
             # FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fdab50da37..3df872c578 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,7 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
+from synapse.appservice import ApplicationService
 from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
@@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
         is_guest: bool = False,
         explicit_room_id: Optional[str] = None,
         include_offline: bool = True,
+        service: Optional[ApplicationService] = None,
     ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 86639b0ce8..aa47354a6a 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -676,7 +676,11 @@ class ProfileHandler:
                 continue
 
             new_name = profile.get("displayname")
+            if not isinstance(new_name, str):
+                new_name = None
             new_avatar = profile.get("avatar_url")
+            if not isinstance(new_avatar, str):
+                new_avatar = None
 
             # We always hit update to update the last_check timestamp
             await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cf01d58ea1..969eb3b9b0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -525,7 +525,7 @@ class RoomCreationHandler:
             ):
                 await self.room_member_handler.update_membership(
                     requester,
-                    UserID.from_string(old_event["state_key"]),
+                    UserID.from_string(old_event.state_key),
                     new_room_id,
                     "ban",
                     ratelimit=False,
@@ -1173,7 +1173,7 @@ class RoomContextHandler:
         else:
             last_event_id = event_id
 
-        if event_filter and event_filter.lazy_load_members():
+        if event_filter and event_filter.lazy_load_members:
             state_filter = StateFilter.from_lazy_load_member_list(
                 ev.sender
                 for ev in itertools.chain(
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 2f5a3e4d19..0723286383 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -355,7 +355,7 @@ class RoomBatchHandler:
         for (event, context) in reversed(events_to_persist):
             await self.event_creation_handler.handle_new_client_event(
                 await self.create_requester_for_user_id_from_app_service(
-                    event["sender"], app_service_requester.app_service
+                    event.sender, app_service_requester.app_service
                 ),
                 event=event,
                 context=context,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8fcab486d8..281630367e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1679,7 +1679,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         #
         # the prev_events consist solely of the previous membership event.
         prev_event_ids = [previous_membership_event.event_id]
-        auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
+        auth_event_ids = (
+            list(previous_membership_event.auth_event_ids()) + prev_event_ids
+        )
 
         event, context = await self.event_creation_handler.create_event(
             requester,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index a3ffa26be8..6e4dff8056 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -249,7 +249,7 @@ class SearchHandler:
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
-            allowed_events = events[: search_filter.limit()]
+            allowed_events = events[: search_filter.limit]
 
             for e in allowed_events:
                 rm = room_groups.setdefault(
@@ -271,13 +271,13 @@ class SearchHandler:
             # We keep looping and we keep filtering until we reach the limit
             # or we run out of things.
             # But only go around 5 times since otherwise synapse will be sad.
-            while len(room_events) < search_filter.limit() and i < 5:
+            while len(room_events) < search_filter.limit and i < 5:
                 i += 1
                 search_result = await self.store.search_rooms(
                     room_ids,
                     search_term,
                     keys,
-                    search_filter.limit() * 2,
+                    search_filter.limit * 2,
                     pagination_token=pagination_token,
                 )
 
@@ -299,9 +299,9 @@ class SearchHandler:
                 )
 
                 room_events.extend(events)
-                room_events = room_events[: search_filter.limit()]
+                room_events = room_events[: search_filter.limit]
 
-                if len(results) < search_filter.limit() * 2:
+                if len(results) < search_filter.limit * 2:
                     pagination_token = None
                     break
                 else:
@@ -311,7 +311,7 @@ class SearchHandler:
                 group = room_groups.setdefault(event.room_id, {"results": []})
                 group["results"].append(event.event_id)
 
-            if room_events and len(room_events) >= search_filter.limit():
+            if room_events and len(room_events) >= search_filter.limit:
                 last_event_id = room_events[-1].event_id
                 pagination_token = results_map[last_event_id]["pagination_token"]
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d69924..22c6174821 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
         if hs.should_send_federation():
             self.federation = hs.get_federation_sender()
 
-        if hs.config.worker.writers.typing != hs.get_instance_name():
-            hs.get_federation_registry().register_instance_for_edu(
+        if hs.get_instance_name() not in hs.config.worker.writers.typing:
+            hs.get_federation_registry().register_instances_for_edu(
                 "m.typing",
                 hs.config.worker.writers.typing,
             )
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        assert hs.config.worker.writers.typing == hs.get_instance_name()
+        assert hs.get_instance_name() in hs.config.worker.writers.typing
 
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()