summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/homeserver.py11
-rw-r--r--synapse/config/server.py16
-rw-r--r--synapse/config/workers.py8
-rw-r--r--synapse/replication/tcp/handler.py58
4 files changed, 39 insertions, 54 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e57a926032..883f2fd2ec 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -57,7 +57,6 @@ from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.admin import AdminRestResource
 from synapse.rest.health import HealthResource
@@ -290,16 +289,6 @@ class SynapseHomeServer(HomeServer):
                     manhole_settings=self.config.server.manhole_settings,
                     manhole_globals={"hs": self},
                 )
-            elif listener.type == "replication":
-                services = listen_tcp(
-                    listener.bind_addresses,
-                    listener.port,
-                    ReplicationStreamProtocolFactory(self),
-                )
-                for s in services:
-                    self.get_reactor().addSystemEventTrigger(
-                        "before", "shutdown", s.stopListening
-                    )
             elif listener.type == "metrics":
                 if not self.config.metrics.enable_metrics:
                     logger.warning(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 085fe22c51..c91df636d9 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -36,6 +36,12 @@ from ._util import validate_config
 
 logger = logging.Logger(__name__)
 
+DIRECT_TCP_ERROR = """
+Using direct TCP replication for workers is no longer supported.
+
+Please see https://matrix-org.github.io/synapse/latest/upgrade.html#direct-tcp-replication-is-no-longer-supported-migrate-to-redis
+"""
+
 # by default, we attempt to listen on both '::' *and* '0.0.0.0' because some OSes
 # (Windows, macOS, other BSD/Linux where net.ipv6.bindv6only is set) will only listen
 # on IPv6 when '::' is set.
@@ -165,7 +171,6 @@ KNOWN_LISTENER_TYPES = {
     "http",
     "metrics",
     "manhole",
-    "replication",
 }
 
 KNOWN_RESOURCES = {
@@ -515,7 +520,9 @@ class ServerConfig(Config):
         ):
             raise ConfigError("allowed_avatar_mimetypes must be a list")
 
-        self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
+        self.listeners = [
+            parse_listener_def(i, x) for i, x in enumerate(config.get("listeners", []))
+        ]
 
         # no_tls is not really supported any more, but let's grandfather it in
         # here.
@@ -880,9 +887,12 @@ def read_gc_thresholds(
         )
 
 
-def parse_listener_def(listener: Any) -> ListenerConfig:
+def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
     """parse a listener config from the config file"""
     listener_type = listener["type"]
+    # Raise a helpful error if direct TCP replication is still configured.
+    if listener_type == "replication":
+        raise ConfigError(DIRECT_TCP_ERROR, ("listeners", str(num), "type"))
 
     port = listener.get("port")
     if not isinstance(port, int):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index f2716422b5..0fb725dd8f 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -27,7 +27,7 @@ from ._base import (
     RoutableShardedWorkerHandlingConfig,
     ShardedWorkerHandlingConfig,
 )
-from .server import ListenerConfig, parse_listener_def
+from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
 
 _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
 The send_federation config option must be disabled in the main
@@ -128,7 +128,8 @@ class WorkerConfig(Config):
             self.worker_app = None
 
         self.worker_listeners = [
-            parse_listener_def(x) for x in config.get("worker_listeners", [])
+            parse_listener_def(i, x)
+            for i, x in enumerate(config.get("worker_listeners", []))
         ]
         self.worker_daemonize = bool(config.get("worker_daemonize"))
         self.worker_pid_file = config.get("worker_pid_file")
@@ -142,7 +143,8 @@ class WorkerConfig(Config):
         self.worker_replication_host = config.get("worker_replication_host", None)
 
         # The port on the main synapse for TCP replication
-        self.worker_replication_port = config.get("worker_replication_port", None)
+        if "worker_replication_port" in config:
+            raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
 
         # The port on the main synapse for HTTP replication endpoint
         self.worker_replication_http_port = config.get("worker_replication_http_port")
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e1cbfa50eb..0f166d16aa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
 
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
 from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
@@ -332,46 +331,31 @@ class ReplicationCommandHandler:
 
     def start_replication(self, hs: "HomeServer") -> None:
         """Helper method to start replication."""
-        if hs.config.redis.redis_enabled:
-            from synapse.replication.tcp.redis import (
-                RedisDirectTcpReplicationClientFactory,
-            )
+        from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
 
-            # First let's ensure that we have a ReplicationStreamer started.
-            hs.get_replication_streamer()
+        # First let's ensure that we have a ReplicationStreamer started.
+        hs.get_replication_streamer()
 
-            # We need two connections to redis, one for the subscription stream and
-            # one to send commands to (as you can't send further redis commands to a
-            # connection after SUBSCRIBE is called).
+        # We need two connections to redis, one for the subscription stream and
+        # one to send commands to (as you can't send further redis commands to a
+        # connection after SUBSCRIBE is called).
 
-            # First create the connection for sending commands.
-            outbound_redis_connection = hs.get_outbound_redis_connection()
+        # First create the connection for sending commands.
+        outbound_redis_connection = hs.get_outbound_redis_connection()
 
-            # Now create the factory/connection for the subscription stream.
-            self._factory = RedisDirectTcpReplicationClientFactory(
-                hs,
-                outbound_redis_connection,
-                channel_names=self._channels_to_subscribe_to,
-            )
-            hs.get_reactor().connectTCP(
-                hs.config.redis.redis_host,
-                hs.config.redis.redis_port,
-                self._factory,
-                timeout=30,
-                bindAddress=None,
-            )
-        else:
-            client_name = hs.get_instance_name()
-            self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
-            host = hs.config.worker.worker_replication_host
-            port = hs.config.worker.worker_replication_port
-            hs.get_reactor().connectTCP(
-                host,
-                port,
-                self._factory,
-                timeout=30,
-                bindAddress=None,
-            )
+        # Now create the factory/connection for the subscription stream.
+        self._factory = RedisDirectTcpReplicationClientFactory(
+            hs,
+            outbound_redis_connection,
+            channel_names=self._channels_to_subscribe_to,
+        )
+        hs.get_reactor().connectTCP(
+            hs.config.redis.redis_host,
+            hs.config.redis.redis_port,
+            self._factory,
+            timeout=30,
+            bindAddress=None,
+        )
 
     def get_streams(self) -> Dict[str, Stream]:
         """Get a map from stream name to all streams."""