diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee373ea2ac..76655d5d67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -56,6 +56,7 @@ from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
@@ -494,6 +495,7 @@ class EventCreationHandler:
self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
+ self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
self.request_ratelimiter = hs.get_request_ratelimiter()
@@ -1362,7 +1364,7 @@ class EventCreationHandler:
async def _persist_events_batch(
self,
- requestor: Requester,
+ requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
@@ -1378,8 +1380,31 @@ class EventCreationHandler:
event, context
)
try:
+ # If we're a worker we need to hit out to the master.
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
+ try:
+ result = await self.send_events(
+ instance_name=writer_instance,
+ store=self.store,
+ requester=requester,
+ events_and_ctx=events_and_ctx,
+ ratelimit=ratelimit,
+ )
+ except SynapseError as e:
+ if e.code == HTTPStatus.CONFLICT:
+ raise PartialStateConflictError()
+ raise
+ stream_id = result["stream_id"]
+
+ # If we newly persisted the event then we need to update its
+ # stream_ordering entry manually (as it was persisted on
+ # another worker).
+ event.internal_metadata.stream_ordering = stream_id
+ return event
+
last_event = await self.persist_and_notify_batched_events(
- requestor, events_and_ctx, ratelimit
+ requester, events_and_ctx, ratelimit
)
except Exception:
# Ensure that we actually remove the entries in the push actions
|