diff --git a/changelog.d/16558.bugfix b/changelog.d/16558.bugfix
new file mode 100644
index 0000000000..64f419fd82
--- /dev/null
+++ b/changelog.d/16558.bugfix
@@ -0,0 +1 @@
+Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a0b4a93ae8..811a41f161 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -999,7 +999,26 @@ class EventCreationHandler:
raise ShadowBanError()
if ratelimit:
- await self.request_ratelimiter.ratelimit(requester, update=False)
+ room_id = event_dict["room_id"]
+ try:
+ room_version = await self.store.get_room_version(room_id)
+ except NotFoundError:
+ # The room doesn't exist.
+ raise AuthError(403, f"User {requester.user} not in room {room_id}")
+
+ if room_version.updated_redaction_rules:
+ redacts = event_dict["content"].get("redacts")
+ else:
+ redacts = event_dict.get("redacts")
+
+ is_admin_redaction = await self.is_admin_redaction(
+ event_type=event_dict["type"],
+ sender=event_dict["sender"],
+ redacts=redacts,
+ )
+ await self.request_ratelimiter.ratelimit(
+ requester, is_admin_redaction=is_admin_redaction, update=False
+ )
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
@@ -1508,6 +1527,18 @@ class EventCreationHandler:
first_event.room_id
)
if writer_instance != self._instance_name:
+ # Ratelimit before sending to the other event persister, to
+ # ensure that we correctly have ratelimits on both the event
+ # creators and event persisters.
+ if ratelimit:
+ for event, _ in events_and_context:
+ is_admin_redaction = await self.is_admin_redaction(
+ event.type, event.sender, event.redacts
+ )
+ await self.request_ratelimiter.ratelimit(
+ requester, is_admin_redaction=is_admin_redaction
+ )
+
try:
result = await self.send_events(
instance_name=writer_instance,
@@ -1538,6 +1569,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
+
return event
event = await self.persist_and_notify_client_events(
@@ -1696,21 +1728,9 @@ class EventCreationHandler:
# can apply different ratelimiting. We do this by simply checking
# it's not a self-redaction (to avoid having to look up whether the
# user is actually admin or not).
- is_admin_redaction = False
- if event.type == EventTypes.Redaction:
- assert event.redacts is not None
-
- original_event = await self.store.get_event(
- event.redacts,
- redact_behaviour=EventRedactBehaviour.as_is,
- get_prev_content=False,
- allow_rejected=False,
- allow_none=True,
- )
-
- is_admin_redaction = bool(
- original_event and event.sender != original_event.sender
- )
+ is_admin_redaction = await self.is_admin_redaction(
+ event.type, event.sender, event.redacts
+ )
await self.request_ratelimiter.ratelimit(
requester, is_admin_redaction=is_admin_redaction
@@ -1930,6 +1950,27 @@ class EventCreationHandler:
return persisted_events[-1]
+ async def is_admin_redaction(
+ self, event_type: str, sender: str, redacts: Optional[str]
+ ) -> bool:
+ """Return whether the event is a redaction made by an admin, and thus
+ should use a different ratelimiter.
+ """
+ if event_type != EventTypes.Redaction:
+ return False
+
+ assert redacts is not None
+
+ original_event = await self.store.get_event(
+ redacts,
+ redact_behaviour=EventRedactBehaviour.as_is,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=True,
+ )
+
+ return bool(original_event and sender != original_event.sender)
+
async def _maybe_kick_guest_users(
self, event: EventBase, context: EventContext
) -> None:
|