diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 33 |
1 files changed, 8 insertions, 25 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 33d2f589ac..41569305df 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,6 @@ import logging import random -from typing import Dict, List from prometheus_client import Counter @@ -25,7 +24,6 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol -from synapse.replication.tcp.streams import STREAMS_MAP, FederationStream, Stream from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -71,26 +69,11 @@ class ReplicationStreamer(object): self.store = hs.get_datastore() self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self._instance_name = hs.get_instance_name() self._replication_torture_level = hs.config.replication_torture_level - # Work out list of streams that this instance is the source of. - self.streams = [] # type: List[Stream] - if hs.config.worker_app is None: - for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # hase been disabled on the master. - continue - - self.streams.append(stream(hs)) - - self.streams_by_name = {stream.NAME: stream for stream in self.streams} - - # Only bother registering the notifier callback if we have streams to - # publish. - if self.streams: - self.notifier.add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -98,10 +81,8 @@ class ReplicationStreamer(object): self.command_handler = hs.get_tcp_replication() - def get_streams(self) -> Dict[str, Stream]: - """Get a mapp from stream name to stream instance. - """ - return self.streams_by_name + # Set of streams to replicate. + self.streams = self.command_handler.get_streams_to_replicate() def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the @@ -145,7 +126,9 @@ class ReplicationStreamer(object): random.shuffle(all_streams) for stream in all_streams: - if stream.last_token == stream.current_token(): + if stream.last_token == stream.current_token( + self._instance_name + ): continue if self._replication_torture_level: @@ -157,7 +140,7 @@ class ReplicationStreamer(object): "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, - stream.current_token(), + stream.current_token(self._instance_name), ) try: updates, current_token, limited = await stream.get_updates() |