summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-27 12:52:40 +0100
committerGitHub <noreply@github.com>2023-10-27 12:52:40 +0100
commit928e9648578d24dc9f5ed3476d629fca9a64b22a (patch)
tree7c5cb9bccf17cec1106a8c4fb3e5112fc3cd02ff
parentReduce replication traffic due to reflected cache stream POSITION (#16557) (diff)
downloadsynapse-928e9648578d24dc9f5ed3476d629fca9a64b22a.tar.xz
Fix cross-worker ratelimiting (#16558)
c.f. #16481
-rw-r--r--changelog.d/16558.bugfix1
-rw-r--r--synapse/handlers/message.py73
2 files changed, 58 insertions, 16 deletions
diff --git a/changelog.d/16558.bugfix b/changelog.d/16558.bugfix
new file mode 100644
index 0000000000..64f419fd82
--- /dev/null
+++ b/changelog.d/16558.bugfix
@@ -0,0 +1 @@
+Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a0b4a93ae8..811a41f161 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -999,7 +999,26 @@ class EventCreationHandler:
             raise ShadowBanError()
 
         if ratelimit:
-            await self.request_ratelimiter.ratelimit(requester, update=False)
+            room_id = event_dict["room_id"]
+            try:
+                room_version = await self.store.get_room_version(room_id)
+            except NotFoundError:
+                # The room doesn't exist.
+                raise AuthError(403, f"User {requester.user} not in room {room_id}")
+
+            if room_version.updated_redaction_rules:
+                redacts = event_dict["content"].get("redacts")
+            else:
+                redacts = event_dict.get("redacts")
+
+            is_admin_redaction = await self.is_admin_redaction(
+                event_type=event_dict["type"],
+                sender=event_dict["sender"],
+                redacts=redacts,
+            )
+            await self.request_ratelimiter.ratelimit(
+                requester, is_admin_redaction=is_admin_redaction, update=False
+            )
 
         # We limit the number of concurrent event sends in a room so that we
         # don't fork the DAG too much. If we don't limit then we can end up in
@@ -1508,6 +1527,18 @@ class EventCreationHandler:
                 first_event.room_id
             )
             if writer_instance != self._instance_name:
+                # Ratelimit before sending to the other event persister, to
+                # ensure that we correctly have ratelimits on both the event
+                # creators and event persisters.
+                if ratelimit:
+                    for event, _ in events_and_context:
+                        is_admin_redaction = await self.is_admin_redaction(
+                            event.type, event.sender, event.redacts
+                        )
+                        await self.request_ratelimiter.ratelimit(
+                            requester, is_admin_redaction=is_admin_redaction
+                        )
+
                 try:
                     result = await self.send_events(
                         instance_name=writer_instance,
@@ -1538,6 +1569,7 @@ class EventCreationHandler:
                     # stream_ordering entry manually (as it was persisted on
                     # another worker).
                     event.internal_metadata.stream_ordering = stream_id
+
                 return event
 
             event = await self.persist_and_notify_client_events(
@@ -1696,21 +1728,9 @@ class EventCreationHandler:
                 # can apply different ratelimiting. We do this by simply checking
                 # it's not a self-redaction (to avoid having to look up whether the
                 # user is actually admin or not).
-                is_admin_redaction = False
-                if event.type == EventTypes.Redaction:
-                    assert event.redacts is not None
-
-                    original_event = await self.store.get_event(
-                        event.redacts,
-                        redact_behaviour=EventRedactBehaviour.as_is,
-                        get_prev_content=False,
-                        allow_rejected=False,
-                        allow_none=True,
-                    )
-
-                    is_admin_redaction = bool(
-                        original_event and event.sender != original_event.sender
-                    )
+                is_admin_redaction = await self.is_admin_redaction(
+                    event.type, event.sender, event.redacts
+                )
 
                 await self.request_ratelimiter.ratelimit(
                     requester, is_admin_redaction=is_admin_redaction
@@ -1930,6 +1950,27 @@ class EventCreationHandler:
 
         return persisted_events[-1]
 
+    async def is_admin_redaction(
+        self, event_type: str, sender: str, redacts: Optional[str]
+    ) -> bool:
+        """Return whether the event is a redaction made by an admin, and thus
+        should use a different ratelimiter.
+        """
+        if event_type != EventTypes.Redaction:
+            return False
+
+        assert redacts is not None
+
+        original_event = await self.store.get_event(
+            redacts,
+            redact_behaviour=EventRedactBehaviour.as_is,
+            get_prev_content=False,
+            allow_rejected=False,
+            allow_none=True,
+        )
+
+        return bool(original_event and sender != original_event.sender)
+
     async def _maybe_kick_guest_users(
         self, event: EventBase, context: EventContext
     ) -> None: