diff --git a/changelog.d/11228.feature b/changelog.d/11228.feature
new file mode 100644
index 0000000000..33c1756b50
--- /dev/null
+++ b/changelog.d/11228.feature
@@ -0,0 +1 @@
+Allow the admin [Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api) to block a room without the need to join it.
diff --git a/changelog.d/11236.feature b/changelog.d/11236.feature
new file mode 100644
index 0000000000..e7aeee2aa6
--- /dev/null
+++ b/changelog.d/11236.feature
@@ -0,0 +1 @@
+Support filtering by relation senders & types per [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
diff --git a/changelog.d/11242.misc b/changelog.d/11242.misc
new file mode 100644
index 0000000000..3a98259edf
--- /dev/null
+++ b/changelog.d/11242.misc
@@ -0,0 +1 @@
+Split out federated PDU retrieval function into a non-cached version.
diff --git a/changelog.d/11247.misc b/changelog.d/11247.misc
new file mode 100644
index 0000000000..5ce701560e
--- /dev/null
+++ b/changelog.d/11247.misc
@@ -0,0 +1 @@
+Clean up code relating to to-device messages and sending ephemeral events to application services.
\ No newline at end of file
diff --git a/changelog.d/11278.misc b/changelog.d/11278.misc
new file mode 100644
index 0000000000..9b014bc8b4
--- /dev/null
+++ b/changelog.d/11278.misc
@@ -0,0 +1 @@
+Fix a small typo in the error response when a relation type other than 'm.annotation' is passed to `GET /rooms/{room_id}/aggregations/{event_id}`.
\ No newline at end of file
diff --git a/changelog.d/11282.misc b/changelog.d/11282.misc
new file mode 100644
index 0000000000..4720519cbc
--- /dev/null
+++ b/changelog.d/11282.misc
@@ -0,0 +1 @@
+Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.
diff --git a/changelog.d/11285.misc b/changelog.d/11285.misc
new file mode 100644
index 0000000000..4720519cbc
--- /dev/null
+++ b/changelog.d/11285.misc
@@ -0,0 +1 @@
+Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.
diff --git a/changelog.d/11286.doc b/changelog.d/11286.doc
new file mode 100644
index 0000000000..890d7b4ee4
--- /dev/null
+++ b/changelog.d/11286.doc
@@ -0,0 +1 @@
+Fix typo in the word `available` and fix HTTP method (should be `GET`) for the `username_available` admin API. Contributed by Stanislav Motylkov.
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index ab6b82a082..41a4961d00 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -396,13 +396,17 @@ The new room will be created with the user specified by the `new_room_user_id` p
as room administrator and will contain a message explaining what happened. Users invited
to the new room will have power level `-10` by default, and thus be unable to speak.
-If `block` is `True` it prevents new joins to the old room.
+If `block` is `true`, users will be prevented from joining the old room.
+This option can also be used to pre-emptively block a room, even if it's unknown
+to this homeserver. In this case, the room will be blocked, and no further action
+will be taken. If `block` is `false`, attempting to delete an unknown room is
+invalid and will be rejected as a bad request.
This API will remove all trace of the old room from your database after removing
all local users. If `purge` is `true` (the default), all traces of the old room will
be removed from your database after removing all local users. If you do not want
this to happen, set `purge` to `false`.
-Depending on the amount of history being purged a call to the API may take
+Depending on the amount of history being purged, a call to the API may take
several minutes or longer.
The local server will only have the power to move local user and room aliases to
@@ -464,8 +468,9 @@ The following JSON body parameters are available:
`new_room_user_id` in the new room. Ideally this will clearly convey why the
original room was shut down. Defaults to `Sharing illegal content on this server
is not permitted and rooms in violation will be blocked.`
-* `block` - Optional. If set to `true`, this room will be added to a blocking list, preventing
- future attempts to join the room. Defaults to `false`.
+* `block` - Optional. If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Rooms can be blocked
+ even if they're not yet known to the homeserver. Defaults to `false`.
* `purge` - Optional. If set to `true`, it will remove all traces of the room from your database.
Defaults to `true`.
* `force_purge` - Optional, and ignored unless `purge` is `true`. If set to `true`, it
@@ -483,7 +488,8 @@ The following fields are returned in the JSON response body:
* `failed_to_kick_users` - An array of users (`user_id`) that that were not kicked.
* `local_aliases` - An array of strings representing the local aliases that were migrated from
the old room to the new.
-* `new_room_id` - A string representing the room ID of the new room.
+* `new_room_id` - A string representing the room ID of the new room, or `null` if
+ no such room was created.
## Undoing room deletions
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index f03539c9f0..16ec33b3c1 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -1107,7 +1107,7 @@ This endpoint will work even if registration is disabled on the server, unlike
The API is:
```
-POST /_synapse/admin/v1/username_availabile?username=$localpart
+GET /_synapse/admin/v1/username_available?username=$localpart
```
The request and response format is the same as the [/_matrix/client/r0/register/available](https://matrix.org/docs/spec/client_server/r0.6.0#get-matrix-client-r0-register-available) API.
diff --git a/mypy.ini b/mypy.ini
index 600402a5d3..1752b82bc5 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -10,82 +10,173 @@ warn_unreachable = True
local_partial_types = True
no_implicit_optional = True
-# To find all folders that pass mypy you run:
-#
-# find synapse/* -type d -not -name __pycache__ -exec bash -c "mypy '{}' > /dev/null" \; -print
-
files =
scripts-dev/sign_json,
- synapse/__init__.py,
- synapse/api,
- synapse/appservice,
- synapse/config,
- synapse/crypto,
- synapse/event_auth.py,
- synapse/events,
- synapse/federation,
- synapse/groups,
- synapse/handlers,
- synapse/http,
- synapse/logging,
- synapse/metrics,
- synapse/module_api,
- synapse/notifier.py,
- synapse/push,
- synapse/replication,
- synapse/rest,
- synapse/server.py,
- synapse/server_notices,
- synapse/spam_checker_api,
- synapse/state,
- synapse/storage/__init__.py,
- synapse/storage/_base.py,
- synapse/storage/background_updates.py,
- synapse/storage/databases/main/appservice.py,
- synapse/storage/databases/main/client_ips.py,
- synapse/storage/databases/main/events.py,
- synapse/storage/databases/main/keys.py,
- synapse/storage/databases/main/pusher.py,
- synapse/storage/databases/main/registration.py,
- synapse/storage/databases/main/relations.py,
- synapse/storage/databases/main/session.py,
- synapse/storage/databases/main/stream.py,
- synapse/storage/databases/main/ui_auth.py,
- synapse/storage/databases/state,
- synapse/storage/database.py,
- synapse/storage/engines,
- synapse/storage/keys.py,
- synapse/storage/persist_events.py,
- synapse/storage/prepare_database.py,
- synapse/storage/purge_events.py,
- synapse/storage/push_rule.py,
- synapse/storage/relations.py,
- synapse/storage/roommember.py,
- synapse/storage/state.py,
- synapse/storage/types.py,
- synapse/storage/util,
- synapse/streams,
- synapse/types.py,
- synapse/util,
- synapse/visibility.py,
- tests/replication,
- tests/test_event_auth.py,
- tests/test_utils,
- tests/handlers/test_password_providers.py,
- tests/handlers/test_room.py,
- tests/handlers/test_room_summary.py,
- tests/handlers/test_send_email.py,
- tests/handlers/test_sync.py,
- tests/handlers/test_user_directory.py,
- tests/rest/client/test_login.py,
- tests/rest/client/test_auth.py,
- tests/rest/client/test_relations.py,
- tests/rest/media/v1/test_filepath.py,
- tests/rest/media/v1/test_oembed.py,
- tests/storage/test_state.py,
- tests/storage/test_user_directory.py,
- tests/util/test_itertools.py,
- tests/util/test_stream_change_cache.py
+ setup.py,
+ synapse/,
+ tests/
+
+# Note: Better exclusion syntax coming in mypy > 0.910
+# https://github.com/python/mypy/pull/11329
+#
+# For now, set the (?x) flag enable "verbose" regexes
+# https://docs.python.org/3/library/re.html#re.X
+exclude = (?x)
+ ^(
+ |synapse/_scripts/register_new_matrix_user.py
+ |synapse/_scripts/review_recent_signups.py
+ |synapse/app/__init__.py
+ |synapse/app/_base.py
+ |synapse/app/admin_cmd.py
+ |synapse/app/appservice.py
+ |synapse/app/client_reader.py
+ |synapse/app/event_creator.py
+ |synapse/app/federation_reader.py
+ |synapse/app/federation_sender.py
+ |synapse/app/frontend_proxy.py
+ |synapse/app/generic_worker.py
+ |synapse/app/homeserver.py
+ |synapse/app/media_repository.py
+ |synapse/app/phone_stats_home.py
+ |synapse/app/pusher.py
+ |synapse/app/synchrotron.py
+ |synapse/app/user_dir.py
+ |synapse/storage/databases/__init__.py
+ |synapse/storage/databases/main/__init__.py
+ |synapse/storage/databases/main/account_data.py
+ |synapse/storage/databases/main/cache.py
+ |synapse/storage/databases/main/censor_events.py
+ |synapse/storage/databases/main/deviceinbox.py
+ |synapse/storage/databases/main/devices.py
+ |synapse/storage/databases/main/directory.py
+ |synapse/storage/databases/main/e2e_room_keys.py
+ |synapse/storage/databases/main/end_to_end_keys.py
+ |synapse/storage/databases/main/event_federation.py
+ |synapse/storage/databases/main/event_push_actions.py
+ |synapse/storage/databases/main/events_bg_updates.py
+ |synapse/storage/databases/main/events_forward_extremities.py
+ |synapse/storage/databases/main/events_worker.py
+ |synapse/storage/databases/main/filtering.py
+ |synapse/storage/databases/main/group_server.py
+ |synapse/storage/databases/main/lock.py
+ |synapse/storage/databases/main/media_repository.py
+ |synapse/storage/databases/main/metrics.py
+ |synapse/storage/databases/main/monthly_active_users.py
+ |synapse/storage/databases/main/openid.py
+ |synapse/storage/databases/main/presence.py
+ |synapse/storage/databases/main/profile.py
+ |synapse/storage/databases/main/purge_events.py
+ |synapse/storage/databases/main/push_rule.py
+ |synapse/storage/databases/main/receipts.py
+ |synapse/storage/databases/main/rejections.py
+ |synapse/storage/databases/main/room.py
+ |synapse/storage/databases/main/room_batch.py
+ |synapse/storage/databases/main/roommember.py
+ |synapse/storage/databases/main/search.py
+ |synapse/storage/databases/main/signatures.py
+ |synapse/storage/databases/main/state.py
+ |synapse/storage/databases/main/state_deltas.py
+ |synapse/storage/databases/main/stats.py
+ |synapse/storage/databases/main/tags.py
+ |synapse/storage/databases/main/transactions.py
+ |synapse/storage/databases/main/user_directory.py
+ |synapse/storage/databases/main/user_erasure_store.py
+ |synapse/storage/schema/
+
+ |tests/api/test_auth.py
+ |tests/api/test_ratelimiting.py
+ |tests/app/test_openid_listener.py
+ |tests/appservice/test_scheduler.py
+ |tests/config/test_cache.py
+ |tests/config/test_tls.py
+ |tests/crypto/test_keyring.py
+ |tests/events/test_presence_router.py
+ |tests/events/test_utils.py
+ |tests/federation/test_federation_catch_up.py
+ |tests/federation/test_federation_sender.py
+ |tests/federation/test_federation_server.py
+ |tests/federation/transport/test_knocking.py
+ |tests/federation/transport/test_server.py
+ |tests/handlers/test_cas.py
+ |tests/handlers/test_directory.py
+ |tests/handlers/test_e2e_keys.py
+ |tests/handlers/test_federation.py
+ |tests/handlers/test_oidc.py
+ |tests/handlers/test_presence.py
+ |tests/handlers/test_profile.py
+ |tests/handlers/test_saml.py
+ |tests/handlers/test_typing.py
+ |tests/http/federation/test_matrix_federation_agent.py
+ |tests/http/federation/test_srv_resolver.py
+ |tests/http/test_fedclient.py
+ |tests/http/test_proxyagent.py
+ |tests/http/test_servlet.py
+ |tests/http/test_site.py
+ |tests/logging/__init__.py
+ |tests/logging/test_terse_json.py
+ |tests/module_api/test_api.py
+ |tests/push/test_email.py
+ |tests/push/test_http.py
+ |tests/push/test_presentable_names.py
+ |tests/push/test_push_rule_evaluator.py
+ |tests/rest/admin/test_admin.py
+ |tests/rest/admin/test_device.py
+ |tests/rest/admin/test_media.py
+ |tests/rest/admin/test_server_notice.py
+ |tests/rest/admin/test_user.py
+ |tests/rest/admin/test_username_available.py
+ |tests/rest/client/test_account.py
+ |tests/rest/client/test_events.py
+ |tests/rest/client/test_filter.py
+ |tests/rest/client/test_groups.py
+ |tests/rest/client/test_register.py
+ |tests/rest/client/test_report_event.py
+ |tests/rest/client/test_rooms.py
+ |tests/rest/client/test_third_party_rules.py
+ |tests/rest/client/test_transactions.py
+ |tests/rest/client/test_typing.py
+ |tests/rest/client/utils.py
+ |tests/rest/key/v2/test_remote_key_resource.py
+ |tests/rest/media/v1/test_base.py
+ |tests/rest/media/v1/test_media_storage.py
+ |tests/rest/media/v1/test_url_preview.py
+ |tests/scripts/test_new_matrix_user.py
+ |tests/server.py
+ |tests/server_notices/test_resource_limits_server_notices.py
+ |tests/state/test_v2.py
+ |tests/storage/test_account_data.py
+ |tests/storage/test_appservice.py
+ |tests/storage/test_background_update.py
+ |tests/storage/test_base.py
+ |tests/storage/test_client_ips.py
+ |tests/storage/test_database.py
+ |tests/storage/test_event_federation.py
+ |tests/storage/test_id_generators.py
+ |tests/storage/test_roommember.py
+ |tests/test_metrics.py
+ |tests/test_phone_home.py
+ |tests/test_server.py
+ |tests/test_state.py
+ |tests/test_terms_auth.py
+ |tests/test_visibility.py
+ |tests/unittest.py
+ |tests/util/caches/test_cached_call.py
+ |tests/util/caches/test_deferred_cache.py
+ |tests/util/caches/test_descriptors.py
+ |tests/util/caches/test_response_cache.py
+ |tests/util/caches/test_ttlcache.py
+ |tests/util/test_async_helpers.py
+ |tests/util/test_batching_queue.py
+ |tests/util/test_dict_cache.py
+ |tests/util/test_expiring_cache.py
+ |tests/util/test_file_consumer.py
+ |tests/util/test_linearizer.py
+ |tests/util/test_logcontext.py
+ |tests/util/test_lrucache.py
+ |tests/util/test_rwlock.py
+ |tests/util/test_wheel_timer.py
+ |tests/utils.py
+ )$
[mypy-synapse.api.*]
disallow_untyped_defs = True
@@ -272,6 +363,9 @@ ignore_missing_imports = True
[mypy-opentracing]
ignore_missing_imports = True
+[mypy-parameterized.*]
+ignore_missing_imports = True
+
[mypy-phonenumbers.*]
ignore_missing_imports = True
diff --git a/setup.py b/setup.py
index 345cff09c3..5d602db240 100755
--- a/setup.py
+++ b/setup.py
@@ -17,6 +17,7 @@
# limitations under the License.
import glob
import os
+from typing import Any, Dict
from setuptools import Command, find_packages, setup
@@ -49,8 +50,6 @@ here = os.path.abspath(os.path.dirname(__file__))
# [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command
# [2]: https://pypi.python.org/pypi/setuptools_trial
class TestCommand(Command):
- user_options = []
-
def initialize_options(self):
pass
@@ -75,7 +74,7 @@ def read_file(path_segments):
def exec_file(path_segments):
"""Execute a single python file to get the variables defined in it"""
- result = {}
+ result: Dict[str, Any] = {}
code = read_file(path_segments)
exec(code, result)
return result
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 4b0a9b2974..13dd6ce248 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -1,7 +1,7 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018-2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -86,6 +86,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
# cf https://github.com/matrix-org/matrix-doc/pull/2326
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
+ # MSC3440, filtering by event relations.
+ "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
+ "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
},
}
@@ -146,14 +149,16 @@ def matrix_user_id_validator(user_id_str: str) -> UserID:
class Filtering:
def __init__(self, hs: "HomeServer"):
- super().__init__()
+ self._hs = hs
self.store = hs.get_datastore()
+ self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})
+
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
) -> "FilterCollection":
result = await self.store.get_user_filter(user_localpart, filter_id)
- return FilterCollection(result)
+ return FilterCollection(self._hs, result)
def add_user_filter(
self, user_localpart: str, user_filter: JsonDict
@@ -191,21 +196,22 @@ FilterEvent = TypeVar("FilterEvent", EventBase, UserPresenceState, JsonDict)
class FilterCollection:
- def __init__(self, filter_json: JsonDict):
+ def __init__(self, hs: "HomeServer", filter_json: JsonDict):
self._filter_json = filter_json
room_filter_json = self._filter_json.get("room", {})
self._room_filter = Filter(
- {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")}
+ hs,
+ {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")},
)
- self._room_timeline_filter = Filter(room_filter_json.get("timeline", {}))
- self._room_state_filter = Filter(room_filter_json.get("state", {}))
- self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {}))
- self._room_account_data = Filter(room_filter_json.get("account_data", {}))
- self._presence_filter = Filter(filter_json.get("presence", {}))
- self._account_data = Filter(filter_json.get("account_data", {}))
+ self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
+ self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
+ self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
+ self._room_account_data = Filter(hs, room_filter_json.get("account_data", {}))
+ self._presence_filter = Filter(hs, filter_json.get("presence", {}))
+ self._account_data = Filter(hs, filter_json.get("account_data", {}))
self.include_leave = filter_json.get("room", {}).get("include_leave", False)
self.event_fields = filter_json.get("event_fields", [])
@@ -232,25 +238,37 @@ class FilterCollection:
def include_redundant_members(self) -> bool:
return self._room_state_filter.include_redundant_members
- def filter_presence(
+ async def filter_presence(
self, events: Iterable[UserPresenceState]
) -> List[UserPresenceState]:
- return self._presence_filter.filter(events)
+ return await self._presence_filter.filter(events)
- def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
- return self._account_data.filter(events)
+ async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+ return await self._account_data.filter(events)
- def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
- return self._room_state_filter.filter(self._room_filter.filter(events))
+ async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
+ return await self._room_state_filter.filter(
+ await self._room_filter.filter(events)
+ )
- def filter_room_timeline(self, events: Iterable[EventBase]) -> List[EventBase]:
- return self._room_timeline_filter.filter(self._room_filter.filter(events))
+ async def filter_room_timeline(
+ self, events: Iterable[EventBase]
+ ) -> List[EventBase]:
+ return await self._room_timeline_filter.filter(
+ await self._room_filter.filter(events)
+ )
- def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
- return self._room_ephemeral_filter.filter(self._room_filter.filter(events))
+ async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+ return await self._room_ephemeral_filter.filter(
+ await self._room_filter.filter(events)
+ )
- def filter_room_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
- return self._room_account_data.filter(self._room_filter.filter(events))
+ async def filter_room_account_data(
+ self, events: Iterable[JsonDict]
+ ) -> List[JsonDict]:
+ return await self._room_account_data.filter(
+ await self._room_filter.filter(events)
+ )
def blocks_all_presence(self) -> bool:
return (
@@ -274,7 +292,9 @@ class FilterCollection:
class Filter:
- def __init__(self, filter_json: JsonDict):
+ def __init__(self, hs: "HomeServer", filter_json: JsonDict):
+ self._hs = hs
+ self._store = hs.get_datastore()
self.filter_json = filter_json
self.limit = filter_json.get("limit", 10)
@@ -297,6 +317,20 @@ class Filter:
self.labels = filter_json.get("org.matrix.labels", None)
self.not_labels = filter_json.get("org.matrix.not_labels", [])
+ # Ideally these would be rejected at the endpoint if they were provided
+ # and not supported, but that would involve modifying the JSON schema
+ # based on the homeserver configuration.
+ if hs.config.experimental.msc3440_enabled:
+ self.relation_senders = self.filter_json.get(
+ "io.element.relation_senders", None
+ )
+ self.relation_types = self.filter_json.get(
+ "io.element.relation_types", None
+ )
+ else:
+ self.relation_senders = None
+ self.relation_types = None
+
def filters_all_types(self) -> bool:
return "*" in self.not_types
@@ -306,7 +340,7 @@ class Filter:
def filters_all_rooms(self) -> bool:
return "*" in self.not_rooms
- def check(self, event: FilterEvent) -> bool:
+ def _check(self, event: FilterEvent) -> bool:
"""Checks whether the filter matches the given event.
Args:
@@ -420,8 +454,30 @@ class Filter:
return room_ids
- def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
- return list(filter(self.check, events))
+ async def _check_event_relations(
+ self, events: Iterable[FilterEvent]
+ ) -> List[FilterEvent]:
+ # The event IDs to check, mypy doesn't understand the ifinstance check.
+ event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
+ event_ids_to_keep = set(
+ await self._store.events_have_relations(
+ event_ids, self.relation_senders, self.relation_types
+ )
+ )
+
+ return [
+ event
+ for event in events
+ if not isinstance(event, EventBase) or event.event_id in event_ids_to_keep
+ ]
+
+ async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
+ result = [event for event in events if self._check(event)]
+
+ if self.relation_senders or self.relation_types:
+ return await self._check_event_relations(result)
+
+ return result
def with_room_ids(self, room_ids: Iterable[str]) -> "Filter":
"""Returns a new filter with the given room IDs appended.
@@ -433,7 +489,7 @@ class Filter:
filter: A new filter including the given rooms and the old
filter's rooms.
"""
- newFilter = Filter(self.filter_json)
+ newFilter = Filter(self._hs, self.filter_json)
newFilter.rooms += room_ids
return newFilter
@@ -444,6 +500,3 @@ def _matches_wildcard(actual_value: Optional[str], filter_value: str) -> bool:
return actual_value.startswith(type_prefix)
else:
return actual_value == filter_value
-
-
-DEFAULT_FILTER_COLLECTION = FilterCollection({})
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 670186f548..3b85b135e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -277,6 +277,58 @@ class FederationClient(FederationBase):
return pdus
+ async def get_pdu_from_destination_raw(
+ self,
+ destination: str,
+ event_id: str,
+ room_version: RoomVersion,
+ outlier: bool = False,
+ timeout: Optional[int] = None,
+ ) -> Optional[EventBase]:
+ """Requests the PDU with given origin and ID from the remote home
+ server. Does not have any caching or rate limiting!
+
+ Args:
+ destination: Which homeserver to query
+ event_id: event to fetch
+ room_version: version of the room
+ outlier: Indicates whether the PDU is an `outlier`, i.e. if
+ it's from an arbitrary point in the context as opposed to part
+ of the current block of PDUs. Defaults to `False`
+ timeout: How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
+
+ Returns:
+ The requested PDU, or None if we were unable to find it.
+
+ Raises:
+ SynapseError, NotRetryingDestination, FederationDeniedError
+ """
+ transaction_data = await self.transport_layer.get_event(
+ destination, event_id, timeout=timeout
+ )
+
+ logger.debug(
+ "retrieved event id %s from %s: %r",
+ event_id,
+ destination,
+ transaction_data,
+ )
+
+ pdu_list: List[EventBase] = [
+ event_from_pdu_json(p, room_version, outlier=outlier)
+ for p in transaction_data["pdus"]
+ ]
+
+ if pdu_list and pdu_list[0]:
+ pdu = pdu_list[0]
+
+ # Check signatures are correct.
+ signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+ return signed_pdu
+
+ return None
+
async def get_pdu(
self,
destinations: Iterable[str],
@@ -321,30 +373,14 @@ class FederationClient(FederationBase):
continue
try:
- transaction_data = await self.transport_layer.get_event(
- destination, event_id, timeout=timeout
- )
-
- logger.debug(
- "retrieved event id %s from %s: %r",
- event_id,
- destination,
- transaction_data,
+ signed_pdu = await self.get_pdu_from_destination_raw(
+ destination=destination,
+ event_id=event_id,
+ room_version=room_version,
+ outlier=outlier,
+ timeout=timeout,
)
- pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version, outlier=outlier)
- for p in transaction_data["pdus"]
- ]
-
- if pdu_list and pdu_list[0]:
- pdu = pdu_list[0]
-
- # Check signatures are correct.
- signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
-
- break
-
pdu_attempts[destination] = now
except SynapseError as e:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ddc9105ee9..9abdad262b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -188,7 +188,7 @@ class ApplicationServicesHandler:
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Optional[Collection[Union[str, UserID]]] = None,
+ users: Collection[Union[str, UserID]],
) -> None:
"""
This is called by the notifier in the background when an ephemeral event is handled
@@ -203,7 +203,9 @@ class ApplicationServicesHandler:
value for `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
- them.
+ receiving them by setting `push_ephemeral` to true in their registration
+ file. Note that while MSC2409 is experimental, this option is called
+ `de.sorunome.msc2409.push_ephemeral`.
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
@@ -214,6 +216,7 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
+ # Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
@@ -230,18 +233,25 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
+ # Check whether there are any appservices which have registered to receive
+ # ephemeral events.
+ #
+ # Note that whether these events are actually relevant to these appservices
+ # is decided later on.
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services:
+ # Bail out early if none of the target appservices have explicitly registered
+ # to receive these ephemeral events.
return
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
- services, stream_key, new_token, users or []
+ services, stream_key, new_token, users
)
@wrap_as_background_process("notify_interested_services_ephemeral")
@@ -252,7 +262,7 @@ class ApplicationServicesHandler:
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
- logger.debug("Checking interested services for %s" % (stream_key))
+ logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
@@ -345,6 +355,9 @@ class ApplicationServicesHandler:
Args:
service: The application service to check for which events it should receive.
+ new_token: A receipts event stream token. Purely used to double-check that the
+ from_token we pull from the database isn't greater than or equal to this
+ token. Prevents accidentally duplicating work.
Returns:
A list of JSON dictionaries containing data derived from the read receipts that
@@ -382,6 +395,9 @@ class ApplicationServicesHandler:
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
+ new_token: A presence update stream token. Purely used to double-check that the
+ from_token we pull from the database isn't greater than or equal to this
+ token. Prevents accidentally duplicating work.
Returns:
A list of json dictionaries containing data derived from the presence events
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index b6a2a34ab7..b582266af9 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -89,6 +89,13 @@ class DeviceMessageHandler:
)
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
+ """
+ Handle receiving to-device messages from remote homeservers.
+
+ Args:
+ origin: The remote homeserver.
+ content: The JSON dictionary containing the to-device messages.
+ """
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
@@ -135,12 +142,16 @@ class DeviceMessageHandler:
message_type, sender_user_id, by_device
)
- stream_id = await self.store.add_messages_from_remote_to_device_inbox(
+ # Add messages to the database.
+ # Retrieve the stream id of the last-processed to-device message.
+ last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
+ # Notify listeners that there are new to-device messages to process,
+ # handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", stream_id, users=local_messages.keys()
+ "to_device_key", last_stream_id, users=local_messages.keys()
)
async def _check_for_unknown_devices(
@@ -195,6 +206,14 @@ class DeviceMessageHandler:
message_type: str,
messages: Dict[str, Dict[str, JsonDict]],
) -> None:
+ """
+ Handle a request from a user to send to-device message(s).
+
+ Args:
+ requester: The user that is sending the to-device messages.
+ message_type: The type of to-device messages that are being sent.
+ messages: A dictionary containing recipients mapped to messages intended for them.
+ """
sender_user_id = requester.user.to_string()
message_id = random_string(16)
@@ -257,12 +276,16 @@ class DeviceMessageHandler:
"org.matrix.opentracing_context": json_encoder.encode(context),
}
- stream_id = await self.store.add_messages_to_device_inbox(
+ # Add messages to the database.
+ # Retrieve the stream id of the last-processed to-device message.
+ last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
+ # Notify listeners that there are new to-device messages to process,
+ # handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", stream_id, users=local_messages.keys()
+ "to_device_key", last_stream_id, users=local_messages.keys()
)
if self.federation_sender:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index abfe7be0e3..aa26911aed 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -424,7 +424,7 @@ class PaginationHandler:
if events:
if event_filter:
- events = event_filter.filter(events)
+ events = await event_filter.filter(events)
events = await filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 969eb3b9b0..11af30eee7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Contains functions for performing events on rooms."""
-
+"""Contains functions for performing actions on rooms."""
import itertools
import logging
import math
@@ -31,6 +30,8 @@ from typing import (
Tuple,
)
+from typing_extensions import TypedDict
+
from synapse.api.constants import (
EventContentFields,
EventTypes,
@@ -1158,8 +1159,10 @@ class RoomContextHandler:
)
if event_filter:
- results["events_before"] = event_filter.filter(results["events_before"])
- results["events_after"] = event_filter.filter(results["events_after"])
+ results["events_before"] = await event_filter.filter(
+ results["events_before"]
+ )
+ results["events_after"] = await event_filter.filter(results["events_after"])
results["events_before"] = await filter_evts(results["events_before"])
results["events_after"] = await filter_evts(results["events_after"])
@@ -1195,7 +1198,7 @@ class RoomContextHandler:
state_events = list(state[last_event_id].values())
if event_filter:
- state_events = event_filter.filter(state_events)
+ state_events = await event_filter.filter(state_events)
results["state"] = await filter_evts(state_events)
@@ -1275,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_room_events_max_id(room_id)
+class ShutdownRoomResponse(TypedDict):
+ kicked_users: List[str]
+ failed_to_kick_users: List[str]
+ local_aliases: List[str]
+ new_room_id: Optional[str]
+
+
class RoomShutdownHandler:
DEFAULT_MESSAGE = (
@@ -1300,7 +1310,7 @@ class RoomShutdownHandler:
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
- ) -> dict:
+ ) -> ShutdownRoomResponse:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1334,8 +1344,13 @@ class RoomShutdownHandler:
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
+ If set to `True`, users will be prevented from joining the old
+ room. This option can also be used to pre-emptively block a room,
+ even if it's unknown to this homeserver. In this case, the room
+ will be blocked, and no further action will be taken. If `False`,
+ attempting to delete an unknown room is invalid.
+
+ Defaults to `False`.
Returns: a dict containing the following keys:
kicked_users: An array of users (`user_id`) that were kicked.
@@ -1344,7 +1359,9 @@ class RoomShutdownHandler:
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
- new_room_id: A string representing the room ID of the new room.
+ new_room_id:
+ A string representing the room ID of the new room, or None if
+ no such room was created.
"""
if not new_room_name:
@@ -1355,14 +1372,28 @@ class RoomShutdownHandler:
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
- if not await self.store.get_room(room_id):
- raise NotFoundError("Unknown room id %s" % (room_id,))
-
- # This will work even if the room is already blocked, but that is
- # desirable in case the first attempt at blocking the room failed below.
+ # Action the block first (even if the room doesn't exist yet)
if block:
+ # This will work even if the room is already blocked, but that is
+ # desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
+ if not await self.store.get_room(room_id):
+ if block:
+ # We allow you to block an unknown room.
+ return {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+ else:
+ # But if you don't want to preventatively block another room,
+ # this function can't do anything useful.
+ raise NotFoundError(
+ "Cannot shut down room: unknown room id %s" % (room_id,)
+ )
+
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6e4dff8056..ab7eaab2fb 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -180,7 +180,7 @@ class SearchHandler:
% (set(group_keys) - {"room_id", "sender"},),
)
- search_filter = Filter(filter_dict)
+ search_filter = Filter(self.hs, filter_dict)
# TODO: Search through left rooms too
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
@@ -242,7 +242,7 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
- filtered_events = search_filter.filter([r["event"] for r in results])
+ filtered_events = await search_filter.filter([r["event"] for r in results])
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
@@ -292,7 +292,9 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
- filtered_events = search_filter.filter([r["event"] for r in results])
+ filtered_events = await search_filter.filter(
+ [r["event"] for r in results]
+ )
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c7c6d63a9..891435c14d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,7 +510,7 @@ class SyncHandler:
log_kv({"limited": limited})
if potential_recents:
- recents = sync_config.filter_collection.filter_room_timeline(
+ recents = await sync_config.filter_collection.filter_room_timeline(
potential_recents
)
log_kv({"recents_after_sync_filtering": len(recents)})
@@ -575,8 +575,8 @@ class SyncHandler:
log_kv({"loaded_recents": len(events)})
- loaded_recents = sync_config.filter_collection.filter_room_timeline(
- events
+ loaded_recents = (
+ await sync_config.filter_collection.filter_room_timeline(events)
)
log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
@@ -1015,7 +1015,7 @@ class SyncHandler:
return {
(e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
+ for e in await sync_config.filter_collection.filter_room_state(
list(state.values())
)
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
@@ -1383,7 +1383,7 @@ class SyncHandler:
sync_config.user
)
- account_data_for_user = sync_config.filter_collection.filter_account_data(
+ account_data_for_user = await sync_config.filter_collection.filter_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in account_data.items()
@@ -1448,7 +1448,7 @@ class SyncHandler:
# Deduplicate the presence entries so that there's at most one per user
presence = list({p.user_id: p for p in presence}.values())
- presence = sync_config.filter_collection.filter_presence(presence)
+ presence = await sync_config.filter_collection.filter_presence(presence)
sync_result_builder.presence = presence
@@ -2021,12 +2021,14 @@ class SyncHandler:
)
account_data_events = (
- sync_config.filter_collection.filter_room_account_data(
+ await sync_config.filter_collection.filter_room_account_data(
account_data_events
)
)
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+ ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral
+ )
if not (
always_include
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 05c5b4bf0c..a2f4edebb8 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from http import HTTPStatus
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from urllib import parse as urlparse
from synapse.api.constants import EventTypes, JoinRules, Membership
@@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet):
# Purge room
if purge:
- await pagination_handler.purge_room(room_id, force=force_purge)
-
- return 200, ret
+ try:
+ await pagination_handler.purge_room(room_id, force=force_purge)
+ except NotFoundError:
+ if block:
+ # We can block unknown rooms with this endpoint, in which case
+ # a failed purge is expected.
+ pass
+ else:
+ # But otherwise, we expect this purge to have succeeded.
+ raise
+
+ # Cast safety: cast away the knowledge that this is a TypedDict.
+ # See https://github.com/python/mypy/issues/4976#issuecomment-579883622
+ # for some discussion on why this is necessary. Either way,
+ # `ret` is an opaque dictionary blob as far as the rest of the app cares.
+ return 200, cast(JsonDict, ret)
class RoomMembersRestServlet(RestServlet):
@@ -583,6 +596,7 @@ class RoomEventContextServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
+ self._hs = hs
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
@@ -600,7 +614,9 @@ class RoomEventContextServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+ event_filter: Optional[Filter] = Filter(
+ self._hs, json_decoder.decode(filter_json)
+ )
else:
event_filter = None
diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py
index 58f6699073..184cfbe196 100644
--- a/synapse/rest/client/relations.py
+++ b/synapse/rest/client/relations.py
@@ -298,7 +298,9 @@ class RelationAggregationPaginationServlet(RestServlet):
raise SynapseError(404, "Unknown parent event.")
if relation_type not in (RelationTypes.ANNOTATION, None):
- raise SynapseError(400, "Relation type must be 'annotation'")
+ raise SynapseError(
+ 400, f"Relation type must be '{RelationTypes.ANNOTATION}'"
+ )
limit = parse_integer(request, "limit", default=5)
from_token_str = parse_string(request, "from")
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 6a876cfa2f..03a353d53c 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -550,6 +550,7 @@ class RoomMessageListRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
+ self._hs = hs
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()
@@ -567,7 +568,9 @@ class RoomMessageListRestServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+ event_filter: Optional[Filter] = Filter(
+ self._hs, json_decoder.decode(filter_json)
+ )
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
@@ -672,6 +675,7 @@ class RoomEventContextServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
+ self._hs = hs
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
@@ -688,7 +692,9 @@ class RoomEventContextServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+ event_filter: Optional[Filter] = Filter(
+ self._hs, json_decoder.decode(filter_json)
+ )
else:
event_filter = None
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 913216a7c4..8c0fdb1940 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -29,7 +29,7 @@ from typing import (
from synapse.api.constants import Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
-from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
+from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.events.utils import (
@@ -150,7 +150,7 @@ class SyncRestServlet(RestServlet):
request_key = (user, timeout, since, filter_id, full_state, device_id)
if filter_id is None:
- filter_collection = DEFAULT_FILTER_COLLECTION
+ filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
elif filter_id.startswith("{"):
try:
filter_object = json_decoder.decode(filter_id)
@@ -160,7 +160,7 @@ class SyncRestServlet(RestServlet):
except Exception:
raise SynapseError(400, "Invalid filter JSON")
self.filtering.check_valid_filter(filter_object)
- filter_collection = FilterCollection(filter_object)
+ filter_collection = FilterCollection(self.hs, filter_object)
else:
try:
filter_collection = await self.filtering.get_user_filter(
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2da2659f41..baec35ee27 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore(
)
async def set_type_stream_id_for_appservice(
- self, service: ApplicationService, type: str, pos: Optional[int]
+ self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
- if type not in ("read_receipt", "presence"):
+ if stream_type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
- % (type,)
+ % (stream_type,)
)
def set_type_stream_id_for_appservice_txn(txn):
- stream_id_type = "%s_stream_id" % type
+ stream_id_type = "%s_stream_id" % stream_type
txn.execute(
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 264e625bd7..ae3afdd5d2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit: The maximum number of messages to retrieve.
Returns:
- A list of messages for the device and where in the stream the messages got to.
+ A tuple containing:
+ * A list of messages for the device.
+ * The max stream token of these messages. There may be more to retrieve
+ if the given limit was reached.
"""
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_stream_id
@@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn.execute(
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
)
+
messages = []
+ stream_pos = current_stream_id
+
for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))
+
+ # If the limit was not reached we know that there's no more data for this
+ # user/device pair up to current_stream_id.
if len(messages) < limit:
stream_pos = current_stream_id
+
return messages, stream_pos
return await self.db_pool.runInteraction(
@@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
+
messages = []
+ stream_pos = current_stream_id
+
for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))
+
+ # If the limit was not reached we know that there's no more data for this
+ # user/device pair up to current_stream_id.
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
+
return messages, stream_pos
return await self.db_pool.runInteraction(
@@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"""Used to send messages from this server.
Args:
- local_messages_by_user_and_device:
- Dictionary of user_id to device_id to message.
+ local_messages_by_user_then_device:
+ Dictionary of recipient user_id to recipient device_id to message.
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 53576ad52f..907af10995 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -20,7 +20,7 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import LoggingTransaction
+from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.relations import (
AggregationPaginationToken,
@@ -334,6 +334,62 @@ class RelationsWorkerStore(SQLBaseStore):
return count, latest_event
+ async def events_have_relations(
+ self,
+ parent_ids: List[str],
+ relation_senders: Optional[List[str]],
+ relation_types: Optional[List[str]],
+ ) -> List[str]:
+ """Check which events have a relationship from the given senders of the
+ given types.
+
+ Args:
+ parent_ids: The events being annotated
+ relation_senders: The relation senders to check.
+ relation_types: The relation types to check.
+
+ Returns:
+ True if the event has at least one relationship from one of the given senders of the given type.
+ """
+ # If no restrictions are given then the event has the required relations.
+ if not relation_senders and not relation_types:
+ return parent_ids
+
+ sql = """
+ SELECT relates_to_id FROM event_relations
+ INNER JOIN events USING (event_id)
+ WHERE
+ %s;
+ """
+
+ def _get_if_event_has_relations(txn) -> List[str]:
+ clauses: List[str] = []
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "relates_to_id", parent_ids
+ )
+ clauses.append(clause)
+
+ if relation_senders:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "sender", relation_senders
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+ if relation_types:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "relation_type", relation_types
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+
+ txn.execute(sql % " AND ".join(clauses), args)
+
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_if_event_has_relations", _get_if_event_has_relations
+ )
+
async def has_user_annotated_event(
self, parent_id: str, event_type: str, aggregation_key: str, sender: str
) -> bool:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index cefc77fa0f..17b398bb69 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1751,7 +1751,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
async def block_room(self, room_id: str, user_id: str) -> None:
- """Marks the room as blocked. Can be called multiple times.
+ """Marks the room as blocked.
+
+ Can be called multiple times (though we'll only track the last user to
+ block this room).
+
+ Can be called on a room unknown to this homeserver.
Args:
room_id: Room to block
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index dc7884b1c0..42dc807d17 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -272,31 +272,37 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
args = []
if event_filter.types:
- clauses.append("(%s)" % " OR ".join("type = ?" for _ in event_filter.types))
+ clauses.append(
+ "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types)
+ )
args.extend(event_filter.types)
for typ in event_filter.not_types:
- clauses.append("type != ?")
+ clauses.append("event.type != ?")
args.append(typ)
if event_filter.senders:
- clauses.append("(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders))
+ clauses.append(
+ "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders)
+ )
args.extend(event_filter.senders)
for sender in event_filter.not_senders:
- clauses.append("sender != ?")
+ clauses.append("event.sender != ?")
args.append(sender)
if event_filter.rooms:
- clauses.append("(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms))
+ clauses.append(
+ "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms)
+ )
args.extend(event_filter.rooms)
for room_id in event_filter.not_rooms:
- clauses.append("room_id != ?")
+ clauses.append("event.room_id != ?")
args.append(room_id)
if event_filter.contains_url:
- clauses.append("contains_url = ?")
+ clauses.append("event.contains_url = ?")
args.append(event_filter.contains_url)
# We're only applying the "labels" filter on the database query, because applying the
@@ -307,6 +313,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels))
args.extend(event_filter.labels)
+ # Filter on relation_senders / relation types from the joined tables.
+ if event_filter.relation_senders:
+ clauses.append(
+ "(%s)"
+ % " OR ".join(
+ "related_event.sender = ?" for _ in event_filter.relation_senders
+ )
+ )
+ args.extend(event_filter.relation_senders)
+
+ if event_filter.relation_types:
+ clauses.append(
+ "(%s)"
+ % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+ )
+ args.extend(event_filter.relation_types)
+
return " AND ".join(clauses), args
@@ -1116,7 +1139,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
bounds = generate_pagination_where_clause(
direction=direction,
- column_names=("topological_ordering", "stream_ordering"),
+ column_names=("event.topological_ordering", "event.stream_ordering"),
from_token=from_bound,
to_token=to_bound,
engine=self.database_engine,
@@ -1133,32 +1156,51 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
select_keywords = "SELECT"
join_clause = ""
+ # Using DISTINCT in this SELECT query is quite expensive, because it
+ # requires the engine to sort on the entire (not limited) result set,
+ # i.e. the entire events table. Only use it in scenarios that could result
+ # in the same event ID occurring multiple times in the results.
+ needs_distinct = False
if event_filter and event_filter.labels:
# If we're not filtering on a label, then joining on event_labels will
# return as many row for a single event as the number of labels it has. To
# avoid this, only join if we're filtering on at least one label.
- join_clause = """
+ join_clause += """
LEFT JOIN event_labels
USING (event_id, room_id, topological_ordering)
"""
if len(event_filter.labels) > 1:
- # Using DISTINCT in this SELECT query is quite expensive, because it
- # requires the engine to sort on the entire (not limited) result set,
- # i.e. the entire events table. We only need to use it when we're
- # filtering on more than two labels, because that's the only scenario
- # in which we can possibly to get multiple times the same event ID in
- # the results.
- select_keywords += "DISTINCT"
+ # Multiple labels could cause the same event to appear multiple times.
+ needs_distinct = True
+
+ # If there is a filter on relation_senders and relation_types join to the
+ # relations table.
+ if event_filter and (
+ event_filter.relation_senders or event_filter.relation_types
+ ):
+ # Filtering by relations could cause the same event to appear multiple
+ # times (since there's no limit on the number of relations to an event).
+ needs_distinct = True
+ join_clause += """
+ LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
+ """
+ if event_filter.relation_senders:
+ join_clause += """
+ LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
+ """
+
+ if needs_distinct:
+ select_keywords += " DISTINCT"
sql = """
%(select_keywords)s
- event_id, instance_name,
- topological_ordering, stream_ordering
- FROM events
+ event.event_id, event.instance_name,
+ event.topological_ordering, event.stream_ordering
+ FROM events AS event
%(join_clause)s
- WHERE outlier = ? AND room_id = ? AND %(bounds)s
- ORDER BY topological_ordering %(order)s,
- stream_ordering %(order)s LIMIT ?
+ WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+ ORDER BY event.topological_ordering %(order)s,
+ event.stream_ordering %(order)s LIMIT ?
""" % {
"select_keywords": select_keywords,
"join_clause": join_clause,
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index f44c91a373..b7fc33dc94 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from unittest.mock import patch
+
import jsonschema
from synapse.api.constants import EventContentFields
@@ -51,9 +53,8 @@ class FilteringTestCase(unittest.HomeserverTestCase):
{"presence": {"senders": ["@bar;pik.test.com"]}},
]
for filter in invalid_filters:
- with self.assertRaises(SynapseError) as check_filter_error:
+ with self.assertRaises(SynapseError):
self.filtering.check_valid_filter(filter)
- self.assertIsInstance(check_filter_error.exception, SynapseError)
def test_valid_filters(self):
valid_filters = [
@@ -119,12 +120,12 @@ class FilteringTestCase(unittest.HomeserverTestCase):
definition = {"types": ["m.room.message", "org.matrix.foo.bar"]}
event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_types_works_with_wildcards(self):
definition = {"types": ["m.*", "org.matrix.foo.bar"]}
event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_types_works_with_unknowns(self):
definition = {"types": ["m.room.message", "org.matrix.foo.bar"]}
@@ -133,24 +134,24 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="now.for.something.completely.different",
room_id="!foo:bar",
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_types_works_with_literals(self):
definition = {"not_types": ["m.room.message", "org.matrix.foo.bar"]}
event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_types_works_with_wildcards(self):
definition = {"not_types": ["m.room.message", "org.matrix.*"]}
event = MockEvent(
sender="@foo:bar", type="org.matrix.custom.event", room_id="!foo:bar"
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_types_works_with_unknowns(self):
definition = {"not_types": ["m.*", "org.*"]}
event = MockEvent(sender="@foo:bar", type="com.nom.nom.nom", room_id="!foo:bar")
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_not_types_takes_priority_over_types(self):
definition = {
@@ -158,35 +159,35 @@ class FilteringTestCase(unittest.HomeserverTestCase):
"types": ["m.room.message", "m.room.topic"],
}
event = MockEvent(sender="@foo:bar", type="m.room.topic", room_id="!foo:bar")
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_senders_works_with_literals(self):
definition = {"senders": ["@flibble:wibble"]}
event = MockEvent(
sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar"
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_senders_works_with_unknowns(self):
definition = {"senders": ["@flibble:wibble"]}
event = MockEvent(
sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar"
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_senders_works_with_literals(self):
definition = {"not_senders": ["@flibble:wibble"]}
event = MockEvent(
sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar"
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_senders_works_with_unknowns(self):
definition = {"not_senders": ["@flibble:wibble"]}
event = MockEvent(
sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar"
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_not_senders_takes_priority_over_senders(self):
definition = {
@@ -196,14 +197,14 @@ class FilteringTestCase(unittest.HomeserverTestCase):
event = MockEvent(
sender="@misspiggy:muppets", type="m.room.topic", room_id="!foo:bar"
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_rooms_works_with_literals(self):
definition = {"rooms": ["!secretbase:unknown"]}
event = MockEvent(
sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown"
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_rooms_works_with_unknowns(self):
definition = {"rooms": ["!secretbase:unknown"]}
@@ -212,7 +213,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message",
room_id="!anothersecretbase:unknown",
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_rooms_works_with_literals(self):
definition = {"not_rooms": ["!anothersecretbase:unknown"]}
@@ -221,7 +222,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message",
room_id="!anothersecretbase:unknown",
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_not_rooms_works_with_unknowns(self):
definition = {"not_rooms": ["!secretbase:unknown"]}
@@ -230,7 +231,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message",
room_id="!anothersecretbase:unknown",
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_not_rooms_takes_priority_over_rooms(self):
definition = {
@@ -240,7 +241,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
event = MockEvent(
sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown"
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_combined_event(self):
definition = {
@@ -256,7 +257,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message", # yup
room_id="!stage:unknown", # yup
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_definition_combined_event_bad_sender(self):
definition = {
@@ -272,7 +273,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message", # yup
room_id="!stage:unknown", # yup
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_combined_event_bad_room(self):
definition = {
@@ -288,7 +289,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="m.room.message", # yup
room_id="!piggyshouse:muppets", # nope
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_definition_combined_event_bad_type(self):
definition = {
@@ -304,7 +305,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
type="muppets.misspiggy.kisses", # nope
room_id="!stage:unknown", # yup
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_filter_labels(self):
definition = {"org.matrix.labels": ["#fun"]}
@@ -315,7 +316,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
content={EventContentFields.LABELS: ["#fun"]},
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
event = MockEvent(
sender="@foo:bar",
@@ -324,7 +325,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
content={EventContentFields.LABELS: ["#notfun"]},
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
def test_filter_not_labels(self):
definition = {"org.matrix.not_labels": ["#fun"]}
@@ -335,7 +336,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
content={EventContentFields.LABELS: ["#fun"]},
)
- self.assertFalse(Filter(definition).check(event))
+ self.assertFalse(Filter(self.hs, definition)._check(event))
event = MockEvent(
sender="@foo:bar",
@@ -344,7 +345,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
content={EventContentFields.LABELS: ["#notfun"]},
)
- self.assertTrue(Filter(definition).check(event))
+ self.assertTrue(Filter(self.hs, definition)._check(event))
def test_filter_presence_match(self):
user_filter_json = {"presence": {"types": ["m.*"]}}
@@ -362,7 +363,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
)
)
- results = user_filter.filter_presence(events=events)
+ results = self.get_success(user_filter.filter_presence(events=events))
self.assertEquals(events, results)
def test_filter_presence_no_match(self):
@@ -386,7 +387,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
)
)
- results = user_filter.filter_presence(events=events)
+ results = self.get_success(user_filter.filter_presence(events=events))
self.assertEquals([], results)
def test_filter_room_state_match(self):
@@ -405,7 +406,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
)
)
- results = user_filter.filter_room_state(events=events)
+ results = self.get_success(user_filter.filter_room_state(events=events))
self.assertEquals(events, results)
def test_filter_room_state_no_match(self):
@@ -426,7 +427,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
)
)
- results = user_filter.filter_room_state(events)
+ results = self.get_success(user_filter.filter_room_state(events))
self.assertEquals([], results)
def test_filter_rooms(self):
@@ -441,10 +442,52 @@ class FilteringTestCase(unittest.HomeserverTestCase):
"!not_included:example.com", # Disallowed because not in rooms.
]
- filtered_room_ids = list(Filter(definition).filter_rooms(room_ids))
+ filtered_room_ids = list(Filter(self.hs, definition).filter_rooms(room_ids))
self.assertEquals(filtered_room_ids, ["!allowed:example.com"])
+ @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
+ def test_filter_relations(self):
+ events = [
+ # An event without a relation.
+ MockEvent(
+ event_id="$no_relation",
+ sender="@foo:bar",
+ type="org.matrix.custom.event",
+ room_id="!foo:bar",
+ ),
+ # An event with a relation.
+ MockEvent(
+ event_id="$with_relation",
+ sender="@foo:bar",
+ type="org.matrix.custom.event",
+ room_id="!foo:bar",
+ ),
+ # Non-EventBase objects get passed through.
+ {},
+ ]
+
+ # For the following tests we patch the datastore method (intead of injecting
+ # events). This is a bit cheeky, but tests the logic of _check_event_relations.
+
+ # Filter for a particular sender.
+ definition = {
+ "io.element.relation_senders": ["@foo:bar"],
+ }
+
+ async def events_have_relations(*args, **kwargs):
+ return ["$with_relation"]
+
+ with patch.object(
+ self.datastore, "events_have_relations", new=events_have_relations
+ ):
+ filtered_events = list(
+ self.get_success(
+ Filter(self.hs, definition)._check_event_relations(events)
+ )
+ )
+ self.assertEquals(filtered_events, events[1:])
+
def test_add_filter(self):
user_filter_json = {"room": {"state": {"types": ["m.*"]}}}
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 1f6a924452..d6f14e2dba 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -272,7 +272,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
make_awaitable(([event], None))
)
- self.handler.notify_interested_services_ephemeral("receipt_key", 580)
+ self.handler.notify_interested_services_ephemeral(
+ "receipt_key", 580, ["@fakerecipient:example.com"]
+ )
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
interested_service, [event]
)
@@ -300,7 +302,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
make_awaitable(([event], None))
)
- self.handler.notify_interested_services_ephemeral("receipt_key", 579)
+ self.handler.notify_interested_services_ephemeral(
+ "receipt_key", 580, ["@fakerecipient:example.com"]
+ )
self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
def _mkservice(self, is_interested, protocols=None):
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 339c039914..638186f173 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -13,10 +13,11 @@
# limitations under the License.
from typing import Optional
+from unittest.mock import Mock
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, ResourceLimitError
-from synapse.api.filtering import DEFAULT_FILTER_COLLECTION
+from synapse.api.filtering import Filtering
from synapse.api.room_versions import RoomVersions
from synapse.handlers.sync import SyncConfig
from synapse.rest import admin
@@ -197,7 +198,7 @@ def generate_sync_config(
_request_key += 1
return SyncConfig(
user=UserID.from_string(user_id),
- filter_collection=DEFAULT_FILTER_COLLECTION,
+ filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION,
is_guest=False,
request_key=("request_key", _request_key),
device_id=device_id,
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 46116644ce..11ec54c82e 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -14,9 +14,12 @@
import json
import urllib.parse
+from http import HTTPStatus
from typing import List, Optional
from unittest.mock import Mock
+from parameterized import parameterized
+
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import Codes
@@ -281,6 +284,31 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=True)
self._has_no_members(self.room_id)
+ @parameterized.expand([(True,), (False,)])
+ def test_block_unknown_room(self, purge: bool) -> None:
+ """
+ We can block an unknown room. In this case, the `purge` argument
+ should be ignored.
+ """
+ room_id = "!unknown:test"
+
+ # The room isn't already in the blocked rooms table
+ self._is_blocked(room_id, expect=False)
+
+ # Request the room be blocked.
+ channel = self.make_request(
+ "DELETE",
+ f"/_synapse/admin/v1/rooms/{room_id}",
+ {"block": True, "purge": purge},
+ access_token=self.admin_user_tok,
+ )
+
+ # The room is now blocked.
+ self.assertEqual(
+ HTTPStatus.OK, int(channel.result["code"]), msg=channel.result["body"]
+ )
+ self._is_blocked(room_id)
+
def test_shutdown_room_consent(self):
"""Test that we can shutdown rooms with local users who have not
yet accepted the privacy policy. This used to fail when we tried to
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 376853fd65..10a4a4dc5e 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -25,7 +25,12 @@ from urllib import parse as urlparse
from twisted.internet import defer
import synapse.rest.admin
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+)
from synapse.api.errors import Codes, HttpResponseException
from synapse.handlers.pagination import PurgeStatus
from synapse.rest import admin
@@ -2157,6 +2162,153 @@ class LabelsTestCase(unittest.HomeserverTestCase):
return event_id
+class RelationsTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ synapse.rest.admin.register_servlets_for_client_rest_resource,
+ room.register_servlets,
+ login.register_servlets,
+ ]
+
+ def default_config(self):
+ config = super().default_config()
+ config["experimental_features"] = {"msc3440_enabled": True}
+ return config
+
+ def prepare(self, reactor, clock, homeserver):
+ self.user_id = self.register_user("test", "test")
+ self.tok = self.login("test", "test")
+ self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
+
+ self.second_user_id = self.register_user("second", "test")
+ self.second_tok = self.login("second", "test")
+ self.helper.join(
+ room=self.room_id, user=self.second_user_id, tok=self.second_tok
+ )
+
+ self.third_user_id = self.register_user("third", "test")
+ self.third_tok = self.login("third", "test")
+ self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok)
+
+ # An initial event with a relation from second user.
+ res = self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "Message 1"},
+ tok=self.tok,
+ )
+ self.event_id_1 = res["event_id"]
+ self.helper.send_event(
+ room_id=self.room_id,
+ type="m.reaction",
+ content={
+ "m.relates_to": {
+ "rel_type": RelationTypes.ANNOTATION,
+ "event_id": self.event_id_1,
+ "key": "👍",
+ }
+ },
+ tok=self.second_tok,
+ )
+
+ # Another event with a relation from third user.
+ res = self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "Message 2"},
+ tok=self.tok,
+ )
+ self.event_id_2 = res["event_id"]
+ self.helper.send_event(
+ room_id=self.room_id,
+ type="m.reaction",
+ content={
+ "m.relates_to": {
+ "rel_type": RelationTypes.REFERENCE,
+ "event_id": self.event_id_2,
+ }
+ },
+ tok=self.third_tok,
+ )
+
+ # An event with no relations.
+ self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "No relations"},
+ tok=self.tok,
+ )
+
+ def _filter_messages(self, filter: JsonDict) -> List[JsonDict]:
+ """Make a request to /messages with a filter, returns the chunk of events."""
+ channel = self.make_request(
+ "GET",
+ "/rooms/%s/messages?filter=%s&dir=b" % (self.room_id, json.dumps(filter)),
+ access_token=self.tok,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+
+ return channel.json_body["chunk"]
+
+ def test_filter_relation_senders(self):
+ # Messages which second user reacted to.
+ filter = {"io.element.relation_senders": [self.second_user_id]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+ # Messages which third user reacted to.
+ filter = {"io.element.relation_senders": [self.third_user_id]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0]["event_id"], self.event_id_2)
+
+ # Messages which either user reacted to.
+ filter = {
+ "io.element.relation_senders": [self.second_user_id, self.third_user_id]
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 2, chunk)
+ self.assertCountEqual(
+ [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2]
+ )
+
+ def test_filter_relation_type(self):
+ # Messages which have annotations.
+ filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+ # Messages which have references.
+ filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0]["event_id"], self.event_id_2)
+
+ # Messages which have either annotations or references.
+ filter = {
+ "io.element.relation_types": [
+ RelationTypes.ANNOTATION,
+ RelationTypes.REFERENCE,
+ ]
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 2, chunk)
+ self.assertCountEqual(
+ [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2]
+ )
+
+ def test_filter_relation_senders_and_type(self):
+ # Messages which second user reacted to.
+ filter = {
+ "io.element.relation_senders": [self.second_user_id],
+ "io.element.relation_types": [RelationTypes.ANNOTATION],
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+
class ContextTestCase(unittest.HomeserverTestCase):
servlets = [
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
new file mode 100644
index 0000000000..ce782c7e1d
--- /dev/null
+++ b/tests/storage/test_stream.py
@@ -0,0 +1,207 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+from synapse.api.constants import EventTypes, RelationTypes
+from synapse.api.filtering import Filter
+from synapse.events import EventBase
+from synapse.rest import admin
+from synapse.rest.client import login, room
+from synapse.types import JsonDict
+
+from tests.unittest import HomeserverTestCase
+
+
+class PaginationTestCase(HomeserverTestCase):
+ """
+ Test the pre-filtering done in the pagination code.
+
+ This is similar to some of the tests in tests.rest.client.test_rooms but here
+ we ensure that the filtering done in the database is applied successfully.
+ """
+
+ servlets = [
+ admin.register_servlets_for_client_rest_resource,
+ room.register_servlets,
+ login.register_servlets,
+ ]
+
+ def default_config(self):
+ config = super().default_config()
+ config["experimental_features"] = {"msc3440_enabled": True}
+ return config
+
+ def prepare(self, reactor, clock, homeserver):
+ self.user_id = self.register_user("test", "test")
+ self.tok = self.login("test", "test")
+ self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
+
+ self.second_user_id = self.register_user("second", "test")
+ self.second_tok = self.login("second", "test")
+ self.helper.join(
+ room=self.room_id, user=self.second_user_id, tok=self.second_tok
+ )
+
+ self.third_user_id = self.register_user("third", "test")
+ self.third_tok = self.login("third", "test")
+ self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok)
+
+ # An initial event with a relation from second user.
+ res = self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "Message 1"},
+ tok=self.tok,
+ )
+ self.event_id_1 = res["event_id"]
+ self.helper.send_event(
+ room_id=self.room_id,
+ type="m.reaction",
+ content={
+ "m.relates_to": {
+ "rel_type": RelationTypes.ANNOTATION,
+ "event_id": self.event_id_1,
+ "key": "👍",
+ }
+ },
+ tok=self.second_tok,
+ )
+
+ # Another event with a relation from third user.
+ res = self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "Message 2"},
+ tok=self.tok,
+ )
+ self.event_id_2 = res["event_id"]
+ self.helper.send_event(
+ room_id=self.room_id,
+ type="m.reaction",
+ content={
+ "m.relates_to": {
+ "rel_type": RelationTypes.REFERENCE,
+ "event_id": self.event_id_2,
+ }
+ },
+ tok=self.third_tok,
+ )
+
+ # An event with no relations.
+ self.helper.send_event(
+ room_id=self.room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "No relations"},
+ tok=self.tok,
+ )
+
+ def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
+ """Make a request to /messages with a filter, returns the chunk of events."""
+
+ from_token = self.get_success(
+ self.hs.get_event_sources().get_current_token_for_pagination()
+ )
+
+ events, next_key = self.get_success(
+ self.hs.get_datastore().paginate_room_events(
+ room_id=self.room_id,
+ from_key=from_token.room_key,
+ to_key=None,
+ direction="b",
+ limit=10,
+ event_filter=Filter(self.hs, filter),
+ )
+ )
+
+ return events
+
+ def test_filter_relation_senders(self):
+ # Messages which second user reacted to.
+ filter = {"io.element.relation_senders": [self.second_user_id]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+ # Messages which third user reacted to.
+ filter = {"io.element.relation_senders": [self.third_user_id]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_2)
+
+ # Messages which either user reacted to.
+ filter = {
+ "io.element.relation_senders": [self.second_user_id, self.third_user_id]
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 2, chunk)
+ self.assertCountEqual(
+ [c.event_id for c in chunk], [self.event_id_1, self.event_id_2]
+ )
+
+ def test_filter_relation_type(self):
+ # Messages which have annotations.
+ filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+ # Messages which have references.
+ filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_2)
+
+ # Messages which have either annotations or references.
+ filter = {
+ "io.element.relation_types": [
+ RelationTypes.ANNOTATION,
+ RelationTypes.REFERENCE,
+ ]
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 2, chunk)
+ self.assertCountEqual(
+ [c.event_id for c in chunk], [self.event_id_1, self.event_id_2]
+ )
+
+ def test_filter_relation_senders_and_type(self):
+ # Messages which second user reacted to.
+ filter = {
+ "io.element.relation_senders": [self.second_user_id],
+ "io.element.relation_types": [RelationTypes.ANNOTATION],
+ }
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+ def test_duplicate_relation(self):
+ """An event should only be returned once if there are multiple relations to it."""
+ self.helper.send_event(
+ room_id=self.room_id,
+ type="m.reaction",
+ content={
+ "m.relates_to": {
+ "rel_type": RelationTypes.ANNOTATION,
+ "event_id": self.event_id_1,
+ "key": "A",
+ }
+ },
+ tok=self.second_tok,
+ )
+
+ filter = {"io.element.relation_senders": [self.second_user_id]}
+ chunk = self._filter_messages(filter)
+ self.assertEqual(len(chunk), 1, chunk)
+ self.assertEqual(chunk[0].event_id, self.event_id_1)
|