From 8da6f0be480b67e6b917dc0e90a6f95d2dbd2311 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:58:26 +0100 Subject: Define the various streams we will replicate --- synapse/replication/tcp/streams.py | 409 +++++++++++++++++++++++++++++++++++++ 1 file changed, 409 insertions(+) create mode 100644 synapse/replication/tcp/streams.py (limited to 'synapse/replication/tcp/streams.py') diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py new file mode 100644 index 0000000000..07adf9412e --- /dev/null +++ b/synapse/replication/tcp/streams.py @@ -0,0 +1,409 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines all the valid streams that clients can subscribe to, and the format +of the rows returned by each stream. + +Each stream is defined by the following information: + + stream name: The name of the stream + row type: The type that is used to serialise/deserialse the row + current_token: The function that returns the current token for the stream + update_function: The function that returns a list of updates between two tokens +""" + +from twisted.internet import defer +from collections import namedtuple + +import logging + + +logger = logging.getLogger(__name__) + + +MAX_EVENTS_BEHIND = 10000 + + +EventStreamRow = namedtuple("EventStreamRow", + ("event_id", "room_id", "type", "state_key", "redacts")) +BackfillStreamRow = namedtuple("BackfillStreamRow", + ("event_id", "room_id", "type", "state_key", "redacts")) +PresenceStreamRow = namedtuple("PresenceStreamRow", + ("user_id", "state", "last_active_ts", + "last_federation_update_ts", "last_user_sync_ts", + "status_msg", "currently_active")) +TypingStreamRow = namedtuple("TypingStreamRow", + ("room_id", "user_ids")) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", + ("room_id", "receipt_type", "user_id", "event_id", + "data")) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) +PushersStreamRow = namedtuple("PushersStreamRow", + ("user_id", "app_id", "pushkey", "deleted",)) +CachesStreamRow = namedtuple("CachesStreamRow", + ("cache_func", "keys", "invalidation_ts",)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", + ("room_id", "visibility", "appservice_id", + "network_id",)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) +FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", + ("user_id", "room_id", "data")) +AccountDataStreamRow = namedtuple("AccountDataStream", + ("user_id", "room_id", "data_type", "data")) + + +class Stream(object): + """Base class for the streams. + + Provides a `get_updates()` function that returns new updates since the last + time it was called up until the point `advance_current_token` was called. + """ + NAME = None # The name of the stream + ROW_TYPE = None # The type of the row + _LIMITED = True # Whether the update function takes a limit + + def __init__(self, hs): + # The token from which we last asked for updates + self.last_token = self.current_token() + + # The token that we will get updates up to + self.upto_token = self.current_token() + + def advance_current_token(self): + """Updates `upto_token` to "now", which updates up until which point + get_updates[_since] will fetch rows till. + """ + self.upto_token = self.current_token() + + @defer.inlineCallbacks + def get_updates(self): + """Gets all updates since the last time this function was called (or + since the stream was constructed if it hadn't been called before), + until the `upto_token` + + Returns: + (list(ROW_TYPE), int): list of updates plus the token used as an + upper bound of the updates (i.e. the "current token") + """ + updates, current_token = yield self.get_updates_since(self.last_token) + self.last_token = current_token + + defer.returnValue((updates, current_token)) + + @defer.inlineCallbacks + def get_updates_since(self, from_token): + """Like get_updates except allows specifying from when we should + stream updates + + Returns: + (list(ROW_TYPE), int): list of updates plus the token used as an + upper bound of the updates (i.e. the "current token") + """ + if from_token in ("NOW", "now"): + defer.returnValue(([], self.upto_token)) + + current_token = self.upto_token + + from_token = int(from_token) + + if from_token == current_token: + defer.returnValue(([], current_token)) + + if self._LIMITED: + rows = yield self.update_function( + from_token, current_token, + limit=MAX_EVENTS_BEHIND + 1, + ) + + if len(rows) >= MAX_EVENTS_BEHIND: + raise Exception("stream %s has fallen behined" % (self.NAME)) + else: + rows = yield self.update_function( + from_token, current_token, + ) + + updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + + defer.returnValue((updates, current_token)) + + def current_token(self): + """Gets the current token of the underlying streams. Should be provided + by the sub classes + + Returns: + int + """ + raise NotImplementedError() + + def update_function(self, from_token, current_token, limit=None): + """Get updates between from_token and to_token. If Stream._LIMITED is + True then limit is provided, otherwise it's not. + + Returns: + list(tuple): the first entry in the tuple is the token for that + update, and the rest of the tuple gets used to construct + a ``ROW_TYPE`` instance + """ + raise NotImplementedError() + + +class EventsStream(Stream): + """We received a new event, or an event went from being an outlier to not + """ + NAME = "events" + ROW_TYPE = EventStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_events_token + self.update_function = store.get_all_new_forward_event_rows + + super(EventsStream, self).__init__(hs) + + +class BackfillStream(Stream): + """We fetched some old events and either we had never seen that event before + or it went from being an outlier to not. + """ + NAME = "backfill" + ROW_TYPE = BackfillStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_backfill_token + self.update_function = store.get_all_new_backfill_event_rows + + super(BackfillStream, self).__init__(hs) + + +class PresenceStream(Stream): + NAME = "presence" + _LIMITED = False + ROW_TYPE = PresenceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + presence_handler = hs.get_presence_handler() + + self.current_token = store.get_current_presence_token + self.update_function = presence_handler.get_all_presence_updates + + super(PresenceStream, self).__init__(hs) + + +class TypingStream(Stream): + NAME = "typing" + _LIMITED = False + ROW_TYPE = TypingStreamRow + + def __init__(self, hs): + typing_handler = hs.get_typing_handler() + + self.current_token = typing_handler.get_current_token + self.update_function = typing_handler.get_all_typing_updates + + super(TypingStream, self).__init__(hs) + + +class ReceiptsStream(Stream): + NAME = "receipts" + ROW_TYPE = ReceiptsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_receipt_stream_id + self.update_function = store.get_all_updated_receipts + + super(ReceiptsStream, self).__init__(hs) + + +class PushRulesStream(Stream): + """A user has changed their push rules + """ + NAME = "push_rules" + ROW_TYPE = PushRulesStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + super(PushRulesStream, self).__init__(hs) + + def current_token(self): + push_rules_token, _ = self.store.get_push_rules_stream_token() + return push_rules_token + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit) + defer.returnValue([(row[0], row[2]) for row in rows]) + + +class PushersStream(Stream): + """A user has added/changed/removed a pusher + """ + NAME = "pushers" + ROW_TYPE = PushersStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_pushers_stream_token + self.update_function = store.get_all_updated_pushers_rows + + super(PushersStream, self).__init__(hs) + + +class CachesStream(Stream): + """A cache was invalidated on the master and no other stream would invalidate + the cache on the workers + """ + NAME = "caches" + ROW_TYPE = CachesStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_cache_stream_token + self.update_function = store.get_all_updated_caches + + super(CachesStream, self).__init__(hs) + + +class PublicRoomsStream(Stream): + """The public rooms list changed + """ + NAME = "public_rooms" + ROW_TYPE = PublicRoomsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_current_public_room_stream_id + self.update_function = store.get_all_new_public_rooms + + super(PublicRoomsStream, self).__init__(hs) + + +class DeviceListsStream(Stream): + """Someone added/changed/removed a device + """ + NAME = "device_lists" + _LIMITED = False + ROW_TYPE = DeviceListsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_device_stream_token + self.update_function = store.get_all_device_list_changes_for_remotes + + super(DeviceListsStream, self).__init__(hs) + + +class ToDeviceStream(Stream): + """New to_device messages for a client + """ + NAME = "to_device" + ROW_TYPE = ToDeviceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_to_device_stream_token + self.update_function = store.get_all_new_device_messages + + super(ToDeviceStream, self).__init__(hs) + + +class FederationStream(Stream): + """Data to be sent over federation. Only available when master has federation + sending disabled. + """ + NAME = "federation" + ROW_TYPE = FederationStreamRow + + def __init__(self, hs): + federation_sender = hs.get_federation_sender() + + self.current_token = federation_sender.get_current_token + self.update_function = federation_sender.get_replication_rows + + super(FederationStream, self).__init__(hs) + + +class TagAccountDataStream(Stream): + """Someone added/removed a tag for a room + """ + NAME = "tag_account_data" + ROW_TYPE = TagAccountDataStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_account_data_stream_id + self.update_function = store.get_all_updated_tags + + super(TagAccountDataStream, self).__init__(hs) + + +class AccountDataStream(Stream): + """Global or per room account data was changed + """ + NAME = "account_data" + ROW_TYPE = AccountDataStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + + self.current_token = self.store.get_max_account_data_stream_id + + super(AccountDataStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + global_results, room_results = yield self.store.get_all_updated_account_data( + from_token, from_token, to_token, limit + ) + + results = list(room_results) + results.extend( + (stream_id, user_id, None, account_data_type, content,) + for stream_id, user_id, account_data_type, content in global_results + ) + + defer.returnValue(results) + + +STREAMS_MAP = { + stream.NAME: stream + for stream in ( + EventsStream, + BackfillStream, + PresenceStream, + TypingStream, + ReceiptsStream, + PushRulesStream, + PushersStream, + CachesStream, + PublicRoomsStream, + DeviceListsStream, + ToDeviceStream, + FederationStream, + TagAccountDataStream, + AccountDataStream, + ) +} -- cgit 1.4.1 From bfcf016714575edb0ad2c19b2f00694d62ca08ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:19:24 +0100 Subject: Fix up docs --- docs/tcp_replication.rst | 16 ++++++++-------- synapse/replication/tcp/__init__.py | 18 +----------------- synapse/replication/tcp/streams.py | 4 ++-- synapse/storage/pusher.py | 2 +- 4 files changed, 12 insertions(+), 28 deletions(-) (limited to 'synapse/replication/tcp/streams.py') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index be0aa6b28c..7393527f6f 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -1,20 +1,20 @@ TCP Replication =============== -This describes the TCP replication protocol that replaces the HTTP protocol. - Motivation ---------- -The HTTP API used long poll from the workers to the master, this has the problem -of causing a lot of duplicate work on the server. This TCP protocol aims to -solve. +Previously the workers used an HTTP long poll mechanism to get updates from the +master, which had the problem of causing a lot of duplicate work on the server. +This TCP protocol replaces those APIs with the aim of increased efficiency. + + Overview -------- The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: +would be (where '>' indicates master to worker and '<' worker to master flows):: > SERVER example.com < REPLICATE events 53 @@ -24,7 +24,7 @@ would be (where '>' indicates master->worker and '<' worker->master flows):: The example shows the server accepting a new connection and sending its identity with the ``SERVER`` command, followed by the client asking to subscribe to the ``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` -commands which have the format ``RDATA ```, where the +commands which have the format ``RDATA ``, where the format of ```` is defined by the individual streams. Error reporting happens by either the client or server sending an `ERROR` @@ -125,7 +125,7 @@ recovers it can reconnect to the server and ask for missed messages. Reliability ~~~~~~~~~~~ -In general the replication stream should be consisdered an unreliable transport +In general the replication stream should be considered an unreliable transport since e.g. commands are not resent if the connection disappears. The exception to that are the replication streams, i.e. RDATA commands, since diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 8b4e886d4e..81c2ea7ee9 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -16,22 +16,7 @@ """This module implements the TCP replication protocol used by synapse to communicate between the master process and its workers (when they're enabled). -The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: - - > SERVER example.com - < REPLICATE events 53 - > RDATA events 54 ["$foo1:bar.com", ...] - > RDATA events 55 ["$foo4:bar.com", ...] - -The example shows the server accepting a new connection and sending its identity -with the `SERVER` command, followed by the client asking to subscribe to the -`events` stream from the token `53`. The server then periodically sends `RDATA` -commands which have the format `RDATA `, where the -format of `` is defined by the individual streams. - -Error reporting happens by either the client or server sending an `ERROR` -command, and usually the connection will be closed. +Further details can be found in docs/tcp_replication.rst Structure of the module: @@ -42,5 +27,4 @@ Structure of the module: * resource.py - the server classes that accepts and handle client connections * streams.py - the definitons of all the valid streams -Further details can be found in docs/tcp_replication.rst """ diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 07adf9412e..fada40c6ef 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -154,8 +154,8 @@ class Stream(object): True then limit is provided, otherwise it's not. Returns: - list(tuple): the first entry in the tuple is the token for that - update, and the rest of the tuple gets used to construct + Deferred(list(tuple)): the first entry in the tuple is the token for + that update, and the rest of the tuple gets used to construct a ``ROW_TYPE`` instance """ raise NotImplementedError() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 715c8bef24..34d2f82b7f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -139,7 +139,7 @@ class PusherStore(SQLBaseStore): """Get all the pushers that have changed between the given tokens. Returns: - list(tuple): each tuple consists of: + Deferred(list(tuple)): each tuple consists of: stream_id (str) user_id (str) app_id (str) -- cgit 1.4.1