diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 55630ba9a7..02e5bf6cc8 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -223,14 +223,25 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
return
# Now lets try and call on_<CMD_NAME> function
- try:
- run_as_background_process(
- "replication-" + cmd.get_logcontext_id(),
- getattr(self, "on_%s" % (cmd_name,)),
- cmd,
- )
- except Exception:
- logger.exception("[%s] Failed to handle line: %r", self.id(), line)
+ run_as_background_process(
+ "replication-" + cmd.get_logcontext_id(),
+ self.handle_command,
+ cmd,
+ )
+
+ def handle_command(self, cmd):
+ """Handle a command we have received over the replication stream.
+
+ By default delegates to on_<COMMAND>
+
+ Args:
+ cmd (synapse.replication.tcp.commands.Command): received command
+
+ Returns:
+ Deferred
+ """
+ handler = getattr(self, "on_%s" % (cmd.NAME,))
+ return handler(cmd)
def close(self):
logger.warn("[%s] Closing connection", self.id())
@@ -364,8 +375,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.transport.unregisterProducer()
def __str__(self):
+ addr = None
+ if self.transport:
+ addr = str(self.transport.getPeer())
return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
- self.name, self.conn_id, self.addr,
+ self.name, self.conn_id, addr,
)
def id(self):
@@ -381,12 +395,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
- def __init__(self, server_name, clock, streamer, addr):
+ def __init__(self, server_name, clock, streamer):
BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
self.server_name = server_name
self.streamer = streamer
- self.addr = addr
# The streams the client has subscribed to and is up to date with
self.replication_streams = set()
|