From 92312aa3e6353be91585c5941b7740f9aa9ae037 Mon Sep 17 00:00:00 2001 From: Benedict Lau Date: Wed, 1 Mar 2017 01:30:11 -0500 Subject: Clarify doc for SQLite to PostgreSQL port --- docs/postgres.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'docs') diff --git a/docs/postgres.rst b/docs/postgres.rst index 402ff9a4de..b592801e93 100644 --- a/docs/postgres.rst +++ b/docs/postgres.rst @@ -112,9 +112,9 @@ script one last time, e.g. if the SQLite database is at ``homeserver.db`` run:: synapse_port_db --sqlite-database homeserver.db \ - --postgres-config database_config.yaml + --postgres-config homeserver-postgres.yaml Once that has completed, change the synapse config to point at the PostgreSQL -database configuration file using the ``database_config`` parameter (see -`Synapse Config`_) and restart synapse. Synapse should now be running against +database configuration file ``homeserver-postgres.yaml`` (i.e. rename it to +``homeserver.yaml``) and restart synapse. Synapse should now be running against PostgreSQL. -- cgit 1.4.1 From e0ff66251f71cb46aea30a187edc1dc027760b9e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 15 Mar 2017 12:22:18 +0000 Subject: add setting (on by default) to support TURN for guests --- docs/turn-howto.rst | 38 ++++++++++++++++++++++++++++++++++---- synapse/config/voip.py | 8 ++++++++ synapse/rest/client/v1/voip.py | 5 ++++- 3 files changed, 46 insertions(+), 5 deletions(-) (limited to 'docs') diff --git a/docs/turn-howto.rst b/docs/turn-howto.rst index 04c0100715..e48628ce6e 100644 --- a/docs/turn-howto.rst +++ b/docs/turn-howto.rst @@ -50,14 +50,37 @@ You may be able to setup coturn via your package manager, or set it up manually pwgen -s 64 1 - 5. Ensure youe firewall allows traffic into the TURN server on + 5. Consider your security settings. TURN lets users request a relay + which will connect to arbitrary IP addresses and ports. At the least + we recommend: + + # VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay. + no-tcp-relay + + # don't let the relay ever try to connect to private IP address ranges within your network (if any) + # given the turn server is likely behind your firewall, remember to include any privileged public IPs too. + denied-peer-ip=10.0.0.0-10.255.255.255 + denied-peer-ip=192.168.0.0-192.168.255.255 + denied-peer-ip=172.16.0.0-172.31.255.255 + + # special case the turn server itself so that client->TURN->TURN->client flows work + allowed-peer-ip=10.0.0.1 + + # consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS. + user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user. + total-quota=1200 + + Ideally coturn should refuse to relay traffic which isn't SRTP; + see https://github.com/matrix-org/synapse/issues/2009 + + 6. Ensure your firewall allows traffic into the TURN server on the ports you've configured it to listen on (remember to allow - both TCP and UDP if you've enabled both). + both TCP and UDP TURN traffic) - 6. If you've configured coturn to support TLS/DTLS, generate or + 7. If you've configured coturn to support TLS/DTLS, generate or import your private key and certificate. - 7. Start the turn server:: + 8. Start the turn server:: bin/turnserver -o @@ -83,12 +106,19 @@ Your home server configuration file needs the following extra keys: to refresh credentials. The TURN REST API specification recommends one day (86400000). + 4. "turn_allow_guests": Whether to allow guest users to use the TURN + server. This is enabled by default, as otherwise VoIP will not + work reliably for guests. However, it does introduce a security risk + as it lets guests connect to arbitrary endpoints without having gone + through a CAPTCHA or similar to register a real account. + As an example, here is the relevant section of the config file for matrix.org:: turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ] turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons turn_user_lifetime: 86400000 + turn_allow_guests: True Now, restart synapse:: diff --git a/synapse/config/voip.py b/synapse/config/voip.py index eeb693027b..c93c92d177 100644 --- a/synapse/config/voip.py +++ b/synapse/config/voip.py @@ -23,6 +23,7 @@ class VoipConfig(Config): self.turn_username = config.get("turn_username") self.turn_password = config.get("turn_password") self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"]) + self.turn_allow_guests = config.get("turn_allow_guests") or True def default_config(self, **kwargs): return """\ @@ -41,4 +42,11 @@ class VoipConfig(Config): # How long generated TURN credentials last turn_user_lifetime: "1h" + + # Whether guests should be allowed to use the TURN server. + # This is defaults to True, otherwise VoIP will be unreliable for guests. + # However, it does introduce a slight security risk as it allows users to + # connect to arbitrary endpoints without having first signed up for a + # valid account (e.g. by passing a CAPTCHA). + turn_allow_guests: True """ diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index 03141c623c..c43b30b73a 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -28,7 +28,10 @@ class VoipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - requester = yield self.auth.get_user_by_req(request) + requester = yield self.auth.get_user_by_req( + request, + self.hs.config.turn_allow_guests + ) turnUris = self.hs.config.turn_uris turnSecret = self.hs.config.turn_shared_secret -- cgit 1.4.1 From 74506934356dcb10b1704e3e66d4648e99ba6308 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:40:37 +0100 Subject: Initial TCP protocol implementation This defines the low level TCP replication protocol --- docs/tcp_replication.rst | 174 +++++++++++ synapse/replication/tcp/__init__.py | 32 ++ synapse/replication/tcp/commands.py | 341 ++++++++++++++++++++ synapse/replication/tcp/protocol.py | 601 ++++++++++++++++++++++++++++++++++++ 4 files changed, 1148 insertions(+) create mode 100644 docs/tcp_replication.rst create mode 100644 synapse/replication/tcp/commands.py create mode 100644 synapse/replication/tcp/protocol.py (limited to 'docs') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst new file mode 100644 index 0000000000..946add2849 --- /dev/null +++ b/docs/tcp_replication.rst @@ -0,0 +1,174 @@ +TCP Replication +=============== + +This describes the TCP replication protocol that replaces the HTTP protocol. + +Motivation +---------- + +The HTTP API used long poll from the workers to the master, this has the problem +of causing a lot of duplicate work on the server. This TCP protocol aims to +solve. + +Overview +-------- + +The protocol is based on fire and forget, line based commands. An example flow +would be (where '>' indicates master->worker and '<' worker->master flows):: + + > SERVER example.com + < REPLICATE events 53 + > RDATA events 54 ["$foo1:bar.com", ...] + > RDATA events 55 ["$foo4:bar.com", ...] + +The example shows the server accepting a new connection and sending its identity +with the ``SERVER`` command, followed by the client asking to subscribe to the +``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` +commands which have the format ``RDATA ```, where the +format of ```` is defined by the individual streams. + +Error reporting happens by either the client or server sending an `ERROR` +command, and usually the connection will be closed. + + +Since the protocol is a simple line based, its possible to manually connect to +the server using a tool like netcat. A few things should be noted when manually +using the protocol: + * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can + be used to get all future updates. The special stream name ``ALL`` can be used + with ``NOW`` to subscribe to all available streams. + * The federation stream is only available if federation sending has been + disabled on the main process. + * The server will only time connections out that have sent a ``PING`` command. + If a ping is sent then the connection will be closed if no further commands + are receieved within 15s. Both the client and server protocol implementations + will send an initial PING on connection and ensure at least one command every + 5s is sent (not necessarily ``PING``). + * ``RDATA`` commands *usually* include a numeric token, however if the stream + has multiple rows to replicate per token the server will send multiple + ``RDATA`` commands, with all but the last having a token of ``batch``. See + the documentation on ``commands.RdataCommand`` for further details. + + +Architecture +------------ + +The basic structure of the protocol is line based, where the initial word of +each line specifies the command. The rest of the line is parsed based on the +command. For example, the `RDATA` command is defined as:: + + RDATA + +(Note that `` may contains spaces, but cannot contain newlines.) + +Blank lines are ignored. + + +Keep alives +~~~~~~~~~~~ + +Both sides are expected to send at least one command every 5s or so, and +should send a ``PING`` command if necessary. If either side do not receive a +command within e.g. 15s then the connection should be closed. + +Because the server may be connected to manually using e.g. netcat, the timeouts +aren't enabled until an initial ``PING`` command is seen. Both the client and +server implementations below send a ``PING`` command immediately on connection to +ensure the timeouts are enabled. + +This ensures that both sides can quickly realize if the tcp connection has gone +and handle the situation appropriately. + + +Start up +~~~~~~~~ + +When a new connection is made, the server: + * Sends a ``SERVER`` command, which includes the identity of the server, allowing + the client to detect if its connected to the expected server + * Sends a ``PING`` command as above, to enable the client to time out connections + promptly. + +The client: + * Sends a ``NAME`` command, allowing the server to associate a human friendly + name with the connection. This is optional. + * Sends a ``PING`` as above + * For each stream the client wishes to subscribe to it sends a ``REPLICATE`` + with the stream_name and token it wants to subscribe from. + * On receipt of a ``SERVER`` command, checks that the server name matches the + expected server name. + + +Error handling +~~~~~~~~~~~~~~ + +If either side detects an error it can send an ``ERROR`` command and close the +connection. + +If the client side loses the connection to the server it should reconnect, +following the steps above. + + +Congestion +~~~~~~~~~~ + +If the server sends messages faster than the client can consume them the server +will first buffer a (fairly large) number of commands and then disconnect the +client. This ensures that we don't queue up an unbounded number of commands in +memory and gives us a potential oppurtunity to squawk loudly. When/if the client +recovers it can reconnect to the server and ask for missed messages. + + +Reliability +~~~~~~~~~~~ + +In general the replication stream should be consisdered an unreliable transport +since e.g. commands are not resent if the connection disappears. + +The exception to that are the replication streams, i.e. RDATA commands, since +these include tokens which can be used to restart the stream on connection +errors. + +The client should keep track of the token in the last RDATA command received +for each stream so that on reconneciton it can start streaming from the correct +place. Note: not all RDATA have valid tokens due to batching. See +``RdataCommand`` for more details. + + +Example +~~~~~~~ + +An example iteraction is shown below. Each line is prefixed with '>' or '<' to +indicate which side is sending, these are *not* included on the wire:: + + * connection established * + > SERVER localhost:8823 + > PING 1490197665618 + < NAME synapse.app.appservice + < PING 1490197665618 + < REPLICATE events 1 + < REPLICATE backfill 1 + < REPLICATE caches 1 + > POSITION events 1 + > POSITION backfill 1 + > POSITION caches 1 + > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] + > RDATA events 14 ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + < PING 1490197675618 + > ERROR server stopping + * connection closed by server * + +The ``POSITION`` command sent by the server is used to set the clients position +without needing to send data with the ``RDATA`` command. + + +An example of a batched set of ``RDATA`` is:: + + > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513] + > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513] + > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513] + > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513] + +In this case the client shouldn't advance their caches token until it sees the +the last ``RDATA``. diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 451dae3b6c..8b4e886d4e 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -12,3 +12,35 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +"""This module implements the TCP replication protocol used by synapse to +communicate between the master process and its workers (when they're enabled). + +The protocol is based on fire and forget, line based commands. An example flow +would be (where '>' indicates master->worker and '<' worker->master flows):: + + > SERVER example.com + < REPLICATE events 53 + > RDATA events 54 ["$foo1:bar.com", ...] + > RDATA events 55 ["$foo4:bar.com", ...] + +The example shows the server accepting a new connection and sending its identity +with the `SERVER` command, followed by the client asking to subscribe to the +`events` stream from the token `53`. The server then periodically sends `RDATA` +commands which have the format `RDATA `, where the +format of `` is defined by the individual streams. + +Error reporting happens by either the client or server sending an `ERROR` +command, and usually the connection will be closed. + + +Structure of the module: + * client.py - the client classes used for workers to connect to master + * command.py - the definitions of all the valid commands + * protocol.py - contains bot the client and server protocol implementations, + these should not be used directly + * resource.py - the server classes that accepts and handle client connections + * streams.py - the definitons of all the valid streams + +Further details can be found in docs/tcp_replication.rst +""" diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py new file mode 100644 index 0000000000..68165cf2dc --- /dev/null +++ b/synapse/replication/tcp/commands.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the various valid commands + +The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are +allowed to be sent by which side. +""" + +import logging +import ujson as json + + +logger = logging.getLogger(__name__) + + +class Command(object): + """The base command class. + + All subclasses must set the NAME variable which equates to the name of the + command on the wire. + + A full command line on the wire is constructed from `NAME + " " + to_line()` + + The default implementation creates a command of form ` ` + """ + NAME = None + + def __init__(self, data): + self.data = data + + @classmethod + def from_line(cls, line): + """Deserialises a line from the wire into this command. `line` does not + include the command. + """ + return cls(line) + + def to_line(self): + """Serialises the comamnd for the wire. Does not include the command + prefix. + """ + return self.data + + +class ServerCommand(Command): + """Sent by the server on new connection and includes the server_name. + + Format:: + + SERVER + """ + NAME = "SERVER" + + +class RdataCommand(Command): + """Sent by server when a subscribed stream has an update. + + Format:: + + RDATA + + The `` may either be a numeric stream id OR "batch". The latter case + is used to support sending multiple updates with the same stream ID. This + is done by sending an RDATA for each row, with all but the last RDATA having + a token of "batch" and the last having the final stream ID. + + The client should batch all incoming RDATA with a token of "batch" (per + stream_name) until it sees an RDATA with a numeric stream ID. + + `` of "batch" maps to the instance variable `token` being None. + + An example of a batched series of RDATA:: + + RDATA presence batch ["@foo:example.com", "online", ...] + RDATA presence batch ["@bar:example.com", "online", ...] + RDATA presence 59 ["@baz:example.com", "online", ...] + """ + NAME = "RDATA" + + def __init__(self, stream_name, token, row): + self.stream_name = stream_name + self.token = token + self.row = row + + @classmethod + def from_line(cls, line): + stream_name, token, row_json = line.split(" ", 2) + return cls( + stream_name, + None if token == "batch" else int(token), + json.loads(row_json) + ) + + def to_line(self): + return " ".join(( + self.stream_name, + str(self.token) if self.token is not None else "batch", + json.dumps(self.row), + )) + + +class PositionCommand(Command): + """Sent by the client to tell the client the stream postition without + needing to send an RDATA. + """ + NAME = "POSITION" + + def __init__(self, stream_name, token): + self.stream_name = stream_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, token = line.split(" ", 1) + return cls(stream_name, int(token)) + + def to_line(self): + return " ".join((self.stream_name, str(self.token),)) + + +class ErrorCommand(Command): + """Sent by either side if there was an ERROR. The data is a string describing + the error. + """ + NAME = "ERROR" + + +class PingCommand(Command): + """Sent by either side as a keep alive. The data is arbitary (often timestamp) + """ + NAME = "PING" + + +class NameCommand(Command): + """Sent by client to inform the server of the client's identity. The data + is the name + """ + NAME = "NAME" + + +class ReplicateCommand(Command): + """Sent by the client to subscribe to the stream. + + Format:: + + REPLICATE + + Where may be either: + * a numeric stream_id to stream updates from + * "NOW" to stream all subsequent updates. + + The can be "ALL" to subscribe to all known streams, in which + case the must be set to "NOW", i.e.:: + + REPLICATE ALL NOW + """ + NAME = "REPLICATE" + + def __init__(self, stream_name, token): + self.stream_name = stream_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, token = line.split(" ", 1) + if token in ("NOW", "now"): + token = "NOW" + else: + token = int(token) + return cls(stream_name, token) + + def to_line(self): + return " ".join((self.stream_name, str(self.token),)) + + +class UserSyncCommand(Command): + """Sent by the client to inform the server that a user has started or + stopped syncing. Used to calculate presence on the master. + + Format:: + + USER_SYNC + + Where is either "start" or "stop" + """ + NAME = "USER_SYNC" + + def __init__(self, user_id, is_syncing): + self.user_id = user_id + self.is_syncing = is_syncing + + @classmethod + def from_line(cls, line): + user_id, state = line.split(" ", 1) + + if state not in ("start", "end"): + raise Exception("Invalid USER_SYNC state %r" % (state,)) + + return cls(user_id, state == "start") + + def to_line(self): + return " ".join((self.user_id, "start" if self.is_syncing else "end")) + + +class FederationAckCommand(Command): + """Sent by the client when it has processed up to a given point in the + federation stream. This allows the master to drop in-memory caches of the + federation stream. + + This must only be sent from one worker (i.e. the one sending federation) + + Format:: + + FEDERATION_ACK + """ + NAME = "FEDERATION_ACK" + + def __init__(self, token): + self.token = token + + @classmethod + def from_line(cls, line): + return cls(int(line)) + + def to_line(self): + return str(self.token) + + +class SyncCommand(Command): + """Used for testing. The client protocol implementation allows waiting + on a SYNC command with a specified data. + """ + NAME = "SYNC" + + +class RemovePusherCommand(Command): + """Sent by the client to request the master remove the given pusher. + + Format:: + + REMOVE_PUSHER + """ + NAME = "REMOVE_PUSHER" + + def __init__(self, app_id, push_key, user_id): + self.user_id = user_id + self.app_id = app_id + self.push_key = push_key + + @classmethod + def from_line(cls, line): + app_id, push_key, user_id = line.split(" ", 2) + + return cls(app_id, push_key, user_id) + + def to_line(self): + return " ".join((self.app_id, self.push_key, self.user_id)) + + +class InvalidateCacheCommand(Command): + """Sent by the client to invalidate an upstream cache. + + THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE + NOT DISASTROUS IF WE DROP ON THE FLOOR. + + Mainly used to invalidate destination retry timing caches. + + Format:: + + INVALIDATE_CACHE + + Where is a json list. + """ + NAME = "INVALIDATE_CACHE" + + def __init__(self, cache_func, keys): + self.cache_func = cache_func + self.keys = keys + + @classmethod + def from_line(cls, line): + cache_func, keys_json = line.split(" ", 1) + + return cls(cache_func, json.loads(keys_json)) + + def to_line(self): + return " ".join((self.cache_func, json.dumps(self.keys))) + + +# Map of command name to command type. +COMMAND_MAP = { + cmd.NAME: cmd + for cmd in ( + ServerCommand, + RdataCommand, + PositionCommand, + ErrorCommand, + PingCommand, + NameCommand, + ReplicateCommand, + UserSyncCommand, + FederationAckCommand, + SyncCommand, + RemovePusherCommand, + InvalidateCacheCommand, + ) +} + +# The commands the server is allowed to send +VALID_SERVER_COMMANDS = ( + ServerCommand.NAME, + RdataCommand.NAME, + PositionCommand.NAME, + ErrorCommand.NAME, + PingCommand.NAME, + SyncCommand.NAME, +) + +# The commands the client is allowed to send +VALID_CLIENT_COMMANDS = ( + NameCommand.NAME, + ReplicateCommand.NAME, + PingCommand.NAME, + UserSyncCommand.NAME, + FederationAckCommand.NAME, + RemovePusherCommand.NAME, + InvalidateCacheCommand.NAME, + ErrorCommand.NAME, +) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py new file mode 100644 index 0000000000..c1dc91bdb7 --- /dev/null +++ b/synapse/replication/tcp/protocol.py @@ -0,0 +1,601 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains the implementation of both the client and server +protocols. + +The basic structure of the protocol is line based, where the initial word of +each line specifies the command. The rest of the line is parsed based on the +command. For example, the `RDATA` command is defined as:: + + RDATA + +(Note that `` may contains spaces, but cannot contain newlines.) + +Blank lines are ignored. + +# Example + +An example iteraction is shown below. Each line is prefixed with '>' or '<' to +indicate which side is sending, these are *not* included on the wire:: + + * connection established * + > SERVER localhost:8823 + > PING 1490197665618 + < NAME synapse.app.appservice + < PING 1490197665618 + < REPLICATE events 1 + < REPLICATE backfill 1 + < REPLICATE caches 1 + > POSITION events 1 + > POSITION backfill 1 + > POSITION caches 1 + > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] + > RDATA events 14 ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + < PING 1490197675618 + > ERROR server stopping + * connection closed by server * +""" + +from twisted.internet import defer +from twisted.protocols.basic import LineOnlyReceiver + +from commands import ( + COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, + ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand, + NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand, +) +from streams import STREAMS_MAP + +from synapse.util.stringutils import random_string + +import logging +import synapse.metrics +import struct +import fcntl + + +metrics = synapse.metrics.get_metrics_for(__name__) + +inbound_commands_counter = metrics.register_counter( + "inbound_commands", labels=["command", "name", "conn_id"], +) +outbound_commands_counter = metrics.register_counter( + "outbound_commands", labels=["command", "name", "conn_id"], +) + + +# A list of all connected protocols. This allows us to send metrics about the +# connections. +connected_connections = [] + + +logger = logging.getLogger(__name__) + + +PING_TIME = 5000 + + +class ConnectionStates(object): + CONNECTING = "connecting" + ESTABLISHED = "established" + PAUSED = "paused" + CLOSED = "closed" + + +class BaseReplicationStreamProtocol(LineOnlyReceiver): + """Base replication protocol shared between client and server. + + Reads lines (ignoring blank ones) and parses them into command classes, + asserting that they are valid for the given direction, i.e. server commands + are only sent by the server. + + On receiving a new command it calls `on_` with the parsed + command. + + It also sends `PING` periodically, and correctly times out remote connections + (if they send a `PING` command) + """ + delimiter = b'\n' + + VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive + VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send + + max_line_buffer = 10000 + + def __init__(self, clock): + self.clock = clock + + self.last_received_command = self.clock.time_msec() + self.last_sent_command = 0 + self.time_we_closed = None # When we requested the connection be closed + + self.received_ping = False # Have we reecived a ping from the other side + + self.state = ConnectionStates.CONNECTING + + self.name = "anon" # The name sent by a client. + self.conn_id = random_string(5) # To dedupe in case of name clashes. + + # List of pending commands to send once we've established the connection + self.pending_commands = [] + + # The LoopingCall for sending pings. + self._send_ping_loop = None + + def connectionMade(self): + logger.info("[%s] Connection established", self.id()) + + self.state = ConnectionStates.ESTABLISHED + + connected_connections.append(self) # Register connection for metrics + + self.transport.registerProducer(self, True) # For the *Producing callbacks + + self._send_pending_commands() + + # Starts sending pings + self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000) + + # Always send the initial PING so that the other side knows that they + # can time us out. + self.send_command(PingCommand(self.clock.time_msec())) + + def send_ping(self): + """Periodically sends a ping and checks if we should close the connection + due to the other side timing out. + """ + now = self.clock.time_msec() + + if self.time_we_closed: + if now - self.time_we_closed > PING_TIME * 3: + logger.info( + "[%s] Failed to close connection gracefully, aborting", self.id() + ) + self.transport.abortConnection() + else: + if now - self.last_sent_command >= PING_TIME: + self.send_command(PingCommand(now)) + + if self.received_ping and now - self.last_received_command > PING_TIME * 3: + logger.info( + "[%s] Connection hasn't received command in %r ms. Closing.", + self.id(), now - self.last_received_command + ) + self.send_error("ping timeout") + + def lineReceived(self, line): + """Called when we've received a line + """ + if line.strip() == "": + # Ignore blank lines + return + + line = line.decode("utf-8") + cmd_name, rest_of_line = line.split(" ", 1) + + if cmd_name not in self.VALID_INBOUND_COMMANDS: + logger.error("[%s] invalid command %s", self.id(), cmd_name) + self.send_error("invalid command: %s", cmd_name) + return + + self.last_received_command = self.clock.time_msec() + + inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) + + cmd_cls = COMMAND_MAP[cmd_name] + try: + cmd = cmd_cls.from_line(rest_of_line) + except Exception as e: + logger.exception( + "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line + ) + self.send_error( + "failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line) + ) + return + + # Now lets try and call on_ function + try: + getattr(self, "on_%s" % (cmd_name,))(cmd) + except Exception: + logger.exception("[%s] Failed to handle line: %r", self.id(), line) + + def close(self): + self.time_we_closed = self.clock.time_msec() + self.transport.loseConnection() + self.on_connection_closed() + + def send_error(self, error_string, *args): + """Send an error to remote and close the connection. + """ + self.send_command(ErrorCommand(error_string % args)) + self.close() + + def send_command(self, cmd, do_buffer=True): + """Send a command if connection has been established. + + Args: + cmd (Command) + do_buffer (bool): Whether to buffer the message or always attempt + to send the command. This is mostly used to send an error + message if we're about to close the connection due our buffers + becoming full. + """ + if self.state == ConnectionStates.CLOSED: + logger.info("[%s] Not sending, connection closed", self.id()) + return + + if do_buffer and self.state != ConnectionStates.ESTABLISHED: + self._queue_command(cmd) + return + + outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) + + string = "%s %s" % (cmd.NAME, cmd.to_line(),) + if "\n" in string: + raise Exception("Unexpected newline in command: %r", string) + + self.sendLine(string.encode("utf-8")) + + self.last_sent_command = self.clock.time_msec() + + def _queue_command(self, cmd): + """Queue the command until the connection is ready to write to again. + """ + logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + self.pending_commands.append(cmd) + + if len(self.pending_commands) > self.max_line_buffer: + # The other side is failing to keep up and out buffers are becoming + # full, so lets close the connection. + # XXX: should we squawk more loudly? + logger.error("[%s] Remote failed to keep up", self.id()) + self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False) + self.close() + + def _send_pending_commands(self): + """Send any queued commandes + """ + pending = self.pending_commands + self.pending_commands = [] + for cmd in pending: + self.send_command(cmd) + + def on_PING(self, line): + self.received_ping = True + + def on_ERROR(self, cmd): + logger.error("[%s] Remote reported error: %r", self.id(), cmd.data) + + def pauseProducing(self): + """This is called when both the kernel send buffer and the twisted + tcp connection send buffers have become full. + + We don't actually have any control over those sizes, so we buffer some + commands ourselves before knifing the connection due to the remote + failing to keep up. + """ + logger.info("[%s] Pause producing", self.id()) + self.state = ConnectionStates.PAUSED + + def resumeProducing(self): + """The remote has caught up after we started buffering! + """ + logger.info("[%s] Resume producing", self.id()) + self.state = ConnectionStates.ESTABLISHED + self._send_pending_commands() + + def stopProducing(self): + """We're never going to send any more data (normally because either + we or the remote has closed the connection) + """ + logger.info("[%s] Stop producing", self.id()) + self.on_connection_closed() + + def connectionLost(self, reason): + logger.info("[%s] Replication connection closed: %r", self.id(), reason) + + try: + # Remove us from list of connections to be monitored + connected_connections.remove(self) + except ValueError: + pass + + # Stop the looping call sending pings. + if self._send_ping_loop and self._send_ping_loop.running: + self._send_ping_loop.stop() + + self.on_connection_closed() + + def on_connection_closed(self): + logger.info("[%s] Connection was closed", self.id()) + + self.state = ConnectionStates.CLOSED + self.pending_commands = [] + + if self.transport: + self.transport.unregisterProducer() + + def __str__(self): + return "ReplicationConnection" % ( + self.name, self.conn_id, self.addr, + ) + + def id(self): + return "%s-%s" % (self.name, self.conn_id) + + +class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): + VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS + VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS + + def __init__(self, server_name, clock, streamer, addr): + BaseReplicationStreamProtocol.__init__(self, clock) # Old style class + + self.server_name = server_name + self.streamer = streamer + self.addr = addr + + # The streams the client has subscribed to and is up to date with + self.replication_streams = set() + + # The streams the client is currently subscribing to. + self.connecting_streams = set() + + # Map from stream name to list of updates to send once we've finished + # subscribing the client to the stream. + self.pending_rdata = {} + + def connectionMade(self): + self.send_command(ServerCommand(self.server_name)) + BaseReplicationStreamProtocol.connectionMade(self) + self.streamer.new_connection(self) + + def on_NAME(self, cmd): + self.name = cmd.data + + def on_USER_SYNC(self, cmd): + self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + + def on_REPLICATE(self, cmd): + stream_name = cmd.stream_name + token = cmd.token + + if stream_name == "ALL": + # Subscribe to all streams we're publishing to. + for stream in self.streamer.streams_by_name.iterkeys(): + self.subscribe_to_stream(stream, token) + else: + self.subscribe_to_stream(stream_name, token) + + def on_FEDERATION_ACK(self, cmd): + self.streamer.federation_ack(cmd.token) + + def on_REMOVE_PUSHER(self, cmd): + self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) + + def onINVALIDATE_CACHE(self, cmd): + self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + + @defer.inlineCallbacks + def subscribe_to_stream(self, stream_name, token): + """Subscribe the remote to a streams. + + This invloves checking if they've missed anything and sending those + updates down if they have. During that time new updates for the stream + are queued and sent once we've sent down any missed updates. + """ + self.replication_streams.discard(stream_name) + self.connecting_streams.add(stream_name) + + try: + # Get missing updates + updates, current_token = yield self.streamer.get_stream_updates( + stream_name, token, + ) + + # Send all the missing updates + for update in updates: + token, row = update[0], update[1] + self.send_command(RdataCommand(stream_name, token, row)) + + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + self.send_command(RdataCommand(stream_name, token, update)) + + # We send a POSITION command to ensure that they have an up to + # date token (especially useful if we didn't send any updates + # above) + self.send_command(PositionCommand(stream_name, current_token)) + + # They're now fully subscribed + self.replication_streams.add(stream_name) + except Exception as e: + logger.exception("[%s] Failed to handle REPLICATE command", self.id()) + self.send_error("failed to handle replicate: %r", e) + finally: + self.connecting_streams.discard(stream_name) + + def stream_update(self, stream_name, token, data): + """Called when a new update is available to stream to clients. + + We need to check if the client is interested in the stream or not + """ + if stream_name in self.replication_streams: + # The client is subscribed to the stream + self.send_command(RdataCommand(stream_name, token, data)) + elif stream_name in self.connecting_streams: + # The client is being subscribed to the stream + logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token) + self.pending_rdata.setdefault(stream_name, []).append((token, data)) + else: + # The client isn't subscribed + logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token) + + def send_sync(self, data): + self.send_command(SyncCommand(data)) + + def on_connection_closed(self): + BaseReplicationStreamProtocol.on_connection_closed(self) + logger.info("[%s] Replication connection closed", self.id()) + self.streamer.lost_connection(self) + + +class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): + VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS + VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS + + def __init__(self, client_name, server_name, clock, handler): + BaseReplicationStreamProtocol.__init__(self, clock) + + self.client_name = client_name + self.server_name = server_name + self.handler = handler + + # Map of stream to batched updates. See RdataCommand for info on how + # batching works. + self.pending_batches = {} + + def connectionMade(self): + self.send_command(NameCommand(self.client_name)) + BaseReplicationStreamProtocol.connectionMade(self) + + # Once we've connected subscribe to the necessary streams + for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + self.replicate(stream_name, token) + + # Tell the server if we have any users currently syncing (should only + # happen on synchrotrons) + currently_syncing = self.handler.get_currently_syncing_users() + for user_id in currently_syncing: + self.send_command(UserSyncCommand(user_id, True)) + + # We've now finished connecting to so inform the client handler + self.handler.update_connection(self) + + def on_SERVER(self, cmd): + if cmd.data != self.server_name: + logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) + self.transport.abortConnection() + + def on_RDATA(self, cmd): + try: + row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + except Exception: + logger.exception( + "[%s] Failed to parse RDATA: %r %r", + self.id(), cmd.stream_name, cmd.row + ) + raise + + if cmd.token is None: + # I.e. this is part of a batch of updates for this stream. Batch + # until we get an update for the stream with a non None token + self.pending_batches.setdefault(cmd.stream_name, []).append(row) + else: + # Check if this is the last of a batch of updates + rows = self.pending_batches.pop(cmd.stream_name, []) + rows.append(row) + + self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + + def on_POSITION(self, cmd): + self.handler.on_position(cmd.stream_name, cmd.token) + + def on_SYNC(self, cmd): + self.handler.on_sync(cmd.data) + + def replicate(self, stream_name, token): + """Send the subscription request to the server + """ + if stream_name not in STREAMS_MAP: + raise Exception("Invalid stream name %r" % (stream_name,)) + + logger.info( + "[%s] Subscribing to replication stream: %r from %r", + self.id(), stream_name, token + ) + + self.send_command(ReplicateCommand(stream_name, token)) + + def on_connection_closed(self): + BaseReplicationStreamProtocol.on_connection_closed(self) + self.handler.update_connection(None) + + +# The following simply registers metrics for the replication connections + +metrics.register_callback( + "pending_commands", + lambda: { + (p.name, p.conn_id): len(p.pending_commands) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +def transport_buffer_size(protocol): + if protocol.transport: + size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen + return size + return 0 + + +metrics.register_callback( + "transport_send_buffer", + lambda: { + (p.name, p.conn_id): transport_buffer_size(p) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +def transport_kernel_read_buffer_size(protocol, read=True): + SIOCINQ = 0x541B + SIOCOUTQ = 0x5411 + + if protocol.transport: + fileno = protocol.transport.getHandle().fileno() + if read: + op = SIOCINQ + else: + op = SIOCOUTQ + size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0] + return size + return 0 + + +metrics.register_callback( + "transport_kernel_send_buffer", + lambda: { + (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +metrics.register_callback( + "transport_kernel_read_buffer", + lambda: { + (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True) + for p in connected_connections + }, + labels=["name", "conn_id"], +) -- cgit 1.4.1 From 31e0fe90315c46d62163cd5cb78cf9302e9dd0c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 13:54:15 +0100 Subject: Fix indentation in docs/ --- docs/tcp_replication.rst | 53 +++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 25 deletions(-) (limited to 'docs') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 946add2849..be0aa6b28c 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -34,20 +34,21 @@ command, and usually the connection will be closed. Since the protocol is a simple line based, its possible to manually connect to the server using a tool like netcat. A few things should be noted when manually using the protocol: - * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can - be used to get all future updates. The special stream name ``ALL`` can be used - with ``NOW`` to subscribe to all available streams. - * The federation stream is only available if federation sending has been - disabled on the main process. - * The server will only time connections out that have sent a ``PING`` command. - If a ping is sent then the connection will be closed if no further commands - are receieved within 15s. Both the client and server protocol implementations - will send an initial PING on connection and ensure at least one command every - 5s is sent (not necessarily ``PING``). - * ``RDATA`` commands *usually* include a numeric token, however if the stream - has multiple rows to replicate per token the server will send multiple - ``RDATA`` commands, with all but the last having a token of ``batch``. See - the documentation on ``commands.RdataCommand`` for further details. + +* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can + be used to get all future updates. The special stream name ``ALL`` can be used + with ``NOW`` to subscribe to all available streams. +* The federation stream is only available if federation sending has been + disabled on the main process. +* The server will only time connections out that have sent a ``PING`` command. + If a ping is sent then the connection will be closed if no further commands + are receieved within 15s. Both the client and server protocol implementations + will send an initial PING on connection and ensure at least one command every + 5s is sent (not necessarily ``PING``). +* ``RDATA`` commands *usually* include a numeric token, however if the stream + has multiple rows to replicate per token the server will send multiple + ``RDATA`` commands, with all but the last having a token of ``batch``. See + the documentation on ``commands.RdataCommand`` for further details. Architecture @@ -84,19 +85,21 @@ Start up ~~~~~~~~ When a new connection is made, the server: - * Sends a ``SERVER`` command, which includes the identity of the server, allowing - the client to detect if its connected to the expected server - * Sends a ``PING`` command as above, to enable the client to time out connections - promptly. + +* Sends a ``SERVER`` command, which includes the identity of the server, allowing + the client to detect if its connected to the expected server +* Sends a ``PING`` command as above, to enable the client to time out connections + promptly. The client: - * Sends a ``NAME`` command, allowing the server to associate a human friendly - name with the connection. This is optional. - * Sends a ``PING`` as above - * For each stream the client wishes to subscribe to it sends a ``REPLICATE`` - with the stream_name and token it wants to subscribe from. - * On receipt of a ``SERVER`` command, checks that the server name matches the - expected server name. + +* Sends a ``NAME`` command, allowing the server to associate a human friendly + name with the connection. This is optional. +* Sends a ``PING`` as above +* For each stream the client wishes to subscribe to it sends a ``REPLICATE`` + with the stream_name and token it wants to subscribe from. +* On receipt of a ``SERVER`` command, checks that the server name matches the + expected server name. Error handling -- cgit 1.4.1 From bfcf016714575edb0ad2c19b2f00694d62ca08ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:19:24 +0100 Subject: Fix up docs --- docs/tcp_replication.rst | 16 ++++++++-------- synapse/replication/tcp/__init__.py | 18 +----------------- synapse/replication/tcp/streams.py | 4 ++-- synapse/storage/pusher.py | 2 +- 4 files changed, 12 insertions(+), 28 deletions(-) (limited to 'docs') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index be0aa6b28c..7393527f6f 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -1,20 +1,20 @@ TCP Replication =============== -This describes the TCP replication protocol that replaces the HTTP protocol. - Motivation ---------- -The HTTP API used long poll from the workers to the master, this has the problem -of causing a lot of duplicate work on the server. This TCP protocol aims to -solve. +Previously the workers used an HTTP long poll mechanism to get updates from the +master, which had the problem of causing a lot of duplicate work on the server. +This TCP protocol replaces those APIs with the aim of increased efficiency. + + Overview -------- The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: +would be (where '>' indicates master to worker and '<' worker to master flows):: > SERVER example.com < REPLICATE events 53 @@ -24,7 +24,7 @@ would be (where '>' indicates master->worker and '<' worker->master flows):: The example shows the server accepting a new connection and sending its identity with the ``SERVER`` command, followed by the client asking to subscribe to the ``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` -commands which have the format ``RDATA ```, where the +commands which have the format ``RDATA ``, where the format of ```` is defined by the individual streams. Error reporting happens by either the client or server sending an `ERROR` @@ -125,7 +125,7 @@ recovers it can reconnect to the server and ask for missed messages. Reliability ~~~~~~~~~~~ -In general the replication stream should be consisdered an unreliable transport +In general the replication stream should be considered an unreliable transport since e.g. commands are not resent if the connection disappears. The exception to that are the replication streams, i.e. RDATA commands, since diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 8b4e886d4e..81c2ea7ee9 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -16,22 +16,7 @@ """This module implements the TCP replication protocol used by synapse to communicate between the master process and its workers (when they're enabled). -The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: - - > SERVER example.com - < REPLICATE events 53 - > RDATA events 54 ["$foo1:bar.com", ...] - > RDATA events 55 ["$foo4:bar.com", ...] - -The example shows the server accepting a new connection and sending its identity -with the `SERVER` command, followed by the client asking to subscribe to the -`events` stream from the token `53`. The server then periodically sends `RDATA` -commands which have the format `RDATA `, where the -format of `` is defined by the individual streams. - -Error reporting happens by either the client or server sending an `ERROR` -command, and usually the connection will be closed. +Further details can be found in docs/tcp_replication.rst Structure of the module: @@ -42,5 +27,4 @@ Structure of the module: * resource.py - the server classes that accepts and handle client connections * streams.py - the definitons of all the valid streams -Further details can be found in docs/tcp_replication.rst """ diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 07adf9412e..fada40c6ef 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -154,8 +154,8 @@ class Stream(object): True then limit is provided, otherwise it's not. Returns: - list(tuple): the first entry in the tuple is the token for that - update, and the rest of the tuple gets used to construct + Deferred(list(tuple)): the first entry in the tuple is the token for + that update, and the rest of the tuple gets used to construct a ``ROW_TYPE`` instance """ raise NotImplementedError() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 715c8bef24..34d2f82b7f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -139,7 +139,7 @@ class PusherStore(SQLBaseStore): """Get all the pushers that have changed between the given tokens. Returns: - list(tuple): each tuple consists of: + Deferred(list(tuple)): each tuple consists of: stream_id (str) user_id (str) app_id (str) -- cgit 1.4.1 From b4276a3896b7fec702feac4a60391ec81e595322 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:34:40 +0100 Subject: Add a brief list of commands to docs --- docs/tcp_replication.rst | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) (limited to 'docs') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 7393527f6f..62225ba6f4 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -175,3 +175,49 @@ An example of a batched set of ``RDATA`` is:: In this case the client shouldn't advance their caches token until it sees the the last ``RDATA``. + + +List of commands +~~~~~~~~~~~~~~~~ + +The list of valid commands, with which side can send it: server (S) or client (C): + +SERVER (S) + Sent at the start to identify which server the client is talking to + +RDATA (S) + A single update in a stream + +POSITION (S) + The position of the stream has been updated + +ERROR (S, C) + There was an error + +PING (S, C) + Sent periodically to ensure the connection is still alive + +NAME (C) + Sent at the start by client to inform the server who they are + +REPLICATE (C) + Asks the server to replicate a given stream + +USER_SYNC (C) + A user has started or stopped syncing + +FEDERATION_ACK (C) + Acknowledge receipt of some federation data + +REMOVE_PUSHER (C) + Inform the server a pusher should be removed + +INVALIDATE_CACHE (C) + Inform the server a cache should be invalidated + +SYNC (S, C) + Used exclusively in tests + + +See ``synapse/replication/tcp/commands.py`` for a detailed description and the +format of each command. -- cgit 1.4.1 From 323634bf8b240051081c8dd31377cd48de8c7856 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:19:52 +0100 Subject: Update workers docs --- docs/workers.rst | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index 65b6e690f7..7f040c52d9 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -12,7 +12,7 @@ across multiple processes is a recipe for disaster, plus you should be using postgres anyway if you care about scalability). The workers communicate with the master synapse process via a synapse-specific -HTTP protocol called 'replication' - analogous to MySQL or Postgres style +TCP protocol called 'replication' - analogous to MySQL or Postgres style database replication; feeding a stream of relevant data to the workers so they can be kept in sync with the main synapse process and database state. @@ -21,16 +21,11 @@ To enable workers, you need to add a replication listener to the master synapse, listeners: - port: 9092 bind_address: '127.0.0.1' - type: http - tls: false - x_forwarded: false - resources: - - names: [replication] - compress: false + type: replication Under **no circumstances** should this replication API listener be exposed to the public internet; it currently implements no authentication whatsoever and is -unencrypted HTTP. +unencrypted. You then create a set of configs for the various worker processes. These should be worker configuration files should be stored in a dedicated subdirectory, to allow @@ -57,7 +52,8 @@ For instance:: worker_app: synapse.app.synchrotron # The replication listener on the synapse to talk to. - worker_replication_url: http://127.0.0.1:9092/_synapse/replication + worker_replication_host: 127.0.0.1 + worker_replication_port: 9092 worker_listeners: - type: http @@ -95,4 +91,3 @@ To manipulate a specific worker, you pass the -w option to synctl:: All of the above is highly experimental and subject to change as Synapse evolves, but documenting it here to help folks needing highly scalable Synapses similar to the one running matrix.org! - -- cgit 1.4.1 From 6f65e2f90c4bcd25ada583cfd91f7c77352fd26c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:21:12 +0100 Subject: Update replication docs --- docs/replication.rst | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) (limited to 'docs') diff --git a/docs/replication.rst b/docs/replication.rst index 7e37e71987..310abb3488 100644 --- a/docs/replication.rst +++ b/docs/replication.rst @@ -26,28 +26,10 @@ expose the append-only log to the readers should be fairly minimal. Architecture ------------ -The Replication API -~~~~~~~~~~~~~~~~~~~ - -Synapse will optionally expose a long poll HTTP API for extracting updates. The -API will have a similar shape to /sync in that clients provide tokens -indicating where in the log they have reached and a timeout. The synapse server -then either responds with updates immediately if it already has updates or it -waits until the timeout for more updates. If the timeout expires and nothing -happened then the server returns an empty response. - -However unlike the /sync API this replication API is returning synapse specific -data rather than trying to implement a matrix specification. The replication -results are returned as arrays of rows where the rows are mostly lifted -directly from the database. This avoids unnecessary JSON parsing on the server -and hopefully avoids an impedance mismatch between the data returned and the -required updates to the datastore. - -This does not replicate all the database tables as many of the database tables -are indexes that can be recovered from the contents of other tables. - -The format and parameters for the api are documented in -``synapse/replication/resource.py``. +The Replication Protocol +~~~~~~~~~~~~~~~~~~~~~~~~ + +See ``tcp_replication.rst`` The Slaved DataStore -- cgit 1.4.1 From 82301b6c293ddad500897ee2290e7ea38cfc3905 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:21:02 +0100 Subject: Remove last reference to worker_replication_url --- docs/workers.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'docs') diff --git a/docs/workers.rst b/docs/workers.rst index 7f040c52d9..2d3df91593 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -45,7 +45,8 @@ e.g. the HTTP listener that it provides (if any); logging configuration; etc. You should minimise the number of overrides though to maintain a usable config. You must specify the type of worker application (worker_app) and the replication -endpoint that it's talking to on the main synapse process (worker_replication_url). +endpoint that it's talking to on the main synapse process (worker_replication_host +and worker_replication_port). For instance:: -- cgit 1.4.1 From cea783991142696a3a901294ff8e361302106daa Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 11:55:07 +0100 Subject: Document some of the admin APIs (#2143) I haven't (yet) documented all of the user-list APIs introduced in https://github.com/matrix-org/synapse/pull/1784 because the API shape seems very odd, given the functionality. --- docs/admin_api/user_admin_api.rst | 73 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 docs/admin_api/user_admin_api.rst (limited to 'docs') diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst new file mode 100644 index 0000000000..1c9c5a6bde --- /dev/null +++ b/docs/admin_api/user_admin_api.rst @@ -0,0 +1,73 @@ +Query Account +============= + +This API returns information about a specific user account. + +The api is:: + + GET /_matrix/client/r0/admin/whois/ + +including an ``access_token`` of a server admin. + +It returns a JSON body like the following: + +.. code:: json + + { + "user_id": "", + "devices": { + "": { + "sessions": [ + { + "connections": [ + { + "ip": "1.2.3.4", + "last_seen": 1417222374433, + "user_agent": "Mozilla/5.0 ..." + }, + { + "ip": "1.2.3.10", + "last_seen": 1417222374500, + "user_agent": "Dalvik/2.1.0 ..." + } + ] + } + ] + } + } + } + +``last_seen`` is measured in milliseconds since the Unix epoch. + +Deactivate Account +================== + +This API deactivates an account. It removes active access tokens, resets the +password, and deletes third-party IDs (to prevent the user requesting a +password reset). + +The api is:: + + POST /_matrix/client/r0/admin/deactivate/ + +including an ``access_token`` of a server admin, and an empty request body. + + +Reset password +============== + +Changes the password of another user. + +The api is:: + + POST /_matrix/client/r0/admin/reset_password/ + +with a body of: + +.. code:: json + + { + "new_password": "" + } + +including an ``access_token`` of a server admin. -- cgit 1.4.1 From 719aec406487c9eb7942ce8c6a12a046218e93e2 Mon Sep 17 00:00:00 2001 From: Sean Enck Date: Fri, 21 Apr 2017 11:03:32 -0400 Subject: clarify metric setup to use 'scrape_configs' section of yaml and use an array for target --- docs/metrics-howto.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'docs') diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 7390ab85c9..143cd0f42f 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -21,13 +21,12 @@ How to monitor Synapse metrics using Prometheus 3. Add a prometheus target for synapse. - It needs to set the ``metrics_path`` to a non-default value:: + It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``):: - job_name: "synapse" metrics_path: "/_synapse/metrics" static_configs: - - targets: - "my.server.here:9092" + - targets: ["my.server.here:9092"] If your prometheus is older than 1.5.2, you will need to replace ``static_configs`` in the above with ``target_groups``. -- cgit 1.4.1