summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-04-22 13:07:41 +0100
committerGitHub <noreply@github.com>2020-04-22 13:07:41 +0100
commit51f7eaf908a84fcaf231899e2bf1beae14ae72c0 (patch)
treee7e14016f42d021542d1703415cf6e06804310eb /synapse/replication/tcp/protocol.py
parentReduce logging verbosity of URL cache cleanup. (#7295) (diff)
downloadsynapse-51f7eaf908a84fcaf231899e2bf1beae14ae72c0.tar.xz
Add ability to run replication protocol over redis. (#7040)
This is configured via the `redis` config options.
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py38
1 files changed, 12 insertions, 26 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9276ed2965..7240acb0a2 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -63,7 +63,6 @@ from twisted.python.failure import Failure
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.commands import (
-    COMMAND_MAP,
     VALID_CLIENT_COMMANDS,
     VALID_SERVER_COMMANDS,
     Command,
@@ -72,6 +71,7 @@ from synapse.replication.tcp.commands import (
     PingCommand,
     ReplicateCommand,
     ServerCommand,
+    parse_command_from_line,
 )
 from synapse.types import Collection
 from synapse.util import Clock
@@ -210,38 +210,24 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         linestr = line.decode("utf-8")
 
-        # split at the first " ", handling one-word commands
-        idx = linestr.index(" ")
-        if idx >= 0:
-            cmd_name = linestr[:idx]
-            rest_of_line = linestr[idx + 1 :]
-        else:
-            cmd_name = linestr
-            rest_of_line = ""
+        try:
+            cmd = parse_command_from_line(linestr)
+        except Exception as e:
+            logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
+            self.send_error("failed to parse line: %r (%r):" % (e, linestr))
+            return
 
-        if cmd_name not in self.VALID_INBOUND_COMMANDS:
-            logger.error("[%s] invalid command %s", self.id(), cmd_name)
-            self.send_error("invalid command: %s", cmd_name)
+        if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
+            logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
+            self.send_error("invalid command: %s", cmd.NAME)
             return
 
         self.last_received_command = self.clock.time_msec()
 
-        self.inbound_commands_counter[cmd_name] = (
-            self.inbound_commands_counter[cmd_name] + 1
+        self.inbound_commands_counter[cmd.NAME] = (
+            self.inbound_commands_counter[cmd.NAME] + 1
         )
 
-        cmd_cls = COMMAND_MAP[cmd_name]
-        try:
-            cmd = cmd_cls.from_line(rest_of_line)
-        except Exception as e:
-            logger.exception(
-                "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
-            )
-            self.send_error(
-                "failed to parse line for  %r: %r (%r):" % (cmd_name, e, rest_of_line)
-            )
-            return
-
         # Now lets try and call on_<CMD_NAME> function
         run_as_background_process(
             "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd