summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorH. Shay <hillerys@element.io>2022-09-16 13:36:41 -0700
committerH. Shay <hillerys@element.io>2022-09-19 11:49:33 -0700
commit404a6e3d797e2322fc8392e3bc7c0b3c954d7c20 (patch)
tree6e5f5bc047bd0548f015b381ca6d88dbc7265f83 /synapse/handlers
parentadd fucntions to persist events as a batch, encapsulate some logic in a helpe... (diff)
downloadsynapse-github/shay/fix_git_hx_batch.tar.xz
add suppport for persisting batched events over replication github/shay/fix_git_hx_batch shay/fix_git_hx_batch
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/message.py29
1 files changed, 27 insertions, 2 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py

index ee373ea2ac..76655d5d67 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -56,6 +56,7 @@ from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter @@ -494,6 +495,7 @@ class EventCreationHandler: self.membership_types_to_include_profile_data_in.add(Membership.INVITE) self.send_event = ReplicationSendEventRestServlet.make_client(hs) + self.send_events = ReplicationSendEventsRestServlet.make_client(hs) self.request_ratelimiter = hs.get_request_ratelimiter() @@ -1362,7 +1364,7 @@ class EventCreationHandler: async def _persist_events_batch( self, - requestor: Requester, + requester: Requester, events_and_ctx: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, ) -> EventBase: @@ -1378,8 +1380,31 @@ class EventCreationHandler: event, context ) try: + # If we're a worker we need to hit out to the master. + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: + try: + result = await self.send_events( + instance_name=writer_instance, + store=self.store, + requester=requester, + events_and_ctx=events_and_ctx, + ratelimit=ratelimit, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise + stream_id = result["stream_id"] + + # If we newly persisted the event then we need to update its + # stream_ordering entry manually (as it was persisted on + # another worker). + event.internal_metadata.stream_ordering = stream_id + return event + last_event = await self.persist_and_notify_batched_events( - requestor, events_and_ctx, ratelimit + requester, events_and_ctx, ratelimit ) except Exception: # Ensure that we actually remove the entries in the push actions