diff --git a/changelog.d/18019.feature b/changelog.d/18019.feature
new file mode 100644
index 0000000000..74e22df74a
--- /dev/null
+++ b/changelog.d/18019.feature
@@ -0,0 +1 @@
+Define ratelimit configuration for delayed event management.
diff --git a/demo/start.sh b/demo/start.sh
index 7636c41f1f..e010302bf4 100755
--- a/demo/start.sh
+++ b/demo/start.sh
@@ -142,6 +142,9 @@ for port in 8080 8081 8082; do
per_user:
per_second: 1000
burst_count: 1000
+ rc_delayed_event_mgmt:
+ per_second: 1000
+ burst_count: 1000
RC
)
echo "${ratelimiting}" >> "$port.config"
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
index ac0c4bb851..9ab8fedcae 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -94,6 +94,10 @@ rc_presence:
per_second: 9999
burst_count: 9999
+rc_delayed_event_mgmt:
+ per_second: 9999
+ burst_count: 9999
+
federation_rr_transactions_per_room_per_second: 9999
allow_device_name_lookup_over_federation: true
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 8d9a71fb5f..8523c5f65f 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -1947,6 +1947,29 @@ rc_presence:
burst_count: 1
```
---
+### `rc_delayed_event_mgmt`
+
+Ratelimiting settings for delayed event management.
+
+This is a ratelimiting option that ratelimits
+attempts to restart, cancel, or view delayed events
+based on the sending client's account and device ID.
+It defaults to: `per_second: 1`, `burst_count: 5`.
+
+Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.
+
+Setting this to a high value allows clients to make delayed event management requests often
+(such as repeatedly restarting a delayed event with a short timeout,
+or restarting several different delayed events all at once)
+without the risk of being ratelimited.
+
+Example configuration:
+```yaml
+rc_delayed_event_mgmt:
+ per_second: 2
+ burst_count: 20
+```
+---
### `federation_rr_transactions_per_room_per_second`
Sets outgoing federation transaction frequency for sending read-receipts,
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 06af4da3c5..eb1dc2dacb 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -234,3 +234,9 @@ class RatelimitConfig(Config):
"rc_presence.per_user",
defaults={"per_second": 0.1, "burst_count": 1},
)
+
+ self.rc_delayed_event_mgmt = RatelimitSettings.parse(
+ config,
+ "rc_delayed_event_mgmt",
+ defaults={"per_second": 1, "burst_count": 5},
+ )
diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py
index 3c88a96fd3..b3f40809a1 100644
--- a/synapse/handlers/delayed_events.py
+++ b/synapse/handlers/delayed_events.py
@@ -19,6 +19,7 @@ from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
+from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
from synapse.metrics import event_processing_positions
@@ -57,10 +58,19 @@ class DelayedEventsHandler:
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
self._clock = hs.get_clock()
- self._request_ratelimiter = hs.get_request_ratelimiter()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()
+ self._request_ratelimiter = hs.get_request_ratelimiter()
+
+ # Ratelimiter for management of existing delayed events,
+ # keyed by the sending user ID & device ID.
+ self._delayed_event_mgmt_ratelimiter = Ratelimiter(
+ store=self._store,
+ clock=self._clock,
+ cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
+ )
+
self._next_delayed_event_call: Optional[IDelayedCall] = None
# The current position in the current_state_delta stream
@@ -227,6 +237,9 @@ class DelayedEventsHandler:
Raises:
SynapseError: if the delayed event fails validation checks.
"""
+ # Use standard request limiter for scheduling new delayed events.
+ # TODO: Instead apply ratelimiting based on the scheduled send time.
+ # See https://github.com/element-hq/synapse/issues/18021
await self._request_ratelimiter.ratelimit(requester)
self._event_creation_handler.validator.validate_builder(
@@ -285,7 +298,10 @@ class DelayedEventsHandler:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
- await self._request_ratelimiter.ratelimit(requester)
+ await self._delayed_event_mgmt_ratelimiter.ratelimit(
+ requester,
+ (requester.user.to_string(), requester.device_id),
+ )
await self._initialized_from_db
next_send_ts = await self._store.cancel_delayed_event(
@@ -308,7 +324,10 @@ class DelayedEventsHandler:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
- await self._request_ratelimiter.ratelimit(requester)
+ await self._delayed_event_mgmt_ratelimiter.ratelimit(
+ requester,
+ (requester.user.to_string(), requester.device_id),
+ )
await self._initialized_from_db
next_send_ts = await self._store.restart_delayed_event(
@@ -332,6 +351,8 @@ class DelayedEventsHandler:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
+ # Use standard request limiter for sending delayed events on-demand,
+ # as an on-demand send is similar to sending a regular event.
await self._request_ratelimiter.ratelimit(requester)
await self._initialized_from_db
@@ -415,7 +436,10 @@ class DelayedEventsHandler:
async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
"""Return all pending delayed events requested by the given user."""
- await self._request_ratelimiter.ratelimit(requester)
+ await self._delayed_event_mgmt_ratelimiter.ratelimit(
+ requester,
+ (requester.user.to_string(), requester.device_id),
+ )
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)
diff --git a/tests/rest/client/test_delayed_events.py b/tests/rest/client/test_delayed_events.py
index 1793b38c4a..2c938390c8 100644
--- a/tests/rest/client/test_delayed_events.py
+++ b/tests/rest/client/test_delayed_events.py
@@ -109,6 +109,27 @@ class DelayedEventsTestCase(HomeserverTestCase):
)
self.assertEqual(setter_expected, content.get(setter_key), content)
+ @unittest.override_config(
+ {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
+ )
+ def test_get_delayed_events_ratelimit(self) -> None:
+ args = ("GET", PATH_PREFIX)
+
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
+
+ # Add the current user to the ratelimit overrides, allowing them no ratelimiting.
+ self.get_success(
+ self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
+ )
+
+ # Test that the request isn't ratelimited anymore.
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
def test_update_delayed_event_without_id(self) -> None:
channel = self.make_request(
"POST",
@@ -206,6 +227,46 @@ class DelayedEventsTestCase(HomeserverTestCase):
expect_code=HTTPStatus.NOT_FOUND,
)
+ @unittest.override_config(
+ {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
+ )
+ def test_cancel_delayed_event_ratelimit(self) -> None:
+ delay_ids = []
+ for _ in range(2):
+ channel = self.make_request(
+ "POST",
+ _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
+ {},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+ delay_id = channel.json_body.get("delay_id")
+ self.assertIsNotNone(delay_id)
+ delay_ids.append(delay_id)
+
+ channel = self.make_request(
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "cancel"},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
+ args = (
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "cancel"},
+ )
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
+
+ # Add the current user to the ratelimit overrides, allowing them no ratelimiting.
+ self.get_success(
+ self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
+ )
+
+ # Test that the request isn't ratelimited anymore.
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
def test_send_delayed_state_event(self) -> None:
state_key = "to_send_on_request"
@@ -250,6 +311,44 @@ class DelayedEventsTestCase(HomeserverTestCase):
)
self.assertEqual(setter_expected, content.get(setter_key), content)
+ @unittest.override_config({"rc_message": {"per_second": 3.5, "burst_count": 4}})
+ def test_send_delayed_event_ratelimit(self) -> None:
+ delay_ids = []
+ for _ in range(2):
+ channel = self.make_request(
+ "POST",
+ _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
+ {},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+ delay_id = channel.json_body.get("delay_id")
+ self.assertIsNotNone(delay_id)
+ delay_ids.append(delay_id)
+
+ channel = self.make_request(
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "send"},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
+ args = (
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "send"},
+ )
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
+
+ # Add the current user to the ratelimit overrides, allowing them no ratelimiting.
+ self.get_success(
+ self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
+ )
+
+ # Test that the request isn't ratelimited anymore.
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
def test_restart_delayed_state_event(self) -> None:
state_key = "to_send_on_restarted_timeout"
@@ -309,6 +408,46 @@ class DelayedEventsTestCase(HomeserverTestCase):
)
self.assertEqual(setter_expected, content.get(setter_key), content)
+ @unittest.override_config(
+ {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
+ )
+ def test_restart_delayed_event_ratelimit(self) -> None:
+ delay_ids = []
+ for _ in range(2):
+ channel = self.make_request(
+ "POST",
+ _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
+ {},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+ delay_id = channel.json_body.get("delay_id")
+ self.assertIsNotNone(delay_id)
+ delay_ids.append(delay_id)
+
+ channel = self.make_request(
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "restart"},
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
+ args = (
+ "POST",
+ f"{PATH_PREFIX}/{delay_ids.pop(0)}",
+ {"action": "restart"},
+ )
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
+
+ # Add the current user to the ratelimit overrides, allowing them no ratelimiting.
+ self.get_success(
+ self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
+ )
+
+ # Test that the request isn't ratelimited anymore.
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
state_key = "to_be_cancelled"
@@ -374,3 +513,7 @@ def _get_path_for_delayed_state(
room_id: str, event_type: str, state_key: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"
+
+
+def _get_path_for_delayed_send(room_id: str, event_type: str, delay_ms: int) -> str:
+ return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index a7108b905a..dd8350ddd1 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -2399,6 +2399,41 @@ class RoomDelayedEventTestCase(RoomBase):
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+ @unittest.override_config(
+ {
+ "max_event_delay_duration": "24h",
+ "rc_message": {"per_second": 1, "burst_count": 2},
+ }
+ )
+ def test_add_delayed_event_ratelimit(self) -> None:
+ """Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
+ which ratelimits them correctly, including by not limiting when the requester is
+ exempt from ratelimiting.
+ """
+
+ # Test that new delayed events are correctly ratelimited.
+ args = (
+ "POST",
+ (
+ "rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
+ % self.room_id
+ ).encode("ascii"),
+ {"body": "test", "msgtype": "m.text"},
+ )
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
+
+ # Add the current user to the ratelimit overrides, allowing them no ratelimiting.
+ self.get_success(
+ self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
+ )
+
+ # Test that the new delayed events aren't ratelimited anymore.
+ channel = self.make_request(*args)
+ self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
+
class RoomSearchTestCase(unittest.HomeserverTestCase):
servlets = [
|