summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/filtering.py115
-rw-r--r--synapse/federation/federation_client.py80
-rw-r--r--synapse/handlers/appservice.py24
-rw-r--r--synapse/handlers/devicemessage.py31
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/room.py59
-rw-r--r--synapse/handlers/search.py8
-rw-r--r--synapse/handlers/sync.py18
-rw-r--r--synapse/rest/admin/rooms.py26
-rw-r--r--synapse/rest/client/relations.py4
-rw-r--r--synapse/rest/client/room.py10
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/storage/databases/main/appservice.py8
-rw-r--r--synapse/storage/databases/main/deviceinbox.py23
-rw-r--r--synapse/storage/databases/main/relations.py58
-rw-r--r--synapse/storage/databases/main/room.py7
-rw-r--r--synapse/storage/databases/main/stream.py86
17 files changed, 436 insertions, 129 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 4b0a9b2974..13dd6ce248 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -1,7 +1,7 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2018-2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -86,6 +86,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
         # cf https://github.com/matrix-org/matrix-doc/pull/2326
         "org.matrix.labels": {"type": "array", "items": {"type": "string"}},
         "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
+        # MSC3440, filtering by event relations.
+        "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
+        "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
     },
 }
 
@@ -146,14 +149,16 @@ def matrix_user_id_validator(user_id_str: str) -> UserID:
 
 class Filtering:
     def __init__(self, hs: "HomeServer"):
-        super().__init__()
+        self._hs = hs
         self.store = hs.get_datastore()
 
+        self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})
+
     async def get_user_filter(
         self, user_localpart: str, filter_id: Union[int, str]
     ) -> "FilterCollection":
         result = await self.store.get_user_filter(user_localpart, filter_id)
-        return FilterCollection(result)
+        return FilterCollection(self._hs, result)
 
     def add_user_filter(
         self, user_localpart: str, user_filter: JsonDict
@@ -191,21 +196,22 @@ FilterEvent = TypeVar("FilterEvent", EventBase, UserPresenceState, JsonDict)
 
 
 class FilterCollection:
-    def __init__(self, filter_json: JsonDict):
+    def __init__(self, hs: "HomeServer", filter_json: JsonDict):
         self._filter_json = filter_json
 
         room_filter_json = self._filter_json.get("room", {})
 
         self._room_filter = Filter(
-            {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")}
+            hs,
+            {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")},
         )
 
-        self._room_timeline_filter = Filter(room_filter_json.get("timeline", {}))
-        self._room_state_filter = Filter(room_filter_json.get("state", {}))
-        self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {}))
-        self._room_account_data = Filter(room_filter_json.get("account_data", {}))
-        self._presence_filter = Filter(filter_json.get("presence", {}))
-        self._account_data = Filter(filter_json.get("account_data", {}))
+        self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
+        self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
+        self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
+        self._room_account_data = Filter(hs, room_filter_json.get("account_data", {}))
+        self._presence_filter = Filter(hs, filter_json.get("presence", {}))
+        self._account_data = Filter(hs, filter_json.get("account_data", {}))
 
         self.include_leave = filter_json.get("room", {}).get("include_leave", False)
         self.event_fields = filter_json.get("event_fields", [])
@@ -232,25 +238,37 @@ class FilterCollection:
     def include_redundant_members(self) -> bool:
         return self._room_state_filter.include_redundant_members
 
-    def filter_presence(
+    async def filter_presence(
         self, events: Iterable[UserPresenceState]
     ) -> List[UserPresenceState]:
-        return self._presence_filter.filter(events)
+        return await self._presence_filter.filter(events)
 
-    def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._account_data.filter(events)
+    async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+        return await self._account_data.filter(events)
 
-    def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
-        return self._room_state_filter.filter(self._room_filter.filter(events))
+    async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
+        return await self._room_state_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_timeline(self, events: Iterable[EventBase]) -> List[EventBase]:
-        return self._room_timeline_filter.filter(self._room_filter.filter(events))
+    async def filter_room_timeline(
+        self, events: Iterable[EventBase]
+    ) -> List[EventBase]:
+        return await self._room_timeline_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._room_ephemeral_filter.filter(self._room_filter.filter(events))
+    async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+        return await self._room_ephemeral_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._room_account_data.filter(self._room_filter.filter(events))
+    async def filter_room_account_data(
+        self, events: Iterable[JsonDict]
+    ) -> List[JsonDict]:
+        return await self._room_account_data.filter(
+            await self._room_filter.filter(events)
+        )
 
     def blocks_all_presence(self) -> bool:
         return (
@@ -274,7 +292,9 @@ class FilterCollection:
 
 
 class Filter:
-    def __init__(self, filter_json: JsonDict):
+    def __init__(self, hs: "HomeServer", filter_json: JsonDict):
+        self._hs = hs
+        self._store = hs.get_datastore()
         self.filter_json = filter_json
 
         self.limit = filter_json.get("limit", 10)
@@ -297,6 +317,20 @@ class Filter:
         self.labels = filter_json.get("org.matrix.labels", None)
         self.not_labels = filter_json.get("org.matrix.not_labels", [])
 
+        # Ideally these would be rejected at the endpoint if they were provided
+        # and not supported, but that would involve modifying the JSON schema
+        # based on the homeserver configuration.
+        if hs.config.experimental.msc3440_enabled:
+            self.relation_senders = self.filter_json.get(
+                "io.element.relation_senders", None
+            )
+            self.relation_types = self.filter_json.get(
+                "io.element.relation_types", None
+            )
+        else:
+            self.relation_senders = None
+            self.relation_types = None
+
     def filters_all_types(self) -> bool:
         return "*" in self.not_types
 
@@ -306,7 +340,7 @@ class Filter:
     def filters_all_rooms(self) -> bool:
         return "*" in self.not_rooms
 
-    def check(self, event: FilterEvent) -> bool:
+    def _check(self, event: FilterEvent) -> bool:
         """Checks whether the filter matches the given event.
 
         Args:
@@ -420,8 +454,30 @@ class Filter:
 
         return room_ids
 
-    def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
-        return list(filter(self.check, events))
+    async def _check_event_relations(
+        self, events: Iterable[FilterEvent]
+    ) -> List[FilterEvent]:
+        # The event IDs to check, mypy doesn't understand the ifinstance check.
+        event_ids = [event.event_id for event in events if isinstance(event, EventBase)]  # type: ignore[attr-defined]
+        event_ids_to_keep = set(
+            await self._store.events_have_relations(
+                event_ids, self.relation_senders, self.relation_types
+            )
+        )
+
+        return [
+            event
+            for event in events
+            if not isinstance(event, EventBase) or event.event_id in event_ids_to_keep
+        ]
+
+    async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
+        result = [event for event in events if self._check(event)]
+
+        if self.relation_senders or self.relation_types:
+            return await self._check_event_relations(result)
+
+        return result
 
     def with_room_ids(self, room_ids: Iterable[str]) -> "Filter":
         """Returns a new filter with the given room IDs appended.
@@ -433,7 +489,7 @@ class Filter:
             filter: A new filter including the given rooms and the old
                     filter's rooms.
         """
-        newFilter = Filter(self.filter_json)
+        newFilter = Filter(self._hs, self.filter_json)
         newFilter.rooms += room_ids
         return newFilter
 
@@ -444,6 +500,3 @@ def _matches_wildcard(actual_value: Optional[str], filter_value: str) -> bool:
         return actual_value.startswith(type_prefix)
     else:
         return actual_value == filter_value
-
-
-DEFAULT_FILTER_COLLECTION = FilterCollection({})
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 670186f548..3b85b135e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -277,6 +277,58 @@ class FederationClient(FederationBase):
 
         return pdus
 
+    async def get_pdu_from_destination_raw(
+        self,
+        destination: str,
+        event_id: str,
+        room_version: RoomVersion,
+        outlier: bool = False,
+        timeout: Optional[int] = None,
+    ) -> Optional[EventBase]:
+        """Requests the PDU with given origin and ID from the remote home
+        server. Does not have any caching or rate limiting!
+
+        Args:
+            destination: Which homeserver to query
+            event_id: event to fetch
+            room_version: version of the room
+            outlier: Indicates whether the PDU is an `outlier`, i.e. if
+                it's from an arbitrary point in the context as opposed to part
+                of the current block of PDUs. Defaults to `False`
+            timeout: How long to try (in ms) each destination for before
+                moving to the next destination. None indicates no timeout.
+
+        Returns:
+            The requested PDU, or None if we were unable to find it.
+
+        Raises:
+            SynapseError, NotRetryingDestination, FederationDeniedError
+        """
+        transaction_data = await self.transport_layer.get_event(
+            destination, event_id, timeout=timeout
+        )
+
+        logger.debug(
+            "retrieved event id %s from %s: %r",
+            event_id,
+            destination,
+            transaction_data,
+        )
+
+        pdu_list: List[EventBase] = [
+            event_from_pdu_json(p, room_version, outlier=outlier)
+            for p in transaction_data["pdus"]
+        ]
+
+        if pdu_list and pdu_list[0]:
+            pdu = pdu_list[0]
+
+            # Check signatures are correct.
+            signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+            return signed_pdu
+
+        return None
+
     async def get_pdu(
         self,
         destinations: Iterable[str],
@@ -321,30 +373,14 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                transaction_data = await self.transport_layer.get_event(
-                    destination, event_id, timeout=timeout
-                )
-
-                logger.debug(
-                    "retrieved event id %s from %s: %r",
-                    event_id,
-                    destination,
-                    transaction_data,
+                signed_pdu = await self.get_pdu_from_destination_raw(
+                    destination=destination,
+                    event_id=event_id,
+                    room_version=room_version,
+                    outlier=outlier,
+                    timeout=timeout,
                 )
 
-                pdu_list: List[EventBase] = [
-                    event_from_pdu_json(p, room_version, outlier=outlier)
-                    for p in transaction_data["pdus"]
-                ]
-
-                if pdu_list and pdu_list[0]:
-                    pdu = pdu_list[0]
-
-                    # Check signatures are correct.
-                    signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
-
-                    break
-
                 pdu_attempts[destination] = now
 
             except SynapseError as e:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ddc9105ee9..9abdad262b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -188,7 +188,7 @@ class ApplicationServicesHandler:
         self,
         stream_key: str,
         new_token: Union[int, RoomStreamToken],
-        users: Optional[Collection[Union[str, UserID]]] = None,
+        users: Collection[Union[str, UserID]],
     ) -> None:
         """
         This is called by the notifier in the background when an ephemeral event is handled
@@ -203,7 +203,9 @@ class ApplicationServicesHandler:
                 value for `stream_key` will cause this function to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
-                them.
+                receiving them by setting `push_ephemeral` to true in their registration
+                file. Note that while MSC2409 is experimental, this option is called
+                `de.sorunome.msc2409.push_ephemeral`.
 
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
@@ -214,6 +216,7 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
+        # Ignore any unsupported streams
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
@@ -230,18 +233,25 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Check whether there are any appservices which have registered to receive
+        # ephemeral events.
+        #
+        # Note that whether these events are actually relevant to these appservices
+        # is decided later on.
         services = [
             service
             for service in self.store.get_app_services()
             if service.supports_ephemeral
         ]
         if not services:
+            # Bail out early if none of the target appservices have explicitly registered
+            # to receive these ephemeral events.
             return
 
         # We only start a new background process if necessary rather than
         # optimistically (to cut down on overhead).
         self._notify_interested_services_ephemeral(
-            services, stream_key, new_token, users or []
+            services, stream_key, new_token, users
         )
 
     @wrap_as_background_process("notify_interested_services_ephemeral")
@@ -252,7 +262,7 @@ class ApplicationServicesHandler:
         new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
-        logger.debug("Checking interested services for %s" % (stream_key))
+        logger.debug("Checking interested services for %s", stream_key)
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
                 if stream_key == "typing_key":
@@ -345,6 +355,9 @@ class ApplicationServicesHandler:
 
         Args:
             service: The application service to check for which events it should receive.
+            new_token: A receipts event stream token. Purely used to double-check that the
+                from_token we pull from the database isn't greater than or equal to this
+                token. Prevents accidentally duplicating work.
 
         Returns:
             A list of JSON dictionaries containing data derived from the read receipts that
@@ -382,6 +395,9 @@ class ApplicationServicesHandler:
         Args:
             service: The application service that ephemeral events are being sent to.
             users: The users that should receive the presence update.
+            new_token: A presence update stream token. Purely used to double-check that the
+                from_token we pull from the database isn't greater than or equal to this
+                token. Prevents accidentally duplicating work.
 
         Returns:
             A list of json dictionaries containing data derived from the presence events
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index b6a2a34ab7..b582266af9 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -89,6 +89,13 @@ class DeviceMessageHandler:
         )
 
     async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
+        """
+        Handle receiving to-device messages from remote homeservers.
+
+        Args:
+            origin: The remote homeserver.
+            content: The JSON dictionary containing the to-device messages.
+        """
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -135,12 +142,16 @@ class DeviceMessageHandler:
                 message_type, sender_user_id, by_device
             )
 
-        stream_id = await self.store.add_messages_from_remote_to_device_inbox(
+        # Add messages to the database.
+        # Retrieve the stream id of the last-processed to-device message.
+        last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
         )
 
+        # Notify listeners that there are new to-device messages to process,
+        # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", stream_id, users=local_messages.keys()
+            "to_device_key", last_stream_id, users=local_messages.keys()
         )
 
     async def _check_for_unknown_devices(
@@ -195,6 +206,14 @@ class DeviceMessageHandler:
         message_type: str,
         messages: Dict[str, Dict[str, JsonDict]],
     ) -> None:
+        """
+        Handle a request from a user to send to-device message(s).
+
+        Args:
+            requester: The user that is sending the to-device messages.
+            message_type: The type of to-device messages that are being sent.
+            messages: A dictionary containing recipients mapped to messages intended for them.
+        """
         sender_user_id = requester.user.to_string()
 
         message_id = random_string(16)
@@ -257,12 +276,16 @@ class DeviceMessageHandler:
                 "org.matrix.opentracing_context": json_encoder.encode(context),
             }
 
-        stream_id = await self.store.add_messages_to_device_inbox(
+        # Add messages to the database.
+        # Retrieve the stream id of the last-processed to-device message.
+        last_stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
 
+        # Notify listeners that there are new to-device messages to process,
+        # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", stream_id, users=local_messages.keys()
+            "to_device_key", last_stream_id, users=local_messages.keys()
         )
 
         if self.federation_sender:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index abfe7be0e3..aa26911aed 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -424,7 +424,7 @@ class PaginationHandler:
 
         if events:
             if event_filter:
-                events = event_filter.filter(events)
+                events = await event_filter.filter(events)
 
             events = await filter_events_for_client(
                 self.storage, user_id, events, is_peeking=(member_event_id is None)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 969eb3b9b0..11af30eee7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -12,8 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Contains functions for performing events on rooms."""
-
+"""Contains functions for performing actions on rooms."""
 import itertools
 import logging
 import math
@@ -31,6 +30,8 @@ from typing import (
     Tuple,
 )
 
+from typing_extensions import TypedDict
+
 from synapse.api.constants import (
     EventContentFields,
     EventTypes,
@@ -1158,8 +1159,10 @@ class RoomContextHandler:
         )
 
         if event_filter:
-            results["events_before"] = event_filter.filter(results["events_before"])
-            results["events_after"] = event_filter.filter(results["events_after"])
+            results["events_before"] = await event_filter.filter(
+                results["events_before"]
+            )
+            results["events_after"] = await event_filter.filter(results["events_after"])
 
         results["events_before"] = await filter_evts(results["events_before"])
         results["events_after"] = await filter_evts(results["events_after"])
@@ -1195,7 +1198,7 @@ class RoomContextHandler:
 
         state_events = list(state[last_event_id].values())
         if event_filter:
-            state_events = event_filter.filter(state_events)
+            state_events = await event_filter.filter(state_events)
 
         results["state"] = await filter_evts(state_events)
 
@@ -1275,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         return self.store.get_room_events_max_id(room_id)
 
 
+class ShutdownRoomResponse(TypedDict):
+    kicked_users: List[str]
+    failed_to_kick_users: List[str]
+    local_aliases: List[str]
+    new_room_id: Optional[str]
+
+
 class RoomShutdownHandler:
 
     DEFAULT_MESSAGE = (
@@ -1300,7 +1310,7 @@ class RoomShutdownHandler:
         new_room_name: Optional[str] = None,
         message: Optional[str] = None,
         block: bool = False,
-    ) -> dict:
+    ) -> ShutdownRoomResponse:
         """
         Shuts down a room. Moves all local users and room aliases automatically
         to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1334,8 +1344,13 @@ class RoomShutdownHandler:
                 Defaults to `Sharing illegal content on this server is not
                 permitted and rooms in violation will be blocked.`
             block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
+                If set to `True`, users will be prevented from joining the old
+                room. This option can also be used to pre-emptively block a room,
+                even if it's unknown to this homeserver. In this case, the room
+                will be blocked, and no further action will be taken. If `False`,
+                attempting to delete an unknown room is invalid.
+
+                Defaults to `False`.
 
         Returns: a dict containing the following keys:
             kicked_users: An array of users (`user_id`) that were kicked.
@@ -1344,7 +1359,9 @@ class RoomShutdownHandler:
             local_aliases:
                 An array of strings representing the local aliases that were
                 migrated from the old room to the new.
-            new_room_id: A string representing the room ID of the new room.
+            new_room_id:
+                A string representing the room ID of the new room, or None if
+                no such room was created.
         """
 
         if not new_room_name:
@@ -1355,14 +1372,28 @@ class RoomShutdownHandler:
         if not RoomID.is_valid(room_id):
             raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
 
-        if not await self.store.get_room(room_id):
-            raise NotFoundError("Unknown room id %s" % (room_id,))
-
-        # This will work even if the room is already blocked, but that is
-        # desirable in case the first attempt at blocking the room failed below.
+        # Action the block first (even if the room doesn't exist yet)
         if block:
+            # This will work even if the room is already blocked, but that is
+            # desirable in case the first attempt at blocking the room failed below.
             await self.store.block_room(room_id, requester_user_id)
 
+        if not await self.store.get_room(room_id):
+            if block:
+                # We allow you to block an unknown room.
+                return {
+                    "kicked_users": [],
+                    "failed_to_kick_users": [],
+                    "local_aliases": [],
+                    "new_room_id": None,
+                }
+            else:
+                # But if you don't want to preventatively block another room,
+                # this function can't do anything useful.
+                raise NotFoundError(
+                    "Cannot shut down room: unknown room id %s" % (room_id,)
+                )
+
         if new_room_user_id is not None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6e4dff8056..ab7eaab2fb 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -180,7 +180,7 @@ class SearchHandler:
                 % (set(group_keys) - {"room_id", "sender"},),
             )
 
-        search_filter = Filter(filter_dict)
+        search_filter = Filter(self.hs, filter_dict)
 
         # TODO: Search through left rooms too
         rooms = await self.store.get_rooms_for_local_user_where_membership_is(
@@ -242,7 +242,7 @@ class SearchHandler:
 
             rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
-            filtered_events = search_filter.filter([r["event"] for r in results])
+            filtered_events = await search_filter.filter([r["event"] for r in results])
 
             events = await filter_events_for_client(
                 self.storage, user.to_string(), filtered_events
@@ -292,7 +292,9 @@ class SearchHandler:
 
                 rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
-                filtered_events = search_filter.filter([r["event"] for r in results])
+                filtered_events = await search_filter.filter(
+                    [r["event"] for r in results]
+                )
 
                 events = await filter_events_for_client(
                     self.storage, user.to_string(), filtered_events
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c7c6d63a9..891435c14d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,7 +510,7 @@ class SyncHandler:
             log_kv({"limited": limited})
 
             if potential_recents:
-                recents = sync_config.filter_collection.filter_room_timeline(
+                recents = await sync_config.filter_collection.filter_room_timeline(
                     potential_recents
                 )
                 log_kv({"recents_after_sync_filtering": len(recents)})
@@ -575,8 +575,8 @@ class SyncHandler:
 
                 log_kv({"loaded_recents": len(events)})
 
-                loaded_recents = sync_config.filter_collection.filter_room_timeline(
-                    events
+                loaded_recents = (
+                    await sync_config.filter_collection.filter_room_timeline(events)
                 )
 
                 log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
@@ -1015,7 +1015,7 @@ class SyncHandler:
 
         return {
             (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
+            for e in await sync_config.filter_collection.filter_room_state(
                 list(state.values())
             )
             if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
@@ -1383,7 +1383,7 @@ class SyncHandler:
                 sync_config.user
             )
 
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
+        account_data_for_user = await sync_config.filter_collection.filter_account_data(
             [
                 {"type": account_data_type, "content": content}
                 for account_data_type, content in account_data.items()
@@ -1448,7 +1448,7 @@ class SyncHandler:
             # Deduplicate the presence entries so that there's at most one per user
             presence = list({p.user_id: p for p in presence}.values())
 
-        presence = sync_config.filter_collection.filter_presence(presence)
+        presence = await sync_config.filter_collection.filter_presence(presence)
 
         sync_result_builder.presence = presence
 
@@ -2021,12 +2021,14 @@ class SyncHandler:
                 )
 
             account_data_events = (
-                sync_config.filter_collection.filter_room_account_data(
+                await sync_config.filter_collection.filter_room_account_data(
                     account_data_events
                 )
             )
 
-            ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+            ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
+                ephemeral
+            )
 
             if not (
                 always_include
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 05c5b4bf0c..a2f4edebb8 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 import logging
 from http import HTTPStatus
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 from urllib import parse as urlparse
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
@@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet):
 
         # Purge room
         if purge:
-            await pagination_handler.purge_room(room_id, force=force_purge)
-
-        return 200, ret
+            try:
+                await pagination_handler.purge_room(room_id, force=force_purge)
+            except NotFoundError:
+                if block:
+                    # We can block unknown rooms with this endpoint, in which case
+                    # a failed purge is expected.
+                    pass
+                else:
+                    # But otherwise, we expect this purge to have succeeded.
+                    raise
+
+        # Cast safety: cast away the knowledge that this is a TypedDict.
+        # See https://github.com/python/mypy/issues/4976#issuecomment-579883622
+        # for some discussion on why this is necessary. Either way,
+        # `ret` is an opaque dictionary blob as far as the rest of the app cares.
+        return 200, cast(JsonDict, ret)
 
 
 class RoomMembersRestServlet(RestServlet):
@@ -583,6 +596,7 @@ class RoomEventContextServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.clock = hs.get_clock()
         self.room_context_handler = hs.get_room_context_handler()
         self._event_serializer = hs.get_event_client_serializer()
@@ -600,7 +614,9 @@ class RoomEventContextServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
         else:
             event_filter = None
 
diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py
index 58f6699073..184cfbe196 100644
--- a/synapse/rest/client/relations.py
+++ b/synapse/rest/client/relations.py
@@ -298,7 +298,9 @@ class RelationAggregationPaginationServlet(RestServlet):
             raise SynapseError(404, "Unknown parent event.")
 
         if relation_type not in (RelationTypes.ANNOTATION, None):
-            raise SynapseError(400, "Relation type must be 'annotation'")
+            raise SynapseError(
+                400, f"Relation type must be '{RelationTypes.ANNOTATION}'"
+            )
 
         limit = parse_integer(request, "limit", default=5)
         from_token_str = parse_string(request, "from")
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 6a876cfa2f..03a353d53c 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -550,6 +550,7 @@ class RoomMessageListRestServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.pagination_handler = hs.get_pagination_handler()
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -567,7 +568,9 @@ class RoomMessageListRestServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
             if (
                 event_filter
                 and event_filter.filter_json.get("event_format", "client")
@@ -672,6 +675,7 @@ class RoomEventContextServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.clock = hs.get_clock()
         self.room_context_handler = hs.get_room_context_handler()
         self._event_serializer = hs.get_event_client_serializer()
@@ -688,7 +692,9 @@ class RoomEventContextServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
         else:
             event_filter = None
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 913216a7c4..8c0fdb1940 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -29,7 +29,7 @@ from typing import (
 
 from synapse.api.constants import Membership, PresenceState
 from synapse.api.errors import Codes, StoreError, SynapseError
-from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
+from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.events.utils import (
@@ -150,7 +150,7 @@ class SyncRestServlet(RestServlet):
         request_key = (user, timeout, since, filter_id, full_state, device_id)
 
         if filter_id is None:
-            filter_collection = DEFAULT_FILTER_COLLECTION
+            filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
         elif filter_id.startswith("{"):
             try:
                 filter_object = json_decoder.decode(filter_id)
@@ -160,7 +160,7 @@ class SyncRestServlet(RestServlet):
             except Exception:
                 raise SynapseError(400, "Invalid filter JSON")
             self.filtering.check_valid_filter(filter_object)
-            filter_collection = FilterCollection(filter_object)
+            filter_collection = FilterCollection(self.hs, filter_object)
         else:
             try:
                 filter_collection = await self.filtering.get_user_filter(
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2da2659f41..baec35ee27 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def set_type_stream_id_for_appservice(
-        self, service: ApplicationService, type: str, pos: Optional[int]
+        self, service: ApplicationService, stream_type: str, pos: Optional[int]
     ) -> None:
-        if type not in ("read_receipt", "presence"):
+        if stream_type not in ("read_receipt", "presence"):
             raise ValueError(
                 "Expected type to be a valid application stream id type, got %s"
-                % (type,)
+                % (stream_type,)
             )
 
         def set_type_stream_id_for_appservice_txn(txn):
-            stream_id_type = "%s_stream_id" % type
+            stream_id_type = "%s_stream_id" % stream_type
             txn.execute(
                 "UPDATE application_services_state SET %s = ? WHERE as_id=?"
                 % stream_id_type,
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 264e625bd7..ae3afdd5d2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             limit: The maximum number of messages to retrieve.
 
         Returns:
-            A list of messages for the device and where in the stream the messages got to.
+            A tuple containing:
+                * A list of messages for the device.
+                * The max stream token of these messages. There may be more to retrieve
+                  if the given limit was reached.
         """
         has_changed = self._device_inbox_stream_cache.has_entity_changed(
             user_id, last_stream_id
@@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             txn.execute(
                 sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
             )
+
             messages = []
+            stream_pos = current_stream_id
+
             for row in txn:
                 stream_pos = row[0]
                 messages.append(db_to_json(row[1]))
+
+            # If the limit was not reached we know that there's no more data for this
+            # user/device pair up to current_stream_id.
             if len(messages) < limit:
                 stream_pos = current_stream_id
+
             return messages, stream_pos
 
         return await self.db_pool.runInteraction(
@@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
+
             messages = []
+            stream_pos = current_stream_id
+
             for row in txn:
                 stream_pos = row[0]
                 messages.append(db_to_json(row[1]))
+
+            # If the limit was not reached we know that there's no more data for this
+            # user/device pair up to current_stream_id.
             if len(messages) < limit:
                 log_kv({"message": "Set stream position to current position"})
                 stream_pos = current_stream_id
+
             return messages, stream_pos
 
         return await self.db_pool.runInteraction(
@@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         """Used to send messages from this server.
 
         Args:
-            local_messages_by_user_and_device:
-                Dictionary of user_id to device_id to message.
+            local_messages_by_user_then_device:
+                Dictionary of recipient user_id to recipient device_id to message.
             remote_messages_by_destination:
                 Dictionary of destination server_name to the EDU JSON to send.
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 53576ad52f..907af10995 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -20,7 +20,7 @@ import attr
 from synapse.api.constants import RelationTypes
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import LoggingTransaction
+from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
 from synapse.storage.relations import (
     AggregationPaginationToken,
@@ -334,6 +334,62 @@ class RelationsWorkerStore(SQLBaseStore):
 
         return count, latest_event
 
+    async def events_have_relations(
+        self,
+        parent_ids: List[str],
+        relation_senders: Optional[List[str]],
+        relation_types: Optional[List[str]],
+    ) -> List[str]:
+        """Check which events have a relationship from the given senders of the
+        given types.
+
+        Args:
+            parent_ids: The events being annotated
+            relation_senders: The relation senders to check.
+            relation_types: The relation types to check.
+
+        Returns:
+            True if the event has at least one relationship from one of the given senders of the given type.
+        """
+        # If no restrictions are given then the event has the required relations.
+        if not relation_senders and not relation_types:
+            return parent_ids
+
+        sql = """
+            SELECT relates_to_id FROM event_relations
+            INNER JOIN events USING (event_id)
+            WHERE
+                %s;
+        """
+
+        def _get_if_event_has_relations(txn) -> List[str]:
+            clauses: List[str] = []
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "relates_to_id", parent_ids
+            )
+            clauses.append(clause)
+
+            if relation_senders:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "sender", relation_senders
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+            if relation_types:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "relation_type", relation_types
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+
+            txn.execute(sql % " AND ".join(clauses), args)
+
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_if_event_has_relations", _get_if_event_has_relations
+        )
+
     async def has_user_annotated_event(
         self, parent_id: str, event_type: str, aggregation_key: str, sender: str
     ) -> bool:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index cefc77fa0f..17b398bb69 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1751,7 +1751,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         )
 
     async def block_room(self, room_id: str, user_id: str) -> None:
-        """Marks the room as blocked. Can be called multiple times.
+        """Marks the room as blocked.
+
+        Can be called multiple times (though we'll only track the last user to
+        block this room).
+
+        Can be called on a room unknown to this homeserver.
 
         Args:
             room_id: Room to block
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index dc7884b1c0..42dc807d17 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -272,31 +272,37 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     args = []
 
     if event_filter.types:
-        clauses.append("(%s)" % " OR ".join("type = ?" for _ in event_filter.types))
+        clauses.append(
+            "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types)
+        )
         args.extend(event_filter.types)
 
     for typ in event_filter.not_types:
-        clauses.append("type != ?")
+        clauses.append("event.type != ?")
         args.append(typ)
 
     if event_filter.senders:
-        clauses.append("(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders))
+        clauses.append(
+            "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders)
+        )
         args.extend(event_filter.senders)
 
     for sender in event_filter.not_senders:
-        clauses.append("sender != ?")
+        clauses.append("event.sender != ?")
         args.append(sender)
 
     if event_filter.rooms:
-        clauses.append("(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms))
+        clauses.append(
+            "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms)
+        )
         args.extend(event_filter.rooms)
 
     for room_id in event_filter.not_rooms:
-        clauses.append("room_id != ?")
+        clauses.append("event.room_id != ?")
         args.append(room_id)
 
     if event_filter.contains_url:
-        clauses.append("contains_url = ?")
+        clauses.append("event.contains_url = ?")
         args.append(event_filter.contains_url)
 
     # We're only applying the "labels" filter on the database query, because applying the
@@ -307,6 +313,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
         clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels))
         args.extend(event_filter.labels)
 
+    # Filter on relation_senders / relation types from the joined tables.
+    if event_filter.relation_senders:
+        clauses.append(
+            "(%s)"
+            % " OR ".join(
+                "related_event.sender = ?" for _ in event_filter.relation_senders
+            )
+        )
+        args.extend(event_filter.relation_senders)
+
+    if event_filter.relation_types:
+        clauses.append(
+            "(%s)"
+            % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+        )
+        args.extend(event_filter.relation_types)
+
     return " AND ".join(clauses), args
 
 
@@ -1116,7 +1139,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         bounds = generate_pagination_where_clause(
             direction=direction,
-            column_names=("topological_ordering", "stream_ordering"),
+            column_names=("event.topological_ordering", "event.stream_ordering"),
             from_token=from_bound,
             to_token=to_bound,
             engine=self.database_engine,
@@ -1133,32 +1156,51 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         select_keywords = "SELECT"
         join_clause = ""
+        # Using DISTINCT in this SELECT query is quite expensive, because it
+        # requires the engine to sort on the entire (not limited) result set,
+        # i.e. the entire events table. Only use it in scenarios that could result
+        # in the same event ID occurring multiple times in the results.
+        needs_distinct = False
         if event_filter and event_filter.labels:
             # If we're not filtering on a label, then joining on event_labels will
             # return as many row for a single event as the number of labels it has. To
             # avoid this, only join if we're filtering on at least one label.
-            join_clause = """
+            join_clause += """
                 LEFT JOIN event_labels
                 USING (event_id, room_id, topological_ordering)
             """
             if len(event_filter.labels) > 1:
-                # Using DISTINCT in this SELECT query is quite expensive, because it
-                # requires the engine to sort on the entire (not limited) result set,
-                # i.e. the entire events table. We only need to use it when we're
-                # filtering on more than two labels, because that's the only scenario
-                # in which we can possibly to get multiple times the same event ID in
-                # the results.
-                select_keywords += "DISTINCT"
+                # Multiple labels could cause the same event to appear multiple times.
+                needs_distinct = True
+
+        # If there is a filter on relation_senders and relation_types join to the
+        # relations table.
+        if event_filter and (
+            event_filter.relation_senders or event_filter.relation_types
+        ):
+            # Filtering by relations could cause the same event to appear multiple
+            # times (since there's no limit on the number of relations to an event).
+            needs_distinct = True
+            join_clause += """
+                LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
+            """
+            if event_filter.relation_senders:
+                join_clause += """
+                    LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
+                """
+
+        if needs_distinct:
+            select_keywords += " DISTINCT"
 
         sql = """
             %(select_keywords)s
-                event_id, instance_name,
-                topological_ordering, stream_ordering
-            FROM events
+                event.event_id, event.instance_name,
+                event.topological_ordering, event.stream_ordering
+            FROM events AS event
             %(join_clause)s
-            WHERE outlier = ? AND room_id = ? AND %(bounds)s
-            ORDER BY topological_ordering %(order)s,
-            stream_ordering %(order)s LIMIT ?
+            WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+            ORDER BY event.topological_ordering %(order)s,
+            event.stream_ordering %(order)s LIMIT ?
         """ % {
             "select_keywords": select_keywords,
             "join_clause": join_clause,