diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e57a926032..883f2fd2ec 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -57,7 +57,6 @@ from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource
@@ -290,16 +289,6 @@ class SynapseHomeServer(HomeServer):
manhole_settings=self.config.server.manhole_settings,
manhole_globals={"hs": self},
)
- elif listener.type == "replication":
- services = listen_tcp(
- listener.bind_addresses,
- listener.port,
- ReplicationStreamProtocolFactory(self),
- )
- for s in services:
- self.get_reactor().addSystemEventTrigger(
- "before", "shutdown", s.stopListening
- )
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 085fe22c51..c91df636d9 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -36,6 +36,12 @@ from ._util import validate_config
logger = logging.Logger(__name__)
+DIRECT_TCP_ERROR = """
+Using direct TCP replication for workers is no longer supported.
+
+Please see https://matrix-org.github.io/synapse/latest/upgrade.html#direct-tcp-replication-is-no-longer-supported-migrate-to-redis
+"""
+
# by default, we attempt to listen on both '::' *and* '0.0.0.0' because some OSes
# (Windows, macOS, other BSD/Linux where net.ipv6.bindv6only is set) will only listen
# on IPv6 when '::' is set.
@@ -165,7 +171,6 @@ KNOWN_LISTENER_TYPES = {
"http",
"metrics",
"manhole",
- "replication",
}
KNOWN_RESOURCES = {
@@ -515,7 +520,9 @@ class ServerConfig(Config):
):
raise ConfigError("allowed_avatar_mimetypes must be a list")
- self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
+ self.listeners = [
+ parse_listener_def(i, x) for i, x in enumerate(config.get("listeners", []))
+ ]
# no_tls is not really supported any more, but let's grandfather it in
# here.
@@ -880,9 +887,12 @@ def read_gc_thresholds(
)
-def parse_listener_def(listener: Any) -> ListenerConfig:
+def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
"""parse a listener config from the config file"""
listener_type = listener["type"]
+ # Raise a helpful error if direct TCP replication is still configured.
+ if listener_type == "replication":
+ raise ConfigError(DIRECT_TCP_ERROR, ("listeners", str(num), "type"))
port = listener.get("port")
if not isinstance(port, int):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index f2716422b5..0fb725dd8f 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -27,7 +27,7 @@ from ._base import (
RoutableShardedWorkerHandlingConfig,
ShardedWorkerHandlingConfig,
)
-from .server import ListenerConfig, parse_listener_def
+from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
@@ -128,7 +128,8 @@ class WorkerConfig(Config):
self.worker_app = None
self.worker_listeners = [
- parse_listener_def(x) for x in config.get("worker_listeners", [])
+ parse_listener_def(i, x)
+ for i, x in enumerate(config.get("worker_listeners", []))
]
self.worker_daemonize = bool(config.get("worker_daemonize"))
self.worker_pid_file = config.get("worker_pid_file")
@@ -142,7 +143,8 @@ class WorkerConfig(Config):
self.worker_replication_host = config.get("worker_replication_host", None)
# The port on the main synapse for TCP replication
- self.worker_replication_port = config.get("worker_replication_port", None)
+ if "worker_replication_port" in config:
+ raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e1cbfa50eb..0f166d16aa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
@@ -332,46 +331,31 @@ class ReplicationCommandHandler:
def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start replication."""
- if hs.config.redis.redis_enabled:
- from synapse.replication.tcp.redis import (
- RedisDirectTcpReplicationClientFactory,
- )
+ from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
- # First let's ensure that we have a ReplicationStreamer started.
- hs.get_replication_streamer()
+ # First let's ensure that we have a ReplicationStreamer started.
+ hs.get_replication_streamer()
- # We need two connections to redis, one for the subscription stream and
- # one to send commands to (as you can't send further redis commands to a
- # connection after SUBSCRIBE is called).
+ # We need two connections to redis, one for the subscription stream and
+ # one to send commands to (as you can't send further redis commands to a
+ # connection after SUBSCRIBE is called).
- # First create the connection for sending commands.
- outbound_redis_connection = hs.get_outbound_redis_connection()
+ # First create the connection for sending commands.
+ outbound_redis_connection = hs.get_outbound_redis_connection()
- # Now create the factory/connection for the subscription stream.
- self._factory = RedisDirectTcpReplicationClientFactory(
- hs,
- outbound_redis_connection,
- channel_names=self._channels_to_subscribe_to,
- )
- hs.get_reactor().connectTCP(
- hs.config.redis.redis_host,
- hs.config.redis.redis_port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
- else:
- client_name = hs.get_instance_name()
- self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
- host = hs.config.worker.worker_replication_host
- port = hs.config.worker.worker_replication_port
- hs.get_reactor().connectTCP(
- host,
- port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
+ # Now create the factory/connection for the subscription stream.
+ self._factory = RedisDirectTcpReplicationClientFactory(
+ hs,
+ outbound_redis_connection,
+ channel_names=self._channels_to_subscribe_to,
+ )
+ hs.get_reactor().connectTCP(
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
+ timeout=30,
+ bindAddress=None,
+ )
def get_streams(self) -> Dict[str, Stream]:
"""Get a map from stream name to all streams."""
|