summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4e9752ccbd..1c3ac03f20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
 from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
         self.server_name = hs.hostname
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
+        self.config = hs.config
+
+        self.http_client = hs.get_simple_http_client()
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -419,12 +423,6 @@ class EventCreationHandler(object):
             ratelimit=ratelimit,
         )
 
-        if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(user)
-
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
         """
@@ -559,6 +557,18 @@ class EventCreationHandler(object):
     ):
         # We now need to go and hit out to wherever we need to hit out to.
 
+        # If we're a worker we need to hit out to the master.
+        if self.config.worker_app:
+            yield send_event_to_master(
+                self.http_client,
+                host=self.config.worker_replication_host,
+                port=self.config.worker_replication_http_port,
+                requester=requester,
+                event=event,
+                context=context,
+            )
+            return
+
         if ratelimit:
             yield self.base_handler.ratelimit(requester)
 
@@ -692,3 +702,9 @@ class EventCreationHandler(object):
             )
 
         preserve_fn(_notify)()
+
+        if event.type == EventTypes.Message:
+            presence = self.hs.get_presence_handler()
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            preserve_fn(presence.bump_presence_active_time)(requester.user)