summary refs log tree commit diff
path: root/synapse/replication/tcp/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/commands.py')
-rw-r--r--synapse/replication/tcp/commands.py71
1 files changed, 48 insertions, 23 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 2098c32a77..0ff2a7199f 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -23,9 +23,11 @@ import platform
 
 if platform.python_implementation() == "PyPy":
     import json
+
     _json_encoder = json.JSONEncoder()
 else:
     import simplejson as json
+
     _json_encoder = json.JSONEncoder(namedtuple_as_object=False)
 
 logger = logging.getLogger(__name__)
@@ -41,6 +43,7 @@ class Command(object):
 
     The default implementation creates a command of form `<NAME> <data>`
     """
+
     NAME = None
 
     def __init__(self, data):
@@ -73,6 +76,7 @@ class ServerCommand(Command):
 
         SERVER <server_name>
     """
+
     NAME = "SERVER"
 
 
@@ -99,6 +103,7 @@ class RdataCommand(Command):
         RDATA presence batch ["@bar:example.com", "online", ...]
         RDATA presence 59 ["@baz:example.com", "online", ...]
     """
+
     NAME = "RDATA"
 
     def __init__(self, stream_name, token, row):
@@ -110,17 +115,17 @@ class RdataCommand(Command):
     def from_line(cls, line):
         stream_name, token, row_json = line.split(" ", 2)
         return cls(
-            stream_name,
-            None if token == "batch" else int(token),
-            json.loads(row_json)
+            stream_name, None if token == "batch" else int(token), json.loads(row_json)
         )
 
     def to_line(self):
-        return " ".join((
-            self.stream_name,
-            str(self.token) if self.token is not None else "batch",
-            _json_encoder.encode(self.row),
-        ))
+        return " ".join(
+            (
+                self.stream_name,
+                str(self.token) if self.token is not None else "batch",
+                _json_encoder.encode(self.row),
+            )
+        )
 
     def get_logcontext_id(self):
         return "RDATA-" + self.stream_name
@@ -133,6 +138,7 @@ class PositionCommand(Command):
     Sent to the client after all missing updates for a stream have been sent
     to the client and they're now up to date.
     """
+
     NAME = "POSITION"
 
     def __init__(self, stream_name, token):
@@ -145,19 +151,21 @@ class PositionCommand(Command):
         return cls(stream_name, int(token))
 
     def to_line(self):
-        return " ".join((self.stream_name, str(self.token),))
+        return " ".join((self.stream_name, str(self.token)))
 
 
 class ErrorCommand(Command):
     """Sent by either side if there was an ERROR. The data is a string describing
     the error.
     """
+
     NAME = "ERROR"
 
 
 class PingCommand(Command):
     """Sent by either side as a keep alive. The data is arbitary (often timestamp)
     """
+
     NAME = "PING"
 
 
@@ -165,6 +173,7 @@ class NameCommand(Command):
     """Sent by client to inform the server of the client's identity. The data
     is the name
     """
+
     NAME = "NAME"
 
 
@@ -184,6 +193,7 @@ class ReplicateCommand(Command):
 
         REPLICATE ALL NOW
     """
+
     NAME = "REPLICATE"
 
     def __init__(self, stream_name, token):
@@ -200,7 +210,7 @@ class ReplicateCommand(Command):
         return cls(stream_name, token)
 
     def to_line(self):
-        return " ".join((self.stream_name, str(self.token),))
+        return " ".join((self.stream_name, str(self.token)))
 
     def get_logcontext_id(self):
         return "REPLICATE-" + self.stream_name
@@ -218,6 +228,7 @@ class UserSyncCommand(Command):
 
     Where <state> is either "start" or "stop"
     """
+
     NAME = "USER_SYNC"
 
     def __init__(self, user_id, is_syncing, last_sync_ms):
@@ -235,9 +246,13 @@ class UserSyncCommand(Command):
         return cls(user_id, state == "start", int(last_sync_ms))
 
     def to_line(self):
-        return " ".join((
-            self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
-        ))
+        return " ".join(
+            (
+                self.user_id,
+                "start" if self.is_syncing else "end",
+                str(self.last_sync_ms),
+            )
+        )
 
 
 class FederationAckCommand(Command):
@@ -251,6 +266,7 @@ class FederationAckCommand(Command):
 
         FEDERATION_ACK <token>
     """
+
     NAME = "FEDERATION_ACK"
 
     def __init__(self, token):
@@ -268,6 +284,7 @@ class SyncCommand(Command):
     """Used for testing. The client protocol implementation allows waiting
     on a SYNC command with a specified data.
     """
+
     NAME = "SYNC"
 
 
@@ -278,6 +295,7 @@ class RemovePusherCommand(Command):
 
         REMOVE_PUSHER <app_id> <push_key> <user_id>
     """
+
     NAME = "REMOVE_PUSHER"
 
     def __init__(self, app_id, push_key, user_id):
@@ -309,6 +327,7 @@ class InvalidateCacheCommand(Command):
 
     Where <keys_json> is a json list.
     """
+
     NAME = "INVALIDATE_CACHE"
 
     def __init__(self, cache_func, keys):
@@ -322,9 +341,7 @@ class InvalidateCacheCommand(Command):
         return cls(cache_func, json.loads(keys_json))
 
     def to_line(self):
-        return " ".join((
-            self.cache_func, _json_encoder.encode(self.keys),
-        ))
+        return " ".join((self.cache_func, _json_encoder.encode(self.keys)))
 
 
 class UserIpCommand(Command):
@@ -334,6 +351,7 @@ class UserIpCommand(Command):
 
         USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
     """
+
     NAME = "USER_IP"
 
     def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
@@ -350,15 +368,22 @@ class UserIpCommand(Command):
 
         access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
 
-        return cls(
-            user_id, access_token, ip, user_agent, device_id, last_seen
-        )
+        return cls(user_id, access_token, ip, user_agent, device_id, last_seen)
 
     def to_line(self):
-        return self.user_id + " " + _json_encoder.encode((
-            self.access_token, self.ip, self.user_agent, self.device_id,
-            self.last_seen,
-        ))
+        return (
+            self.user_id
+            + " "
+            + _json_encoder.encode(
+                (
+                    self.access_token,
+                    self.ip,
+                    self.user_agent,
+                    self.device_id,
+                    self.last_seen,
+                )
+            )
+        )
 
 
 # Map of command name to command type.