diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst
new file mode 100644
index 0000000000..62225ba6f4
--- /dev/null
+++ b/docs/tcp_replication.rst
@@ -0,0 +1,223 @@
+TCP Replication
+Previously the workers used an HTTP long poll mechanism to get updates from the
+master, which had the problem of causing a lot of duplicate work on the server.
+This TCP protocol replaces those APIs with the aim of increased efficiency.
+The protocol is based on fire and forget, line based commands. An example flow
+would be (where '>' indicates master to worker and '<' worker to master flows)::
+ > SERVER example.com
+ < REPLICATE events 53
+ > RDATA events 54 ["$foo1:bar.com", ...]
+ > RDATA events 55 ["$foo4:bar.com", ...]
+The example shows the server accepting a new connection and sending its identity
+with the ``SERVER`` command, followed by the client asking to subscribe to the
+``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
+commands which have the format ``RDATA <stream_name> <token> <row>``, where the
+format of ``<row>`` is defined by the individual streams.
+Error reporting happens by either the client or server sending an `ERROR`
+command, and usually the connection will be closed.
+Since the protocol is a simple line based, its possible to manually connect to
+the server using a tool like netcat. A few things should be noted when manually
+using the protocol:
+* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
+ be used to get all future updates. The special stream name ``ALL`` can be used
+ with ``NOW`` to subscribe to all available streams.
+* The federation stream is only available if federation sending has been
+ disabled on the main process.
+* The server will only time connections out that have sent a ``PING`` command.
+ If a ping is sent then the connection will be closed if no further commands
+ are receieved within 15s. Both the client and server protocol implementations
+ will send an initial PING on connection and ensure at least one command every
+ 5s is sent (not necessarily ``PING``).
+* ``RDATA`` commands *usually* include a numeric token, however if the stream
+ has multiple rows to replicate per token the server will send multiple
+ ``RDATA`` commands, with all but the last having a token of ``batch``. See
+ the documentation on ``commands.RdataCommand`` for further details.
+The basic structure of the protocol is line based, where the initial word of
+each line specifies the command. The rest of the line is parsed based on the
+command. For example, the `RDATA` command is defined as::
+ RDATA <stream_name> <token> <row_json>
+(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
+Blank lines are ignored.
+Keep alives
+Both sides are expected to send at least one command every 5s or so, and
+should send a ``PING`` command if necessary. If either side do not receive a
+command within e.g. 15s then the connection should be closed.
+Because the server may be connected to manually using e.g. netcat, the timeouts
+aren't enabled until an initial ``PING`` command is seen. Both the client and
+server implementations below send a ``PING`` command immediately on connection to
+ensure the timeouts are enabled.
+This ensures that both sides can quickly realize if the tcp connection has gone
+and handle the situation appropriately.
+Start up
+When a new connection is made, the server:
+* Sends a ``SERVER`` command, which includes the identity of the server, allowing
+ the client to detect if its connected to the expected server
+* Sends a ``PING`` command as above, to enable the client to time out connections
+ promptly.
+The client:
+* Sends a ``NAME`` command, allowing the server to associate a human friendly
+ name with the connection. This is optional.
+* Sends a ``PING`` as above
+* For each stream the client wishes to subscribe to it sends a ``REPLICATE``
+ with the stream_name and token it wants to subscribe from.
+* On receipt of a ``SERVER`` command, checks that the server name matches the
+ expected server name.
+Error handling
+If either side detects an error it can send an ``ERROR`` command and close the
+If the client side loses the connection to the server it should reconnect,
+following the steps above.
+If the server sends messages faster than the client can consume them the server
+will first buffer a (fairly large) number of commands and then disconnect the
+client. This ensures that we don't queue up an unbounded number of commands in
+memory and gives us a potential oppurtunity to squawk loudly. When/if the client
+recovers it can reconnect to the server and ask for missed messages.
+In general the replication stream should be considered an unreliable transport
+since e.g. commands are not resent if the connection disappears.
+The exception to that are the replication streams, i.e. RDATA commands, since
+these include tokens which can be used to restart the stream on connection
+The client should keep track of the token in the last RDATA command received
+for each stream so that on reconneciton it can start streaming from the correct
+place. Note: not all RDATA have valid tokens due to batching. See
+``RdataCommand`` for more details.
+An example iteraction is shown below. Each line is prefixed with '>' or '<' to
+indicate which side is sending, these are *not* included on the wire::
+ * connection established *
+ > SERVER localhost:8823
+ > PING 1490197665618
+ < NAME synapse.app.appservice
+ < PING 1490197665618
+ < REPLICATE events 1
+ < REPLICATE backfill 1
+ < REPLICATE caches 1
+ > POSITION events 1
+ > POSITION backfill 1
+ > POSITION caches 1
+ > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
+ > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
+ "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+ < PING 1490197675618
+ > ERROR server stopping
+ * connection closed by server *
+The ``POSITION`` command sent by the server is used to set the clients position
+without needing to send data with the ``RDATA`` command.
+An example of a batched set of ``RDATA`` is::
+ > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
+ > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
+ > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
+ > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
+In this case the client shouldn't advance their caches token until it sees the
+the last ``RDATA``.
+List of commands
+The list of valid commands, with which side can send it: server (S) or client (C):
+ Sent at the start to identify which server the client is talking to
+ A single update in a stream
+ The position of the stream has been updated
+ There was an error
+PING (S, C)
+ Sent periodically to ensure the connection is still alive
+ Sent at the start by client to inform the server who they are
+ Asks the server to replicate a given stream
+ A user has started or stopped syncing
+ Acknowledge receipt of some federation data
+ Inform the server a pusher should be removed
+ Inform the server a cache should be invalidated
+SYNC (S, C)
+ Used exclusively in tests
+See ``synapse/replication/tcp/commands.py`` for a detailed description and the
+format of each command.
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2cdd2d39ff..990eb477e5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -56,6 +56,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer
from synapse.util.rlimit import change_resource_limit
@@ -222,6 +223,16 @@ class SynapseHomeServer(HomeServer):
+ elif listener["type"] == "replication":
+ bind_addresses = listener["bind_addresses"]
+ for address in bind_addresses:
+ factory = ReplicationStreamProtocolFactory(self)
+ server_listener = reactor.listenTCP(
+ listener["port"], factory, interface=address
+ )
+ reactor.addSystemEventTrigger(
+ "before", "shutdown", server_listener.stopListening,
+ )
logger.warn("Unrecognized listener type: %s", listener["type"])
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index bbb0195228..4bde66fbf8 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object):
def get_current_token(self):
return self.pos - 1
- def get_replication_rows(self, token, limit, federation_ack=None):
- """
+ def federation_ack(self, token):
+ self._clear_queue_before_pos(token)
+ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+ """Get rows to be sent over federation between the two tokens
- token (int)
+ from_token (int)
+ to_token(int)
limit (int)
federation_ack (int): Optional. The position where the worker is
explicitly acknowledged it has handled. Allows us to drop
@@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object):
# TODO: Handle limit.
# To handle restarts where we wrap around
- if token > self.pos:
- token = -1
+ if from_token > self.pos:
+ from_token = -1
rows = []
@@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object):
# Fetch changed presence
keys = self.presence_changed.keys()
- i = keys.bisect_right(token)
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
- for pos in keys[i:]
+ for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
@@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object):
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
- i = keys.bisect_right(token)
- keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
for (pos, (destination, edu_key)) in keyed_edus:
@@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object):
# Fetch changed edus
keys = self.edus.keys()
- i = keys.bisect_right(token)
- edus = set((k, self.edus[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ edus = set((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
# Fetch changed failures
keys = self.failures.keys()
- i = keys.bisect_right(token)
- failures = set((k, self.failures[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ failures = set((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
rows.append((pos, FAILURE_TYPE, ujson.dumps({
@@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object):
# Fetch changed device messages
keys = self.device_messages.keys()
- i = keys.bisect_right(token)
- device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
for (pos, destination) in device_messages:
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1ede117c79..53baf3e79a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -187,6 +188,7 @@ class PresenceHandler(object):
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
+ self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
@@ -509,6 +511,73 @@ class PresenceHandler(object):
self.external_process_to_current_syncs[process_id] = syncing_user_ids
+ def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+ """Update the syncing users for an external process as a delta.
+ Args:
+ process_id (str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ user_id (str): The user who has started or stopped syncing
+ is_syncing (bool): Whether or not the user is now syncing
+ sync_time_msec(int): Time in ms when the user was last syncing
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ prev_state = yield self.current_state_for_user(user_id)
+ process_presence = self.external_process_to_current_syncs.setdefault(
+ process_id, set()
+ )
+ updates = []
+ if is_syncing and user_id not in process_presence:
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=sync_time_msec,
+ last_user_sync_ts=sync_time_msec,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+ process_presence.add(user_id)
+ elif user_id in process_presence:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+ if not is_syncing:
+ process_presence.discard(user_id)
+ if updates:
+ yield self._update_states(updates)
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+ @defer.inlineCallbacks
+ def update_external_syncs_clear(self, process_id):
+ """Marks all users that had been marked as syncing by a given process
+ as offline.
+ Used when the process has stopped/disappeared.
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ process_presence = self.external_process_to_current_syncs.pop(
+ process_id, set()
+ )
+ prev_states = yield self.current_state_for_users(process_presence)
+ time_now_ms = self.clock.time_msec()
+ yield self._update_states([
+ prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ )
+ for prev_state in prev_states.itervalues()
+ ])
+ self.external_process_last_updated_ms.pop(process_id, None)
+ @defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0eea7f8f9c..d6809862e0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -293,6 +293,9 @@ class TypingHandler(object):
return rows
+ def get_current_token(self):
+ return self._latest_room_serial
class TypingNotificationEventSource(object):
def __init__(self, hs):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 57d6a8cfe3..48566187ab 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -163,6 +163,8 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = []
+ self.replication_callbacks = []
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
@@ -202,6 +204,12 @@ class Notifier(object):
lambda: len(self.user_to_user_stream),
+ def add_replication_callback(self, cb):
+ """Add a callback that will be called when some new data is available.
+ Callback is not given any arguments.
+ """
+ self.replication_callbacks.append(cb)
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
""" Used by handlers to inform the notifier something has happened
@@ -507,6 +515,9 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred())
+ for cb in self.replication_callbacks:
+ preserve_fn(cb)()
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 03930fe958..abd3fe7665 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -489,7 +489,7 @@ class ReplicationResource(Resource):
if federation is not None and federation != current_position:
federation_rows = self.federation_sender.get_replication_rows(
- federation, limit, federation_ack=federation_ack,
+ federation, current_position, limit, federation_ack=federation_ack,
upto_token = _position_from_rows(federation_rows, current_position)
writer.write_header_and_rows("federation", federation_rows, (
@@ -504,7 +504,7 @@ class ReplicationResource(Resource):
if device_lists is not None and device_lists != current_position:
changes = yield self.store.get_all_device_list_changes_for_remotes(
- device_lists,
+ device_lists, current_position,
writer.write_header_and_rows("device_lists", changes, (
"position", "user_id", "destination",
diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py
new file mode 100644
index 0000000000..81c2ea7ee9
--- /dev/null
+++ b/synapse/replication/tcp/__init__.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""This module implements the TCP replication protocol used by synapse to
+communicate between the master process and its workers (when they're enabled).
+Further details can be found in docs/tcp_replication.rst
+Structure of the module:
+ * client.py - the client classes used for workers to connect to master
+ * command.py - the definitions of all the valid commands
+ * protocol.py - contains bot the client and server protocol implementations,
+ these should not be used directly
+ * resource.py - the server classes that accepts and handle client connections
+ * streams.py - the definitons of all the valid streams
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
new file mode 100644
index 0000000000..84d2a2272a
--- /dev/null
+++ b/synapse/replication/tcp/commands.py
@@ -0,0 +1,346 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Defines the various valid commands
+The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
+allowed to be sent by which side.
+import logging
+import ujson as json
+logger = logging.getLogger(__name__)
+class Command(object):
+ """The base command class.
+ All subclasses must set the NAME variable which equates to the name of the
+ command on the wire.
+ A full command line on the wire is constructed from `NAME + " " + to_line()`
+ The default implementation creates a command of form `<NAME> <data>`
+ """
+ NAME = None
+ def __init__(self, data):
+ self.data = data
+ @classmethod
+ def from_line(cls, line):
+ """Deserialises a line from the wire into this command. `line` does not
+ include the command.
+ """
+ return cls(line)
+ def to_line(self):
+ """Serialises the comamnd for the wire. Does not include the command
+ prefix.
+ """
+ return self.data
+class ServerCommand(Command):
+ """Sent by the server on new connection and includes the server_name.
+ Format::
+ SERVER <server_name>
+ """
+class RdataCommand(Command):
+ """Sent by server when a subscribed stream has an update.
+ Format::
+ RDATA <stream_name> <token> <row_json>
+ The `<token>` may either be a numeric stream id OR "batch". The latter case
+ is used to support sending multiple updates with the same stream ID. This
+ is done by sending an RDATA for each row, with all but the last RDATA having
+ a token of "batch" and the last having the final stream ID.
+ The client should batch all incoming RDATA with a token of "batch" (per
+ stream_name) until it sees an RDATA with a numeric stream ID.
+ `<token>` of "batch" maps to the instance variable `token` being None.
+ An example of a batched series of RDATA::
+ RDATA presence batch ["@foo:example.com", "online", ...]
+ RDATA presence batch ["@bar:example.com", "online", ...]
+ RDATA presence 59 ["@baz:example.com", "online", ...]
+ """
+ def __init__(self, stream_name, token, row):
+ self.stream_name = stream_name
+ self.token = token
+ self.row = row
+ @classmethod
+ def from_line(cls, line):
+ stream_name, token, row_json = line.split(" ", 2)
+ return cls(
+ stream_name,
+ None if token == "batch" else int(token),
+ json.loads(row_json)
+ )
+ def to_line(self):
+ return " ".join((
+ self.stream_name,
+ str(self.token) if self.token is not None else "batch",
+ json.dumps(self.row),
+ ))
+class PositionCommand(Command):
+ """Sent by the client to tell the client the stream postition without
+ needing to send an RDATA.
+ """
+ def __init__(self, stream_name, token):
+ self.stream_name = stream_name
+ self.token = token
+ @classmethod
+ def from_line(cls, line):
+ stream_name, token = line.split(" ", 1)
+ return cls(stream_name, int(token))
+ def to_line(self):
+ return " ".join((self.stream_name, str(self.token),))
+class ErrorCommand(Command):
+ """Sent by either side if there was an ERROR. The data is a string describing
+ the error.
+ """
+class PingCommand(Command):
+ """Sent by either side as a keep alive. The data is arbitary (often timestamp)
+ """
+class NameCommand(Command):
+ """Sent by client to inform the server of the client's identity. The data
+ is the name
+ """
+class ReplicateCommand(Command):
+ """Sent by the client to subscribe to the stream.
+ Format::
+ REPLICATE <stream_name> <token>
+ Where <token> may be either:
+ * a numeric stream_id to stream updates from
+ * "NOW" to stream all subsequent updates.
+ The <stream_name> can be "ALL" to subscribe to all known streams, in which
+ case the <token> must be set to "NOW", i.e.::
+ """
+ def __init__(self, stream_name, token):
+ self.stream_name = stream_name
+ self.token = token
+ @classmethod
+ def from_line(cls, line):
+ stream_name, token = line.split(" ", 1)
+ if token in ("NOW", "now"):
+ token = "NOW"
+ else:
+ token = int(token)
+ return cls(stream_name, token)
+ def to_line(self):
+ return " ".join((self.stream_name, str(self.token),))
+class UserSyncCommand(Command):
+ """Sent by the client to inform the server that a user has started or
+ stopped syncing. Used to calculate presence on the master.
+ Includes a timestamp of when the last user sync was.
+ Format::
+ USER_SYNC <user_id> <state> <last_sync_ms>
+ Where <state> is either "start" or "stop"
+ """
+ def __init__(self, user_id, is_syncing, last_sync_ms):
+ self.user_id = user_id
+ self.is_syncing = is_syncing
+ self.last_sync_ms = last_sync_ms
+ @classmethod
+ def from_line(cls, line):
+ user_id, state, last_sync_ms = line.split(" ", 2)
+ if state not in ("start", "end"):
+ raise Exception("Invalid USER_SYNC state %r" % (state,))
+ return cls(user_id, state == "start", int(last_sync_ms))
+ def to_line(self):
+ return " ".join((
+ self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
+ ))
+class FederationAckCommand(Command):
+ """Sent by the client when it has processed up to a given point in the
+ federation stream. This allows the master to drop in-memory caches of the
+ federation stream.
+ This must only be sent from one worker (i.e. the one sending federation)
+ Format::
+ """
+ def __init__(self, token):
+ self.token = token
+ @classmethod
+ def from_line(cls, line):
+ return cls(int(line))
+ def to_line(self):
+ return str(self.token)
+class SyncCommand(Command):
+ """Used for testing. The client protocol implementation allows waiting
+ on a SYNC command with a specified data.
+ """
+class RemovePusherCommand(Command):
+ """Sent by the client to request the master remove the given pusher.
+ Format::
+ REMOVE_PUSHER <app_id> <push_key> <user_id>
+ """
+ def __init__(self, app_id, push_key, user_id):
+ self.user_id = user_id
+ self.app_id = app_id
+ self.push_key = push_key
+ @classmethod
+ def from_line(cls, line):
+ app_id, push_key, user_id = line.split(" ", 2)
+ return cls(app_id, push_key, user_id)
+ def to_line(self):
+ return " ".join((self.app_id, self.push_key, self.user_id))
+class InvalidateCacheCommand(Command):
+ """Sent by the client to invalidate an upstream cache.
+ Mainly used to invalidate destination retry timing caches.
+ Format::
+ INVALIDATE_CACHE <cache_func> <keys_json>
+ Where <keys_json> is a json list.
+ """
+ def __init__(self, cache_func, keys):
+ self.cache_func = cache_func
+ self.keys = keys
+ @classmethod
+ def from_line(cls, line):
+ cache_func, keys_json = line.split(" ", 1)
+ return cls(cache_func, json.loads(keys_json))
+ def to_line(self):
+ return " ".join((self.cache_func, json.dumps(self.keys)))
+# Map of command name to command type.
+ cmd.NAME: cmd
+ for cmd in (
+ ServerCommand,
+ RdataCommand,
+ PositionCommand,
+ ErrorCommand,
+ PingCommand,
+ NameCommand,
+ ReplicateCommand,
+ UserSyncCommand,
+ FederationAckCommand,
+ SyncCommand,
+ RemovePusherCommand,
+ InvalidateCacheCommand,
+ )
+# The commands the server is allowed to send
+ ServerCommand.NAME,
+ RdataCommand.NAME,
+ PositionCommand.NAME,
+ ErrorCommand.NAME,
+ PingCommand.NAME,
+ SyncCommand.NAME,
+# The commands the client is allowed to send
+ NameCommand.NAME,
+ ReplicateCommand.NAME,
+ PingCommand.NAME,
+ UserSyncCommand.NAME,
+ FederationAckCommand.NAME,
+ RemovePusherCommand.NAME,
+ InvalidateCacheCommand.NAME,
+ ErrorCommand.NAME,
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
new file mode 100644
index 0000000000..80f732b455
--- /dev/null
+++ b/synapse/replication/tcp/protocol.py
@@ -0,0 +1,604 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""This module contains the implementation of both the client and server
+The basic structure of the protocol is line based, where the initial word of
+each line specifies the command. The rest of the line is parsed based on the
+command. For example, the `RDATA` command is defined as::
+ RDATA <stream_name> <token> <row_json>
+(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
+Blank lines are ignored.
+# Example
+An example iteraction is shown below. Each line is prefixed with '>' or '<' to
+indicate which side is sending, these are *not* included on the wire::
+ * connection established *
+ > SERVER localhost:8823
+ > PING 1490197665618
+ < NAME synapse.app.appservice
+ < PING 1490197665618
+ < REPLICATE events 1
+ < REPLICATE backfill 1
+ < REPLICATE caches 1
+ > POSITION events 1
+ > POSITION backfill 1
+ > POSITION caches 1
+ > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
+ > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
+ "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+ < PING 1490197675618
+ > ERROR server stopping
+ * connection closed by server *
+from twisted.internet import defer
+from twisted.protocols.basic import LineOnlyReceiver
+from commands import (
+ ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
+ NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
+from streams import STREAMS_MAP
+from synapse.util.stringutils import random_string
+import logging
+import synapse.metrics
+import struct
+import fcntl
+metrics = synapse.metrics.get_metrics_for(__name__)
+inbound_commands_counter = metrics.register_counter(
+ "inbound_commands", labels=["command", "name", "conn_id"],
+outbound_commands_counter = metrics.register_counter(
+ "outbound_commands", labels=["command", "name", "conn_id"],
+# A list of all connected protocols. This allows us to send metrics about the
+# connections.
+connected_connections = []
+logger = logging.getLogger(__name__)
+PING_TIME = 5000
+class ConnectionStates(object):
+ CONNECTING = "connecting"
+ ESTABLISHED = "established"
+ PAUSED = "paused"
+ CLOSED = "closed"
+class BaseReplicationStreamProtocol(LineOnlyReceiver):
+ """Base replication protocol shared between client and server.
+ Reads lines (ignoring blank ones) and parses them into command classes,
+ asserting that they are valid for the given direction, i.e. server commands
+ are only sent by the server.
+ On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
+ command.
+ It also sends `PING` periodically, and correctly times out remote connections
+ (if they send a `PING` command)
+ """
+ delimiter = b'\n'
+ VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
+ VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
+ max_line_buffer = 10000
+ def __init__(self, clock):
+ self.clock = clock
+ self.last_received_command = self.clock.time_msec()
+ self.last_sent_command = 0
+ self.time_we_closed = None # When we requested the connection be closed
+ self.received_ping = False # Have we reecived a ping from the other side
+ self.state = ConnectionStates.CONNECTING
+ self.name = "anon" # The name sent by a client.
+ self.conn_id = random_string(5) # To dedupe in case of name clashes.
+ # List of pending commands to send once we've established the connection
+ self.pending_commands = []
+ # The LoopingCall for sending pings.
+ self._send_ping_loop = None
+ def connectionMade(self):
+ logger.info("[%s] Connection established", self.id())
+ self.state = ConnectionStates.ESTABLISHED
+ connected_connections.append(self) # Register connection for metrics
+ self.transport.registerProducer(self, True) # For the *Producing callbacks
+ self._send_pending_commands()
+ # Starts sending pings
+ self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000)
+ # Always send the initial PING so that the other side knows that they
+ # can time us out.
+ self.send_command(PingCommand(self.clock.time_msec()))
+ def send_ping(self):
+ """Periodically sends a ping and checks if we should close the connection
+ due to the other side timing out.
+ """
+ now = self.clock.time_msec()
+ if self.time_we_closed:
+ if now - self.time_we_closed > PING_TIME * 3:
+ logger.info(
+ "[%s] Failed to close connection gracefully, aborting", self.id()
+ )
+ self.transport.abortConnection()
+ else:
+ if now - self.last_sent_command >= PING_TIME:
+ self.send_command(PingCommand(now))
+ if self.received_ping and now - self.last_received_command > PING_TIME * 3:
+ logger.info(
+ "[%s] Connection hasn't received command in %r ms. Closing.",
+ self.id(), now - self.last_received_command
+ )
+ self.send_error("ping timeout")
+ def lineReceived(self, line):
+ """Called when we've received a line
+ """
+ if line.strip() == "":
+ # Ignore blank lines
+ return
+ line = line.decode("utf-8")
+ cmd_name, rest_of_line = line.split(" ", 1)
+ if cmd_name not in self.VALID_INBOUND_COMMANDS:
+ logger.error("[%s] invalid command %s", self.id(), cmd_name)
+ self.send_error("invalid command: %s", cmd_name)
+ return
+ self.last_received_command = self.clock.time_msec()
+ inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
+ cmd_cls = COMMAND_MAP[cmd_name]
+ try:
+ cmd = cmd_cls.from_line(rest_of_line)
+ except Exception as e:
+ logger.exception(
+ "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
+ )
+ self.send_error(
+ "failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
+ )
+ return
+ # Now lets try and call on_<CMD_NAME> function
+ try:
+ getattr(self, "on_%s" % (cmd_name,))(cmd)
+ except Exception:
+ logger.exception("[%s] Failed to handle line: %r", self.id(), line)
+ def close(self):
+ self.time_we_closed = self.clock.time_msec()
+ self.transport.loseConnection()
+ self.on_connection_closed()
+ def send_error(self, error_string, *args):
+ """Send an error to remote and close the connection.
+ """
+ self.send_command(ErrorCommand(error_string % args))
+ self.close()
+ def send_command(self, cmd, do_buffer=True):
+ """Send a command if connection has been established.
+ Args:
+ cmd (Command)
+ do_buffer (bool): Whether to buffer the message or always attempt
+ to send the command. This is mostly used to send an error
+ message if we're about to close the connection due our buffers
+ becoming full.
+ """
+ if self.state == ConnectionStates.CLOSED:
+ logger.info("[%s] Not sending, connection closed", self.id())
+ return
+ if do_buffer and self.state != ConnectionStates.ESTABLISHED:
+ self._queue_command(cmd)
+ return
+ outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
+ string = "%s %s" % (cmd.NAME, cmd.to_line(),)
+ if "\n" in string:
+ raise Exception("Unexpected newline in command: %r", string)
+ self.sendLine(string.encode("utf-8"))
+ self.last_sent_command = self.clock.time_msec()
+ def _queue_command(self, cmd):
+ """Queue the command until the connection is ready to write to again.
+ """
+ logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
+ self.pending_commands.append(cmd)
+ if len(self.pending_commands) > self.max_line_buffer:
+ # The other side is failing to keep up and out buffers are becoming
+ # full, so lets close the connection.
+ # XXX: should we squawk more loudly?
+ logger.error("[%s] Remote failed to keep up", self.id())
+ self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False)
+ self.close()
+ def _send_pending_commands(self):
+ """Send any queued commandes
+ """
+ pending = self.pending_commands
+ self.pending_commands = []
+ for cmd in pending:
+ self.send_command(cmd)
+ def on_PING(self, line):
+ self.received_ping = True
+ def on_ERROR(self, cmd):
+ logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
+ def pauseProducing(self):
+ """This is called when both the kernel send buffer and the twisted
+ tcp connection send buffers have become full.
+ We don't actually have any control over those sizes, so we buffer some
+ commands ourselves before knifing the connection due to the remote
+ failing to keep up.
+ """
+ logger.info("[%s] Pause producing", self.id())
+ self.state = ConnectionStates.PAUSED
+ def resumeProducing(self):
+ """The remote has caught up after we started buffering!
+ """
+ logger.info("[%s] Resume producing", self.id())
+ self.state = ConnectionStates.ESTABLISHED
+ self._send_pending_commands()
+ def stopProducing(self):
+ """We're never going to send any more data (normally because either
+ we or the remote has closed the connection)
+ """
+ logger.info("[%s] Stop producing", self.id())
+ self.on_connection_closed()
+ def connectionLost(self, reason):
+ logger.info("[%s] Replication connection closed: %r", self.id(), reason)
+ try:
+ # Remove us from list of connections to be monitored
+ connected_connections.remove(self)
+ except ValueError:
+ pass
+ # Stop the looping call sending pings.
+ if self._send_ping_loop and self._send_ping_loop.running:
+ self._send_ping_loop.stop()
+ self.on_connection_closed()
+ def on_connection_closed(self):
+ logger.info("[%s] Connection was closed", self.id())
+ self.state = ConnectionStates.CLOSED
+ self.pending_commands = []
+ if self.transport:
+ self.transport.unregisterProducer()
+ def __str__(self):
+ return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
+ self.name, self.conn_id, self.addr,
+ )
+ def id(self):
+ return "%s-%s" % (self.name, self.conn_id)
+class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
+ def __init__(self, server_name, clock, streamer, addr):
+ BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
+ self.server_name = server_name
+ self.streamer = streamer
+ self.addr = addr
+ # The streams the client has subscribed to and is up to date with
+ self.replication_streams = set()
+ # The streams the client is currently subscribing to.
+ self.connecting_streams = set()
+ # Map from stream name to list of updates to send once we've finished
+ # subscribing the client to the stream.
+ self.pending_rdata = {}
+ def connectionMade(self):
+ self.send_command(ServerCommand(self.server_name))
+ BaseReplicationStreamProtocol.connectionMade(self)
+ self.streamer.new_connection(self)
+ def on_NAME(self, cmd):
+ self.name = cmd.data
+ def on_USER_SYNC(self, cmd):
+ self.streamer.on_user_sync(
+ self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
+ )
+ def on_REPLICATE(self, cmd):
+ stream_name = cmd.stream_name
+ token = cmd.token
+ if stream_name == "ALL":
+ # Subscribe to all streams we're publishing to.
+ for stream in self.streamer.streams_by_name.iterkeys():
+ self.subscribe_to_stream(stream, token)
+ else:
+ self.subscribe_to_stream(stream_name, token)
+ def on_FEDERATION_ACK(self, cmd):
+ self.streamer.federation_ack(cmd.token)
+ def on_REMOVE_PUSHER(self, cmd):
+ self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
+ def onINVALIDATE_CACHE(self, cmd):
+ self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+ @defer.inlineCallbacks
+ def subscribe_to_stream(self, stream_name, token):
+ """Subscribe the remote to a streams.
+ This invloves checking if they've missed anything and sending those
+ updates down if they have. During that time new updates for the stream
+ are queued and sent once we've sent down any missed updates.
+ """
+ self.replication_streams.discard(stream_name)
+ self.connecting_streams.add(stream_name)
+ try:
+ # Get missing updates
+ updates, current_token = yield self.streamer.get_stream_updates(
+ stream_name, token,
+ )
+ # Send all the missing updates
+ for update in updates:
+ token, row = update[0], update[1]
+ self.send_command(RdataCommand(stream_name, token, row))
+ # Now we can send any updates that came in while we were subscribing
+ pending_rdata = self.pending_rdata.pop(stream_name, [])
+ for token, update in pending_rdata:
+ self.send_command(RdataCommand(stream_name, token, update))
+ # We send a POSITION command to ensure that they have an up to
+ # date token (especially useful if we didn't send any updates
+ # above)
+ self.send_command(PositionCommand(stream_name, current_token))
+ # They're now fully subscribed
+ self.replication_streams.add(stream_name)
+ except Exception as e:
+ logger.exception("[%s] Failed to handle REPLICATE command", self.id())
+ self.send_error("failed to handle replicate: %r", e)
+ finally:
+ self.connecting_streams.discard(stream_name)
+ def stream_update(self, stream_name, token, data):
+ """Called when a new update is available to stream to clients.
+ We need to check if the client is interested in the stream or not
+ """
+ if stream_name in self.replication_streams:
+ # The client is subscribed to the stream
+ self.send_command(RdataCommand(stream_name, token, data))
+ elif stream_name in self.connecting_streams:
+ # The client is being subscribed to the stream
+ logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
+ self.pending_rdata.setdefault(stream_name, []).append((token, data))
+ else:
+ # The client isn't subscribed
+ logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token)
+ def send_sync(self, data):
+ self.send_command(SyncCommand(data))
+ def on_connection_closed(self):
+ BaseReplicationStreamProtocol.on_connection_closed(self)
+ logger.info("[%s] Replication connection closed", self.id())
+ self.streamer.lost_connection(self)
+class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
+ def __init__(self, client_name, server_name, clock, handler):
+ BaseReplicationStreamProtocol.__init__(self, clock)
+ self.client_name = client_name
+ self.server_name = server_name
+ self.handler = handler
+ # Map of stream to batched updates. See RdataCommand for info on how
+ # batching works.
+ self.pending_batches = {}
+ def connectionMade(self):
+ self.send_command(NameCommand(self.client_name))
+ BaseReplicationStreamProtocol.connectionMade(self)
+ # Once we've connected subscribe to the necessary streams
+ for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
+ self.replicate(stream_name, token)
+ # Tell the server if we have any users currently syncing (should only
+ # happen on synchrotrons)
+ currently_syncing = self.handler.get_currently_syncing_users()
+ now = self.clock.time_msec()
+ for user_id in currently_syncing:
+ self.send_command(UserSyncCommand(user_id, True, now))
+ # We've now finished connecting to so inform the client handler
+ self.handler.update_connection(self)
+ def on_SERVER(self, cmd):
+ if cmd.data != self.server_name:
+ logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
+ self.transport.abortConnection()
+ def on_RDATA(self, cmd):
+ try:
+ row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
+ except Exception:
+ logger.exception(
+ "[%s] Failed to parse RDATA: %r %r",
+ self.id(), cmd.stream_name, cmd.row
+ )
+ raise
+ if cmd.token is None:
+ # I.e. this is part of a batch of updates for this stream. Batch
+ # until we get an update for the stream with a non None token
+ self.pending_batches.setdefault(cmd.stream_name, []).append(row)
+ else:
+ # Check if this is the last of a batch of updates
+ rows = self.pending_batches.pop(cmd.stream_name, [])
+ rows.append(row)
+ self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
+ def on_POSITION(self, cmd):
+ self.handler.on_position(cmd.stream_name, cmd.token)
+ def on_SYNC(self, cmd):
+ self.handler.on_sync(cmd.data)
+ def replicate(self, stream_name, token):
+ """Send the subscription request to the server
+ """
+ if stream_name not in STREAMS_MAP:
+ raise Exception("Invalid stream name %r" % (stream_name,))
+ logger.info(
+ "[%s] Subscribing to replication stream: %r from %r",
+ self.id(), stream_name, token
+ )
+ self.send_command(ReplicateCommand(stream_name, token))
+ def on_connection_closed(self):
+ BaseReplicationStreamProtocol.on_connection_closed(self)
+ self.handler.update_connection(None)
+# The following simply registers metrics for the replication connections
+ "pending_commands",
+ lambda: {
+ (p.name, p.conn_id): len(p.pending_commands)
+ for p in connected_connections
+ },
+ labels=["name", "conn_id"],
+def transport_buffer_size(protocol):
+ if protocol.transport:
+ size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen
+ return size
+ return 0
+ "transport_send_buffer",
+ lambda: {
+ (p.name, p.conn_id): transport_buffer_size(p)
+ for p in connected_connections
+ },
+ labels=["name", "conn_id"],
+def transport_kernel_read_buffer_size(protocol, read=True):
+ SIOCINQ = 0x541B
+ SIOCOUTQ = 0x5411
+ if protocol.transport:
+ fileno = protocol.transport.getHandle().fileno()
+ if read:
+ op = SIOCINQ
+ else:
+ size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0]
+ return size
+ return 0
+ "transport_kernel_send_buffer",
+ lambda: {
+ (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
+ for p in connected_connections
+ },
+ labels=["name", "conn_id"],
+ "transport_kernel_read_buffer",
+ lambda: {
+ (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
+ for p in connected_connections
+ },
+ labels=["name", "conn_id"],
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
new file mode 100644
index 0000000000..0d7ea57318
--- /dev/null
+++ b/synapse/replication/tcp/resource.py
@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""The server side of the replication stream.
+from twisted.internet import defer, reactor
+from twisted.internet.protocol import Factory
+from streams import STREAMS_MAP, FederationStream
+from protocol import ServerReplicationStreamProtocol
+from synapse.util.metrics import Measure, measure_func
+import logging
+import synapse.metrics
+metrics = synapse.metrics.get_metrics_for(__name__)
+stream_updates_counter = metrics.register_counter(
+ "stream_updates", labels=["stream_name"]
+user_sync_counter = metrics.register_counter("user_sync")
+federation_ack_counter = metrics.register_counter("federation_ack")
+remove_pusher_counter = metrics.register_counter("remove_pusher")
+invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+logger = logging.getLogger(__name__)
+class ReplicationStreamProtocolFactory(Factory):
+ """Factory for new replication connections.
+ """
+ def __init__(self, hs):
+ self.streamer = ReplicationStreamer(hs)
+ self.clock = hs.get_clock()
+ self.server_name = hs.config.server_name
+ def buildProtocol(self, addr):
+ return ServerReplicationStreamProtocol(
+ self.server_name,
+ self.clock,
+ self.streamer,
+ addr
+ )
+class ReplicationStreamer(object):
+ """Handles replication connections.
+ This needs to be poked when new replication data may be available. When new
+ data is available it will propagate to all connected clients.
+ """
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.presence_handler = hs.get_presence_handler()
+ self.clock = hs.get_clock()
+ # Current connections.
+ self.connections = []
+ metrics.register_callback("total_connections", lambda: len(self.connections))
+ # List of streams that clients can subscribe to.
+ # We only support federation stream if federation sending hase been
+ # disabled on the master.
+ self.streams = [
+ stream(hs) for stream in STREAMS_MAP.itervalues()
+ if stream != FederationStream or not hs.config.send_federation
+ ]
+ self.streams_by_name = {stream.NAME: stream for stream in self.streams}
+ metrics.register_callback(
+ "connections_per_stream",
+ lambda: {
+ (stream_name,): len([
+ conn for conn in self.connections
+ if stream_name in conn.replication_streams
+ ])
+ for stream_name in self.streams_by_name
+ },
+ labels=["stream_name"],
+ )
+ self.federation_sender = None
+ if not hs.config.send_federation:
+ self.federation_sender = hs.get_federation_sender()
+ hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+ # Keeps track of whether we are currently checking for updates
+ self.is_looping = False
+ self.pending_updates = False
+ reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+ def on_shutdown(self):
+ # close all connections on shutdown
+ for conn in self.connections:
+ conn.send_error("server shutting down")
+ @defer.inlineCallbacks
+ def on_notifier_poke(self):
+ """Checks if there is actually any new data and sends it to the
+ connections if there are.
+ This should get called each time new data is available, even if it
+ is currently being executed, so that nothing gets missed
+ """
+ if not self.connections:
+ # Don't bother if nothing is listening. We still need to advance
+ # the stream tokens otherwise they'll fall beihind forever
+ for stream in self.streams:
+ stream.advance_current_token()
+ return
+ # If we're in the process of checking for new updates, mark that fact
+ # and return
+ if self.is_looping:
+ logger.debug("Noitifier poke loop already running")
+ self.pending_updates = True
+ return
+ self.pending_updates = True
+ self.is_looping = True
+ try:
+ # Keep looping while there have been pokes about potential updates.
+ # This protects against the race where a stream we already checked
+ # gets an update while we're handling other streams.
+ while self.pending_updates:
+ self.pending_updates = False
+ with Measure(self.clock, "repl.stream.get_updates"):
+ # First we tell the streams that they should update their
+ # current tokens.
+ for stream in self.streams:
+ stream.advance_current_token()
+ for stream in self.streams:
+ if stream.last_token == stream.upto_token:
+ continue
+ logger.debug(
+ "Getting stream: %s: %s -> %s",
+ stream.NAME, stream.last_token, stream.upto_token
+ )
+ updates, current_token = yield stream.get_updates()
+ logger.debug(
+ "Sending %d updates to %d connections",
+ len(updates), len(self.connections),
+ )
+ if updates:
+ logger.info(
+ "Streaming: %s -> %s", stream.NAME, updates[-1][0]
+ )
+ stream_updates_counter.inc_by(len(updates), stream.NAME)
+ # Some streams return multiple rows with the same stream IDs,
+ # we need to make sure they get sent out in batches. We do
+ # this by setting the current token to all but the last of
+ # a series of updates with the same token to have a None
+ # token. See RdataCommand for more details.
+ batched_updates = _batch_updates(updates)
+ for conn in self.connections:
+ for token, row in batched_updates:
+ try:
+ conn.stream_update(stream.NAME, token, row)
+ except Exception:
+ logger.exception("Failed to replicate")
+ logger.debug("No more pending updates, breaking poke loop")
+ finally:
+ self.pending_updates = False
+ self.is_looping = False
+ @measure_func("repl.get_stream_updates")
+ def get_stream_updates(self, stream_name, token):
+ """For a given stream get all updates since token. This is called when
+ a client first subscribes to a stream.
+ """
+ stream = self.streams_by_name.get(stream_name, None)
+ if not stream:
+ raise Exception("unknown stream %s", stream_name)
+ return stream.get_updates_since(token)
+ @measure_func("repl.federation_ack")
+ def federation_ack(self, token):
+ """We've received an ack for federation stream from a client.
+ """
+ federation_ack_counter.inc()
+ if self.federation_sender:
+ self.federation_sender.federation_ack(token)
+ @measure_func("repl.on_user_sync")
+ def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
+ """A client has started/stopped syncing on a worker.
+ """
+ user_sync_counter.inc()
+ self.presence_handler.update_external_syncs_row(
+ conn_id, user_id, is_syncing, last_sync_ms,
+ )
+ @measure_func("repl.on_remove_pusher")
+ @defer.inlineCallbacks
+ def on_remove_pusher(self, app_id, push_key, user_id):
+ """A client has asked us to remove a pusher
+ """
+ remove_pusher_counter.inc()
+ yield self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id=app_id, pushkey=push_key, user_id=user_id
+ )
+ self.notifier.on_new_replication_data()
+ @measure_func("repl.on_invalidate_cache")
+ def on_invalidate_cache(self, cache_func, keys):
+ """The client has asked us to invalidate a cache
+ """
+ invalidate_cache_counter.inc()
+ getattr(self.store, cache_func).invalidate(tuple(keys))
+ def send_sync_to_all_connections(self, data):
+ """Sends a SYNC command to all clients.
+ Used in tests.
+ """
+ for conn in self.connections:
+ conn.send_sync(data)
+ def new_connection(self, connection):
+ """A new client connection has been established
+ """
+ self.connections.append(connection)
+ def lost_connection(self, connection):
+ """A client connection has been lost
+ """
+ try:
+ self.connections.remove(connection)
+ except ValueError:
+ pass
+ # We need to tell the presence handler that the connection has been
+ # lost so that it can handle any ongoing syncs on that connection.
+ self.presence_handler.update_external_syncs_clear(connection.conn_id)
+def _batch_updates(updates):
+ """Takes a list of updates of form [(token, row)] and sets the token to
+ None for all rows where the next row has the same token. This is used to
+ implement batching.
+ For example:
+ [(1, _), (1, _), (2, _), (3, _), (3, _)]
+ becomes:
+ [(None, _), (1, _), (2, _), (None, _), (3, _)]
+ """
+ if not updates:
+ return []
+ new_updates = []
+ for i, update in enumerate(updates[:-1]):
+ if update[0] == updates[i + 1][0]:
+ new_updates.append((None, update[1]))
+ else:
+ new_updates.append(update)
+ new_updates.append(updates[-1])
+ return new_updates
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
new file mode 100644
index 0000000000..fada40c6ef
--- /dev/null
+++ b/synapse/replication/tcp/streams.py
@@ -0,0 +1,409 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+Each stream is defined by the following information:
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+from twisted.internet import defer
+from collections import namedtuple
+import logging
+logger = logging.getLogger(__name__)
+EventStreamRow = namedtuple("EventStreamRow",
+ ("event_id", "room_id", "type", "state_key", "redacts"))
+BackfillStreamRow = namedtuple("BackfillStreamRow",
+ ("event_id", "room_id", "type", "state_key", "redacts"))
+PresenceStreamRow = namedtuple("PresenceStreamRow",
+ ("user_id", "state", "last_active_ts",
+ "last_federation_update_ts", "last_user_sync_ts",
+ "status_msg", "currently_active"))
+TypingStreamRow = namedtuple("TypingStreamRow",
+ ("room_id", "user_ids"))
+ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
+ ("room_id", "receipt_type", "user_id", "event_id",
+ "data"))
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
+PushersStreamRow = namedtuple("PushersStreamRow",
+ ("user_id", "app_id", "pushkey", "deleted",))
+CachesStreamRow = namedtuple("CachesStreamRow",
+ ("cache_func", "keys", "invalidation_ts",))
+PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
+ ("room_id", "visibility", "appservice_id",
+ "network_id",))
+DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
+FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
+TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
+ ("user_id", "room_id", "data"))
+AccountDataStreamRow = namedtuple("AccountDataStream",
+ ("user_id", "room_id", "data_type", "data"))
+class Stream(object):
+ """Base class for the streams.
+ Provides a `get_updates()` function that returns new updates since the last
+ time it was called up until the point `advance_current_token` was called.
+ """
+ NAME = None # The name of the stream
+ ROW_TYPE = None # The type of the row
+ _LIMITED = True # Whether the update function takes a limit
+ def __init__(self, hs):
+ # The token from which we last asked for updates
+ self.last_token = self.current_token()
+ # The token that we will get updates up to
+ self.upto_token = self.current_token()
+ def advance_current_token(self):
+ """Updates `upto_token` to "now", which updates up until which point
+ get_updates[_since] will fetch rows till.
+ """
+ self.upto_token = self.current_token()
+ @defer.inlineCallbacks
+ def get_updates(self):
+ """Gets all updates since the last time this function was called (or
+ since the stream was constructed if it hadn't been called before),
+ until the `upto_token`
+ Returns:
+ (list(ROW_TYPE), int): list of updates plus the token used as an
+ upper bound of the updates (i.e. the "current token")
+ """
+ updates, current_token = yield self.get_updates_since(self.last_token)
+ self.last_token = current_token
+ defer.returnValue((updates, current_token))
+ @defer.inlineCallbacks
+ def get_updates_since(self, from_token):
+ """Like get_updates except allows specifying from when we should
+ stream updates
+ Returns:
+ (list(ROW_TYPE), int): list of updates plus the token used as an
+ upper bound of the updates (i.e. the "current token")
+ """
+ if from_token in ("NOW", "now"):
+ defer.returnValue(([], self.upto_token))
+ current_token = self.upto_token
+ from_token = int(from_token)
+ if from_token == current_token:
+ defer.returnValue(([], current_token))
+ if self._LIMITED:
+ rows = yield self.update_function(
+ from_token, current_token,
+ limit=MAX_EVENTS_BEHIND + 1,
+ )
+ if len(rows) >= MAX_EVENTS_BEHIND:
+ raise Exception("stream %s has fallen behined" % (self.NAME))
+ else:
+ rows = yield self.update_function(
+ from_token, current_token,
+ )
+ updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+ defer.returnValue((updates, current_token))
+ def current_token(self):
+ """Gets the current token of the underlying streams. Should be provided
+ by the sub classes
+ Returns:
+ int
+ """
+ raise NotImplementedError()
+ def update_function(self, from_token, current_token, limit=None):
+ """Get updates between from_token and to_token. If Stream._LIMITED is
+ True then limit is provided, otherwise it's not.
+ Returns:
+ Deferred(list(tuple)): the first entry in the tuple is the token for
+ that update, and the rest of the tuple gets used to construct
+ a ``ROW_TYPE`` instance
+ """
+ raise NotImplementedError()
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+ ROW_TYPE = EventStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_events_token
+ self.update_function = store.get_all_new_forward_event_rows
+ super(EventsStream, self).__init__(hs)
+class BackfillStream(Stream):
+ """We fetched some old events and either we had never seen that event before
+ or it went from being an outlier to not.
+ """
+ NAME = "backfill"
+ ROW_TYPE = BackfillStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_backfill_token
+ self.update_function = store.get_all_new_backfill_event_rows
+ super(BackfillStream, self).__init__(hs)
+class PresenceStream(Stream):
+ NAME = "presence"
+ _LIMITED = False
+ ROW_TYPE = PresenceStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ presence_handler = hs.get_presence_handler()
+ self.current_token = store.get_current_presence_token
+ self.update_function = presence_handler.get_all_presence_updates
+ super(PresenceStream, self).__init__(hs)
+class TypingStream(Stream):
+ NAME = "typing"
+ _LIMITED = False
+ ROW_TYPE = TypingStreamRow
+ def __init__(self, hs):
+ typing_handler = hs.get_typing_handler()
+ self.current_token = typing_handler.get_current_token
+ self.update_function = typing_handler.get_all_typing_updates
+ super(TypingStream, self).__init__(hs)
+class ReceiptsStream(Stream):
+ NAME = "receipts"
+ ROW_TYPE = ReceiptsStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_max_receipt_stream_id
+ self.update_function = store.get_all_updated_receipts
+ super(ReceiptsStream, self).__init__(hs)
+class PushRulesStream(Stream):
+ """A user has changed their push rules
+ """
+ NAME = "push_rules"
+ ROW_TYPE = PushRulesStreamRow
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ super(PushRulesStream, self).__init__(hs)
+ def current_token(self):
+ push_rules_token, _ = self.store.get_push_rules_stream_token()
+ return push_rules_token
+ @defer.inlineCallbacks
+ def update_function(self, from_token, to_token, limit):
+ rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
+ defer.returnValue([(row[0], row[2]) for row in rows])
+class PushersStream(Stream):
+ """A user has added/changed/removed a pusher
+ """
+ NAME = "pushers"
+ ROW_TYPE = PushersStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_pushers_stream_token
+ self.update_function = store.get_all_updated_pushers_rows
+ super(PushersStream, self).__init__(hs)
+class CachesStream(Stream):
+ """A cache was invalidated on the master and no other stream would invalidate
+ the cache on the workers
+ """
+ NAME = "caches"
+ ROW_TYPE = CachesStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_cache_stream_token
+ self.update_function = store.get_all_updated_caches
+ super(CachesStream, self).__init__(hs)
+class PublicRoomsStream(Stream):
+ """The public rooms list changed
+ """
+ NAME = "public_rooms"
+ ROW_TYPE = PublicRoomsStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_public_room_stream_id
+ self.update_function = store.get_all_new_public_rooms
+ super(PublicRoomsStream, self).__init__(hs)
+class DeviceListsStream(Stream):
+ """Someone added/changed/removed a device
+ """
+ NAME = "device_lists"
+ _LIMITED = False
+ ROW_TYPE = DeviceListsStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_device_stream_token
+ self.update_function = store.get_all_device_list_changes_for_remotes
+ super(DeviceListsStream, self).__init__(hs)
+class ToDeviceStream(Stream):
+ """New to_device messages for a client
+ """
+ NAME = "to_device"
+ ROW_TYPE = ToDeviceStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_to_device_stream_token
+ self.update_function = store.get_all_new_device_messages
+ super(ToDeviceStream, self).__init__(hs)
+class FederationStream(Stream):
+ """Data to be sent over federation. Only available when master has federation
+ sending disabled.
+ """
+ NAME = "federation"
+ ROW_TYPE = FederationStreamRow
+ def __init__(self, hs):
+ federation_sender = hs.get_federation_sender()
+ self.current_token = federation_sender.get_current_token
+ self.update_function = federation_sender.get_replication_rows
+ super(FederationStream, self).__init__(hs)
+class TagAccountDataStream(Stream):
+ """Someone added/removed a tag for a room
+ """
+ NAME = "tag_account_data"
+ ROW_TYPE = TagAccountDataStreamRow
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_max_account_data_stream_id
+ self.update_function = store.get_all_updated_tags
+ super(TagAccountDataStream, self).__init__(hs)
+class AccountDataStream(Stream):
+ """Global or per room account data was changed
+ """
+ NAME = "account_data"
+ ROW_TYPE = AccountDataStreamRow
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.current_token = self.store.get_max_account_data_stream_id
+ super(AccountDataStream, self).__init__(hs)
+ @defer.inlineCallbacks
+ def update_function(self, from_token, to_token, limit):
+ global_results, room_results = yield self.store.get_all_updated_account_data(
+ from_token, from_token, to_token, limit
+ )
+ results = list(room_results)
+ results.extend(
+ (stream_id, user_id, None, account_data_type, content,)
+ for stream_id, user_id, account_data_type, content in global_results
+ )
+ defer.returnValue(results)
+ stream.NAME: stream
+ for stream in (
+ EventsStream,
+ BackfillStream,
+ PresenceStream,
+ TypingStream,
+ ReceiptsStream,
+ PushRulesStream,
+ PushersStream,
+ CachesStream,
+ PublicRoomsStream,
+ DeviceListsStream,
+ ToDeviceStream,
+ FederationStream,
+ TagAccountDataStream,
+ AccountDataStream,
+ )
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 53e36791d5..c8d5f5ba8b 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore):
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows))
- def get_all_device_list_changes_for_remotes(self, from_key):
+ def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
combined list of changes to devices, and which destinations need to be
poked. `destination` may be None if no destinations need to be poked.
@@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore):
sql = """
SELECT stream_id, user_id, destination FROM device_lists_stream
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
- WHERE stream_id > ?
+ WHERE ? < stream_id AND stream_id <= ?
return self._execute(
"get_all_device_list_changes_for_remotes", None,
- sql, from_key,
+ sql, from_key, to_key
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3f6833fad2..64fe937bdc 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1771,6 +1771,94 @@ class EventsStore(SQLBaseStore):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
+ def get_current_events_token(self):
+ """The current maximum token that events have reached"""
+ return self._stream_id_gen.get_current_token()
+ def get_all_new_forward_event_rows(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+ def get_all_new_forward_event_rows(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ new_event_updates = txn.fetchall()
+ if len(new_event_updates) == limit:
+ upper_bound = new_event_updates[-1][0]
+ else:
+ upper_bound = current_id
+ sql = (
+ "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (last_id, upper_bound))
+ new_event_updates.extend(txn)
+ return new_event_updates
+ return self.runInteraction(
+ "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ )
+ def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+ def get_all_new_backfill_event_rows(txn):
+ sql = (
+ "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (-last_id, -current_id, limit))
+ new_event_updates = txn.fetchall()
+ if len(new_event_updates) == limit:
+ upper_bound = new_event_updates[-1][0]
+ else:
+ upper_bound = current_id
+ sql = (
+ "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (-last_id, -upper_bound))
+ new_event_updates.extend(txn.fetchall())
+ return new_event_updates
+ return self.runInteraction(
+ "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
+ )
@cached(num_args=5, max_entries=10)
def get_all_new_events(self, last_backfill_id, last_forward_id,
current_backfill_id, current_forward_id, limit):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8cc9f0353b..34d2f82b7f 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
+ def get_all_updated_pushers_rows(self, last_id, current_id, limit):
+ """Get all the pushers that have changed between the given tokens.
+ Returns:
+ Deferred(list(tuple)): each tuple consists of:
+ stream_id (str)
+ user_id (str)
+ app_id (str)
+ pushkey (str)
+ was_deleted (bool): whether the pusher was added/updated (False)
+ or deleted (True)
+ """
+ if last_id == current_id:
+ return defer.succeed([])
+ def get_all_updated_pushers_rows_txn(txn):
+ sql = (
+ "SELECT id, user_name, app_id, pushkey"
+ " FROM pushers"
+ " WHERE ? < id AND id <= ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ results = [list(row) + [False] for row in txn]
+ sql = (
+ "SELECT stream_id, user_id, app_id, pushkey"
+ " FROM deleted_pushers"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ results.extend(list(row) + [True] for row in txn)
+ results.sort() # Sort so that they're ordered by stream id
+ return results
+ return self.runInteraction(
+ "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
+ )
@cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator