summary refs log tree commit diff
path: root/synapse/replication/tcp/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/commands.py')
-rw-r--r--synapse/replication/tcp/commands.py341
1 files changed, 341 insertions, 0 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
new file mode 100644
index 0000000000..68165cf2dc
--- /dev/null
+++ b/synapse/replication/tcp/commands.py
@@ -0,0 +1,341 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Defines the various valid commands
+
+The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
+allowed to be sent by which side.
+"""
+
+import logging
+import ujson as json
+
+
+logger = logging.getLogger(__name__)
+
+
+class Command(object):
+    """The base command class.
+
+    All subclasses must set the NAME variable which equates to the name of the
+    command on the wire.
+
+    A full command line on the wire is constructed from `NAME + " " + to_line()`
+
+    The default implementation creates a command of form `<NAME> <data>`
+    """
+    NAME = None
+
+    def __init__(self, data):
+        self.data = data
+
+    @classmethod
+    def from_line(cls, line):
+        """Deserialises a line from the wire into this command. `line` does not
+        include the command.
+        """
+        return cls(line)
+
+    def to_line(self):
+        """Serialises the comamnd for the wire. Does not include the command
+        prefix.
+        """
+        return self.data
+
+
+class ServerCommand(Command):
+    """Sent by the server on new connection and includes the server_name.
+
+    Format::
+
+        SERVER <server_name>
+    """
+    NAME = "SERVER"
+
+
+class RdataCommand(Command):
+    """Sent by server when a subscribed stream has an update.
+
+    Format::
+
+        RDATA <stream_name> <token> <row_json>
+
+    The `<token>` may either be a numeric stream id OR "batch". The latter case
+    is used to support sending multiple updates with the same stream ID. This
+    is done by sending an RDATA for each row, with all but the last RDATA having
+    a token of "batch" and the last having the final stream ID.
+
+    The client should batch all incoming RDATA with a token of "batch" (per
+    stream_name) until it sees an RDATA with a numeric stream ID.
+
+    `<token>` of "batch" maps to the instance variable `token` being None.
+
+    An example of a batched series of RDATA::
+
+        RDATA presence batch ["@foo:example.com", "online", ...]
+        RDATA presence batch ["@bar:example.com", "online", ...]
+        RDATA presence 59 ["@baz:example.com", "online", ...]
+    """
+    NAME = "RDATA"
+
+    def __init__(self, stream_name, token, row):
+        self.stream_name = stream_name
+        self.token = token
+        self.row = row
+
+    @classmethod
+    def from_line(cls, line):
+        stream_name, token, row_json = line.split(" ", 2)
+        return cls(
+            stream_name,
+            None if token == "batch" else int(token),
+            json.loads(row_json)
+        )
+
+    def to_line(self):
+        return " ".join((
+            self.stream_name,
+            str(self.token) if self.token is not None else "batch",
+            json.dumps(self.row),
+        ))
+
+
+class PositionCommand(Command):
+    """Sent by the client to tell the client the stream postition without
+    needing to send an RDATA.
+    """
+    NAME = "POSITION"
+
+    def __init__(self, stream_name, token):
+        self.stream_name = stream_name
+        self.token = token
+
+    @classmethod
+    def from_line(cls, line):
+        stream_name, token = line.split(" ", 1)
+        return cls(stream_name, int(token))
+
+    def to_line(self):
+        return " ".join((self.stream_name, str(self.token),))
+
+
+class ErrorCommand(Command):
+    """Sent by either side if there was an ERROR. The data is a string describing
+    the error.
+    """
+    NAME = "ERROR"
+
+
+class PingCommand(Command):
+    """Sent by either side as a keep alive. The data is arbitary (often timestamp)
+    """
+    NAME = "PING"
+
+
+class NameCommand(Command):
+    """Sent by client to inform the server of the client's identity. The data
+    is the name
+    """
+    NAME = "NAME"
+
+
+class ReplicateCommand(Command):
+    """Sent by the client to subscribe to the stream.
+
+    Format::
+
+        REPLICATE <stream_name> <token>
+
+    Where <token> may be either:
+        * a numeric stream_id to stream updates from
+        * "NOW" to stream all subsequent updates.
+
+    The <stream_name> can be "ALL" to subscribe to all known streams, in which
+    case the <token> must be set to "NOW", i.e.::
+
+        REPLICATE ALL NOW
+    """
+    NAME = "REPLICATE"
+
+    def __init__(self, stream_name, token):
+        self.stream_name = stream_name
+        self.token = token
+
+    @classmethod
+    def from_line(cls, line):
+        stream_name, token = line.split(" ", 1)
+        if token in ("NOW", "now"):
+            token = "NOW"
+        else:
+            token = int(token)
+        return cls(stream_name, token)
+
+    def to_line(self):
+        return " ".join((self.stream_name, str(self.token),))
+
+
+class UserSyncCommand(Command):
+    """Sent by the client to inform the server that a user has started or
+    stopped syncing. Used to calculate presence on the master.
+
+    Format::
+
+        USER_SYNC <user_id> <state>
+
+    Where <state> is either "start" or "stop"
+    """
+    NAME = "USER_SYNC"
+
+    def __init__(self, user_id, is_syncing):
+        self.user_id = user_id
+        self.is_syncing = is_syncing
+
+    @classmethod
+    def from_line(cls, line):
+        user_id, state = line.split(" ", 1)
+
+        if state not in ("start", "end"):
+            raise Exception("Invalid USER_SYNC state %r" % (state,))
+
+        return cls(user_id, state == "start")
+
+    def to_line(self):
+        return " ".join((self.user_id, "start" if self.is_syncing else "end"))
+
+
+class FederationAckCommand(Command):
+    """Sent by the client when it has processed up to a given point in the
+    federation stream. This allows the master to drop in-memory caches of the
+    federation stream.
+
+    This must only be sent from one worker (i.e. the one sending federation)
+
+    Format::
+
+        FEDERATION_ACK <token>
+    """
+    NAME = "FEDERATION_ACK"
+
+    def __init__(self, token):
+        self.token = token
+
+    @classmethod
+    def from_line(cls, line):
+        return cls(int(line))
+
+    def to_line(self):
+        return str(self.token)
+
+
+class SyncCommand(Command):
+    """Used for testing. The client protocol implementation allows waiting
+    on a SYNC command with a specified data.
+    """
+    NAME = "SYNC"
+
+
+class RemovePusherCommand(Command):
+    """Sent by the client to request the master remove the given pusher.
+
+    Format::
+
+        REMOVE_PUSHER <app_id> <push_key> <user_id>
+    """
+    NAME = "REMOVE_PUSHER"
+
+    def __init__(self, app_id, push_key, user_id):
+        self.user_id = user_id
+        self.app_id = app_id
+        self.push_key = push_key
+
+    @classmethod
+    def from_line(cls, line):
+        app_id, push_key, user_id = line.split(" ", 2)
+
+        return cls(app_id, push_key, user_id)
+
+    def to_line(self):
+        return " ".join((self.app_id, self.push_key, self.user_id))
+
+
+class InvalidateCacheCommand(Command):
+    """Sent by the client to invalidate an upstream cache.
+
+    THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
+    NOT DISASTROUS IF WE DROP ON THE FLOOR.
+
+    Mainly used to invalidate destination retry timing caches.
+
+    Format::
+
+        INVALIDATE_CACHE <cache_func> <keys_json>
+
+    Where <keys_json> is a json list.
+    """
+    NAME = "INVALIDATE_CACHE"
+
+    def __init__(self, cache_func, keys):
+        self.cache_func = cache_func
+        self.keys = keys
+
+    @classmethod
+    def from_line(cls, line):
+        cache_func, keys_json = line.split(" ", 1)
+
+        return cls(cache_func, json.loads(keys_json))
+
+    def to_line(self):
+        return " ".join((self.cache_func, json.dumps(self.keys)))
+
+
+# Map of command name to command type.
+COMMAND_MAP = {
+    cmd.NAME: cmd
+    for cmd in (
+        ServerCommand,
+        RdataCommand,
+        PositionCommand,
+        ErrorCommand,
+        PingCommand,
+        NameCommand,
+        ReplicateCommand,
+        UserSyncCommand,
+        FederationAckCommand,
+        SyncCommand,
+        RemovePusherCommand,
+        InvalidateCacheCommand,
+    )
+}
+
+# The commands the server is allowed to send
+VALID_SERVER_COMMANDS = (
+    ServerCommand.NAME,
+    RdataCommand.NAME,
+    PositionCommand.NAME,
+    ErrorCommand.NAME,
+    PingCommand.NAME,
+    SyncCommand.NAME,
+)
+
+# The commands the client is allowed to send
+VALID_CLIENT_COMMANDS = (
+    NameCommand.NAME,
+    ReplicateCommand.NAME,
+    PingCommand.NAME,
+    UserSyncCommand.NAME,
+    FederationAckCommand.NAME,
+    RemovePusherCommand.NAME,
+    InvalidateCacheCommand.NAME,
+    ErrorCommand.NAME,
+)