From acaa18f7dd0d12dec5742e64705f9bf3c45abffe Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 27 Mar 2019 21:12:36 +0000 Subject: Fix/improve some docstrings in the replication code. (#4949) --- synapse/replication/tcp/client.py | 14 +++++++++++--- synapse/replication/tcp/streams.py | 12 ++++++++---- 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e558f90e1a..150975608f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -103,10 +103,18 @@ class ReplicationClientHandler(object): hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): - """Called when we get new replication data. By default this just pokes - the slave store. + """Called to handle a batch of replication data with a given stream token. - Can be overriden in subclasses to handle more. + By default this just pokes the slave store. Can be overriden in subclasses to + handle more. + + Args: + stream_name (str): name of the replication stream for this batch of rows + token (int): stream token for this batch of rows + rows (list): a list of Stream.ROW_TYPE objects. + + Returns: + Deferred|None """ logger.debug("Received rdata %s -> %s", stream_name, token) return self.store.process_replication_rows(stream_name, token, rows) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index e23084baae..42b8a25bd3 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -162,8 +162,10 @@ class Stream(object): until the `upto_token` Returns: - (list(ROW_TYPE), int): list of updates plus the token used as an - upper bound of the updates (i.e. the "current token") + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. """ updates, current_token = yield self.get_updates_since(self.last_token) self.last_token = current_token @@ -176,8 +178,10 @@ class Stream(object): stream updates Returns: - (list(ROW_TYPE), int): list of updates plus the token used as an - upper bound of the updates (i.e. the "current token") + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. """ if from_token in ("NOW", "now"): defer.returnValue(([], self.upto_token)) -- cgit 1.5.1 From a5798de06784470d941ddc8084655e9d0e038eb7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 09:58:42 +0000 Subject: Move replication.tcp.streams into a package --- synapse/app/federation_sender.py | 2 +- synapse/replication/tcp/streams.py | 514 ------------------------- synapse/replication/tcp/streams/__init__.py | 50 +++ synapse/replication/tcp/streams/_base.py | 482 +++++++++++++++++++++++ tests/replication/tcp/streams/test_receipts.py | 2 +- 5 files changed, 534 insertions(+), 516 deletions(-) delete mode 100644 synapse/replication/tcp/streams.py create mode 100644 synapse/replication/tcp/streams/__init__.py create mode 100644 synapse/replication/tcp/streams/_base.py (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 9711a7147c..1d43f2b075 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams import ReceiptsStream +from synapse.replication.tcp.streams._base import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.types import ReadReceipt diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py deleted file mode 100644 index 42b8a25bd3..0000000000 --- a/synapse/replication/tcp/streams.py +++ /dev/null @@ -1,514 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2017 Vector Creations Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Defines all the valid streams that clients can subscribe to, and the format -of the rows returned by each stream. - -Each stream is defined by the following information: - - stream name: The name of the stream - row type: The type that is used to serialise/deserialse the row - current_token: The function that returns the current token for the stream - update_function: The function that returns a list of updates between two tokens -""" -import itertools -import logging -from collections import namedtuple - -from twisted.internet import defer - -logger = logging.getLogger(__name__) - - -MAX_EVENTS_BEHIND = 10000 - - -EventStreamRow = namedtuple("EventStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) -BackfillStreamRow = namedtuple("BackfillStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) -PresenceStreamRow = namedtuple("PresenceStreamRow", ( - "user_id", # str - "state", # str - "last_active_ts", # int - "last_federation_update_ts", # int - "last_user_sync_ts", # int - "status_msg", # str - "currently_active", # bool -)) -TypingStreamRow = namedtuple("TypingStreamRow", ( - "room_id", # str - "user_ids", # list(str) -)) -ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( - "room_id", # str - "receipt_type", # str - "user_id", # str - "event_id", # str - "data", # dict -)) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( - "user_id", # str -)) -PushersStreamRow = namedtuple("PushersStreamRow", ( - "user_id", # str - "app_id", # str - "pushkey", # str - "deleted", # bool -)) -CachesStreamRow = namedtuple("CachesStreamRow", ( - "cache_func", # str - "keys", # list(str) - "invalidation_ts", # int -)) -PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( - "room_id", # str - "visibility", # str - "appservice_id", # str, optional - "network_id", # str, optional -)) -DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( - "user_id", # str - "destination", # str -)) -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( - "entity", # str -)) -FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str, the type of data as defined in the BaseFederationRows - "data", # dict, serialization of a federation.send_queue.BaseFederationRow -)) -TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( - "user_id", # str - "room_id", # str - "data", # dict -)) -AccountDataStreamRow = namedtuple("AccountDataStream", ( - "user_id", # str - "room_id", # str - "data_type", # str - "data", # dict -)) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) -GroupsStreamRow = namedtuple("GroupsStreamRow", ( - "group_id", # str - "user_id", # str - "type", # str - "content", # dict -)) - - -class Stream(object): - """Base class for the streams. - - Provides a `get_updates()` function that returns new updates since the last - time it was called up until the point `advance_current_token` was called. - """ - NAME = None # The name of the stream - ROW_TYPE = None # The type of the row - _LIMITED = True # Whether the update function takes a limit - - def __init__(self, hs): - # The token from which we last asked for updates - self.last_token = self.current_token() - - # The token that we will get updates up to - self.upto_token = self.current_token() - - def advance_current_token(self): - """Updates `upto_token` to "now", which updates up until which point - get_updates[_since] will fetch rows till. - """ - self.upto_token = self.current_token() - - def discard_updates_and_advance(self): - """Called when the stream should advance but the updates would be discarded, - e.g. when there are no currently connected workers. - """ - self.upto_token = self.current_token() - self.last_token = self.upto_token - - @defer.inlineCallbacks - def get_updates(self): - """Gets all updates since the last time this function was called (or - since the stream was constructed if it hadn't been called before), - until the `upto_token` - - Returns: - Deferred[Tuple[List[Tuple[int, Any]], int]: - Resolves to a pair ``(updates, current_token)``, where ``updates`` is a - list of ``(token, row)`` entries. ``row`` will be json-serialised and - sent over the replication steam. - """ - updates, current_token = yield self.get_updates_since(self.last_token) - self.last_token = current_token - - defer.returnValue((updates, current_token)) - - @defer.inlineCallbacks - def get_updates_since(self, from_token): - """Like get_updates except allows specifying from when we should - stream updates - - Returns: - Deferred[Tuple[List[Tuple[int, Any]], int]: - Resolves to a pair ``(updates, current_token)``, where ``updates`` is a - list of ``(token, row)`` entries. ``row`` will be json-serialised and - sent over the replication steam. - """ - if from_token in ("NOW", "now"): - defer.returnValue(([], self.upto_token)) - - current_token = self.upto_token - - from_token = int(from_token) - - if from_token == current_token: - defer.returnValue(([], current_token)) - - if self._LIMITED: - rows = yield self.update_function( - from_token, current_token, - limit=MAX_EVENTS_BEHIND + 1, - ) - - # never turn more than MAX_EVENTS_BEHIND + 1 into updates. - rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) - else: - rows = yield self.update_function( - from_token, current_token, - ) - - updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] - - # check we didn't get more rows than the limit. - # doing it like this allows the update_function to be a generator. - if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND: - raise Exception("stream %s has fallen behind" % (self.NAME)) - - defer.returnValue((updates, current_token)) - - def current_token(self): - """Gets the current token of the underlying streams. Should be provided - by the sub classes - - Returns: - int - """ - raise NotImplementedError() - - def update_function(self, from_token, current_token, limit=None): - """Get updates between from_token and to_token. If Stream._LIMITED is - True then limit is provided, otherwise it's not. - - Returns: - Deferred(list(tuple)): the first entry in the tuple is the token for - that update, and the rest of the tuple gets used to construct - a ``ROW_TYPE`` instance - """ - raise NotImplementedError() - - -class EventsStream(Stream): - """We received a new event, or an event went from being an outlier to not - """ - NAME = "events" - ROW_TYPE = EventStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_events_token - self.update_function = store.get_all_new_forward_event_rows - - super(EventsStream, self).__init__(hs) - - -class BackfillStream(Stream): - """We fetched some old events and either we had never seen that event before - or it went from being an outlier to not. - """ - NAME = "backfill" - ROW_TYPE = BackfillStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_backfill_token - self.update_function = store.get_all_new_backfill_event_rows - - super(BackfillStream, self).__init__(hs) - - -class PresenceStream(Stream): - NAME = "presence" - _LIMITED = False - ROW_TYPE = PresenceStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - presence_handler = hs.get_presence_handler() - - self.current_token = store.get_current_presence_token - self.update_function = presence_handler.get_all_presence_updates - - super(PresenceStream, self).__init__(hs) - - -class TypingStream(Stream): - NAME = "typing" - _LIMITED = False - ROW_TYPE = TypingStreamRow - - def __init__(self, hs): - typing_handler = hs.get_typing_handler() - - self.current_token = typing_handler.get_current_token - self.update_function = typing_handler.get_all_typing_updates - - super(TypingStream, self).__init__(hs) - - -class ReceiptsStream(Stream): - NAME = "receipts" - ROW_TYPE = ReceiptsStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_receipt_stream_id - self.update_function = store.get_all_updated_receipts - - super(ReceiptsStream, self).__init__(hs) - - -class PushRulesStream(Stream): - """A user has changed their push rules - """ - NAME = "push_rules" - ROW_TYPE = PushRulesStreamRow - - def __init__(self, hs): - self.store = hs.get_datastore() - super(PushRulesStream, self).__init__(hs) - - def current_token(self): - push_rules_token, _ = self.store.get_push_rules_stream_token() - return push_rules_token - - @defer.inlineCallbacks - def update_function(self, from_token, to_token, limit): - rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit) - defer.returnValue([(row[0], row[2]) for row in rows]) - - -class PushersStream(Stream): - """A user has added/changed/removed a pusher - """ - NAME = "pushers" - ROW_TYPE = PushersStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_pushers_stream_token - self.update_function = store.get_all_updated_pushers_rows - - super(PushersStream, self).__init__(hs) - - -class CachesStream(Stream): - """A cache was invalidated on the master and no other stream would invalidate - the cache on the workers - """ - NAME = "caches" - ROW_TYPE = CachesStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_cache_stream_token - self.update_function = store.get_all_updated_caches - - super(CachesStream, self).__init__(hs) - - -class PublicRoomsStream(Stream): - """The public rooms list changed - """ - NAME = "public_rooms" - ROW_TYPE = PublicRoomsStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_current_public_room_stream_id - self.update_function = store.get_all_new_public_rooms - - super(PublicRoomsStream, self).__init__(hs) - - -class DeviceListsStream(Stream): - """Someone added/changed/removed a device - """ - NAME = "device_lists" - _LIMITED = False - ROW_TYPE = DeviceListsStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_device_stream_token - self.update_function = store.get_all_device_list_changes_for_remotes - - super(DeviceListsStream, self).__init__(hs) - - -class ToDeviceStream(Stream): - """New to_device messages for a client - """ - NAME = "to_device" - ROW_TYPE = ToDeviceStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_to_device_stream_token - self.update_function = store.get_all_new_device_messages - - super(ToDeviceStream, self).__init__(hs) - - -class FederationStream(Stream): - """Data to be sent over federation. Only available when master has federation - sending disabled. - """ - NAME = "federation" - ROW_TYPE = FederationStreamRow - - def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token - self.update_function = federation_sender.get_replication_rows - - super(FederationStream, self).__init__(hs) - - -class TagAccountDataStream(Stream): - """Someone added/removed a tag for a room - """ - NAME = "tag_account_data" - ROW_TYPE = TagAccountDataStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_account_data_stream_id - self.update_function = store.get_all_updated_tags - - super(TagAccountDataStream, self).__init__(hs) - - -class AccountDataStream(Stream): - """Global or per room account data was changed - """ - NAME = "account_data" - ROW_TYPE = AccountDataStreamRow - - def __init__(self, hs): - self.store = hs.get_datastore() - - self.current_token = self.store.get_max_account_data_stream_id - - super(AccountDataStream, self).__init__(hs) - - @defer.inlineCallbacks - def update_function(self, from_token, to_token, limit): - global_results, room_results = yield self.store.get_all_updated_account_data( - from_token, from_token, to_token, limit - ) - - results = list(room_results) - results.extend( - (stream_id, user_id, None, account_data_type, content,) - for stream_id, user_id, account_data_type, content in global_results - ) - - defer.returnValue(results) - - -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - -class GroupServerStream(Stream): - NAME = "groups" - ROW_TYPE = GroupsStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_group_stream_token - self.update_function = store.get_all_groups_changes - - super(GroupServerStream, self).__init__(hs) - - -STREAMS_MAP = { - stream.NAME: stream - for stream in ( - EventsStream, - BackfillStream, - PresenceStream, - TypingStream, - ReceiptsStream, - PushRulesStream, - PushersStream, - CachesStream, - PublicRoomsStream, - DeviceListsStream, - ToDeviceStream, - FederationStream, - TagAccountDataStream, - AccountDataStream, - CurrentStateDeltaStream, - GroupServerStream, - ) -} diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py new file mode 100644 index 0000000000..1d5227971e --- /dev/null +++ b/synapse/replication/tcp/streams/__init__.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines all the valid streams that clients can subscribe to, and the format +of the rows returned by each stream. + +Each stream is defined by the following information: + + stream name: The name of the stream + row type: The type that is used to serialise/deserialse the row + current_token: The function that returns the current token for the stream + update_function: The function that returns a list of updates between two tokens +""" + +from . import _base + +STREAMS_MAP = { + stream.NAME: stream + for stream in ( + _base.EventsStream, + _base.BackfillStream, + _base.PresenceStream, + _base.TypingStream, + _base.ReceiptsStream, + _base.PushRulesStream, + _base.PushersStream, + _base.CachesStream, + _base.PublicRoomsStream, + _base.DeviceListsStream, + _base.ToDeviceStream, + _base.FederationStream, + _base.TagAccountDataStream, + _base.AccountDataStream, + _base.CurrentStateDeltaStream, + _base.GroupServerStream, + ) +} diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py new file mode 100644 index 0000000000..344c8ab916 --- /dev/null +++ b/synapse/replication/tcp/streams/_base.py @@ -0,0 +1,482 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import itertools +import logging +from collections import namedtuple + +from twisted.internet import defer + +logger = logging.getLogger(__name__) + + +MAX_EVENTS_BEHIND = 10000 + +EventStreamRow = namedtuple("EventStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +BackfillStreamRow = namedtuple("BackfillStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +PresenceStreamRow = namedtuple("PresenceStreamRow", ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool +)) +TypingStreamRow = namedtuple("TypingStreamRow", ( + "room_id", # str + "user_ids", # list(str) +)) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict +)) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( + "user_id", # str +)) +PushersStreamRow = namedtuple("PushersStreamRow", ( + "user_id", # str + "app_id", # str + "pushkey", # str + "deleted", # bool +)) +CachesStreamRow = namedtuple("CachesStreamRow", ( + "cache_func", # str + "keys", # list(str) + "invalidation_ts", # int +)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional +)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( + "user_id", # str + "destination", # str +)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( + "entity", # str +)) +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow +)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( + "user_id", # str + "room_id", # str + "data", # dict +)) +AccountDataStreamRow = namedtuple("AccountDataStream", ( + "user_id", # str + "room_id", # str + "data_type", # str + "data", # dict +)) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) + + +class Stream(object): + """Base class for the streams. + + Provides a `get_updates()` function that returns new updates since the last + time it was called up until the point `advance_current_token` was called. + """ + NAME = None # The name of the stream + ROW_TYPE = None # The type of the row + _LIMITED = True # Whether the update function takes a limit + + def __init__(self, hs): + # The token from which we last asked for updates + self.last_token = self.current_token() + + # The token that we will get updates up to + self.upto_token = self.current_token() + + def advance_current_token(self): + """Updates `upto_token` to "now", which updates up until which point + get_updates[_since] will fetch rows till. + """ + self.upto_token = self.current_token() + + def discard_updates_and_advance(self): + """Called when the stream should advance but the updates would be discarded, + e.g. when there are no currently connected workers. + """ + self.upto_token = self.current_token() + self.last_token = self.upto_token + + @defer.inlineCallbacks + def get_updates(self): + """Gets all updates since the last time this function was called (or + since the stream was constructed if it hadn't been called before), + until the `upto_token` + + Returns: + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. + """ + updates, current_token = yield self.get_updates_since(self.last_token) + self.last_token = current_token + + defer.returnValue((updates, current_token)) + + @defer.inlineCallbacks + def get_updates_since(self, from_token): + """Like get_updates except allows specifying from when we should + stream updates + + Returns: + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. + """ + if from_token in ("NOW", "now"): + defer.returnValue(([], self.upto_token)) + + current_token = self.upto_token + + from_token = int(from_token) + + if from_token == current_token: + defer.returnValue(([], current_token)) + + if self._LIMITED: + rows = yield self.update_function( + from_token, current_token, + limit=MAX_EVENTS_BEHIND + 1, + ) + + # never turn more than MAX_EVENTS_BEHIND + 1 into updates. + rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) + else: + rows = yield self.update_function( + from_token, current_token, + ) + + updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + + # check we didn't get more rows than the limit. + # doing it like this allows the update_function to be a generator. + if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND: + raise Exception("stream %s has fallen behind" % (self.NAME)) + + defer.returnValue((updates, current_token)) + + def current_token(self): + """Gets the current token of the underlying streams. Should be provided + by the sub classes + + Returns: + int + """ + raise NotImplementedError() + + def update_function(self, from_token, current_token, limit=None): + """Get updates between from_token and to_token. If Stream._LIMITED is + True then limit is provided, otherwise it's not. + + Returns: + Deferred(list(tuple)): the first entry in the tuple is the token for + that update, and the rest of the tuple gets used to construct + a ``ROW_TYPE`` instance + """ + raise NotImplementedError() + + +class EventsStream(Stream): + """We received a new event, or an event went from being an outlier to not + """ + NAME = "events" + ROW_TYPE = EventStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_events_token + self.update_function = store.get_all_new_forward_event_rows + + super(EventsStream, self).__init__(hs) + + +class BackfillStream(Stream): + """We fetched some old events and either we had never seen that event before + or it went from being an outlier to not. + """ + NAME = "backfill" + ROW_TYPE = BackfillStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_backfill_token + self.update_function = store.get_all_new_backfill_event_rows + + super(BackfillStream, self).__init__(hs) + + +class PresenceStream(Stream): + NAME = "presence" + _LIMITED = False + ROW_TYPE = PresenceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + presence_handler = hs.get_presence_handler() + + self.current_token = store.get_current_presence_token + self.update_function = presence_handler.get_all_presence_updates + + super(PresenceStream, self).__init__(hs) + + +class TypingStream(Stream): + NAME = "typing" + _LIMITED = False + ROW_TYPE = TypingStreamRow + + def __init__(self, hs): + typing_handler = hs.get_typing_handler() + + self.current_token = typing_handler.get_current_token + self.update_function = typing_handler.get_all_typing_updates + + super(TypingStream, self).__init__(hs) + + +class ReceiptsStream(Stream): + NAME = "receipts" + ROW_TYPE = ReceiptsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_receipt_stream_id + self.update_function = store.get_all_updated_receipts + + super(ReceiptsStream, self).__init__(hs) + + +class PushRulesStream(Stream): + """A user has changed their push rules + """ + NAME = "push_rules" + ROW_TYPE = PushRulesStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + super(PushRulesStream, self).__init__(hs) + + def current_token(self): + push_rules_token, _ = self.store.get_push_rules_stream_token() + return push_rules_token + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit) + defer.returnValue([(row[0], row[2]) for row in rows]) + + +class PushersStream(Stream): + """A user has added/changed/removed a pusher + """ + NAME = "pushers" + ROW_TYPE = PushersStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_pushers_stream_token + self.update_function = store.get_all_updated_pushers_rows + + super(PushersStream, self).__init__(hs) + + +class CachesStream(Stream): + """A cache was invalidated on the master and no other stream would invalidate + the cache on the workers + """ + NAME = "caches" + ROW_TYPE = CachesStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_cache_stream_token + self.update_function = store.get_all_updated_caches + + super(CachesStream, self).__init__(hs) + + +class PublicRoomsStream(Stream): + """The public rooms list changed + """ + NAME = "public_rooms" + ROW_TYPE = PublicRoomsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_current_public_room_stream_id + self.update_function = store.get_all_new_public_rooms + + super(PublicRoomsStream, self).__init__(hs) + + +class DeviceListsStream(Stream): + """Someone added/changed/removed a device + """ + NAME = "device_lists" + _LIMITED = False + ROW_TYPE = DeviceListsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_device_stream_token + self.update_function = store.get_all_device_list_changes_for_remotes + + super(DeviceListsStream, self).__init__(hs) + + +class ToDeviceStream(Stream): + """New to_device messages for a client + """ + NAME = "to_device" + ROW_TYPE = ToDeviceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_to_device_stream_token + self.update_function = store.get_all_new_device_messages + + super(ToDeviceStream, self).__init__(hs) + + +class FederationStream(Stream): + """Data to be sent over federation. Only available when master has federation + sending disabled. + """ + NAME = "federation" + ROW_TYPE = FederationStreamRow + + def __init__(self, hs): + federation_sender = hs.get_federation_sender() + + self.current_token = federation_sender.get_current_token + self.update_function = federation_sender.get_replication_rows + + super(FederationStream, self).__init__(hs) + + +class TagAccountDataStream(Stream): + """Someone added/removed a tag for a room + """ + NAME = "tag_account_data" + ROW_TYPE = TagAccountDataStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_account_data_stream_id + self.update_function = store.get_all_updated_tags + + super(TagAccountDataStream, self).__init__(hs) + + +class AccountDataStream(Stream): + """Global or per room account data was changed + """ + NAME = "account_data" + ROW_TYPE = AccountDataStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + + self.current_token = self.store.get_max_account_data_stream_id + + super(AccountDataStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + global_results, room_results = yield self.store.get_all_updated_account_data( + from_token, from_token, to_token, limit + ) + + results = list(room_results) + results.extend( + (stream_id, user_id, None, account_data_type, content,) + for stream_id, user_id, account_data_type, content in global_results + ) + + defer.returnValue(results) + + +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index 9aa9dfe82e..d5a99f6caa 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.tcp.streams import ReceiptsStreamRow +from synapse.replication.tcp.streams._base import ReceiptsStreamRow from tests.replication.tcp.streams._base import BaseStreamTestCase -- cgit 1.5.1 From aa1e0178641c5dfbc039cb547bcafe990222bf90 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 10:06:21 +0000 Subject: move EventsStream out to its own file --- synapse/replication/tcp/streams/__init__.py | 4 +-- synapse/replication/tcp/streams/_base.py | 21 --------------- synapse/replication/tcp/streams/events.py | 40 +++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 23 deletions(-) create mode 100644 synapse/replication/tcp/streams/events.py (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 1d5227971e..edad37aef8 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -25,12 +25,12 @@ Each stream is defined by the following information: update_function: The function that returns a list of updates between two tokens """ -from . import _base +from . import _base, events STREAMS_MAP = { stream.NAME: stream for stream in ( - _base.EventsStream, + events.EventsStream, _base.BackfillStream, _base.PresenceStream, _base.TypingStream, diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 344c8ab916..04e585f8f2 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -26,13 +26,6 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 10000 -EventStreamRow = namedtuple("EventStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) BackfillStreamRow = namedtuple("BackfillStreamRow", ( "event_id", # str "room_id", # str @@ -227,20 +220,6 @@ class Stream(object): raise NotImplementedError() -class EventsStream(Stream): - """We received a new event, or an event went from being an outlier to not - """ - NAME = "events" - ROW_TYPE = EventStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_events_token - self.update_function = store.get_all_new_forward_event_rows - - super(EventsStream, self).__init__(hs) - - class BackfillStream(Stream): """We fetched some old events and either we had never seen that event before or it went from being an outlier to not. diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py new file mode 100644 index 0000000000..511dd6bcc7 --- /dev/null +++ b/synapse/replication/tcp/streams/events.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from collections import namedtuple + +from ._base import Stream + +EventStreamRow = namedtuple("EventStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) + + +class EventsStream(Stream): + """We received a new event, or an event went from being an outlier to not + """ + NAME = "events" + ROW_TYPE = EventStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_events_token + self.update_function = store.get_all_new_forward_event_rows + + super(EventsStream, self).__init__(hs) -- cgit 1.5.1 From 71dcb275f1d65f2251b77684550b4d9e2a19aadc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 11:45:42 +0000 Subject: move FederationStream out to its own file --- synapse/replication/tcp/resource.py | 3 ++- synapse/replication/tcp/streams/__init__.py | 4 +-- synapse/replication/tcp/streams/_base.py | 20 -------------- synapse/replication/tcp/streams/federation.py | 39 +++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 23 deletions(-) create mode 100644 synapse/replication/tcp/streams/federation.py (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 7fc346c7b6..f6a38f5140 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure, measure_func from .protocol import ServerReplicationStreamProtocol -from .streams import STREAMS_MAP, FederationStream +from .streams import STREAMS_MAP +from .streams.federation import FederationStream stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index edad37aef8..5c715e3bfa 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -25,7 +25,7 @@ Each stream is defined by the following information: update_function: The function that returns a list of updates between two tokens """ -from . import _base, events +from . import _base, events, federation STREAMS_MAP = { stream.NAME: stream @@ -41,7 +41,7 @@ STREAMS_MAP = { _base.PublicRoomsStream, _base.DeviceListsStream, _base.ToDeviceStream, - _base.FederationStream, + federation.FederationStream, _base.TagAccountDataStream, _base.AccountDataStream, _base.CurrentStateDeltaStream, diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 04e585f8f2..18df89deed 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -80,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( "entity", # str )) -FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str, the type of data as defined in the BaseFederationRows - "data", # dict, serialization of a federation.send_queue.BaseFederationRow -)) TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( "user_id", # str "room_id", # str @@ -374,22 +370,6 @@ class ToDeviceStream(Stream): super(ToDeviceStream, self).__init__(hs) -class FederationStream(Stream): - """Data to be sent over federation. Only available when master has federation - sending disabled. - """ - NAME = "federation" - ROW_TYPE = FederationStreamRow - - def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token - self.update_function = federation_sender.get_replication_rows - - super(FederationStream, self).__init__(hs) - - class TagAccountDataStream(Stream): """Someone added/removed a tag for a room """ diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py new file mode 100644 index 0000000000..9aa43aa8d2 --- /dev/null +++ b/synapse/replication/tcp/streams/federation.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from collections import namedtuple + +from ._base import Stream + +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow +)) + + +class FederationStream(Stream): + """Data to be sent over federation. Only available when master has federation + sending disabled. + """ + NAME = "federation" + ROW_TYPE = FederationStreamRow + + def __init__(self, hs): + federation_sender = hs.get_federation_sender() + + self.current_token = federation_sender.get_current_token + self.update_function = federation_sender.get_replication_rows + + super(FederationStream, self).__init__(hs) -- cgit 1.5.1 From f570916a3eb4088500e966182dea82647a5acac2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 07:40:32 +0000 Subject: Add parse_row method to replication stream class This will allow individual stream classes to override how a row is parsed. --- synapse/replication/tcp/client.py | 5 +++-- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/streams/_base.py | 15 +++++++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 150975608f..206dc3b397 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -105,13 +105,14 @@ class ReplicationClientHandler(object): def on_rdata(self, stream_name, token, rows): """Called to handle a batch of replication data with a given stream token. - By default this just pokes the slave store. Can be overriden in subclasses to + By default this just pokes the slave store. Can be overridden in subclasses to handle more. Args: stream_name (str): name of the replication stream for this batch of rows token (int): stream token for this batch of rows - rows (list): a list of Stream.ROW_TYPE objects. + rows (list): a list of Stream.ROW_TYPE objects as returned by + Stream.parse_row. Returns: Deferred|None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 02e5bf6cc8..9daec2c995 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): inbound_rdata_count.labels(stream_name).inc() try: - row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].parse_row(cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 18df89deed..25c3a23664 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -115,6 +115,21 @@ class Stream(object): ROW_TYPE = None # The type of the row _LIMITED = True # Whether the update function takes a limit + @classmethod + def parse_row(cls, row): + """Parse a row received over replication + + By default, assumes that the row data is an array object and passes its contents + to the constructor of the ROW_TYPE for this stream. + + Args: + row: row data from the incoming RDATA command, after json decoding + + Returns: + ROW_TYPE object for this stream + """ + return cls.ROW_TYPE(*row) + def __init__(self, hs): # The token from which we last asked for updates self.last_token = self.current_token() -- cgit 1.5.1 From 015b3622ebbea118baebc457227e355913a5702f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 21:58:03 +0000 Subject: Skip building a ROW_TYPE when building updates We're about to turn it straight into a JSON object anyway so building a ROW_TYPE is a bit pointless, and reduces flexibility in the update_function. --- synapse/replication/tcp/streams/_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 25c3a23664..13ab1bee05 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -112,7 +112,7 @@ class Stream(object): time it was called up until the point `advance_current_token` was called. """ NAME = None # The name of the stream - ROW_TYPE = None # The type of the row + ROW_TYPE = None # The type of the row. Used by the default impl of parse_row. _LIMITED = True # Whether the update function takes a limit @classmethod @@ -201,7 +201,7 @@ class Stream(object): from_token, current_token, ) - updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + updates = [(row[0], row[1:]) for row in rows] # check we didn't get more rows than the limit. # doing it like this allows the update_function to be a generator. -- cgit 1.5.1 From 1f6d6f918a57183083c2dd1bba6179373102b918 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 15:18:28 +0000 Subject: Make EventStream rows have a type ... as a precursor to combining it with the CurrentStateDelta stream. --- synapse/app/synchrotron.py | 5 +- synapse/replication/slave/storage/events.py | 8 ++- synapse/replication/tcp/protocol.py | 4 +- synapse/replication/tcp/streams/events.py | 98 +++++++++++++++++++++++++---- 4 files changed, 98 insertions(+), 17 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9163b56d86..5388def28a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler): # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: - event = yield self.store.get_event(row.event_id) + if row.type != EventsStreamEventRow.TypeId: + continue + event = yield self.store.get_event(row.data.event_id) extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 4830c68f35..c57385d92f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,6 +16,7 @@ import logging from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore @@ -79,9 +80,12 @@ class SlavedEventStore(EventFederationWorkerStore, if stream_name == "events": self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamEventRow.TypeId: + continue + data = row.data self.invalidate_caches_for_event( - token, row.event_id, row.room_id, row.type, row.state_key, - row.redacts, + token, data.event_id, data.room_id, data.type, data.state_key, + data.redacts, backfilled=False, ) elif stream_name == "backfill": diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 9daec2c995..b51590cf8f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire:: > POSITION backfill 1 > POSITION caches 1 > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] - > RDATA events 14 ["$149019767112vOHxz:localhost:8823", - "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]] < PING 1490197675618 > ERROR server stopping * connection closed by server * diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 511dd6bcc7..928028e893 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,28 +13,102 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple + +import attr + +from twisted.internet import defer from ._base import Stream -EventStreamRow = namedtuple("EventStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) + +"""Handling of the 'events' replication stream + +This stream contains rows of various types. Each row therefore contains a 'type' +identifier before the real data. For example:: + + RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] + +An "ev" row is sent for each new event. The fields in the data part are: + + * The new event id + * The room id for the event + * The type of the new event + * The state key of the event, for state events + * The event id of an event which is redacted by this event. + +""" + + +@attr.s(slots=True, frozen=True) +class EventsStreamRow(object): + """A parsed row from the events replication stream""" + type = attr.ib() # str: the TypeId of one of the *EventsStreamRows + data = attr.ib() # BaseEventsStreamRow + + +class BaseEventsStreamRow(object): + """Base class for rows to be sent in the events stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + + @classmethod + def from_data(cls, data): + """Parse the data from the replication stream into a row. + + By default we just call the constructor with the data list as arguments + + Args: + data: The value of the data object from the replication stream + """ + return cls(*data) + + +@attr.s(slots=True, frozen=True) +class EventsStreamEventRow(BaseEventsStreamRow): + TypeId = "ev" + + event_id = attr.ib() # str + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str, optional + redacts = attr.ib() # str, optional + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + EventsStreamEventRow, + ) +} class EventsStream(Stream): """We received a new event, or an event went from being an outlier to not """ NAME = "events" - ROW_TYPE = EventStreamRow def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_events_token - self.update_function = store.get_all_new_forward_event_rows + self._store = hs.get_datastore() + self.current_token = self._store.get_current_events_token super(EventsStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, current_token, limit=None): + event_rows = yield self._store.get_all_new_forward_event_rows( + from_token, current_token, limit, + ) + event_updates = ( + (row[0], EventsStreamEventRow.TypeId, row[1:]) + for row in event_rows + ) + defer.returnValue(event_updates) + + @classmethod + def parse_row(cls, row): + (typ, data) = row + data = TypeToRow[typ].from_data(data) + return EventsStreamRow(typ, data) -- cgit 1.5.1 From 4b91c313a94a4a89998e097e79a96a4423cf1b9f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 16:15:59 +0000 Subject: Combine the CurrentStateDeltaStream into the EventStream --- synapse/app/user_dir.py | 17 +++++++++------ synapse/replication/tcp/streams/__init__.py | 1 - synapse/replication/tcp/streams/_base.py | 21 ------------------ synapse/replication/tcp/streams/events.py | 34 ++++++++++++++++++++++++++++- synapse/storage/events.py | 3 --- 5 files changed, 43 insertions(+), 33 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index d1ab9512cd..355f5aa71d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamCurrentStateRow, +) from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -73,19 +77,18 @@ class UserDirectorySlaveStore( prefilled_cache=curr_state_delta_prefill, ) - self._current_state_delta_pos = events_max - def stream_positions(self): result = super(UserDirectorySlaveStore, self).stream_positions() - result["current_state_deltas"] = self._current_state_delta_pos return result def process_replication_rows(self, stream_name, token, rows): - if stream_name == "current_state_deltas": - self._current_state_delta_pos = token + if stream_name == EventsStream.NAME: + self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamCurrentStateRow.TypeId: + continue self._curr_state_delta_stream_cache.entity_has_changed( - row.room_id, token + row.data.room_id, token ) return super(UserDirectorySlaveStore, self).process_replication_rows( stream_name, token, rows @@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) - if stream_name == "current_state_deltas": + if stream_name == EventsStream.NAME: run_in_background(self._notify_directory) @defer.inlineCallbacks diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5c715e3bfa..634f636dc9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -44,7 +44,6 @@ STREAMS_MAP = { federation.FederationStream, _base.TagAccountDataStream, _base.AccountDataStream, - _base.CurrentStateDeltaStream, _base.GroupServerStream, ) } diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 13ab1bee05..8971a6a22e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) GroupsStreamRow = namedtuple("GroupsStreamRow", ( "group_id", # str "user_id", # str @@ -428,21 +422,6 @@ class AccountDataStream(Stream): defer.returnValue(results) -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 928028e893..e0f6e29248 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import heapq import attr @@ -26,6 +27,7 @@ from ._base import Stream This stream contains rows of various types. Each row therefore contains a 'type' identifier before the real data. For example:: + RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] An "ev" row is sent for each new event. The fields in the data part are: @@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are: * The state key of the event, for state events * The event id of an event which is redacted by this event. +A "state" row is sent whenever the "current state" in a room changes. The fields in the +data part are: + + * The room id for the state change + * The event type of the state which has changed + * The state_key of the state which has changed + * The event id of the new state + """ @@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow): redacts = attr.ib() # str, optional +@attr.s(slots=True, frozen=True) +class EventsStreamCurrentStateRow(BaseEventsStreamRow): + TypeId = "state" + + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str + event_id = attr.ib() # str, optional + + TypeToRow = { Row.TypeId: Row for Row in ( EventsStreamEventRow, + EventsStreamCurrentStateRow, ) } @@ -105,7 +126,18 @@ class EventsStream(Stream): (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - defer.returnValue(event_updates) + + state_rows = yield self._store.get_all_updated_current_state_deltas( + from_token, current_token, limit + ) + state_updates = ( + (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) + for row in state_rows + ) + + all_updates = heapq.merge(event_updates, state_updates) + + defer.returnValue(all_updates) @classmethod def parse_row(cls, row): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 428300ea0a..0b0a4dcdd3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2287,9 +2287,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) - def get_max_current_state_delta_stream_id(self): - return self._stream_id_gen.get_current_token() - def get_all_updated_current_state_deltas(self, from_token, to_token, limit): def get_all_updated_current_state_deltas_txn(txn): sql = """ -- cgit 1.5.1 From 297bf2547e6fdfbbed276d439b8b04da06f5551c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 2 Apr 2019 12:42:39 +0100 Subject: Fix sync bug when accepting invites (#4956) Hopefully this time we really will fix #4422. We need to make sure that the cache on `get_rooms_for_user_with_stream_ordering` is invalidated *before* the SyncHandler is notified for the new events, and we can now do so reliably via the `events` stream. --- changelog.d/4956.bugfix | 1 + synapse/replication/slave/storage/events.py | 31 +++-- synapse/storage/_base.py | 5 - synapse/storage/events.py | 24 +++- tests/replication/slave/storage/_base.py | 28 ++++- tests/replication/slave/storage/test_events.py | 161 +++++++++++++++++++++---- tests/server.py | 56 +++++---- 7 files changed, 237 insertions(+), 69 deletions(-) create mode 100644 changelog.d/4956.bugfix (limited to 'synapse/replication') diff --git a/changelog.d/4956.bugfix b/changelog.d/4956.bugfix new file mode 100644 index 0000000000..e50e67383d --- /dev/null +++ b/changelog.d/4956.bugfix @@ -0,0 +1 @@ +Fix sync bug which made accepting invites unreliable in worker-mode synapses. diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index c57385d92f..b457c5563f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,10 @@ import logging from synapse.api.constants import EventTypes -from synapse.replication.tcp.streams.events import EventsStreamEventRow +from synapse.replication.tcp.streams.events import ( + EventsStreamCurrentStateRow, + EventsStreamEventRow, +) from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore @@ -80,14 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore, if stream_name == "events": self._stream_id_gen.advance(token) for row in rows: - if row.type != EventsStreamEventRow.TypeId: - continue - data = row.data - self.invalidate_caches_for_event( - token, data.event_id, data.room_id, data.type, data.state_key, - data.redacts, - backfilled=False, - ) + self._process_event_stream_row(token, row) elif stream_name == "backfill": self._backfill_id_gen.advance(-token) for row in rows: @@ -100,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore, stream_name, token, rows ) + def _process_event_stream_row(self, token, row): + data = row.data + + if row.type == EventsStreamEventRow.TypeId: + self.invalidate_caches_for_event( + token, data.event_id, data.room_id, data.type, data.state_key, + data.redacts, + backfilled=False, + ) + elif row.type == EventsStreamCurrentStateRow.TypeId: + if data.type == EventTypes.Member: + self.get_rooms_for_user_with_stream_ordering.invalidate( + (data.state_key, ), + ) + else: + raise Exception("Unknown events stream row type %s" % (row.type, )) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, etype, state_key, redacts, backfilled): self._invalidate_get_event_cache(event_id) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7e3903859b..653b32fbf5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1355,11 +1355,6 @@ class SQLBaseStore(object): members_changed (iterable[str]): The user_ids of members that have changed """ - for member in members_changed: - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (member,), - ) - for host in set(get_domain_from_id(u) for u in members_changed): self._attempt_to_invalidate_cache( "is_host_joined", (room_id, host,), diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d0668e39c4..dfda39bbe0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -79,7 +79,7 @@ def encode_json(json_object): """ out = frozendict_json_encoder.encode(json_object) if isinstance(out, bytes): - out = out.decode('utf8') + out = out.decode("utf8") return out @@ -813,9 +813,10 @@ class EventsStore( """ all_events_and_contexts = events_and_contexts + min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) self._update_forward_extremities_txn( txn, @@ -890,7 +891,7 @@ class EventsStore( backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): + def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple @@ -899,6 +900,12 @@ class EventsStore( # that we can use it to calculate the `prev_event_id`. (This # allows us to not have to pull out the existing state # unnecessarily). + # + # The stream_id for the update is chosen to be the minimum of the stream_ids + # for the batch of the events that we are persisting; that means we do not + # end up in a situation where workers see events before the + # current_state_delta updates. + # sql = """ INSERT INTO current_state_delta_stream (stream_id, room_id, type, state_key, event_id, prev_event_id) @@ -911,7 +918,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -929,7 +936,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -970,7 +977,7 @@ class EventsStore( txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, - max_stream_order, + stream_id, ) # Invalidate the various caches @@ -986,6 +993,11 @@ class EventsStore( if ev_type == EventTypes.Member ) + for member in members_changed: + txn.call_after( + self.get_rooms_for_user_with_stream_ordering.invalidate, (member,) + ) + self._invalidate_state_caches_and_stream(txn, room_id, members_changed) def _update_forward_extremities_txn( diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 524af4f8d1..1f72a2a04f 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -56,7 +56,9 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): client = client_factory.buildProtocol(None) client.makeConnection(FakeTransport(server, reactor)) - server.makeConnection(FakeTransport(client, reactor)) + + self.server_to_client_transport = FakeTransport(client, reactor) + server.makeConnection(self.server_to_client_transport) def replicate(self): """Tell the master side of replication that something has happened, and then @@ -69,6 +71,24 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): master_result = self.get_success(getattr(self.master_store, method)(*args)) slaved_result = self.get_success(getattr(self.slaved_store, method)(*args)) if expected_result is not None: - self.assertEqual(master_result, expected_result) - self.assertEqual(slaved_result, expected_result) - self.assertEqual(master_result, slaved_result) + self.assertEqual( + master_result, + expected_result, + "Expected master result to be %r but was %r" % ( + expected_result, master_result + ), + ) + self.assertEqual( + slaved_result, + expected_result, + "Expected slave result to be %r but was %r" % ( + expected_result, slaved_result + ), + ) + self.assertEqual( + master_result, + slaved_result, + "Slave result %r does not match master result %r" % ( + slaved_result, master_result + ), + ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 1688a741d1..65ecff3bd6 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -11,11 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from canonicaljson import encode_canonical_json from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser @@ -26,6 +28,8 @@ USER_ID_2 = "@bright:blue" OUTLIER = {"outlier": True} ROOM_ID = "!room:blue" +logger = logging.getLogger(__name__) + def dict_equals(self, other): me = encode_canonical_json(self.get_pdu_json()) @@ -172,18 +176,142 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): {"highlight_count": 1, "notify_count": 2}, ) + def test_get_rooms_for_user_with_stream_ordering(self): + """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated + by rows in the events stream + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + j2 = self.persist( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + self.replicate() + self.check( + "get_rooms_for_user_with_stream_ordering", + (USER_ID_2,), + {(ROOM_ID, j2.internal_metadata.stream_ordering)}, + ) + + def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self): + """Check that current_state invalidation happens correctly with multiple events + in the persistence batch. + + This test attempts to reproduce a race condition between the event persistence + loop and a worker-based Sync handler. + + The problem occurred when the master persisted several events in one batch. It + only updates the current_state at the end of each batch, so the obvious thing + to do is then to issue a current_state_delta stream update corresponding to the + last stream_id in the batch. + + However, that raises the possibility that a worker will see the replication + notification for a join event before the current_state caches are invalidated. + + The test involves: + * creating a join and a message event for a user, and persisting them in the + same batch + + * controlling the replication stream so that updates are sent gradually + + * between each bunch of replication updates, check that we see a consistent + snapshot of the state. + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + # limit the replication rate + repl_transport = self.server_to_client_transport + repl_transport.autoflush = False + + # build the join and message events and persist them in the same batch. + logger.info("----- build test events ------") + j2, j2ctx = self.build_event( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + msg, msgctx = self.build_event() + self.get_success(self.master_store.persist_events([ + (j2, j2ctx), + (msg, msgctx), + ])) + self.replicate() + + event_source = RoomEventSource(self.hs) + event_source.store = self.slaved_store + current_token = self.get_success(event_source.get_current_key()) + + # gradually stream out the replication + while repl_transport.buffer: + logger.info("------ flush ------") + repl_transport.flush(30) + self.pump(0) + + prev_token = current_token + current_token = self.get_success(event_source.get_current_key()) + + # attempt to replicate the behaviour of the sync handler. + # + # First, we get a list of the rooms we are joined to + joined_rooms = self.get_success( + self.slaved_store.get_rooms_for_user_with_stream_ordering( + USER_ID_2, + ), + ) + + # Then, we get a list of the events since the last sync + membership_changes = self.get_success( + self.slaved_store.get_membership_changes_for_user( + USER_ID_2, prev_token, current_token, + ) + ) + + logger.info( + "%s->%s: joined_rooms=%r membership_changes=%r", + prev_token, + current_token, + joined_rooms, + membership_changes, + ) + + # the membership change is only any use to us if the room is in the + # joined_rooms list. + if membership_changes: + self.assertEqual( + joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)} + ) + event_id = 0 - def persist( + def persist(self, backfill=False, **kwargs): + """ + Returns: + synapse.events.FrozenEvent: The event that was persisted. + """ + event, context = self.build_event(**kwargs) + + if backfill: + self.get_success( + self.master_store.persist_events([(event, context)], backfilled=True) + ) + else: + self.get_success( + self.master_store.persist_event(event, context) + ) + + return event + + def build_event( self, sender=USER_ID, room_id=ROOM_ID, - type={}, + type="m.room.message", key=None, internal={}, state=None, - reset_state=False, - backfill=False, depth=None, prev_events=[], auth_events=[], @@ -192,10 +320,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): push_actions=[], **content ): - """ - Returns: - synapse.events.FrozenEvent: The event that was persisted. - """ + if depth is None: depth = self.event_id @@ -234,23 +359,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): ) else: state_handler = self.hs.get_state_handler() - context = self.get_success(state_handler.compute_event_context(event)) + context = self.get_success(state_handler.compute_event_context( + event + )) self.master_store.add_push_actions_to_staging( event.event_id, {user_id: actions for user_id, actions in push_actions} ) - - ordering = None - if backfill: - self.get_success( - self.master_store.persist_events([(event, context)], backfilled=True) - ) - else: - ordering, _ = self.get_success( - self.master_store.persist_event(event, context) - ) - - if ordering: - event.internal_metadata.stream_ordering = ordering - - return event + return event, context diff --git a/tests/server.py b/tests/server.py index ea26dea623..8f89f4a83d 100644 --- a/tests/server.py +++ b/tests/server.py @@ -365,6 +365,7 @@ class FakeTransport(object): disconnected = False buffer = attr.ib(default=b'') producer = attr.ib(default=None) + autoflush = attr.ib(default=True) def getPeer(self): return None @@ -415,31 +416,44 @@ class FakeTransport(object): def write(self, byt): self.buffer = self.buffer + byt - def _write(): - if not self.buffer: - # nothing to do. Don't write empty buffers: it upsets the - # TLSMemoryBIOProtocol - return - - if self.disconnected: - return - logger.info("%s->%s: %s", self._protocol, self.other, self.buffer) - - if getattr(self.other, "transport") is not None: - try: - self.other.dataReceived(self.buffer) - self.buffer = b"" - except Exception as e: - logger.warning("Exception writing to protocol: %s", e) - return - - self._reactor.callLater(0.0, _write) - # always actually do the write asynchronously. Some protocols (notably the # TLSMemoryBIOProtocol) get very confused if a read comes back while they are # still doing a write. Doing a callLater here breaks the cycle. - self._reactor.callLater(0.0, _write) + if self.autoflush: + self._reactor.callLater(0.0, self.flush) def writeSequence(self, seq): for x in seq: self.write(x) + + def flush(self, maxbytes=None): + if not self.buffer: + # nothing to do. Don't write empty buffers: it upsets the + # TLSMemoryBIOProtocol + return + + if self.disconnected: + return + + if getattr(self.other, "transport") is None: + # the other has no transport yet; reschedule + if self.autoflush: + self._reactor.callLater(0.0, self.flush) + return + + if maxbytes is not None: + to_write = self.buffer[:maxbytes] + else: + to_write = self.buffer + + logger.info("%s->%s: %s", self._protocol, self.other, to_write) + + try: + self.other.dataReceived(to_write) + except Exception as e: + logger.warning("Exception writing to protocol: %s", e) + return + + self.buffer = self.buffer[len(to_write):] + if self.buffer and self.autoflush: + self._reactor.callLater(0.0, self.flush) -- cgit 1.5.1 From e8419554ffcb5ec41a1f5c22ebe89163f601170b Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 3 Apr 2019 11:11:15 +0100 Subject: Remove presence lists (#4989) Remove presence list support as per MSC 1819 --- changelog.d/4989.feature | 1 + synapse/handlers/presence.py | 167 +-------------------- synapse/replication/slave/storage/presence.py | 10 -- synapse/rest/client/v1/presence.py | 67 --------- synapse/storage/prepare_database.py | 2 +- synapse/storage/presence.py | 86 +---------- .../storage/schema/delta/54/drop_presence_list.sql | 16 ++ .../storage/schema/full_schemas/16/presence.sql | 12 +- tests/storage/test_presence.py | 118 --------------- 9 files changed, 23 insertions(+), 456 deletions(-) create mode 100644 changelog.d/4989.feature create mode 100644 synapse/storage/schema/delta/54/drop_presence_list.sql delete mode 100644 tests/storage/test_presence.py (limited to 'synapse/replication') diff --git a/changelog.d/4989.feature b/changelog.d/4989.feature new file mode 100644 index 0000000000..a5138f5612 --- /dev/null +++ b/changelog.d/4989.feature @@ -0,0 +1 @@ +Remove presence list support as per MSC 1819. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e85c49742d..3b22a22a19 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -113,27 +113,6 @@ class PresenceHandler(object): federation_registry.register_edu_handler( "m.presence", self.incoming_presence ) - federation_registry.register_edu_handler( - "m.presence_invite", - lambda origin, content: self.invite_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) - federation_registry.register_edu_handler( - "m.presence_accept", - lambda origin, content: self.accept_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) - federation_registry.register_edu_handler( - "m.presence_deny", - lambda origin, content: self.deny_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) active_presence = self.store.take_presence_startup_info() @@ -759,137 +738,6 @@ class PresenceHandler(object): yield self._update_states([prev_state.copy_and_replace(**new_fields)]) - @defer.inlineCallbacks - def get_presence_list(self, observer_user, accepted=None): - """Returns the presence for all users in their presence list. - """ - if not self.is_mine(observer_user): - raise SynapseError(400, "User is not hosted on this Home Server") - - presence_list = yield self.store.get_presence_list( - observer_user.localpart, accepted=accepted - ) - - results = yield self.get_states( - target_user_ids=[row["observed_user_id"] for row in presence_list], - as_event=False, - ) - - now = self.clock.time_msec() - results[:] = [format_user_presence_state(r, now) for r in results] - - is_accepted = { - row["observed_user_id"]: row["accepted"] for row in presence_list - } - - for result in results: - result.update({ - "accepted": is_accepted, - }) - - defer.returnValue(results) - - @defer.inlineCallbacks - def send_presence_invite(self, observer_user, observed_user): - """Sends a presence invite. - """ - yield self.store.add_presence_list_pending( - observer_user.localpart, observed_user.to_string() - ) - - if self.is_mine(observed_user): - yield self.invite_presence(observed_user, observer_user) - else: - yield self.federation.build_and_send_edu( - destination=observed_user.domain, - edu_type="m.presence_invite", - content={ - "observed_user": observed_user.to_string(), - "observer_user": observer_user.to_string(), - } - ) - - @defer.inlineCallbacks - def invite_presence(self, observed_user, observer_user): - """Handles new presence invites. - """ - if not self.is_mine(observed_user): - raise SynapseError(400, "User is not hosted on this Home Server") - - # TODO: Don't auto accept - if self.is_mine(observer_user): - yield self.accept_presence(observed_user, observer_user) - else: - self.federation.build_and_send_edu( - destination=observer_user.domain, - edu_type="m.presence_accept", - content={ - "observed_user": observed_user.to_string(), - "observer_user": observer_user.to_string(), - } - ) - - state_dict = yield self.get_state(observed_user, as_event=False) - state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) - - self.federation.build_and_send_edu( - destination=observer_user.domain, - edu_type="m.presence", - content={ - "push": [state_dict] - } - ) - - @defer.inlineCallbacks - def accept_presence(self, observed_user, observer_user): - """Handles a m.presence_accept EDU. Mark a presence invite from a - local or remote user as accepted in a local user's presence list. - Starts polling for presence updates from the local or remote user. - Args: - observed_user(UserID): The user to update in the presence list. - observer_user(UserID): The owner of the presence list to update. - """ - yield self.store.set_presence_list_accepted( - observer_user.localpart, observed_user.to_string() - ) - - @defer.inlineCallbacks - def deny_presence(self, observed_user, observer_user): - """Handle a m.presence_deny EDU. Removes a local or remote user from a - local user's presence list. - Args: - observed_user(UserID): The local or remote user to remove from the - list. - observer_user(UserID): The local owner of the presence list. - Returns: - A Deferred. - """ - yield self.store.del_presence_list( - observer_user.localpart, observed_user.to_string() - ) - - # TODO(paul): Inform the user somehow? - - @defer.inlineCallbacks - def drop(self, observed_user, observer_user): - """Remove a local or remote user from a local user's presence list and - unsubscribe the local user from updates that user. - Args: - observed_user(UserId): The local or remote user to remove from the - list. - observer_user(UserId): The local owner of the presence list. - Returns: - A Deferred. - """ - if not self.is_mine(observer_user): - raise SynapseError(400, "User is not hosted on this Home Server") - - yield self.store.del_presence_list( - observer_user.localpart, observed_user.to_string() - ) - - # TODO: Inform the remote that we've dropped the presence list. - @defer.inlineCallbacks def is_visible(self, observed_user, observer_user): """Returns whether a user can see another user's presence. @@ -904,11 +752,7 @@ class PresenceHandler(object): if observer_room_ids & observed_room_ids: defer.returnValue(True) - accepted_observers = yield self.store.get_presence_list_observers_accepted( - observed_user.to_string() - ) - - defer.returnValue(observer_user.to_string() in accepted_observers) + defer.returnValue(False) @defer.inlineCallbacks def get_all_presence_updates(self, last_id, current_id): @@ -1204,10 +1048,7 @@ class PresenceEventSource(object): updates for """ user_id = user.to_string() - plist = yield self.store.get_presence_list_accepted( - user.localpart, on_invalidate=cache_context.invalidate, - ) - users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence users_who_share_room = yield self.store.get_users_who_share_room_with_user( @@ -1412,10 +1253,6 @@ def get_interested_parties(store, states): for room_id in room_ids: room_ids_to_states.setdefault(room_id, []).append(state) - plist = yield store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - # Always notify self users_to_states.setdefault(state.user_id, []).append(state) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 9e530defe0..0ec1db25ce 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -39,16 +39,6 @@ class SlavedPresenceStore(BaseSlavedStore): _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - def get_current_presence_token(self): return self._presence_id_gen.get_current_token() diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index b5a6d6aebf..045d5a20ac 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -93,72 +93,5 @@ class PresenceStatusRestServlet(ClientV1RestServlet): return (200, {}) -class PresenceListRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/presence/list/(?P[^/]*)") - - def __init__(self, hs): - super(PresenceListRestServlet, self).__init__(hs) - self.presence_handler = hs.get_presence_handler() - - @defer.inlineCallbacks - def on_GET(self, request, user_id): - requester = yield self.auth.get_user_by_req(request) - user = UserID.from_string(user_id) - - if not self.hs.is_mine(user): - raise SynapseError(400, "User not hosted on this Home Server") - - if requester.user != user: - raise SynapseError(400, "Cannot get another user's presence list") - - presence = yield self.presence_handler.get_presence_list( - observer_user=user, accepted=True - ) - - defer.returnValue((200, presence)) - - @defer.inlineCallbacks - def on_POST(self, request, user_id): - requester = yield self.auth.get_user_by_req(request) - user = UserID.from_string(user_id) - - if not self.hs.is_mine(user): - raise SynapseError(400, "User not hosted on this Home Server") - - if requester.user != user: - raise SynapseError( - 400, "Cannot modify another user's presence list") - - content = parse_json_object_from_request(request) - - if "invite" in content: - for u in content["invite"]: - if not isinstance(u, string_types): - raise SynapseError(400, "Bad invite value.") - if len(u) == 0: - continue - invited_user = UserID.from_string(u) - yield self.presence_handler.send_presence_invite( - observer_user=user, observed_user=invited_user - ) - - if "drop" in content: - for u in content["drop"]: - if not isinstance(u, string_types): - raise SynapseError(400, "Bad drop value.") - if len(u) == 0: - continue - dropped_user = UserID.from_string(u) - yield self.presence_handler.drop( - observer_user=user, observed_user=dropped_user - ) - - defer.returnValue((200, {})) - - def on_OPTIONS(self, request): - return (200, {}) - - def register_servlets(hs, http_server): PresenceStatusRestServlet(hs).register(http_server) - PresenceListRestServlet(hs).register(http_server) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 81b4c57ad4..c1711bc8bd 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 53 +SCHEMA_VERSION = 54 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 089ea8c048..42ec8c6bb8 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -19,7 +19,7 @@ from twisted.internet import defer from synapse.api.constants import PresenceState from synapse.util import batch_iter -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList from ._base import SQLBaseStore @@ -205,87 +205,3 @@ class PresenceStore(SQLBaseStore): }, desc="disallow_presence_visible", ) - - def add_presence_list_pending(self, observer_localpart, observed_userid): - return self._simple_insert( - table="presence_list", - values={ - "user_id": observer_localpart, - "observed_user_id": observed_userid, - "accepted": False, - }, - desc="add_presence_list_pending", - ) - - def set_presence_list_accepted(self, observer_localpart, observed_userid): - def update_presence_list_txn(txn): - result = self._simple_update_one_txn( - txn, - table="presence_list", - keyvalues={ - "user_id": observer_localpart, - "observed_user_id": observed_userid, - }, - updatevalues={"accepted": True}, - ) - - self._invalidate_cache_and_stream( - txn, self.get_presence_list_accepted, (observer_localpart,) - ) - self._invalidate_cache_and_stream( - txn, self.get_presence_list_observers_accepted, (observed_userid,) - ) - - return result - - return self.runInteraction( - "set_presence_list_accepted", update_presence_list_txn - ) - - def get_presence_list(self, observer_localpart, accepted=None): - if accepted: - return self.get_presence_list_accepted(observer_localpart) - else: - keyvalues = {"user_id": observer_localpart} - if accepted is not None: - keyvalues["accepted"] = accepted - - return self._simple_select_list( - table="presence_list", - keyvalues=keyvalues, - retcols=["observed_user_id", "accepted"], - desc="get_presence_list", - ) - - @cached() - def get_presence_list_accepted(self, observer_localpart): - return self._simple_select_list( - table="presence_list", - keyvalues={"user_id": observer_localpart, "accepted": True}, - retcols=["observed_user_id", "accepted"], - desc="get_presence_list_accepted", - ) - - @cachedInlineCallbacks() - def get_presence_list_observers_accepted(self, observed_userid): - user_localparts = yield self._simple_select_onecol( - table="presence_list", - keyvalues={"observed_user_id": observed_userid, "accepted": True}, - retcol="user_id", - desc="get_presence_list_accepted", - ) - - defer.returnValue(["@%s:%s" % (u, self.hs.hostname) for u in user_localparts]) - - @defer.inlineCallbacks - def del_presence_list(self, observer_localpart, observed_userid): - yield self._simple_delete_one( - table="presence_list", - keyvalues={ - "user_id": observer_localpart, - "observed_user_id": observed_userid, - }, - desc="del_presence_list", - ) - self.get_presence_list_accepted.invalidate((observer_localpart,)) - self.get_presence_list_observers_accepted.invalidate((observed_userid,)) diff --git a/synapse/storage/schema/delta/54/drop_presence_list.sql b/synapse/storage/schema/delta/54/drop_presence_list.sql new file mode 100644 index 0000000000..e6ee70c623 --- /dev/null +++ b/synapse/storage/schema/delta/54/drop_presence_list.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP TABLE IF EXISTS presence_list; diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql index 283136df20..0892c4cf96 100644 --- a/synapse/storage/schema/full_schemas/16/presence.sql +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -28,13 +28,5 @@ CREATE TABLE IF NOT EXISTS presence_allow_inbound( UNIQUE (observed_user_id, observer_user_id) ); --- For each of /my/ users (watcher), which possibly-remote users are they --- watching? -CREATE TABLE IF NOT EXISTS presence_list( - user_id TEXT NOT NULL, - observed_user_id TEXT NOT NULL, -- a UserID, - accepted BOOLEAN NOT NULL, - UNIQUE (user_id, observed_user_id) -); - -CREATE INDEX presence_list_user_id ON presence_list (user_id); +-- We used to create a table called presence_list, but this is no longer used +-- and is removed in delta 54. \ No newline at end of file diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py deleted file mode 100644 index c7a63f39b9..0000000000 --- a/tests/storage/test_presence.py +++ /dev/null @@ -1,118 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from synapse.types import UserID - -from tests import unittest -from tests.utils import setup_test_homeserver - - -class PresenceStoreTestCase(unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver(self.addCleanup) - - self.store = hs.get_datastore() - - self.u_apple = UserID.from_string("@apple:test") - self.u_banana = UserID.from_string("@banana:test") - - @defer.inlineCallbacks - def test_presence_list(self): - self.assertEquals( - [], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart - ) - ), - ) - self.assertEquals( - [], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart, accepted=True - ) - ), - ) - - yield self.store.add_presence_list_pending( - observer_localpart=self.u_apple.localpart, - observed_userid=self.u_banana.to_string(), - ) - - self.assertEquals( - [{"observed_user_id": "@banana:test", "accepted": 0}], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart - ) - ), - ) - self.assertEquals( - [], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart, accepted=True - ) - ), - ) - - yield self.store.set_presence_list_accepted( - observer_localpart=self.u_apple.localpart, - observed_userid=self.u_banana.to_string(), - ) - - self.assertEquals( - [{"observed_user_id": "@banana:test", "accepted": 1}], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart - ) - ), - ) - self.assertEquals( - [{"observed_user_id": "@banana:test", "accepted": 1}], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart, accepted=True - ) - ), - ) - - yield self.store.del_presence_list( - observer_localpart=self.u_apple.localpart, - observed_userid=self.u_banana.to_string(), - ) - - self.assertEquals( - [], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart - ) - ), - ) - self.assertEquals( - [], - ( - yield self.store.get_presence_list( - observer_localpart=self.u_apple.localpart, accepted=True - ) - ), - ) -- cgit 1.5.1 From 3352baac4b03f3414e0a006b9413b65454d1fe91 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 8 Apr 2019 21:50:18 +0100 Subject: Remove unused server_tls_certificates functions (#5028) These have been unused since #4120, and with the demise of perspectives, it is unlikely that they will ever be used again. --- changelog.d/4992.misc | 2 +- changelog.d/5028.misc | 1 + synapse/replication/slave/storage/keys.py | 3 -- synapse/storage/keys.py | 49 +--------------------- .../storage/schema/delta/54/drop_legacy_tables.sql | 4 +- synapse/storage/schema/full_schemas/16/keys.sql | 11 ++--- 6 files changed, 7 insertions(+), 63 deletions(-) create mode 100644 changelog.d/5028.misc (limited to 'synapse/replication') diff --git a/changelog.d/4992.misc b/changelog.d/4992.misc index 8a9eaea4cf..3ee4228c09 100644 --- a/changelog.d/4992.misc +++ b/changelog.d/4992.misc @@ -1 +1 @@ -Remove legacy tables detailed in #1830. +Remove a number of unused tables from the database schema. diff --git a/changelog.d/5028.misc b/changelog.d/5028.misc new file mode 100644 index 0000000000..3ee4228c09 --- /dev/null +++ b/changelog.d/5028.misc @@ -0,0 +1 @@ +Remove a number of unused tables from the database schema. diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index 8032f53fec..de00660c0e 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -27,8 +27,5 @@ class SlavedKeyStore(BaseSlavedStore): get_server_verify_keys = __func__(DataStore.get_server_verify_keys) store_server_verify_key = __func__(DataStore.store_server_verify_key) - get_server_certificate = __func__(DataStore.get_server_certificate) - store_server_certificate = __func__(DataStore.store_server_certificate) - get_server_keys_json = __func__(DataStore.get_server_keys_json) store_server_keys_json = __func__(DataStore.store_server_keys_json) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index f24ab3eedd..47a9aa784b 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -13,14 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import hashlib import logging import six from signedjson.key import decode_verify_key_bytes -import OpenSSL from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -38,53 +36,8 @@ else: class KeyStore(SQLBaseStore): - """Persistence for signature verification keys and tls X.509 certificates + """Persistence for signature verification keys """ - - @defer.inlineCallbacks - def get_server_certificate(self, server_name): - """Retrieve the TLS X.509 certificate for the given server - Args: - server_name (bytes): The name of the server. - Returns: - (OpenSSL.crypto.X509): The tls certificate. - """ - tls_certificate_bytes, = yield self._simple_select_one( - table="server_tls_certificates", - keyvalues={"server_name": server_name}, - retcols=("tls_certificate",), - desc="get_server_certificate", - ) - tls_certificate = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, tls_certificate_bytes - ) - defer.returnValue(tls_certificate) - - def store_server_certificate( - self, server_name, from_server, time_now_ms, tls_certificate - ): - """Stores the TLS X.509 certificate for the given server - Args: - server_name (str): The name of the server. - from_server (str): Where the certificate was looked up - time_now_ms (int): The time now in milliseconds - tls_certificate (OpenSSL.crypto.X509): The X.509 certificate. - """ - tls_certificate_bytes = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_ASN1, tls_certificate - ) - fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest() - return self._simple_upsert( - table="server_tls_certificates", - keyvalues={"server_name": server_name, "fingerprint": fingerprint}, - values={ - "from_server": from_server, - "ts_added_ms": time_now_ms, - "tls_certificate": db_binary_type(tls_certificate_bytes), - }, - desc="store_server_certificate", - ) - @cachedInlineCallbacks() def _get_server_verify_key(self, server_name, key_id): verify_key_bytes = yield self._simple_select_one_onecol( diff --git a/synapse/storage/schema/delta/54/drop_legacy_tables.sql b/synapse/storage/schema/delta/54/drop_legacy_tables.sql index 77b39dc2d2..ecca005d9b 100644 --- a/synapse/storage/schema/delta/54/drop_legacy_tables.sql +++ b/synapse/storage/schema/delta/54/drop_legacy_tables.sql @@ -24,7 +24,5 @@ DROP TABLE IF EXISTS event_edge_hashes; DROP TABLE IF EXISTS event_signatures; DROP TABLE IF EXISTS feedback; DROP TABLE IF EXISTS room_hosts; +DROP TABLE IF EXISTS server_tls_certificates; DROP TABLE IF EXISTS state_forward_extremities; - - - diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql index ca0ca1b694..11cdffdbb3 100644 --- a/synapse/storage/schema/full_schemas/16/keys.sql +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -12,14 +12,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -CREATE TABLE IF NOT EXISTS server_tls_certificates( - server_name TEXT, -- Server name. - fingerprint TEXT, -- Certificate fingerprint. - from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms BIGINT, -- When the certifcate was added. - tls_certificate bytea, -- DER encoded x509 certificate. - UNIQUE (server_name, fingerprint) -); + +-- we used to create a table called server_tls_certificates, but this is no +-- longer used, and is removed in delta 54. CREATE TABLE IF NOT EXISTS server_signature_keys( server_name TEXT, -- Server name. -- cgit 1.5.1 From f50efcb65d794985185f5cc82c697673f50e4c47 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 8 Apr 2019 23:51:52 +0100 Subject: Replace SlavedKeyStore with a shim since we're pulling everything out of KeyStore anyway, we may as well simplify it. --- synapse/replication/slave/storage/keys.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index de00660c0e..cc6f7f009f 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -13,19 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore -from synapse.storage.keys import KeyStore +from synapse.storage import KeyStore -from ._base import BaseSlavedStore, __func__ +# KeyStore isn't really safe to use from a worker, but for now we do so and hope that +# the races it creates aren't too bad. - -class SlavedKeyStore(BaseSlavedStore): - _get_server_verify_key = KeyStore.__dict__[ - "_get_server_verify_key" - ] - - get_server_verify_keys = __func__(DataStore.get_server_verify_keys) - store_server_verify_key = __func__(DataStore.store_server_verify_key) - - get_server_keys_json = __func__(DataStore.get_server_keys_json) - store_server_keys_json = __func__(DataStore.store_server_keys_json) +SlavedKeyStore = KeyStore -- cgit 1.5.1