diff options
author | Erik Johnston <erik@matrix.org> | 2020-04-28 13:34:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-28 13:34:12 +0100 |
commit | 38919b521e209d5020dba3e8394d8213b07f6bf9 (patch) | |
tree | 1f78b1226aeccf8fe9f1c7c520b0109201432c29 /synapse/replication/tcp | |
parent | Fix incorrect metrics reporting for renew_attestations (#7344) (diff) | |
download | synapse-38919b521e209d5020dba3e8394d8213b07f6bf9.tar.xz |
Run replication streamers on workers (#7146)
Currently we never write to streams from workers, but that will change soon
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/resource.py | 33 |
1 files changed, 15 insertions, 18 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b2d6baa2a2..33d2f589ac 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,9 +17,7 @@ import logging import random -from typing import Dict - -from six import itervalues +from typing import Dict, List from prometheus_client import Counter @@ -71,29 +69,28 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() - self._server_notices_sender = hs.get_server_notices_sender() self._replication_torture_level = hs.config.replication_torture_level - # List of streams that clients can subscribe to. - # We only support federation stream if federation sending hase been - # disabled on the master. - self.streams = [ - stream(hs) - for stream in itervalues(STREAMS_MAP) - if stream != FederationStream or not hs.config.send_federation - ] + # Work out list of streams that this instance is the source of. + self.streams = [] # type: List[Stream] + if hs.config.worker_app is None: + for stream in STREAMS_MAP.values(): + if stream == FederationStream and hs.config.send_federation: + # We only support federation stream if federation sending + # hase been disabled on the master. + continue - self.streams_by_name = {stream.NAME: stream for stream in self.streams} + self.streams.append(stream(hs)) - self.federation_sender = None - if not hs.config.send_federation: - self.federation_sender = hs.get_federation_sender() + self.streams_by_name = {stream.NAME: stream for stream in self.streams} - self.notifier.add_replication_callback(self.on_notifier_poke) + # Only bother registering the notifier callback if we have streams to + # publish. + if self.streams: + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False |