1 files changed, 14 insertions, 20 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 28edbbdac9..b2a1785c08 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,20 +17,18 @@
import logging
import random
-
-from six import itervalues
+from typing import List
from prometheus_client import Counter
from twisted.internet.protocol import Factory
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import STREAMS_MAP, Stream
+from synapse.replication.tcp.streams.federation import FederationStream
from synapse.util.metrics import Measure
-from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP
-from .streams.federation import FederationStream
-
stream_updates_counter = Counter(
"synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
)
@@ -67,27 +65,23 @@ class ReplicationStreamer(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
- self._server_notices_sender = hs.get_server_notices_sender()
self._replication_torture_level = hs.config.replication_torture_level
- # List of streams that clients can subscribe to.
- # We only support federation stream if federation sending hase been
- # disabled on the master.
- self.streams = [
- stream(hs)
- for stream in itervalues(STREAMS_MAP)
- if stream != FederationStream or not hs.config.send_federation
- ]
+ # Work out list of streams that this instance is the source of.
+ self.streams = [] # type: List[Stream]
+ if hs.config.worker_app is None:
+ for stream in STREAMS_MAP.values():
+ if stream == FederationStream and hs.config.send_federation:
+ # We only support federation stream if federation sending
+ # hase been disabled on the master.
+ continue
- self.streams_by_name = {stream.NAME: stream for stream in self.streams}
+ self.streams.append(stream(hs))
- self.federation_sender = None
- if not hs.config.send_federation:
- self.federation_sender = hs.get_federation_sender()
+ self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)
|