diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 884b5d60b4..b8232e5257 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -110,6 +110,7 @@ class FederationServer(FederationBase):
self.handler = hs.get_federation_handler()
self.storage = hs.get_storage()
+ self._spam_checker = hs.get_spam_checker()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
@@ -1019,6 +1020,12 @@ class FederationServer(FederationBase):
except SynapseError as e:
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
+ if await self._spam_checker.should_drop_federated_event(pdu):
+ logger.warning(
+ "Unstaged federated event contains spam, dropping %s", pdu.event_id
+ )
+ return
+
# Add the event to our staging area
await self.store.insert_received_event_to_staging(origin, pdu)
@@ -1032,6 +1039,41 @@ class FederationServer(FederationBase):
pdu.room_id, room_version, lock, origin, pdu
)
+ async def _get_next_nonspam_staged_event_for_room(
+ self, room_id: str, room_version: RoomVersion
+ ) -> Optional[Tuple[str, EventBase]]:
+ """Fetch the first non-spam event from staging queue.
+
+ Args:
+ room_id: the room to fetch the first non-spam event in.
+ room_version: the version of the room.
+
+ Returns:
+ The first non-spam event in that room.
+ """
+
+ while True:
+ # We need to do this check outside the lock to avoid a race between
+ # a new event being inserted by another instance and it attempting
+ # to acquire the lock.
+ next = await self.store.get_next_staged_event_for_room(
+ room_id, room_version
+ )
+
+ if next is None:
+ return None
+
+ origin, event = next
+
+ if await self._spam_checker.should_drop_federated_event(event):
+ logger.warning(
+ "Staged federated event contains spam, dropping %s",
+ event.event_id,
+ )
+ continue
+
+ return next
+
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
self,
@@ -1109,12 +1151,10 @@ class FederationServer(FederationBase):
(self._clock.time_msec() - received_ts) / 1000
)
- # We need to do this check outside the lock to avoid a race between
- # a new event being inserted by another instance and it attempting
- # to acquire the lock.
- next = await self.store.get_next_staged_event_for_room(
+ next = await self._get_next_nonspam_staged_event_for_room(
room_id, room_version
)
+
if not next:
break
|