summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2021-11-17 14:19:27 +0000
committerDavid Robertson <davidr@element.io>2021-11-17 14:19:27 +0000
commit077b74929f8f412395d1156e1b97eb16701059fa (patch)
tree25ecb8e93ec3ba275596bfb31efa45018645d47b /synapse/handlers
parentCorrect target of link to the modules page from the Password Auth Providers p... (diff)
parent1.47.0 (diff)
downloadsynapse-077b74929f8f412395d1156e1b97eb16701059fa.tar.xz
Merge remote-tracking branch 'origin/release-v1.47'
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py22
-rw-r--r--synapse/handlers/appservice.py91
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py211
-rw-r--r--synapse/handlers/federation_event.py6
-rw-r--r--synapse/handlers/identity.py6
-rw-r--r--synapse/handlers/message.py23
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_batch.py2
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py12
-rw-r--r--synapse/handlers/typing.py6
16 files changed, 262 insertions, 139 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index a53cd62d3c..be3203ac80 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -90,6 +90,7 @@ class AdminHandler:
                 Membership.LEAVE,
                 Membership.BAN,
                 Membership.INVITE,
+                Membership.KNOCK,
             ),
         )
 
@@ -122,6 +123,13 @@ class AdminHandler:
                         invited_state = invite.unsigned["invite_room_state"]
                         writer.write_invite(room_id, invite, invited_state)
 
+                if room.membership == Membership.KNOCK:
+                    event_id = room.event_id
+                    knock = await self.store.get_event(event_id, allow_none=True)
+                    if knock:
+                        knock_state = knock.unsigned["knock_room_state"]
+                        writer.write_knock(room_id, knock, knock_state)
+
                 continue
 
             # We only want to bother fetching events up to the last time they
@@ -239,6 +247,20 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
+    def write_knock(
+        self, room_id: str, event: EventBase, state: StateMap[dict]
+    ) -> None:
+        """Write a knock for the room, with associated knock state.
+
+        Args:
+            room_id: The room ID the knock is for.
+            event: The knock event.
+            state: A subset of the state at the knock, with a subset of the
+                event keys (type, state_key content and sender).
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
     def finished(self) -> Any:
         """Called when all data has successfully been exported and written.
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206dae6..ddc9105ee9 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
 from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
+        self._ephemeral_events_linearizer = Linearizer(
+            name="appservice_ephemeral_events"
+        )
+
     def notify_interested_services(self, max_token: RoomStreamToken) -> None:
         """Notifies (pushes) all application services interested in this event.
 
@@ -182,7 +187,7 @@ class ApplicationServicesHandler:
     def notify_interested_services_ephemeral(
         self,
         stream_key: str,
-        new_token: Optional[int],
+        new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
         """
@@ -203,7 +208,7 @@ class ApplicationServicesHandler:
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
 
-            new_token: The latest stream token.
+            new_token: The stream token of the event.
             users: The users that should be informed of the new event, if any.
         """
         if not self.notify_appservices:
@@ -212,6 +217,19 @@ class ApplicationServicesHandler:
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
+        # Assert that new_token is an integer (and not a RoomStreamToken).
+        # All of the supported streams that this function handles use an
+        # integer to track progress (rather than a RoomStreamToken - a
+        # vector clock implementation) as they don't support multiple
+        # stream writers.
+        #
+        # As a result, we simply assert that new_token is an integer.
+        # If we do end up needing to pass a RoomStreamToken down here
+        # in the future, using RoomStreamToken.stream (the minimum stream
+        # position) to convert to an ascending integer value should work.
+        # Additional context: https://github.com/matrix-org/synapse/pull/11137
+        assert isinstance(new_token, int)
+
         services = [
             service
             for service in self.store.get_app_services()
@@ -231,14 +249,13 @@ class ApplicationServicesHandler:
         self,
         services: List[ApplicationService],
         stream_key: str,
-        new_token: Optional[int],
+        new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
         logger.debug("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                # Only handle typing if we have the latest token
-                if stream_key == "typing_key" and new_token is not None:
+                if stream_key == "typing_key":
                     # Note that we don't persist the token (via set_type_stream_id_for_appservice)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
@@ -248,26 +265,37 @@ class ApplicationServicesHandler:
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    continue
 
-                elif stream_key == "receipt_key":
-                    events = await self._handle_receipts(service)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
-
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "read_receipt", new_token
+                # Since we read/update the stream position for this AS/stream
+                with (
+                    await self._ephemeral_events_linearizer.queue(
+                        (service.id, stream_key)
                     )
+                ):
+                    if stream_key == "receipt_key":
+                        events = await self._handle_receipts(service, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
 
-                elif stream_key == "presence_key":
-                    events = await self._handle_presence(service, users)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    elif stream_key == "presence_key":
+                        events = await self._handle_presence(service, users, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
 
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "presence", new_token
-                    )
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
 
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
@@ -304,7 +332,9 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+    async def _handle_receipts(
+        self, service: ApplicationService, new_token: Optional[int]
+    ) -> List[JsonDict]:
         """
         Return the latest read receipts that the given application service should receive.
 
@@ -323,6 +353,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
             service=service, from_key=from_key
@@ -330,7 +366,10 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[Union[str, UserID]]
+        self,
+        service: ApplicationService,
+        users: Collection[Union[str, UserID]],
+        new_token: Optional[int],
     ) -> List[JsonDict]:
         """
         Return the latest presence updates that the given application service should receive.
@@ -353,6 +392,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "presence"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         for user in users:
             if isinstance(user, str):
                 user = UserID.from_string(user)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d508d7d32a..60e59d11a0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
-        auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
+        auth_checkers: Optional[
+            Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
+        ] = None,
     ) -> None:
         # Register check_3pid_auth callback
         if check_3pid_auth is not None:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8567cb0e00..8ca5f60b1c 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -245,7 +245,7 @@ class DirectoryHandler:
                 servers = result.servers
         else:
             try:
-                fed_result = await self.federation.make_query(
+                fed_result: Optional[JsonDict] = await self.federation.make_query(
                     destination=room_alias.domain,
                     query_type="directory",
                     args={"room_alias": room_alias.to_string()},
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d0fb2fc7dc..60c11e3d21 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -201,95 +201,19 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
-            @trace
-            async def do_remote_query(destination: str) -> None:
-                """This is called when we are querying the device list of a user on
-                a remote homeserver and their device list is not in the device list
-                cache. If we share a room with this user and we're not querying for
-                specific user we will update the cache with their device list.
-                """
-
-                destination_query = remote_queries_not_in_cache[destination]
-
-                # We first consider whether we wish to update the device list cache with
-                # the users device list. We want to track a user's devices when the
-                # authenticated user shares a room with the queried user and the query
-                # has not specified a particular device.
-                # If we update the cache for the queried user we remove them from further
-                # queries. We use the more efficient batched query_client_keys for all
-                # remaining users
-                user_ids_updated = []
-                for (user_id, device_list) in destination_query.items():
-                    if user_id in user_ids_updated:
-                        continue
-
-                    if device_list:
-                        continue
-
-                    room_ids = await self.store.get_rooms_for_user(user_id)
-                    if not room_ids:
-                        continue
-
-                    # We've decided we're sharing a room with this user and should
-                    # probably be tracking their device lists. However, we haven't
-                    # done an initial sync on the device list so we do it now.
-                    try:
-                        if self._is_master:
-                            user_devices = await self.device_handler.device_list_updater.user_device_resync(
-                                user_id
-                            )
-                        else:
-                            user_devices = await self._user_device_resync_client(
-                                user_id=user_id
-                            )
-
-                        user_devices = user_devices["devices"]
-                        user_results = results.setdefault(user_id, {})
-                        for device in user_devices:
-                            user_results[device["device_id"]] = device["keys"]
-                        user_ids_updated.append(user_id)
-                    except Exception as e:
-                        failures[destination] = _exception_to_failure(e)
-
-                if len(destination_query) == len(user_ids_updated):
-                    # We've updated all the users in the query and we do not need to
-                    # make any further remote calls.
-                    return
-
-                # Remove all the users from the query which we have updated
-                for user_id in user_ids_updated:
-                    destination_query.pop(user_id)
-
-                try:
-                    remote_result = await self.federation.query_client_keys(
-                        destination, {"device_keys": destination_query}, timeout=timeout
-                    )
-
-                    for user_id, keys in remote_result["device_keys"].items():
-                        if user_id in destination_query:
-                            results[user_id] = keys
-
-                    if "master_keys" in remote_result:
-                        for user_id, key in remote_result["master_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["master_keys"][user_id] = key
-
-                    if "self_signing_keys" in remote_result:
-                        for user_id, key in remote_result["self_signing_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["self_signing_keys"][user_id] = key
-
-                except Exception as e:
-                    failure = _exception_to_failure(e)
-                    failures[destination] = failure
-                    set_tag("error", True)
-                    set_tag("reason", failure)
-
             await make_deferred_yieldable(
                 defer.gatherResults(
                     [
-                        run_in_background(do_remote_query, destination)
-                        for destination in remote_queries_not_in_cache
+                        run_in_background(
+                            self._query_devices_for_destination,
+                            results,
+                            cross_signing_keys,
+                            failures,
+                            destination,
+                            queries,
+                            timeout,
+                        )
+                        for destination, queries in remote_queries_not_in_cache.items()
                     ],
                     consumeErrors=True,
                 ).addErrback(unwrapFirstError)
@@ -301,6 +225,121 @@ class E2eKeysHandler:
 
             return ret
 
+    @trace
+    async def _query_devices_for_destination(
+        self,
+        results: JsonDict,
+        cross_signing_keys: JsonDict,
+        failures: Dict[str, JsonDict],
+        destination: str,
+        destination_query: Dict[str, Iterable[str]],
+        timeout: int,
+    ) -> None:
+        """This is called when we are querying the device list of a user on
+        a remote homeserver and their device list is not in the device list
+        cache. If we share a room with this user and we're not querying for
+        specific user we will update the cache with their device list.
+
+        Args:
+            results: A map from user ID to their device keys, which gets
+                updated with the newly fetched keys.
+            cross_signing_keys: Map from user ID to their cross signing keys,
+                which gets updated with the newly fetched keys.
+            failures: Map of destinations to failures that have occurred while
+                attempting to fetch keys.
+            destination: The remote server to query
+            destination_query: The query dict of devices to query the remote
+                server for.
+            timeout: The timeout for remote HTTP requests.
+        """
+
+        # We first consider whether we wish to update the device list cache with
+        # the users device list. We want to track a user's devices when the
+        # authenticated user shares a room with the queried user and the query
+        # has not specified a particular device.
+        # If we update the cache for the queried user we remove them from further
+        # queries. We use the more efficient batched query_client_keys for all
+        # remaining users
+        user_ids_updated = []
+        for (user_id, device_list) in destination_query.items():
+            if user_id in user_ids_updated:
+                continue
+
+            if device_list:
+                continue
+
+            room_ids = await self.store.get_rooms_for_user(user_id)
+            if not room_ids:
+                continue
+
+            # We've decided we're sharing a room with this user and should
+            # probably be tracking their device lists. However, we haven't
+            # done an initial sync on the device list so we do it now.
+            try:
+                if self._is_master:
+                    resync_results = await self.device_handler.device_list_updater.user_device_resync(
+                        user_id
+                    )
+                else:
+                    resync_results = await self._user_device_resync_client(
+                        user_id=user_id
+                    )
+
+                # Add the device keys to the results.
+                user_devices = resync_results["devices"]
+                user_results = results.setdefault(user_id, {})
+                for device in user_devices:
+                    user_results[device["device_id"]] = device["keys"]
+                user_ids_updated.append(user_id)
+
+                # Add any cross signing keys to the results.
+                master_key = resync_results.get("master_key")
+                self_signing_key = resync_results.get("self_signing_key")
+
+                if master_key:
+                    cross_signing_keys["master_keys"][user_id] = master_key
+
+                if self_signing_key:
+                    cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
+            except Exception as e:
+                failures[destination] = _exception_to_failure(e)
+
+        if len(destination_query) == len(user_ids_updated):
+            # We've updated all the users in the query and we do not need to
+            # make any further remote calls.
+            return
+
+        # Remove all the users from the query which we have updated
+        for user_id in user_ids_updated:
+            destination_query.pop(user_id)
+
+        try:
+            remote_result = await self.federation.query_client_keys(
+                destination, {"device_keys": destination_query}, timeout=timeout
+            )
+
+            for user_id, keys in remote_result["device_keys"].items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+            if "master_keys" in remote_result:
+                for user_id, key in remote_result["master_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["master_keys"][user_id] = key
+
+            if "self_signing_keys" in remote_result:
+                for user_id, key in remote_result["self_signing_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["self_signing_keys"][user_id] = key
+
+        except Exception as e:
+            failure = _exception_to_failure(e)
+            failures[destination] = failure
+            set_tag("error", True)
+            set_tag("reason", failure)
+
+        return
+
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9584d5bd46..1a1cd93b1a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,7 +477,7 @@ class FederationEventHandler:
 
     @log_function
     async def backfill(
-        self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
+        self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> None:
         """Trigger a backfill request to `dest` for the given `room_id`
 
@@ -1643,7 +1643,7 @@ class FederationEventHandler:
             event: the event whose auth_events we want
 
         Returns:
-            all of the events in `event.auth_events`, after deduplication
+            all of the events listed in `event.auth_events_ids`, after deduplication
 
         Raises:
             AuthError if we were unable to fetch the auth_events for any reason.
@@ -1916,7 +1916,7 @@ class FederationEventHandler:
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
-        self._notifier.on_new_room_event(
+        await self._notifier.on_new_room_event(
             event, event_pos, max_stream_token, extra_users=extra_users
         )
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 7ef8698a5e..3dbe611f95 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -537,10 +537,6 @@ class IdentityHandler:
         except RequestTimedOutError:
             raise SynapseError(500, "Timed out contacting identity server")
 
-        # It is already checked that public_baseurl is configured since this code
-        # should only be used if account_threepid_delegate_msisdn is true.
-        assert self.hs.config.server.public_baseurl
-
         # we need to tell the client to send the token back to us, since it doesn't
         # otherwise know where to send it, so add submit_url response parameter
         # (see also MSC2078)
@@ -879,6 +875,8 @@ class IdentityHandler:
         }
 
         if room_type is not None:
+            invite_config["room_type"] = room_type
+            # TODO The unstable field is deprecated and should be removed in the future.
             invite_config["org.matrix.msc3288.room_type"] = room_type
 
         # If a custom web client location is available, include it in the request.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e024b551f..d4c2a6ab7a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1318,6 +1318,8 @@ class EventCreationHandler:
             # user is actually admin or not).
             is_admin_redaction = False
             if event.type == EventTypes.Redaction:
+                assert event.redacts is not None
+
                 original_event = await self.store.get_event(
                     event.redacts,
                     redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1413,6 +1415,8 @@ class EventCreationHandler:
                 )
 
         if event.type == EventTypes.Redaction:
+            assert event.redacts is not None
+
             original_event = await self.store.get_event(
                 event.redacts,
                 redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1500,11 +1504,13 @@ class EventCreationHandler:
                 next_batch_id = event.content.get(
                     EventContentFields.MSC2716_NEXT_BATCH_ID
                 )
-                conflicting_insertion_event_id = (
-                    await self.store.get_insertion_event_by_batch_id(
-                        event.room_id, next_batch_id
+                conflicting_insertion_event_id = None
+                if next_batch_id:
+                    conflicting_insertion_event_id = (
+                        await self.store.get_insertion_event_id_by_batch_id(
+                            event.room_id, next_batch_id
+                        )
                     )
-                )
                 if conflicting_insertion_event_id is not None:
                     # The current insertion event that we're processing is invalid
                     # because an insertion event already exists in the room with the
@@ -1537,13 +1543,16 @@ class EventCreationHandler:
             # If there's an expiry timestamp on the event, schedule its expiry.
             self._message_handler.maybe_schedule_expiry(event)
 
-        def _notify() -> None:
+        async def _notify() -> None:
             try:
-                self.notifier.on_new_room_event(
+                await self.notifier.on_new_room_event(
                     event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
-                logger.exception("Error notifying about new room event")
+                logger.exception(
+                    "Error notifying about new room event %s",
+                    event.event_id,
+                )
 
         run_in_background(_notify)
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 60ff896386..abfe7be0e3 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -438,7 +438,7 @@ class PaginationHandler:
             }
 
         state = None
-        if event_filter and event_filter.lazy_load_members() and len(events) > 0:
+        if event_filter and event_filter.lazy_load_members and len(events) > 0:
             # TODO: remove redundant members
 
             # FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fdab50da37..3df872c578 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,7 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
+from synapse.appservice import ApplicationService
 from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
@@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
         is_guest: bool = False,
         explicit_room_id: Optional[str] = None,
         include_offline: bool = True,
+        service: Optional[ApplicationService] = None,
     ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index e6c3cf585b..6b5a6ded8b 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -456,7 +456,11 @@ class ProfileHandler:
                 continue
 
             new_name = profile.get("displayname")
+            if not isinstance(new_name, str):
+                new_name = None
             new_avatar = profile.get("avatar_url")
+            if not isinstance(new_avatar, str):
+                new_avatar = None
 
             # We always hit update to update the last_check timestamp
             await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cf01d58ea1..969eb3b9b0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -525,7 +525,7 @@ class RoomCreationHandler:
             ):
                 await self.room_member_handler.update_membership(
                     requester,
-                    UserID.from_string(old_event["state_key"]),
+                    UserID.from_string(old_event.state_key),
                     new_room_id,
                     "ban",
                     ratelimit=False,
@@ -1173,7 +1173,7 @@ class RoomContextHandler:
         else:
             last_event_id = event_id
 
-        if event_filter and event_filter.lazy_load_members():
+        if event_filter and event_filter.lazy_load_members:
             state_filter = StateFilter.from_lazy_load_member_list(
                 ev.sender
                 for ev in itertools.chain(
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 2f5a3e4d19..0723286383 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -355,7 +355,7 @@ class RoomBatchHandler:
         for (event, context) in reversed(events_to_persist):
             await self.event_creation_handler.handle_new_client_event(
                 await self.create_requester_for_user_id_from_app_service(
-                    event["sender"], app_service_requester.app_service
+                    event.sender, app_service_requester.app_service
                 ),
                 event=event,
                 context=context,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 74e6c7eca6..08244b690d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         #
         # the prev_events consist solely of the previous membership event.
         prev_event_ids = [previous_membership_event.event_id]
-        auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
+        auth_event_ids = (
+            list(previous_membership_event.auth_event_ids()) + prev_event_ids
+        )
 
         event, context = await self.event_creation_handler.create_event(
             requester,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index a3ffa26be8..6e4dff8056 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -249,7 +249,7 @@ class SearchHandler:
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
-            allowed_events = events[: search_filter.limit()]
+            allowed_events = events[: search_filter.limit]
 
             for e in allowed_events:
                 rm = room_groups.setdefault(
@@ -271,13 +271,13 @@ class SearchHandler:
             # We keep looping and we keep filtering until we reach the limit
             # or we run out of things.
             # But only go around 5 times since otherwise synapse will be sad.
-            while len(room_events) < search_filter.limit() and i < 5:
+            while len(room_events) < search_filter.limit and i < 5:
                 i += 1
                 search_result = await self.store.search_rooms(
                     room_ids,
                     search_term,
                     keys,
-                    search_filter.limit() * 2,
+                    search_filter.limit * 2,
                     pagination_token=pagination_token,
                 )
 
@@ -299,9 +299,9 @@ class SearchHandler:
                 )
 
                 room_events.extend(events)
-                room_events = room_events[: search_filter.limit()]
+                room_events = room_events[: search_filter.limit]
 
-                if len(results) < search_filter.limit() * 2:
+                if len(results) < search_filter.limit * 2:
                     pagination_token = None
                     break
                 else:
@@ -311,7 +311,7 @@ class SearchHandler:
                 group = room_groups.setdefault(event.room_id, {"results": []})
                 group["results"].append(event.event_id)
 
-            if room_events and len(room_events) >= search_filter.limit():
+            if room_events and len(room_events) >= search_filter.limit:
                 last_event_id = room_events[-1].event_id
                 pagination_token = results_map[last_event_id]["pagination_token"]
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d69924..22c6174821 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
         if hs.should_send_federation():
             self.federation = hs.get_federation_sender()
 
-        if hs.config.worker.writers.typing != hs.get_instance_name():
-            hs.get_federation_registry().register_instance_for_edu(
+        if hs.get_instance_name() not in hs.config.worker.writers.typing:
+            hs.get_federation_registry().register_instances_for_edu(
                 "m.typing",
                 hs.config.worker.writers.typing,
             )
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        assert hs.config.worker.writers.typing == hs.get_instance_name()
+        assert hs.get_instance_name() in hs.config.worker.writers.typing
 
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()