diff --git a/changelog.d/18387.feature b/changelog.d/18387.feature
new file mode 100644
index 0000000000..2d9ff2cea2
--- /dev/null
+++ b/changelog.d/18387.feature
@@ -0,0 +1 @@
+Add support for calling Policy Servers ([MSC4284](https://github.com/matrix-org/matrix-spec-proposals/pull/4284)) to mark events as spam.
\ No newline at end of file
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 3796bff5e7..45593430e8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -30,6 +30,7 @@ from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.federation.units import filter_pdus_for_valid_depth
+from synapse.handlers.room_policy import RoomPolicyHandler
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
@@ -64,6 +65,24 @@ class FederationBase:
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
+ # We need to define this lazily otherwise we get a cyclic dependency.
+ # self._policy_handler = hs.get_room_policy_handler()
+ self._policy_handler: Optional[RoomPolicyHandler] = None
+
+ def _lazily_get_policy_handler(self) -> RoomPolicyHandler:
+ """Lazily get the room policy handler.
+
+ This is required to avoid an import cycle: RoomPolicyHandler requires a
+ FederationClient, which requires a FederationBase, which requires a
+ RoomPolicyHandler.
+
+ Returns:
+ RoomPolicyHandler: The room policy handler.
+ """
+ if self._policy_handler is None:
+ self._policy_handler = self.hs.get_room_policy_handler()
+ return self._policy_handler
+
@trace
async def _check_sigs_and_hash(
self,
@@ -80,6 +99,10 @@ class FederationBase:
Also runs the event through the spam checker; if it fails, redacts the event
and flags it as soft-failed.
+ Also checks that the event is allowed by the policy server, if the room uses
+ a policy server. If the event is not allowed, the event is flagged as
+ soft-failed but not redacted.
+
Args:
room_version: The room version of the PDU
pdu: the event to be checked
@@ -145,6 +168,17 @@ class FederationBase:
)
return redacted_event
+ policy_allowed = await self._lazily_get_policy_handler().is_event_allowed(pdu)
+ if not policy_allowed:
+ logger.warning(
+ "Event not allowed by policy server, soft-failing %s", pdu.event_id
+ )
+ pdu.internal_metadata.soft_failed = True
+ # Note: we don't redact the event so admins can inspect the event after the
+ # fact. Other processes may redact the event, but that won't be applied to
+ # the database copy of the event until the server's config requires it.
+ return pdu
+
spam_check = await self._spam_checker_module_callbacks.check_event_for_spam(pdu)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 9fc5b70e9a..7c485aa7e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -75,6 +75,7 @@ from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
+from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -423,6 +424,62 @@ class FederationClient(FederationBase):
@trace
@tag_args
+ async def get_pdu_policy_recommendation(
+ self, destination: str, pdu: EventBase, timeout: Optional[int] = None
+ ) -> str:
+ """Requests that the destination server (typically a policy server)
+ check the event and return its recommendation on how to handle the
+ event.
+
+ If the policy server could not be contacted or the policy server
+ returned an unknown recommendation, this returns an OK recommendation.
+ This type fixing behaviour is done because the typical caller will be
+ in a critical call path and would generally interpret a `None` or similar
+ response as "weird value; don't care; move on without taking action". We
+ just frontload that logic here.
+
+
+ Args:
+ destination: The remote homeserver to ask (a policy server)
+ pdu: The event to check
+ timeout: How long to try (in ms) the destination for before
+ giving up. None indicates no timeout.
+
+ Returns:
+ The policy recommendation, or RECOMMENDATION_OK if the policy server was
+ uncontactable or returned an unknown recommendation.
+ """
+
+ logger.debug(
+ "get_pdu_policy_recommendation for event_id=%s from %s",
+ pdu.event_id,
+ destination,
+ )
+
+ try:
+ res = await self.transport_layer.get_policy_recommendation_for_pdu(
+ destination, pdu, timeout=timeout
+ )
+ recommendation = res.get("recommendation")
+ if not isinstance(recommendation, str):
+ raise InvalidResponseError("recommendation is not a string")
+ if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM):
+ logger.warning(
+ "get_pdu_policy_recommendation: unknown recommendation: %s",
+ recommendation,
+ )
+ return RECOMMENDATION_OK
+ return recommendation
+ except Exception as e:
+ logger.warning(
+ "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s",
+ destination,
+ e,
+ )
+ return RECOMMENDATION_OK
+
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Collection[str],
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 206e91ed14..62bf96ce91 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,6 +143,33 @@ class TransportLayerClient:
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
)
+ async def get_policy_recommendation_for_pdu(
+ self, destination: str, event: EventBase, timeout: Optional[int] = None
+ ) -> JsonDict:
+ """Requests the policy recommendation for the given pdu from the given policy server.
+
+ Args:
+ destination: The host name of the remote homeserver checking the event.
+ event: The event to check.
+ timeout: How long to try (in ms) the destination for before giving up.
+ None indicates no timeout.
+
+ Returns:
+ The full recommendation object from the remote server.
+ """
+ logger.debug(
+ "get_policy_recommendation_for_pdu dest=%s, event_id=%s",
+ destination,
+ event.event_id,
+ )
+ return await self.client.post_json(
+ destination=destination,
+ path=f"/_matrix/policy/unstable/org.matrix.msc4284/event/{event.event_id}/check",
+ data=event.get_pdu_json(),
+ ignore_backoff=True,
+ timeout=timeout,
+ )
+
async def backfill(
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
) -> Optional[Union[JsonDict, list]]:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ff6eb5a514..cb6de02309 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -495,6 +495,7 @@ class EventCreationHandler:
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
self._worker_lock_handler = hs.get_worker_locks_handler()
+ self._policy_handler = hs.get_room_policy_handler()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -1108,6 +1109,18 @@ class EventCreationHandler:
event.sender,
)
+ policy_allowed = await self._policy_handler.is_event_allowed(event)
+ if not policy_allowed:
+ logger.warning(
+ "Event not allowed by policy server, rejecting %s",
+ event.event_id,
+ )
+ raise SynapseError(
+ 403,
+ "This message has been rejected as probable spam",
+ Codes.FORBIDDEN,
+ )
+
spam_check_result = (
await self._spam_checker_module_callbacks.check_event_for_spam(
event
@@ -1119,7 +1132,7 @@ class EventCreationHandler:
[code, dict] = spam_check_result
raise SynapseError(
403,
- "This message had been rejected as probable spam",
+ "This message has been rejected as probable spam",
code,
dict,
)
diff --git a/synapse/handlers/room_policy.py b/synapse/handlers/room_policy.py
new file mode 100644
index 0000000000..dcfebb128c
--- /dev/null
+++ b/synapse/handlers/room_policy.py
@@ -0,0 +1,89 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright 2016-2021 The Matrix.org Foundation C.I.C.
+# Copyright (C) 2023 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+#
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.events import EventBase
+from synapse.types.handlers.policy_server import RECOMMENDATION_OK
+from synapse.util.stringutils import parse_and_validate_server_name
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class RoomPolicyHandler:
+ def __init__(self, hs: "HomeServer"):
+ self._hs = hs
+ self._store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
+ self._event_auth_handler = hs.get_event_auth_handler()
+ self._federation_client = hs.get_federation_client()
+
+ async def is_event_allowed(self, event: EventBase) -> bool:
+ """Check if the given event is allowed in the room by the policy server.
+
+ Note: This will *always* return True if the room's policy server is Synapse
+ itself. This is because Synapse can't be a policy server (currently).
+
+ If no policy server is configured in the room, this returns True. Similarly, if
+ the policy server is invalid in any way (not joined, not a server, etc), this
+ returns True.
+
+ If a valid and contactable policy server is configured in the room, this returns
+ True if that server suggests the event is not spammy, and False otherwise.
+
+ Args:
+ event: The event to check. This should be a fully-formed PDU.
+
+ Returns:
+ bool: True if the event is allowed in the room, False otherwise.
+ """
+ policy_event = await self._storage_controllers.state.get_current_state_event(
+ event.room_id, "org.matrix.msc4284.policy", ""
+ )
+ if not policy_event:
+ return True # no policy server == default allow
+
+ policy_server = policy_event.content.get("via", "")
+ if policy_server is None or not isinstance(policy_server, str):
+ return True # no policy server == default allow
+
+ if policy_server == self._hs.hostname:
+ return True # Synapse itself can't be a policy server (currently)
+
+ try:
+ parse_and_validate_server_name(policy_server)
+ except ValueError:
+ return True # invalid policy server == default allow
+
+ is_in_room = await self._event_auth_handler.is_host_in_room(
+ event.room_id, policy_server
+ )
+ if not is_in_room:
+ return True # policy server not in room == default allow
+
+ # At this point, the server appears valid and is in the room, so ask it to check
+ # the event.
+ recommendation = await self._federation_client.get_pdu_policy_recommendation(
+ policy_server, event
+ )
+ if recommendation != RECOMMENDATION_OK:
+ return False
+
+ return True # default allow
diff --git a/synapse/server.py b/synapse/server.py
index bd2faa61b9..2add4d4e6e 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -107,6 +107,7 @@ from synapse.handlers.room_member import (
RoomMemberMasterHandler,
)
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
+from synapse.handlers.room_policy import RoomPolicyHandler
from synapse.handlers.room_summary import RoomSummaryHandler
from synapse.handlers.search import SearchHandler
from synapse.handlers.send_email import SendEmailHandler
@@ -808,6 +809,10 @@ class HomeServer(metaclass=abc.ABCMeta):
return OidcHandler(self)
@cache_in_self
+ def get_room_policy_handler(self) -> RoomPolicyHandler:
+ return RoomPolicyHandler(self)
+
+ @cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer(self)
diff --git a/synapse/types/handlers/policy_server.py b/synapse/types/handlers/policy_server.py
new file mode 100644
index 0000000000..bfc09dabf4
--- /dev/null
+++ b/synapse/types/handlers/policy_server.py
@@ -0,0 +1,16 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2025 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+
+RECOMMENDATION_OK = "ok"
+RECOMMENDATION_SPAM = "spam"
diff --git a/tests/handlers/test_room_policy.py b/tests/handlers/test_room_policy.py
new file mode 100644
index 0000000000..26642c18ea
--- /dev/null
+++ b/tests/handlers/test_room_policy.py
@@ -0,0 +1,226 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2025 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+#
+from typing import Optional
+from unittest import mock
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.events import EventBase, make_event_from_dict
+from synapse.rest import admin
+from synapse.rest.client import login, room
+from synapse.server import HomeServer
+from synapse.types import JsonDict, UserID
+from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
+from synapse.util import Clock
+
+from tests import unittest
+from tests.test_utils import event_injection
+
+
+class RoomPolicyTestCase(unittest.FederatingHomeserverTestCase):
+ """Tests room policy handler."""
+
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+ # mock out the federation transport client
+ self.mock_federation_transport_client = mock.Mock(
+ spec=["get_policy_recommendation_for_pdu"]
+ )
+ self.mock_federation_transport_client.get_policy_recommendation_for_pdu = (
+ mock.AsyncMock()
+ )
+ return super().setup_test_homeserver(
+ federation_transport_client=self.mock_federation_transport_client
+ )
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.hs = hs
+ self.handler = hs.get_room_policy_handler()
+ main_store = self.hs.get_datastores().main
+
+ # Create a room
+ self.creator = self.register_user("creator", "test1234")
+ self.creator_token = self.login("creator", "test1234")
+ self.room_id = self.helper.create_room_as(
+ room_creator=self.creator, tok=self.creator_token
+ )
+ room_version = self.get_success(main_store.get_room_version(self.room_id))
+
+ # Create some sample events
+ self.spammy_event = make_event_from_dict(
+ room_version=room_version,
+ internal_metadata_dict={},
+ event_dict={
+ "room_id": self.room_id,
+ "type": "m.room.message",
+ "sender": "@spammy:example.org",
+ "content": {
+ "msgtype": "m.text",
+ "body": "This is a spammy event.",
+ },
+ },
+ )
+ self.not_spammy_event = make_event_from_dict(
+ room_version=room_version,
+ internal_metadata_dict={},
+ event_dict={
+ "room_id": self.room_id,
+ "type": "m.room.message",
+ "sender": "@not_spammy:example.org",
+ "content": {
+ "msgtype": "m.text",
+ "body": "This is a NOT spammy event.",
+ },
+ },
+ )
+
+ # Prepare the policy server mock to decide spam vs not spam on those events
+ self.call_count = 0
+
+ async def get_policy_recommendation_for_pdu(
+ destination: str,
+ pdu: EventBase,
+ timeout: Optional[int] = None,
+ ) -> JsonDict:
+ self.call_count += 1
+ self.assertEqual(destination, self.OTHER_SERVER_NAME)
+ if pdu.event_id == self.spammy_event.event_id:
+ return {"recommendation": RECOMMENDATION_SPAM}
+ elif pdu.event_id == self.not_spammy_event.event_id:
+ return {"recommendation": RECOMMENDATION_OK}
+ else:
+ self.fail("Unexpected event ID")
+
+ self.mock_federation_transport_client.get_policy_recommendation_for_pdu.side_effect = get_policy_recommendation_for_pdu
+
+ def _add_policy_server_to_room(self) -> None:
+ # Inject a member event into the room
+ policy_user_id = f"@policy:{self.OTHER_SERVER_NAME}"
+ self.get_success(
+ event_injection.inject_member_event(
+ self.hs, self.room_id, policy_user_id, "join"
+ )
+ )
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ "via": self.OTHER_SERVER_NAME,
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ def test_no_policy_event_set(self) -> None:
+ # We don't need to modify the room state at all - we're testing the default
+ # case where a room doesn't use a policy server.
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_empty_policy_event_set(self) -> None:
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ # empty content (no `via`)
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_nonstring_policy_event_set(self) -> None:
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ "via": 42, # should be a server name
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_self_policy_event_set(self) -> None:
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ # We ignore events when the policy server is ourselves (for now?)
+ "via": (UserID.from_string(self.creator)).domain,
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_invalid_server_policy_event_set(self) -> None:
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ "via": "|this| is *not* a (valid) server name.com",
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_not_in_room_policy_event_set(self) -> None:
+ self.helper.send_state(
+ self.room_id,
+ "org.matrix.msc4284.policy",
+ {
+ "via": f"x.{self.OTHER_SERVER_NAME}",
+ },
+ tok=self.creator_token,
+ state_key="",
+ )
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 0)
+
+ def test_spammy_event_is_spam(self) -> None:
+ self._add_policy_server_to_room()
+
+ ok = self.get_success(self.handler.is_event_allowed(self.spammy_event))
+ self.assertEqual(ok, False)
+ self.assertEqual(self.call_count, 1)
+
+ def test_not_spammy_event_is_not_spam(self) -> None:
+ self._add_policy_server_to_room()
+
+ ok = self.get_success(self.handler.is_event_allowed(self.not_spammy_event))
+ self.assertEqual(ok, True)
+ self.assertEqual(self.call_count, 1)
|