summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py48
1 files changed, 44 insertions, 4 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 884b5d60b4..b8232e5257 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -110,6 +110,7 @@ class FederationServer(FederationBase):
 
         self.handler = hs.get_federation_handler()
         self.storage = hs.get_storage()
+        self._spam_checker = hs.get_spam_checker()
         self._federation_event_handler = hs.get_federation_event_handler()
         self.state = hs.get_state_handler()
         self._event_auth_handler = hs.get_event_auth_handler()
@@ -1019,6 +1020,12 @@ class FederationServer(FederationBase):
         except SynapseError as e:
             raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
 
+        if await self._spam_checker.should_drop_federated_event(pdu):
+            logger.warning(
+                "Unstaged federated event contains spam, dropping %s", pdu.event_id
+            )
+            return
+
         # Add the event to our staging area
         await self.store.insert_received_event_to_staging(origin, pdu)
 
@@ -1032,6 +1039,41 @@ class FederationServer(FederationBase):
                 pdu.room_id, room_version, lock, origin, pdu
             )
 
+    async def _get_next_nonspam_staged_event_for_room(
+        self, room_id: str, room_version: RoomVersion
+    ) -> Optional[Tuple[str, EventBase]]:
+        """Fetch the first non-spam event from staging queue.
+
+        Args:
+            room_id: the room to fetch the first non-spam event in.
+            room_version: the version of the room.
+
+        Returns:
+            The first non-spam event in that room.
+        """
+
+        while True:
+            # We need to do this check outside the lock to avoid a race between
+            # a new event being inserted by another instance and it attempting
+            # to acquire the lock.
+            next = await self.store.get_next_staged_event_for_room(
+                room_id, room_version
+            )
+
+            if next is None:
+                return None
+
+            origin, event = next
+
+            if await self._spam_checker.should_drop_federated_event(event):
+                logger.warning(
+                    "Staged federated event contains spam, dropping %s",
+                    event.event_id,
+                )
+                continue
+
+            return next
+
     @wrap_as_background_process("_process_incoming_pdus_in_room_inner")
     async def _process_incoming_pdus_in_room_inner(
         self,
@@ -1109,12 +1151,10 @@ class FederationServer(FederationBase):
                         (self._clock.time_msec() - received_ts) / 1000
                     )
 
-            # We need to do this check outside the lock to avoid a race between
-            # a new event being inserted by another instance and it attempting
-            # to acquire the lock.
-            next = await self.store.get_next_staged_event_for_room(
+            next = await self._get_next_nonspam_staged_event_for_room(
                 room_id, room_version
             )
+
             if not next:
                 break