diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 39 |
1 files changed, 3 insertions, 36 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 002171ce7c..41569305df 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,6 @@ import logging import random -from typing import Dict, List from prometheus_client import Counter @@ -25,12 +24,6 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol -from synapse.replication.tcp.streams import ( - STREAMS_MAP, - CachesStream, - FederationStream, - Stream, -) from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -80,31 +73,7 @@ class ReplicationStreamer(object): self._replication_torture_level = hs.config.replication_torture_level - # Work out list of streams that this instance is the source of. - self.streams = [] # type: List[Stream] - - # All workers can write to the cache invalidation stream. - self.streams.append(CachesStream(hs)) - - if hs.config.worker_app is None: - for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # has been disabled on the master. - continue - - if stream == CachesStream: - # We've already added it above. - continue - - self.streams.append(stream(hs)) - - self.streams_by_name = {stream.NAME: stream for stream in self.streams} - - # Only bother registering the notifier callback if we have streams to - # publish. - if self.streams: - self.notifier.add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -112,10 +81,8 @@ class ReplicationStreamer(object): self.command_handler = hs.get_tcp_replication() - def get_streams(self) -> Dict[str, Stream]: - """Get a mapp from stream name to stream instance. - """ - return self.streams_by_name + # Set of streams to replicate. + self.streams = self.command_handler.get_streams_to_replicate() def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the |