diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 34 |
1 files changed, 14 insertions, 20 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 28edbbdac9..b2a1785c08 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,20 +17,18 @@ import logging import random - -from six import itervalues +from typing import List from prometheus_client import Counter from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import STREAMS_MAP, Stream +from synapse.replication.tcp.streams.federation import FederationStream from synapse.util.metrics import Measure -from .protocol import ServerReplicationStreamProtocol -from .streams import STREAMS_MAP -from .streams.federation import FederationStream - stream_updates_counter = Counter( "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] ) @@ -67,27 +65,23 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() - self._server_notices_sender = hs.get_server_notices_sender() self._replication_torture_level = hs.config.replication_torture_level - # List of streams that clients can subscribe to. - # We only support federation stream if federation sending hase been - # disabled on the master. - self.streams = [ - stream(hs) - for stream in itervalues(STREAMS_MAP) - if stream != FederationStream or not hs.config.send_federation - ] + # Work out list of streams that this instance is the source of. + self.streams = [] # type: List[Stream] + if hs.config.worker_app is None: + for stream in STREAMS_MAP.values(): + if stream == FederationStream and hs.config.send_federation: + # We only support federation stream if federation sending + # hase been disabled on the master. + continue - self.streams_by_name = {stream.NAME: stream for stream in self.streams} + self.streams.append(stream(hs)) - self.federation_sender = None - if not hs.config.send_federation: - self.federation_sender = hs.get_federation_sender() + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke) |