summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/18387.feature1
-rw-r--r--synapse/federation/federation_base.py34
-rw-r--r--synapse/federation/federation_client.py57
-rw-r--r--synapse/federation/transport/client.py27
-rw-r--r--synapse/handlers/message.py15
-rw-r--r--synapse/handlers/room_policy.py89
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/types/handlers/policy_server.py16
-rw-r--r--tests/handlers/test_room_policy.py226
9 files changed, 469 insertions, 1 deletions
diff --git a/changelog.d/18387.feature b/changelog.d/18387.feature
new file mode 100644

index 0000000000..2d9ff2cea2 --- /dev/null +++ b/changelog.d/18387.feature
@@ -0,0 +1 @@ +Add support for calling Policy Servers ([MSC4284](https://github.com/matrix-org/matrix-spec-proposals/pull/4284)) to mark events as spam. \ No newline at end of file diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 3796bff5e7..45593430e8 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -30,6 +30,7 @@ from synapse.crypto.keyring import Keyring from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event, validate_canonicaljson from synapse.federation.units import filter_pdus_for_valid_depth +from synapse.handlers.room_policy import RoomPolicyHandler from synapse.http.servlet import assert_params_in_dict from synapse.logging.opentracing import log_kv, trace from synapse.types import JsonDict, get_domain_from_id @@ -64,6 +65,24 @@ class FederationBase: self._clock = hs.get_clock() self._storage_controllers = hs.get_storage_controllers() + # We need to define this lazily otherwise we get a cyclic dependency. + # self._policy_handler = hs.get_room_policy_handler() + self._policy_handler: Optional[RoomPolicyHandler] = None + + def _lazily_get_policy_handler(self) -> RoomPolicyHandler: + """Lazily get the room policy handler. + + This is required to avoid an import cycle: RoomPolicyHandler requires a + FederationClient, which requires a FederationBase, which requires a + RoomPolicyHandler. + + Returns: + RoomPolicyHandler: The room policy handler. + """ + if self._policy_handler is None: + self._policy_handler = self.hs.get_room_policy_handler() + return self._policy_handler + @trace async def _check_sigs_and_hash( self, @@ -80,6 +99,10 @@ class FederationBase: Also runs the event through the spam checker; if it fails, redacts the event and flags it as soft-failed. + Also checks that the event is allowed by the policy server, if the room uses + a policy server. If the event is not allowed, the event is flagged as + soft-failed but not redacted. + Args: room_version: The room version of the PDU pdu: the event to be checked @@ -145,6 +168,17 @@ class FederationBase: ) return redacted_event + policy_allowed = await self._lazily_get_policy_handler().is_event_allowed(pdu) + if not policy_allowed: + logger.warning( + "Event not allowed by policy server, soft-failing %s", pdu.event_id + ) + pdu.internal_metadata.soft_failed = True + # Note: we don't redact the event so admins can inspect the event after the + # fact. Other processes may redact the event, but that won't be applied to + # the database copy of the event until the server's config requires it. + return pdu + spam_check = await self._spam_checker_module_callbacks.check_event_for_spam(pdu) if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 9fc5b70e9a..7c485aa7e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -75,6 +75,7 @@ from synapse.http.client import is_unknown_endpoint from synapse.http.types import QueryParams from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id +from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -423,6 +424,62 @@ class FederationClient(FederationBase): @trace @tag_args + async def get_pdu_policy_recommendation( + self, destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> str: + """Requests that the destination server (typically a policy server) + check the event and return its recommendation on how to handle the + event. + + If the policy server could not be contacted or the policy server + returned an unknown recommendation, this returns an OK recommendation. + This type fixing behaviour is done because the typical caller will be + in a critical call path and would generally interpret a `None` or similar + response as "weird value; don't care; move on without taking action". We + just frontload that logic here. + + + Args: + destination: The remote homeserver to ask (a policy server) + pdu: The event to check + timeout: How long to try (in ms) the destination for before + giving up. None indicates no timeout. + + Returns: + The policy recommendation, or RECOMMENDATION_OK if the policy server was + uncontactable or returned an unknown recommendation. + """ + + logger.debug( + "get_pdu_policy_recommendation for event_id=%s from %s", + pdu.event_id, + destination, + ) + + try: + res = await self.transport_layer.get_policy_recommendation_for_pdu( + destination, pdu, timeout=timeout + ) + recommendation = res.get("recommendation") + if not isinstance(recommendation, str): + raise InvalidResponseError("recommendation is not a string") + if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM): + logger.warning( + "get_pdu_policy_recommendation: unknown recommendation: %s", + recommendation, + ) + return RECOMMENDATION_OK + return recommendation + except Exception as e: + logger.warning( + "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s", + destination, + e, + ) + return RECOMMENDATION_OK + + @trace + @tag_args async def get_pdu( self, destinations: Collection[str], diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 206e91ed14..62bf96ce91 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -143,6 +143,33 @@ class TransportLayerClient: destination, path=path, timeout=timeout, try_trailing_slash_on_400=True ) + async def get_policy_recommendation_for_pdu( + self, destination: str, event: EventBase, timeout: Optional[int] = None + ) -> JsonDict: + """Requests the policy recommendation for the given pdu from the given policy server. + + Args: + destination: The host name of the remote homeserver checking the event. + event: The event to check. + timeout: How long to try (in ms) the destination for before giving up. + None indicates no timeout. + + Returns: + The full recommendation object from the remote server. + """ + logger.debug( + "get_policy_recommendation_for_pdu dest=%s, event_id=%s", + destination, + event.event_id, + ) + return await self.client.post_json( + destination=destination, + path=f"/_matrix/policy/unstable/org.matrix.msc4284/event/{event.event_id}/check", + data=event.get_pdu_json(), + ignore_backoff=True, + timeout=timeout, + ) + async def backfill( self, destination: str, room_id: str, event_tuples: Collection[str], limit: int ) -> Optional[Union[JsonDict, list]]: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ff6eb5a514..cb6de02309 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -495,6 +495,7 @@ class EventCreationHandler: self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() self._worker_lock_handler = hs.get_worker_locks_handler() + self._policy_handler = hs.get_room_policy_handler() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -1108,6 +1109,18 @@ class EventCreationHandler: event.sender, ) + policy_allowed = await self._policy_handler.is_event_allowed(event) + if not policy_allowed: + logger.warning( + "Event not allowed by policy server, rejecting %s", + event.event_id, + ) + raise SynapseError( + 403, + "This message has been rejected as probable spam", + Codes.FORBIDDEN, + ) + spam_check_result = ( await self._spam_checker_module_callbacks.check_event_for_spam( event @@ -1119,7 +1132,7 @@ class EventCreationHandler: [code, dict] = spam_check_result raise SynapseError( 403, - "This message had been rejected as probable spam", + "This message has been rejected as probable spam", code, dict, ) diff --git a/synapse/handlers/room_policy.py b/synapse/handlers/room_policy.py new file mode 100644
index 0000000000..dcfebb128c --- /dev/null +++ b/synapse/handlers/room_policy.py
@@ -0,0 +1,89 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright 2016-2021 The Matrix.org Foundation C.I.C. +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# +# + +import logging +from typing import TYPE_CHECKING + +from synapse.events import EventBase +from synapse.types.handlers.policy_server import RECOMMENDATION_OK +from synapse.util.stringutils import parse_and_validate_server_name + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class RoomPolicyHandler: + def __init__(self, hs: "HomeServer"): + self._hs = hs + self._store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self._event_auth_handler = hs.get_event_auth_handler() + self._federation_client = hs.get_federation_client() + + async def is_event_allowed(self, event: EventBase) -> bool: + """Check if the given event is allowed in the room by the policy server. + + Note: This will *always* return True if the room's policy server is Synapse + itself. This is because Synapse can't be a policy server (currently). + + If no policy server is configured in the room, this returns True. Similarly, if + the policy server is invalid in any way (not joined, not a server, etc), this + returns True. + + If a valid and contactable policy server is configured in the room, this returns + True if that server suggests the event is not spammy, and False otherwise. + + Args: + event: The event to check. This should be a fully-formed PDU. + + Returns: + bool: True if the event is allowed in the room, False otherwise. + """ + policy_event = await self._storage_controllers.state.get_current_state_event( + event.room_id, "org.matrix.msc4284.policy", "" + ) + if not policy_event: + return True # no policy server == default allow + + policy_server = policy_event.content.get("via", "") + if policy_server is None or not isinstance(policy_server, str): + return True # no policy server == default allow + + if policy_server == self._hs.hostname: + return True # Synapse itself can't be a policy server (currently) + + try: + parse_and_validate_server_name(policy_server) + except ValueError: + return True # invalid policy server == default allow + + is_in_room = await self._event_auth_handler.is_host_in_room( + event.room_id, policy_server + ) + if not is_in_room: + return True # policy server not in room == default allow + + # At this point, the server appears valid and is in the room, so ask it to check + # the event. + recommendation = await self._federation_client.get_pdu_policy_recommendation( + policy_server, event + ) + if recommendation != RECOMMENDATION_OK: + return False + + return True # default allow diff --git a/synapse/server.py b/synapse/server.py
index bd2faa61b9..2add4d4e6e 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -107,6 +107,7 @@ from synapse.handlers.room_member import ( RoomMemberMasterHandler, ) from synapse.handlers.room_member_worker import RoomMemberWorkerHandler +from synapse.handlers.room_policy import RoomPolicyHandler from synapse.handlers.room_summary import RoomSummaryHandler from synapse.handlers.search import SearchHandler from synapse.handlers.send_email import SendEmailHandler @@ -808,6 +809,10 @@ class HomeServer(metaclass=abc.ABCMeta): return OidcHandler(self) @cache_in_self + def get_room_policy_handler(self) -> RoomPolicyHandler: + return RoomPolicyHandler(self) + + @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: return EventClientSerializer(self) diff --git a/synapse/types/handlers/policy_server.py b/synapse/types/handlers/policy_server.py new file mode 100644
index 0000000000..bfc09dabf4 --- /dev/null +++ b/synapse/types/handlers/policy_server.py
@@ -0,0 +1,16 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + +RECOMMENDATION_OK = "ok" +RECOMMENDATION_SPAM = "spam" diff --git a/tests/handlers/test_room_policy.py b/tests/handlers/test_room_policy.py new file mode 100644
index 0000000000..26642c18ea --- /dev/null +++ b/tests/handlers/test_room_policy.py
@@ -0,0 +1,226 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# +# +from typing import Optional +from unittest import mock + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.events import EventBase, make_event_from_dict +from synapse.rest import admin +from synapse.rest.client import login, room +from synapse.server import HomeServer +from synapse.types import JsonDict, UserID +from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM +from synapse.util import Clock + +from tests import unittest +from tests.test_utils import event_injection + + +class RoomPolicyTestCase(unittest.FederatingHomeserverTestCase): + """Tests room policy handler.""" + + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + # mock out the federation transport client + self.mock_federation_transport_client = mock.Mock( + spec=["get_policy_recommendation_for_pdu"] + ) + self.mock_federation_transport_client.get_policy_recommendation_for_pdu = ( + mock.AsyncMock() + ) + return super().setup_test_homeserver( + federation_transport_client=self.mock_federation_transport_client + ) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.hs = hs + self.handler = hs.get_room_policy_handler() + main_store = self.hs.get_datastores().main + + # Create a room + self.creator = self.register_user("creator", "test1234") + self.creator_token = self.login("creator", "test1234") + self.room_id = self.helper.create_room_as( + room_creator=self.creator, tok=self.creator_token + ) + room_version = self.get_success(main_store.get_room_version(self.room_id)) + + # Create some sample events + self.spammy_event = make_event_from_dict( + room_version=room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is a spammy event.", + }, + }, + ) + self.not_spammy_event = make_event_from_dict( + room_version=room_version, + internal_metadata_dict={}, + event_dict={ + "room_id": self.room_id, + "type": "m.room.message", + "sender": "@not_spammy:example.org", + "content": { + "msgtype": "m.text", + "body": "This is a NOT spammy event.", + }, + }, + ) + + # Prepare the policy server mock to decide spam vs not spam on those events + self.call_count = 0 + + async def get_policy_recommendation_for_pdu( + destination: str, + pdu: EventBase, + timeout: Optional[int] = None, + ) -> JsonDict: + self.call_count += 1 + self.assertEqual(destination, self.OTHER_SERVER_NAME) + if pdu.event_id == self.spammy_event.event_id: + return {"recommendation": RECOMMENDATION_SPAM} + elif pdu.event_id == self.not_spammy_event.event_id: + return {"recommendation": RECOMMENDATION_OK} + else: + self.fail("Unexpected event ID") + + self.mock_federation_transport_client.get_policy_recommendation_for_pdu.side_effect = get_policy_recommendation_for_pdu + + def _add_policy_server_to_room(self) -> None: + # Inject a member event into the room + policy_user_id = f"@policy:{self.OTHER_SERVER_NAME}" + self.get_success( + event_injection.inject_member_event( + self.hs, self.room_id, policy_user_id, "join" + ) + ) + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + "via": self.OTHER_SERVER_NAME, + }, + tok=self.creator_token, + state_key="", + ) + + def test_no_policy_event_set(self) -> None: + # We don't need to modify the room state at all - we're testing the default + # case where a room doesn't use a policy server. + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_empty_policy_event_set(self) -> None: + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + # empty content (no `via`) + }, + tok=self.creator_token, + state_key="", + ) + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_nonstring_policy_event_set(self) -> None: + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + "via": 42, # should be a server name + }, + tok=self.creator_token, + state_key="", + ) + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_self_policy_event_set(self) -> None: + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + # We ignore events when the policy server is ourselves (for now?) + "via": (UserID.from_string(self.creator)).domain, + }, + tok=self.creator_token, + state_key="", + ) + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_invalid_server_policy_event_set(self) -> None: + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + "via": "|this| is *not* a (valid) server name.com", + }, + tok=self.creator_token, + state_key="", + ) + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_not_in_room_policy_event_set(self) -> None: + self.helper.send_state( + self.room_id, + "org.matrix.msc4284.policy", + { + "via": f"x.{self.OTHER_SERVER_NAME}", + }, + tok=self.creator_token, + state_key="", + ) + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 0) + + def test_spammy_event_is_spam(self) -> None: + self._add_policy_server_to_room() + + ok = self.get_success(self.handler.is_event_allowed(self.spammy_event)) + self.assertEqual(ok, False) + self.assertEqual(self.call_count, 1) + + def test_not_spammy_event_is_not_spam(self) -> None: + self._add_policy_server_to_room() + + ok = self.get_success(self.handler.is_event_allowed(self.not_spammy_event)) + self.assertEqual(ok, True) + self.assertEqual(self.call_count, 1)