From 452f38800dd00b8686543099d6a085f9b4210687 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 26 Jul 2025 09:50:56 +0200 Subject: [PATCH 14/14] Add bulk send events endpoint --- synapse/rest/client/capabilities.py | 3 + synapse/rest/client/room.py | 117 +++++++++++++++++++++++++++- 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py index 8f3193fb47..7220b75006 100644 --- a/synapse/rest/client/capabilities.py +++ b/synapse/rest/client/capabilities.py @@ -74,6 +74,9 @@ class CapabilitiesRestServlet(RestServlet): "m.get_login_token": { "enabled": self.config.auth.login_via_existing_enabled, }, + "gay.rory.bulk_send_events": { + "enabled": True + } } } diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index f61152c35b..19ba13dd64 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -23,10 +23,12 @@ import logging import re +import ijson from enum import Enum from http import HTTPStatus from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple from urllib import parse as urlparse +from twisted.internet import defer from prometheus_client.core import Histogram @@ -44,6 +46,7 @@ from synapse.api.errors import ( UnredactedContentDeletedError, ) from synapse.api.filtering import Filter +from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig, format_event_for_client_v2 from synapse.http.server import HttpServer from synapse.http.servlet import ( @@ -469,7 +472,6 @@ class RoomSendEventRestServlet(TransactionRestServlet): txn_id, ) - def _parse_request_delay( request: SynapseRequest, max_delay: Optional[int], @@ -1610,6 +1612,118 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet): remote_room_hosts, ) +class RoomBulkSendEventRestServlet(ResolveRoomIdMixin, RestServlet): + """ + Bulk send events to a room. + + This endpoint allows sending multiple events to a room in a single request, + avoiding event linearisation issues. + """ + + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/gay.rory.bulk_send_events" + "/rooms/(?P[^/]*)/bulk_send_events$" + ), + ) + CATEGORY = "Event sending requests" + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self._auth = hs.get_auth() + self._event_creation_handler = hs.get_event_creation_handler() + self._message_handler = hs.get_message_handler() + + async def on_POST( + self, request: SynapseRequest, room_identifier: str + ) -> Tuple[int, JsonDict]: + logger.warning("bulk_send_events: Got bulk send events request") + requester = await self._auth.get_user_by_req(request, allow_guest=False) + room_id, remote_room_hosts = await self.resolve_room_id(room_identifier) + + force_sync_interval = parse_integer(request, "force_sync_interval", default=250) + + current_state_events = await self._message_handler.get_state_events( + room_id=room_id, + requester=requester, + ) + + state_map = {(event["type"], event.get("state_key", "")): event.get("event_id") for event in current_state_events} + + events = ijson.items( + request.content, + "item" + ) + + i = 0 + unpersisted_events = [] + + for event_data in events: + current_index = i + i += 1 + logger.info("bulk_send_events: Processing event %d: %s", current_index, event_data) + + event_dict: JsonDict = { + "type": event_data.get("type"), + "content": event_data.get("content", {}), + "room_id": room_id, + "sender": requester.user.to_string(), + } + + if "state_key" in event_data: + event_dict["state_key"] = event_data["state_key"] + + # Explicitly handle rate limits in order to avoid compounding effects + awaiting_ratelimit = False + ratelimit_hit = False + while awaiting_ratelimit: + can_do_action, ratelimit_expiry = await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=False) + if not can_do_action: + # can_do_action returns an absolute timestamp, convert it to a relative time + time_to_sleep = ratelimit_expiry - self._event_creation_handler.request_ratelimiter.clock.time() + logger.warning("bulk_send_events: Got rate limited in bulk sending events, waiting %ds", time_to_sleep) + await self._event_creation_handler.request_ratelimiter.clock.sleep(time_to_sleep) + ratelimit_hit = True + else: + awaiting_ratelimit = False + await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=True) + + event, unpersisted_context = await self._event_creation_handler.create_event( + requester, + event_dict, + for_batch=True, + state_map=state_map, + ) + context = await unpersisted_context.persist(event) + + if event.is_state(): + prev_event = await self._event_creation_handler.deduplicate_state_event(event, context) + if prev_event is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, + prev_event.event_id, + ) + continue + else: + state_map[(event_dict["type"], event_dict["state_key"])] = event.event_id + logger.warning("bulk_send_events: Updated state_map!") + + unpersisted_events.append((event, context)) + logger.warning("bulk_send_events: Persisted event %d: %s", current_index, event) + + if ratelimit_hit or len(unpersisted_events) >= force_sync_interval: + logger.warning("bulk_send_events: Hit rate limit or max batch size, sending %d events", len(unpersisted_events)) + await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False) + unpersisted_events = [] + + # Finalize any remaining unpersisted events + if(len(unpersisted_events) > 0): + await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False) + unpersisted_events = [] + + return 200, {} + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: RoomStateEventRestServlet(hs).register(http_server) @@ -1619,6 +1733,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: JoinRoomAliasServlet(hs).register(http_server) RoomMembershipRestServlet(hs).register(http_server) RoomSendEventRestServlet(hs).register(http_server) + RoomBulkSendEventRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) -- 2.49.0