diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9276ed2965..7240acb0a2 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -63,7 +63,6 @@ from twisted.python.failure import Failure
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
- COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
@@ -72,6 +71,7 @@ from synapse.replication.tcp.commands import (
PingCommand,
ReplicateCommand,
ServerCommand,
+ parse_command_from_line,
)
from synapse.types import Collection
from synapse.util import Clock
@@ -210,38 +210,24 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
linestr = line.decode("utf-8")
- # split at the first " ", handling one-word commands
- idx = linestr.index(" ")
- if idx >= 0:
- cmd_name = linestr[:idx]
- rest_of_line = linestr[idx + 1 :]
- else:
- cmd_name = linestr
- rest_of_line = ""
+ try:
+ cmd = parse_command_from_line(linestr)
+ except Exception as e:
+ logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
+ self.send_error("failed to parse line: %r (%r):" % (e, linestr))
+ return
- if cmd_name not in self.VALID_INBOUND_COMMANDS:
- logger.error("[%s] invalid command %s", self.id(), cmd_name)
- self.send_error("invalid command: %s", cmd_name)
+ if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
+ logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
+ self.send_error("invalid command: %s", cmd.NAME)
return
self.last_received_command = self.clock.time_msec()
- self.inbound_commands_counter[cmd_name] = (
- self.inbound_commands_counter[cmd_name] + 1
+ self.inbound_commands_counter[cmd.NAME] = (
+ self.inbound_commands_counter[cmd.NAME] + 1
)
- cmd_cls = COMMAND_MAP[cmd_name]
- try:
- cmd = cmd_cls.from_line(rest_of_line)
- except Exception as e:
- logger.exception(
- "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
- )
- self.send_error(
- "failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
- )
- return
-
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
|