summary refs log tree commit diff
path: root/packages/overlays/matrix-synapse/patches/0014-Add-bulk-send-events-endpoint.patch
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--packages/overlays/matrix-synapse/patches/0014-Add-bulk-send-events-endpoint.patch187
1 files changed, 187 insertions, 0 deletions
diff --git a/packages/overlays/matrix-synapse/patches/0014-Add-bulk-send-events-endpoint.patch b/packages/overlays/matrix-synapse/patches/0014-Add-bulk-send-events-endpoint.patch
new file mode 100644

index 0000000..fdd6030 --- /dev/null +++ b/packages/overlays/matrix-synapse/patches/0014-Add-bulk-send-events-endpoint.patch
@@ -0,0 +1,187 @@ +From 452f38800dd00b8686543099d6a085f9b4210687 Mon Sep 17 00:00:00 2001 +From: Rory& <root@rory.gay> +Date: Sat, 26 Jul 2025 09:50:56 +0200 +Subject: [PATCH 14/14] Add bulk send events endpoint + +--- + synapse/rest/client/capabilities.py | 3 + + synapse/rest/client/room.py | 117 +++++++++++++++++++++++++++- + 2 files changed, 119 insertions(+), 1 deletion(-) + +diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py +index 8f3193fb47..7220b75006 100644 +--- a/synapse/rest/client/capabilities.py ++++ b/synapse/rest/client/capabilities.py +@@ -74,6 +74,9 @@ class CapabilitiesRestServlet(RestServlet): + "m.get_login_token": { + "enabled": self.config.auth.login_via_existing_enabled, + }, ++ "gay.rory.bulk_send_events": { ++ "enabled": True ++ } + } + } + +diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py +index f61152c35b..19ba13dd64 100644 +--- a/synapse/rest/client/room.py ++++ b/synapse/rest/client/room.py +@@ -23,10 +23,12 @@ + + import logging + import re ++import ijson + from enum import Enum + from http import HTTPStatus + from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple + from urllib import parse as urlparse ++from twisted.internet import defer + + from prometheus_client.core import Histogram + +@@ -44,6 +46,7 @@ from synapse.api.errors import ( + UnredactedContentDeletedError, + ) + from synapse.api.filtering import Filter ++from synapse.events import EventBase + from synapse.events.utils import SerializeEventConfig, format_event_for_client_v2 + from synapse.http.server import HttpServer + from synapse.http.servlet import ( +@@ -469,7 +472,6 @@ class RoomSendEventRestServlet(TransactionRestServlet): + txn_id, + ) + +- + def _parse_request_delay( + request: SynapseRequest, + max_delay: Optional[int], +@@ -1610,6 +1612,118 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet): + remote_room_hosts, + ) + ++class RoomBulkSendEventRestServlet(ResolveRoomIdMixin, RestServlet): ++ """ ++ Bulk send events to a room. ++ ++ This endpoint allows sending multiple events to a room in a single request, ++ avoiding event linearisation issues. ++ """ ++ ++ PATTERNS = ( ++ re.compile( ++ "^/_matrix/client/unstable/gay.rory.bulk_send_events" ++ "/rooms/(?P<room_identifier>[^/]*)/bulk_send_events$" ++ ), ++ ) ++ CATEGORY = "Event sending requests" ++ ++ def __init__(self, hs: "HomeServer"): ++ super().__init__(hs) ++ self._auth = hs.get_auth() ++ self._event_creation_handler = hs.get_event_creation_handler() ++ self._message_handler = hs.get_message_handler() ++ ++ async def on_POST( ++ self, request: SynapseRequest, room_identifier: str ++ ) -> Tuple[int, JsonDict]: ++ logger.warning("bulk_send_events: Got bulk send events request") ++ requester = await self._auth.get_user_by_req(request, allow_guest=False) ++ room_id, remote_room_hosts = await self.resolve_room_id(room_identifier) ++ ++ force_sync_interval = parse_integer(request, "force_sync_interval", default=250) ++ ++ current_state_events = await self._message_handler.get_state_events( ++ room_id=room_id, ++ requester=requester, ++ ) ++ ++ state_map = {(event["type"], event.get("state_key", "")): event.get("event_id") for event in current_state_events} ++ ++ events = ijson.items( ++ request.content, ++ "item" ++ ) ++ ++ i = 0 ++ unpersisted_events = [] ++ ++ for event_data in events: ++ current_index = i ++ i += 1 ++ logger.info("bulk_send_events: Processing event %d: %s", current_index, event_data) ++ ++ event_dict: JsonDict = { ++ "type": event_data.get("type"), ++ "content": event_data.get("content", {}), ++ "room_id": room_id, ++ "sender": requester.user.to_string(), ++ } ++ ++ if "state_key" in event_data: ++ event_dict["state_key"] = event_data["state_key"] ++ ++ # Explicitly handle rate limits in order to avoid compounding effects ++ awaiting_ratelimit = False ++ ratelimit_hit = False ++ while awaiting_ratelimit: ++ can_do_action, ratelimit_expiry = await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=False) ++ if not can_do_action: ++ # can_do_action returns an absolute timestamp, convert it to a relative time ++ time_to_sleep = ratelimit_expiry - self._event_creation_handler.request_ratelimiter.clock.time() ++ logger.warning("bulk_send_events: Got rate limited in bulk sending events, waiting %ds", time_to_sleep) ++ await self._event_creation_handler.request_ratelimiter.clock.sleep(time_to_sleep) ++ ratelimit_hit = True ++ else: ++ awaiting_ratelimit = False ++ await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=True) ++ ++ event, unpersisted_context = await self._event_creation_handler.create_event( ++ requester, ++ event_dict, ++ for_batch=True, ++ state_map=state_map, ++ ) ++ context = await unpersisted_context.persist(event) ++ ++ if event.is_state(): ++ prev_event = await self._event_creation_handler.deduplicate_state_event(event, context) ++ if prev_event is not None: ++ logger.info( ++ "Not bothering to persist state event %s duplicated by %s", ++ event.event_id, ++ prev_event.event_id, ++ ) ++ continue ++ else: ++ state_map[(event_dict["type"], event_dict["state_key"])] = event.event_id ++ logger.warning("bulk_send_events: Updated state_map!") ++ ++ unpersisted_events.append((event, context)) ++ logger.warning("bulk_send_events: Persisted event %d: %s", current_index, event) ++ ++ if ratelimit_hit or len(unpersisted_events) >= force_sync_interval: ++ logger.warning("bulk_send_events: Hit rate limit or max batch size, sending %d events", len(unpersisted_events)) ++ await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False) ++ unpersisted_events = [] ++ ++ # Finalize any remaining unpersisted events ++ if(len(unpersisted_events) > 0): ++ await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False) ++ unpersisted_events = [] ++ ++ return 200, {} ++ + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + RoomStateEventRestServlet(hs).register(http_server) +@@ -1619,6 +1733,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + JoinRoomAliasServlet(hs).register(http_server) + RoomMembershipRestServlet(hs).register(http_server) + RoomSendEventRestServlet(hs).register(http_server) ++ RoomBulkSendEventRestServlet(hs).register(http_server) + PublicRoomListRestServlet(hs).register(http_server) + RoomStateRestServlet(hs).register(http_server) + RoomRedactEventRestServlet(hs).register(http_server) +-- +2.49.0 +