summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-24 15:06:35 +0000
committerErik Johnston <erik@matrix.org>2020-03-24 15:53:52 +0000
commit4dd08f25012a517a1c294efbf95b72b82d68e230 (patch)
tree8b85815db8141c2106f1d8a6bf80dfd8a67486da /synapse
parentAdd redis support (diff)
downloadsynapse-4dd08f25012a517a1c294efbf95b72b82d68e230.tar.xz
Make ReplicationStreamer work on workers
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/tcp/resource.py34
1 files changed, 14 insertions, 20 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 28edbbdac9..b2a1785c08 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,20 +17,18 @@
 
 import logging
 import random
-
-from six import itervalues
+from typing import List
 
 from prometheus_client import Counter
 
 from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import STREAMS_MAP, Stream
+from synapse.replication.tcp.streams.federation import FederationStream
 from synapse.util.metrics import Measure
 
-from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP
-from .streams.federation import FederationStream
-
 stream_updates_counter = Counter(
     "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
 )
@@ -67,27 +65,23 @@ class ReplicationStreamer(object):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
-        self._server_notices_sender = hs.get_server_notices_sender()
 
         self._replication_torture_level = hs.config.replication_torture_level
 
-        # List of streams that clients can subscribe to.
-        # We only support federation stream if federation sending hase been
-        # disabled on the master.
-        self.streams = [
-            stream(hs)
-            for stream in itervalues(STREAMS_MAP)
-            if stream != FederationStream or not hs.config.send_federation
-        ]
+        # Work out list of streams that this instance is the source of.
+        self.streams = []  # type: List[Stream]
+        if hs.config.worker_app is None:
+            for stream in STREAMS_MAP.values():
+                if stream == FederationStream and hs.config.send_federation:
+                    # We only support federation stream if federation sending
+                    # hase been disabled on the master.
+                    continue
 
-        self.streams_by_name = {stream.NAME: stream for stream in self.streams}
+                self.streams.append(stream(hs))
 
-        self.federation_sender = None
-        if not hs.config.send_federation:
-            self.federation_sender = hs.get_federation_sender()
+        self.streams_by_name = {stream.NAME: stream for stream in self.streams}
 
         self.notifier.add_replication_callback(self.on_notifier_poke)