diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 424854efbe..200f667fdf 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -18,16 +18,12 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from twisted.internet import defer
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress, IConnector
-from twisted.internet.protocol import ReconnectingClientFactory
-from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
@@ -53,7 +49,6 @@ from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
if TYPE_CHECKING:
- from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -62,52 +57,6 @@ logger = logging.getLogger(__name__)
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
-class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
- """Factory for building connections to the master. Will reconnect if the
- connection is lost.
-
- Accepts a handler that is passed to `ClientReplicationStreamProtocol`.
- """
-
- initialDelay = 0.1
- maxDelay = 1 # Try at least once every N seconds
-
- def __init__(
- self,
- hs: "HomeServer",
- client_name: str,
- command_handler: "ReplicationCommandHandler",
- ):
- self.client_name = client_name
- self.command_handler = command_handler
- self.server_name = hs.config.server.server_name
- self.hs = hs
- self._clock = hs.get_clock() # As self.clock is defined in super class
-
- hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
-
- def startedConnecting(self, connector: IConnector) -> None:
- logger.info("Connecting to replication: %r", connector.getDestination())
-
- def buildProtocol(self, addr: IAddress) -> ClientReplicationStreamProtocol:
- logger.info("Connected to replication: %r", addr)
- return ClientReplicationStreamProtocol(
- self.hs,
- self.client_name,
- self.server_name,
- self._clock,
- self.command_handler,
- )
-
- def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
- logger.error("Lost replication conn: %r", reason)
- ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
-
- def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
- logger.error("Failed to connect to replication: %r", reason)
- ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
-
-
class ReplicationDataHandler:
"""Handles incoming stream updates from replication.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d03a53d764..2290b3e6fe 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -625,23 +625,6 @@ class ReplicationCommandHandler:
self._notifier.notify_remote_server_up(cmd.data)
- # We relay to all other connections to ensure every instance gets the
- # notification.
- #
- # When configured to use redis we'll always only have one connection and
- # so this is a no-op (all instances will have already received the same
- # REMOTE_SERVER_UP command).
- #
- # For direct TCP connections this will relay to all other connections
- # connected to us. When on master this will correctly fan out to all
- # other direct TCP clients and on workers there'll only be the one
- # connection to master.
- #
- # (The logic here should also be sound if we have a mix of Redis and
- # direct TCP connections so long as there is only one traffic route
- # between two instances, but that is not currently supported).
- self.send_command(cmd, ignore_conn=conn)
-
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
self._connections.append(connection)
@@ -689,21 +672,14 @@ class ReplicationCommandHandler:
"""
return bool(self._connections)
- def send_command(
- self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None
- ) -> None:
+ def send_command(self, cmd: Command) -> None:
"""Send a command to all connected connections.
Args:
cmd
- ignore_conn: If set don't send command to the given connection.
- Used when relaying commands from one connection to all others.
"""
if self._connections:
for connection in self._connections:
- if connection == ignore_conn:
- continue
-
try:
connection.send_command(cmd)
except Exception:
|