diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 2098c32a77..0ff2a7199f 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -23,9 +23,11 @@ import platform
if platform.python_implementation() == "PyPy":
import json
+
_json_encoder = json.JSONEncoder()
else:
import simplejson as json
+
_json_encoder = json.JSONEncoder(namedtuple_as_object=False)
logger = logging.getLogger(__name__)
@@ -41,6 +43,7 @@ class Command(object):
The default implementation creates a command of form `<NAME> <data>`
"""
+
NAME = None
def __init__(self, data):
@@ -73,6 +76,7 @@ class ServerCommand(Command):
SERVER <server_name>
"""
+
NAME = "SERVER"
@@ -99,6 +103,7 @@ class RdataCommand(Command):
RDATA presence batch ["@bar:example.com", "online", ...]
RDATA presence 59 ["@baz:example.com", "online", ...]
"""
+
NAME = "RDATA"
def __init__(self, stream_name, token, row):
@@ -110,17 +115,17 @@ class RdataCommand(Command):
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
return cls(
- stream_name,
- None if token == "batch" else int(token),
- json.loads(row_json)
+ stream_name, None if token == "batch" else int(token), json.loads(row_json)
)
def to_line(self):
- return " ".join((
- self.stream_name,
- str(self.token) if self.token is not None else "batch",
- _json_encoder.encode(self.row),
- ))
+ return " ".join(
+ (
+ self.stream_name,
+ str(self.token) if self.token is not None else "batch",
+ _json_encoder.encode(self.row),
+ )
+ )
def get_logcontext_id(self):
return "RDATA-" + self.stream_name
@@ -133,6 +138,7 @@ class PositionCommand(Command):
Sent to the client after all missing updates for a stream have been sent
to the client and they're now up to date.
"""
+
NAME = "POSITION"
def __init__(self, stream_name, token):
@@ -145,19 +151,21 @@ class PositionCommand(Command):
return cls(stream_name, int(token))
def to_line(self):
- return " ".join((self.stream_name, str(self.token),))
+ return " ".join((self.stream_name, str(self.token)))
class ErrorCommand(Command):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
"""
+
NAME = "ERROR"
class PingCommand(Command):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""
+
NAME = "PING"
@@ -165,6 +173,7 @@ class NameCommand(Command):
"""Sent by client to inform the server of the client's identity. The data
is the name
"""
+
NAME = "NAME"
@@ -184,6 +193,7 @@ class ReplicateCommand(Command):
REPLICATE ALL NOW
"""
+
NAME = "REPLICATE"
def __init__(self, stream_name, token):
@@ -200,7 +210,7 @@ class ReplicateCommand(Command):
return cls(stream_name, token)
def to_line(self):
- return " ".join((self.stream_name, str(self.token),))
+ return " ".join((self.stream_name, str(self.token)))
def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
@@ -218,6 +228,7 @@ class UserSyncCommand(Command):
Where <state> is either "start" or "stop"
"""
+
NAME = "USER_SYNC"
def __init__(self, user_id, is_syncing, last_sync_ms):
@@ -235,9 +246,13 @@ class UserSyncCommand(Command):
return cls(user_id, state == "start", int(last_sync_ms))
def to_line(self):
- return " ".join((
- self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
- ))
+ return " ".join(
+ (
+ self.user_id,
+ "start" if self.is_syncing else "end",
+ str(self.last_sync_ms),
+ )
+ )
class FederationAckCommand(Command):
@@ -251,6 +266,7 @@ class FederationAckCommand(Command):
FEDERATION_ACK <token>
"""
+
NAME = "FEDERATION_ACK"
def __init__(self, token):
@@ -268,6 +284,7 @@ class SyncCommand(Command):
"""Used for testing. The client protocol implementation allows waiting
on a SYNC command with a specified data.
"""
+
NAME = "SYNC"
@@ -278,6 +295,7 @@ class RemovePusherCommand(Command):
REMOVE_PUSHER <app_id> <push_key> <user_id>
"""
+
NAME = "REMOVE_PUSHER"
def __init__(self, app_id, push_key, user_id):
@@ -309,6 +327,7 @@ class InvalidateCacheCommand(Command):
Where <keys_json> is a json list.
"""
+
NAME = "INVALIDATE_CACHE"
def __init__(self, cache_func, keys):
@@ -322,9 +341,7 @@ class InvalidateCacheCommand(Command):
return cls(cache_func, json.loads(keys_json))
def to_line(self):
- return " ".join((
- self.cache_func, _json_encoder.encode(self.keys),
- ))
+ return " ".join((self.cache_func, _json_encoder.encode(self.keys)))
class UserIpCommand(Command):
@@ -334,6 +351,7 @@ class UserIpCommand(Command):
USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
"""
+
NAME = "USER_IP"
def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
@@ -350,15 +368,22 @@ class UserIpCommand(Command):
access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
- return cls(
- user_id, access_token, ip, user_agent, device_id, last_seen
- )
+ return cls(user_id, access_token, ip, user_agent, device_id, last_seen)
def to_line(self):
- return self.user_id + " " + _json_encoder.encode((
- self.access_token, self.ip, self.user_agent, self.device_id,
- self.last_seen,
- ))
+ return (
+ self.user_id
+ + " "
+ + _json_encoder.encode(
+ (
+ self.access_token,
+ self.ip,
+ self.user_agent,
+ self.device_id,
+ self.last_seen,
+ )
+ )
+ )
# Map of command name to command type.
|