summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7146.misc1
-rw-r--r--synapse/app/generic_worker.py13
-rw-r--r--synapse/replication/tcp/resource.py33
3 files changed, 25 insertions, 22 deletions
diff --git a/changelog.d/7146.misc b/changelog.d/7146.misc
new file mode 100644
index 0000000000..facde06959
--- /dev/null
+++ b/changelog.d/7146.misc
@@ -0,0 +1 @@
+Run replication streamers on workers.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 2a56fe0bd5..d125327f08 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -960,17 +960,22 @@ def start(config_options):
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    ss = GenericWorkerServer(
+    hs = GenericWorkerServer(
         config.server_name,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
     )
 
-    setup_logging(ss, config, use_worker_options=True)
+    setup_logging(hs, config, use_worker_options=True)
+
+    hs.setup()
+
+    # Ensure the replication streamer is always started in case we write to any
+    # streams. Will no-op if no streams can be written to by this worker.
+    hs.get_replication_streamer()
 
-    ss.setup()
     reactor.addSystemEventTrigger(
-        "before", "startup", _base.start, ss, config.worker_listeners
+        "before", "startup", _base.start, hs, config.worker_listeners
     )
 
     _base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index b2d6baa2a2..33d2f589ac 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,9 +17,7 @@
 
 import logging
 import random
-from typing import Dict
-
-from six import itervalues
+from typing import Dict, List
 
 from prometheus_client import Counter
 
@@ -71,29 +69,28 @@ class ReplicationStreamer(object):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
-        self._server_notices_sender = hs.get_server_notices_sender()
 
         self._replication_torture_level = hs.config.replication_torture_level
 
-        # List of streams that clients can subscribe to.
-        # We only support federation stream if federation sending hase been
-        # disabled on the master.
-        self.streams = [
-            stream(hs)
-            for stream in itervalues(STREAMS_MAP)
-            if stream != FederationStream or not hs.config.send_federation
-        ]
+        # Work out list of streams that this instance is the source of.
+        self.streams = []  # type: List[Stream]
+        if hs.config.worker_app is None:
+            for stream in STREAMS_MAP.values():
+                if stream == FederationStream and hs.config.send_federation:
+                    # We only support federation stream if federation sending
+                    # hase been disabled on the master.
+                    continue
 
-        self.streams_by_name = {stream.NAME: stream for stream in self.streams}
+                self.streams.append(stream(hs))
 
-        self.federation_sender = None
-        if not hs.config.send_federation:
-            self.federation_sender = hs.get_federation_sender()
+        self.streams_by_name = {stream.NAME: stream for stream in self.streams}
 
-        self.notifier.add_replication_callback(self.on_notifier_poke)
+        # Only bother registering the notifier callback if we have streams to
+        # publish.
+        if self.streams:
+            self.notifier.add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False