diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py
index 8f3193fb47..7220b75006 100644
--- a/synapse/rest/client/capabilities.py
+++ b/synapse/rest/client/capabilities.py
@@ -74,6 +74,9 @@ class CapabilitiesRestServlet(RestServlet):
"m.get_login_token": {
"enabled": self.config.auth.login_via_existing_enabled,
},
+ "gay.rory.bulk_send_events": {
+ "enabled": True
+ }
}
}
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index f61152c35b..19ba13dd64 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -23,10 +23,12 @@
import logging
import re
+import ijson
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple
from urllib import parse as urlparse
+from twisted.internet import defer
from prometheus_client.core import Histogram
@@ -44,6 +46,7 @@ from synapse.api.errors import (
UnredactedContentDeletedError,
)
from synapse.api.filtering import Filter
+from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, format_event_for_client_v2
from synapse.http.server import HttpServer
from synapse.http.servlet import (
@@ -469,7 +472,6 @@ class RoomSendEventRestServlet(TransactionRestServlet):
txn_id,
)
-
def _parse_request_delay(
request: SynapseRequest,
max_delay: Optional[int],
@@ -1610,6 +1612,118 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet):
remote_room_hosts,
)
+class RoomBulkSendEventRestServlet(ResolveRoomIdMixin, RestServlet):
+ """
+ Bulk send events to a room.
+
+ This endpoint allows sending multiple events to a room in a single request,
+ avoiding event linearisation issues.
+ """
+
+ PATTERNS = (
+ re.compile(
+ "^/_matrix/client/unstable/gay.rory.bulk_send_events"
+ "/rooms/(?P<room_identifier>[^/]*)/bulk_send_events$"
+ ),
+ )
+ CATEGORY = "Event sending requests"
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+ self._auth = hs.get_auth()
+ self._event_creation_handler = hs.get_event_creation_handler()
+ self._message_handler = hs.get_message_handler()
+
+ async def on_POST(
+ self, request: SynapseRequest, room_identifier: str
+ ) -> Tuple[int, JsonDict]:
+ logger.warning("bulk_send_events: Got bulk send events request")
+ requester = await self._auth.get_user_by_req(request, allow_guest=False)
+ room_id, remote_room_hosts = await self.resolve_room_id(room_identifier)
+
+ force_sync_interval = parse_integer(request, "force_sync_interval", default=250)
+
+ current_state_events = await self._message_handler.get_state_events(
+ room_id=room_id,
+ requester=requester,
+ )
+
+ state_map = {(event["type"], event.get("state_key", "")): event.get("event_id") for event in current_state_events}
+
+ events = ijson.items(
+ request.content,
+ "item"
+ )
+
+ i = 0
+ unpersisted_events = []
+
+ for event_data in events:
+ current_index = i
+ i += 1
+ logger.info("bulk_send_events: Processing event %d: %s", current_index, event_data)
+
+ event_dict: JsonDict = {
+ "type": event_data.get("type"),
+ "content": event_data.get("content", {}),
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ }
+
+ if "state_key" in event_data:
+ event_dict["state_key"] = event_data["state_key"]
+
+ # Explicitly handle rate limits in order to avoid compounding effects
+ awaiting_ratelimit = False
+ ratelimit_hit = False
+ while awaiting_ratelimit:
+ can_do_action, ratelimit_expiry = await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=False)
+ if not can_do_action:
+ # can_do_action returns an absolute timestamp, convert it to a relative time
+ time_to_sleep = ratelimit_expiry - self._event_creation_handler.request_ratelimiter.clock.time()
+ logger.warning("bulk_send_events: Got rate limited in bulk sending events, waiting %ds", time_to_sleep)
+ await self._event_creation_handler.request_ratelimiter.clock.sleep(time_to_sleep)
+ ratelimit_hit = True
+ else:
+ awaiting_ratelimit = False
+ await self._event_creation_handler.request_ratelimiter.can_do_action(requester, update=True)
+
+ event, unpersisted_context = await self._event_creation_handler.create_event(
+ requester,
+ event_dict,
+ for_batch=True,
+ state_map=state_map,
+ )
+ context = await unpersisted_context.persist(event)
+
+ if event.is_state():
+ prev_event = await self._event_creation_handler.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id,
+ prev_event.event_id,
+ )
+ continue
+ else:
+ state_map[(event_dict["type"], event_dict["state_key"])] = event.event_id
+ logger.warning("bulk_send_events: Updated state_map!")
+
+ unpersisted_events.append((event, context))
+ logger.warning("bulk_send_events: Persisted event %d: %s", current_index, event)
+
+ if ratelimit_hit or len(unpersisted_events) >= force_sync_interval:
+ logger.warning("bulk_send_events: Hit rate limit or max batch size, sending %d events", len(unpersisted_events))
+ await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False)
+ unpersisted_events = []
+
+ # Finalize any remaining unpersisted events
+ if(len(unpersisted_events) > 0):
+ await self._event_creation_handler.handle_new_client_event(requester, unpersisted_events, ratelimit=False)
+ unpersisted_events = []
+
+ return 200, {}
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
RoomStateEventRestServlet(hs).register(http_server)
@@ -1619,6 +1733,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
JoinRoomAliasServlet(hs).register(http_server)
RoomMembershipRestServlet(hs).register(http_server)
RoomSendEventRestServlet(hs).register(http_server)
+ RoomBulkSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
|