diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4e9752ccbd..1c3ac03f20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
self.server_name = hs.hostname
self.ratelimiter = hs.get_ratelimiter()
self.notifier = hs.get_notifier()
+ self.config = hs.config
+
+ self.http_client = hs.get_simple_http_client()
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -419,12 +423,6 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
)
- if event.type == EventTypes.Message:
- presence = self.hs.get_presence_handler()
- # We don't want to block sending messages on any presence code. This
- # matters as sometimes presence code can take a while.
- preserve_fn(presence.bump_presence_active_time)(user)
-
@defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
@@ -559,6 +557,18 @@ class EventCreationHandler(object):
):
# We now need to go and hit out to wherever we need to hit out to.
+ # If we're a worker we need to hit out to the master.
+ if self.config.worker_app:
+ yield send_event_to_master(
+ self.http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ event=event,
+ context=context,
+ )
+ return
+
if ratelimit:
yield self.base_handler.ratelimit(requester)
@@ -692,3 +702,9 @@ class EventCreationHandler(object):
)
preserve_fn(_notify)()
+
+ if event.type == EventTypes.Message:
+ presence = self.hs.get_presence_handler()
+ # We don't want to block sending messages on any presence code. This
+ # matters as sometimes presence code can take a while.
+ preserve_fn(presence.bump_presence_active_time)(requester.user)
|