summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-13 10:27:02 +0100
committerGitHub <noreply@github.com>2020-05-13 10:27:02 +0100
commit7ee24c5674a36dc9cd7163cfdd3e14b74570dc77 (patch)
treea6676da63007e1cdde559db9d74ecd6d1ff5b8cd /synapse/replication/tcp/resource.py
parentFix Redis reconnection logic (#7482) (diff)
downloadsynapse-7ee24c5674a36dc9cd7163cfdd3e14b74570dc77.tar.xz
Have all instances correctly respond to REPLICATE command. (#7475)
Before all streams were only written to from master, so only master needed to respond to `REPLICATE` commands.

Before all instances wrote to the cache invalidation stream, but didn't respond to `REPLICATE`. This was a bug, which could lead to missed rows from cache invalidation stream if an instance is restarted, however all the caches would be empty in that case so it wasn't a problem.
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py39
1 files changed, 3 insertions, 36 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 002171ce7c..41569305df 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,6 @@
 
 import logging
 import random
-from typing import Dict, List
 
 from prometheus_client import Counter
 
@@ -25,12 +24,6 @@ from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
-from synapse.replication.tcp.streams import (
-    STREAMS_MAP,
-    CachesStream,
-    FederationStream,
-    Stream,
-)
 from synapse.util.metrics import Measure
 
 stream_updates_counter = Counter(
@@ -80,31 +73,7 @@ class ReplicationStreamer(object):
 
         self._replication_torture_level = hs.config.replication_torture_level
 
-        # Work out list of streams that this instance is the source of.
-        self.streams = []  # type: List[Stream]
-
-        # All workers can write to the cache invalidation stream.
-        self.streams.append(CachesStream(hs))
-
-        if hs.config.worker_app is None:
-            for stream in STREAMS_MAP.values():
-                if stream == FederationStream and hs.config.send_federation:
-                    # We only support federation stream if federation sending
-                    # has been disabled on the master.
-                    continue
-
-                if stream == CachesStream:
-                    # We've already added it above.
-                    continue
-
-                self.streams.append(stream(hs))
-
-        self.streams_by_name = {stream.NAME: stream for stream in self.streams}
-
-        # Only bother registering the notifier callback if we have streams to
-        # publish.
-        if self.streams:
-            self.notifier.add_replication_callback(self.on_notifier_poke)
+        self.notifier.add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
@@ -112,10 +81,8 @@ class ReplicationStreamer(object):
 
         self.command_handler = hs.get_tcp_replication()
 
-    def get_streams(self) -> Dict[str, Stream]:
-        """Get a mapp from stream name to stream instance.
-        """
-        return self.streams_by_name
+        # Set of streams to replicate.
+        self.streams = self.command_handler.get_streams_to_replicate()
 
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the