diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 424854efbe..200f667fdf 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -18,16 +18,12 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from twisted.internet import defer
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress, IConnector
-from twisted.internet.protocol import ReconnectingClientFactory
-from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
@@ -53,7 +49,6 @@ from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
if TYPE_CHECKING:
- from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -62,52 +57,6 @@ logger = logging.getLogger(__name__)
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
-class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
- """Factory for building connections to the master. Will reconnect if the
- connection is lost.
-
- Accepts a handler that is passed to `ClientReplicationStreamProtocol`.
- """
-
- initialDelay = 0.1
- maxDelay = 1 # Try at least once every N seconds
-
- def __init__(
- self,
- hs: "HomeServer",
- client_name: str,
- command_handler: "ReplicationCommandHandler",
- ):
- self.client_name = client_name
- self.command_handler = command_handler
- self.server_name = hs.config.server.server_name
- self.hs = hs
- self._clock = hs.get_clock() # As self.clock is defined in super class
-
- hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
-
- def startedConnecting(self, connector: IConnector) -> None:
- logger.info("Connecting to replication: %r", connector.getDestination())
-
- def buildProtocol(self, addr: IAddress) -> ClientReplicationStreamProtocol:
- logger.info("Connected to replication: %r", addr)
- return ClientReplicationStreamProtocol(
- self.hs,
- self.client_name,
- self.server_name,
- self._clock,
- self.command_handler,
- )
-
- def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
- logger.error("Lost replication conn: %r", reason)
- ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
-
- def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
- logger.error("Failed to connect to replication: %r", reason)
- ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
-
-
class ReplicationDataHandler:
"""Handles incoming stream updates from replication.
|