diff --git a/changelog.d/12744.feature b/changelog.d/12744.feature
new file mode 100644
index 0000000000..9836d94f8c
--- /dev/null
+++ b/changelog.d/12744.feature
@@ -0,0 +1 @@
+Add a `drop_federated_event` callback to `SpamChecker` to disregard inbound federated events before they take up much processing power, in an emergency.
diff --git a/docs/modules/spam_checker_callbacks.md b/docs/modules/spam_checker_callbacks.md
index 472d957180..27c5a0ed5c 100644
--- a/docs/modules/spam_checker_callbacks.md
+++ b/docs/modules/spam_checker_callbacks.md
@@ -249,6 +249,24 @@ callback returns `False`, Synapse falls through to the next one. The value of th
callback that does not return `False` will be used. If this happens, Synapse will not call
any of the subsequent implementations of this callback.
+### `should_drop_federated_event`
+
+_First introduced in Synapse v1.60.0_
+
+```python
+async def should_drop_federated_event(event: "synapse.events.EventBase") -> bool
+```
+
+Called when checking whether a remote server can federate an event with us. **Returning
+`True` from this function will silently drop a federated event and split-brain our view
+of a room's DAG, and thus you shouldn't use this callback unless you know what you are
+doing.**
+
+If multiple modules implement this callback, they will be considered in order. If a
+callback returns `False`, Synapse falls through to the next one. The value of the first
+callback that does not return `False` will be used. If this happens, Synapse will not call
+any of the subsequent implementations of this callback.
+
## Example
The example below is a module that implements the spam checker callback
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index f30207376a..61bcbe2abe 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -44,6 +44,10 @@ CHECK_EVENT_FOR_SPAM_CALLBACK = Callable[
["synapse.events.EventBase"],
Awaitable[Union[bool, str]],
]
+SHOULD_DROP_FEDERATED_EVENT_CALLBACK = Callable[
+ ["synapse.events.EventBase"],
+ Awaitable[Union[bool, str]],
+]
USER_MAY_JOIN_ROOM_CALLBACK = Callable[[str, str, bool], Awaitable[bool]]
USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]]
USER_MAY_SEND_3PID_INVITE_CALLBACK = Callable[[str, str, str, str], Awaitable[bool]]
@@ -168,6 +172,9 @@ class SpamChecker:
self.clock = hs.get_clock()
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
+ self._should_drop_federated_event_callbacks: List[
+ SHOULD_DROP_FEDERATED_EVENT_CALLBACK
+ ] = []
self._user_may_join_room_callbacks: List[USER_MAY_JOIN_ROOM_CALLBACK] = []
self._user_may_invite_callbacks: List[USER_MAY_INVITE_CALLBACK] = []
self._user_may_send_3pid_invite_callbacks: List[
@@ -191,6 +198,9 @@ class SpamChecker:
def register_callbacks(
self,
check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None,
+ should_drop_federated_event: Optional[
+ SHOULD_DROP_FEDERATED_EVENT_CALLBACK
+ ] = None,
user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None,
user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None,
user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None,
@@ -209,6 +219,11 @@ class SpamChecker:
if check_event_for_spam is not None:
self._check_event_for_spam_callbacks.append(check_event_for_spam)
+ if should_drop_federated_event is not None:
+ self._should_drop_federated_event_callbacks.append(
+ should_drop_federated_event
+ )
+
if user_may_join_room is not None:
self._user_may_join_room_callbacks.append(user_may_join_room)
@@ -268,6 +283,31 @@ class SpamChecker:
return False
+ async def should_drop_federated_event(
+ self, event: "synapse.events.EventBase"
+ ) -> Union[bool, str]:
+ """Checks if a given federated event is considered "spammy" by this
+ server.
+
+ If the server considers an event spammy, it will be silently dropped,
+ and in doing so will split-brain our view of the room's DAG.
+
+ Args:
+ event: the event to be checked
+
+ Returns:
+ True if the event should be silently dropped
+ """
+ for callback in self._should_drop_federated_event_callbacks:
+ with Measure(
+ self.clock, "{}.{}".format(callback.__module__, callback.__qualname__)
+ ):
+ res: Union[bool, str] = await delay_cancellation(callback(event))
+ if res:
+ return res
+
+ return False
+
async def user_may_join_room(
self, user_id: str, room_id: str, is_invited: bool
) -> bool:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 884b5d60b4..b8232e5257 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -110,6 +110,7 @@ class FederationServer(FederationBase):
self.handler = hs.get_federation_handler()
self.storage = hs.get_storage()
+ self._spam_checker = hs.get_spam_checker()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
@@ -1019,6 +1020,12 @@ class FederationServer(FederationBase):
except SynapseError as e:
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
+ if await self._spam_checker.should_drop_federated_event(pdu):
+ logger.warning(
+ "Unstaged federated event contains spam, dropping %s", pdu.event_id
+ )
+ return
+
# Add the event to our staging area
await self.store.insert_received_event_to_staging(origin, pdu)
@@ -1032,6 +1039,41 @@ class FederationServer(FederationBase):
pdu.room_id, room_version, lock, origin, pdu
)
+ async def _get_next_nonspam_staged_event_for_room(
+ self, room_id: str, room_version: RoomVersion
+ ) -> Optional[Tuple[str, EventBase]]:
+ """Fetch the first non-spam event from staging queue.
+
+ Args:
+ room_id: the room to fetch the first non-spam event in.
+ room_version: the version of the room.
+
+ Returns:
+ The first non-spam event in that room.
+ """
+
+ while True:
+ # We need to do this check outside the lock to avoid a race between
+ # a new event being inserted by another instance and it attempting
+ # to acquire the lock.
+ next = await self.store.get_next_staged_event_for_room(
+ room_id, room_version
+ )
+
+ if next is None:
+ return None
+
+ origin, event = next
+
+ if await self._spam_checker.should_drop_federated_event(event):
+ logger.warning(
+ "Staged federated event contains spam, dropping %s",
+ event.event_id,
+ )
+ continue
+
+ return next
+
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
self,
@@ -1109,12 +1151,10 @@ class FederationServer(FederationBase):
(self._clock.time_msec() - received_ts) / 1000
)
- # We need to do this check outside the lock to avoid a race between
- # a new event being inserted by another instance and it attempting
- # to acquire the lock.
- next = await self.store.get_next_staged_event_for_room(
+ next = await self._get_next_nonspam_staged_event_for_room(
room_id, room_version
)
+
if not next:
break
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 73f92d2df8..c4f661bb93 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -47,6 +47,7 @@ from synapse.events.spamcheck import (
CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK,
CHECK_REGISTRATION_FOR_SPAM_CALLBACK,
CHECK_USERNAME_FOR_SPAM_CALLBACK,
+ SHOULD_DROP_FEDERATED_EVENT_CALLBACK,
USER_MAY_CREATE_ROOM_ALIAS_CALLBACK,
USER_MAY_CREATE_ROOM_CALLBACK,
USER_MAY_INVITE_CALLBACK,
@@ -234,6 +235,9 @@ class ModuleApi:
self,
*,
check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None,
+ should_drop_federated_event: Optional[
+ SHOULD_DROP_FEDERATED_EVENT_CALLBACK
+ ] = None,
user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None,
user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None,
user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None,
@@ -254,6 +258,7 @@ class ModuleApi:
"""
return self._spam_checker.register_callbacks(
check_event_for_spam=check_event_for_spam,
+ should_drop_federated_event=should_drop_federated_event,
user_may_join_room=user_may_join_room,
user_may_invite=user_may_invite,
user_may_send_3pid_invite=user_may_send_3pid_invite,
|