diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/presence_resource.py | 59 | ||||
-rw-r--r-- | synapse/replication/pusher_resource.py | 54 | ||||
-rw-r--r-- | synapse/replication/resource.py | 161 | ||||
-rw-r--r-- | synapse/replication/slave/__init__.py | 14 | ||||
-rw-r--r-- | synapse/replication/slave/storage/__init__.py | 14 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 28 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_slaved_id_tracker.py | 30 | ||||
-rw-r--r-- | synapse/replication/slave/storage/account_data.py | 100 | ||||
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 30 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 254 | ||||
-rw-r--r-- | synapse/replication/slave/storage/filtering.py | 25 | ||||
-rw-r--r-- | synapse/replication/slave/storage/presence.py | 59 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 67 | ||||
-rw-r--r-- | synapse/replication/slave/storage/pushers.py | 52 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 84 | ||||
-rw-r--r-- | synapse/replication/slave/storage/registration.py | 30 |
16 files changed, 1009 insertions, 52 deletions
diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py new file mode 100644 index 0000000000..fc18130ab4 --- /dev/null +++ b/synapse/replication/presence_resource.py @@ -0,0 +1,59 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PresenceResource(Resource): + """ + HTTP endpoint for marking users as syncing. + + POST /_synapse/replication/presence HTTP/1.1 + Content-Type: application/json + + { + "process_id": "<process_id>", + "syncing_users": ["<user_id>"] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.clock = hs.get_clock() + self.presence_handler = hs.get_presence_handler() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + process_id = content["process_id"] + syncing_user_ids = content["syncing_users"] + + yield self.presence_handler.update_external_syncs( + process_id, set(syncing_user_ids) + ) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py new file mode 100644 index 0000000000..9b01ab3c13 --- /dev/null +++ b/synapse/replication/pusher_resource.py @@ -0,0 +1,54 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PusherResource(Resource): + """ + HTTP endpoint for deleting rejected pushers + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.clock = hs.get_clock() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for remove in content["remove"]: + yield self.store.delete_pusher_by_app_id_pushkey_user_id( + remove["app_id"], + remove["push_key"], + remove["user_id"], + ) + + self.notifier.on_new_replication_data() + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 8c1ae0fbc7..8c2d487ff4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,8 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource +from synapse.replication.presence_resource import PresenceResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -38,6 +40,7 @@ STREAM_NAMES = ( ("backfill",), ("push_rules",), ("pushers",), + ("state",), ) @@ -76,7 +79,7 @@ class ReplicationResource(Resource): The response is a JSON object with keys for each stream with updates. Under each key is a JSON object with: - * "postion": The current position of the stream. + * "position": The current position of the stream. * "field_names": The names of the fields in each row. * "rows": The updates as an array of arrays. @@ -101,17 +104,19 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() self.version_string = hs.version_string self.store = hs.get_datastore() self.sources = hs.get_event_sources() - self.presence_handler = hs.get_handlers().presence_handler - self.typing_handler = hs.get_handlers().typing_notification_handler + self.presence_handler = hs.get_presence_handler() + self.typing_handler = hs.get_typing_handler() self.notifier = hs.notifier + self.clock = hs.get_clock() + + self.putChild("remove_pushers", PusherResource(hs)) + self.putChild("syncing_users", PresenceResource(hs)) def render_GET(self, request): self._async_render_GET(request) @@ -123,6 +128,7 @@ class ReplicationResource(Resource): backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() + state_token = self.store.get_state_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -133,40 +139,62 @@ class ReplicationResource(Resource): backfill_token, push_rules_token, pushers_token, + state_token, )) - @request_handler + @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): limit = parse_integer(request, "limit", 100) timeout = parse_integer(request, "timeout", 10 * 1000) request.setHeader(b"Content-Type", b"application/json") - writer = _Writer(request) - @defer.inlineCallbacks - def replicate(): - current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + request_streams = { + name: parse_integer(request, name) + for names in STREAM_NAMES for name in names + } + request_streams["streams"] = parse_string(request, "streams") - yield self.account_data(writer, current_token, limit) - yield self.events(writer, current_token, limit) - yield self.presence(writer, current_token) # TODO: implement limit - yield self.typing(writer, current_token) # TODO: implement limit - yield self.receipts(writer, current_token, limit) - yield self.push_rules(writer, current_token, limit) - yield self.pushers(writer, current_token, limit) - self.streams(writer, current_token) + def replicate(): + return self.replicate(request_streams, limit) - logger.info("Replicated %d rows", writer.total) - defer.returnValue(writer.total) + result = yield self.notifier.wait_for_replication(replicate, timeout) - yield self.notifier.wait_for_replication(replicate, timeout) + for stream_name, stream_content in result.items(): + logger.info( + "Replicating %d rows of %s from %s -> %s", + len(stream_content["rows"]), + stream_name, + request_streams.get(stream_name), + stream_content["position"], + ) - writer.finish() + request.write(json.dumps(result, ensure_ascii=False)) + finish_request(request) - def streams(self, writer, current_token): - request_token = parse_string(writer.request, "streams") + @defer.inlineCallbacks + def replicate(self, request_streams, limit): + writer = _Writer() + current_token = yield self.current_replication_token() + logger.info("Replicating up to %r", current_token) + + yield self.account_data(writer, current_token, limit, request_streams) + yield self.events(writer, current_token, limit, request_streams) + # TODO: implement limit + yield self.presence(writer, current_token, request_streams) + yield self.typing(writer, current_token, request_streams) + yield self.receipts(writer, current_token, limit, request_streams) + yield self.push_rules(writer, current_token, limit, request_streams) + yield self.pushers(writer, current_token, limit, request_streams) + yield self.state(writer, current_token, limit, request_streams) + self.streams(writer, current_token, request_streams) + + logger.info("Replicated %d rows", writer.total) + defer.returnValue(writer.finish()) + + def streams(self, writer, current_token, request_streams): + request_token = request_streams.get("streams") streams = [] @@ -191,32 +219,43 @@ class ReplicationResource(Resource): ) @defer.inlineCallbacks - def events(self, writer, current_token, limit): - request_events = parse_integer(writer.request, "events") - request_backfill = parse_integer(writer.request, "backfill") + def events(self, writer, current_token, limit, request_streams): + request_events = request_streams.get("events") + request_backfill = request_streams.get("backfill") if request_events is not None or request_backfill is not None: if request_events is None: request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill - events_rows, backfill_rows = yield self.store.get_all_new_events( + res = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit ) + writer.write_header_and_rows("events", res.new_forward_events, ( + "position", "internal", "json", "state_group" + )) + writer.write_header_and_rows("backfill", res.new_backfill_events, ( + "position", "internal", "json", "state_group" + )) + writer.write_header_and_rows( + "forward_ex_outliers", res.forward_ex_outliers, + ("position", "event_id", "state_group") + ) writer.write_header_and_rows( - "events", events_rows, ("position", "internal", "json") + "backward_ex_outliers", res.backward_ex_outliers, + ("position", "event_id", "state_group") ) writer.write_header_and_rows( - "backfill", backfill_rows, ("position", "internal", "json") + "state_resets", res.state_resets, ("position",) ) @defer.inlineCallbacks - def presence(self, writer, current_token): + def presence(self, writer, current_token, request_streams): current_position = current_token.presence - request_presence = parse_integer(writer.request, "presence") + request_presence = request_streams.get("presence") if request_presence is not None: presence_rows = yield self.presence_handler.get_all_presence_updates( @@ -229,10 +268,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def typing(self, writer, current_token): + def typing(self, writer, current_token, request_streams): current_position = current_token.presence - request_typing = parse_integer(writer.request, "typing") + request_typing = request_streams.get("typing") if request_typing is not None: typing_rows = yield self.typing_handler.get_all_typing_updates( @@ -243,10 +282,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def receipts(self, writer, current_token, limit): + def receipts(self, writer, current_token, limit, request_streams): current_position = current_token.receipts - request_receipts = parse_integer(writer.request, "receipts") + request_receipts = request_streams.get("receipts") if request_receipts is not None: receipts_rows = yield self.store.get_all_updated_receipts( @@ -257,12 +296,12 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def account_data(self, writer, current_token, limit): + def account_data(self, writer, current_token, limit, request_streams): current_position = current_token.account_data - user_account_data = parse_integer(writer.request, "user_account_data") - room_account_data = parse_integer(writer.request, "room_account_data") - tag_account_data = parse_integer(writer.request, "tag_account_data") + user_account_data = request_streams.get("user_account_data") + room_account_data = request_streams.get("room_account_data") + tag_account_data = request_streams.get("tag_account_data") if user_account_data is not None or room_account_data is not None: if user_account_data is None: @@ -288,10 +327,10 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def push_rules(self, writer, current_token, limit): + def push_rules(self, writer, current_token, limit, request_streams): current_position = current_token.push_rules - push_rules = parse_integer(writer.request, "push_rules") + push_rules = request_streams.get("push_rules") if push_rules is not None: rows = yield self.store.get_all_push_rule_updates( @@ -303,10 +342,11 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def pushers(self, writer, current_token, limit): + def pushers(self, writer, current_token, limit, request_streams): current_position = current_token.pushers - pushers = parse_integer(writer.request, "pushers") + pushers = request_streams.get("pushers") + if pushers is not None: updated, deleted = yield self.store.get_all_updated_pushers( pushers, current_position, limit @@ -316,16 +356,34 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) + @defer.inlineCallbacks + def state(self, writer, current_token, limit, request_streams): + current_position = current_token.state + + state = request_streams.get("state") + + if state is not None: + state_groups, state_group_state = ( + yield self.store.get_all_new_state_groups( + state, current_position, limit + ) + ) + writer.write_header_and_rows("state_groups", state_groups, ( + "position", "room_id", "event_id" + )) + writer.write_header_and_rows("state_group_state", state_group_state, ( + "position", "type", "state_key", "event_id" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" - def __init__(self, request): + def __init__(self): self.streams = {} - self.request = request self.total = 0 def write_header_and_rows(self, name, rows, fields, position=None): @@ -336,7 +394,7 @@ class _Writer(object): position = rows[-1][0] self.streams[name] = { - "position": str(position), + "position": position if type(position) is int else str(position), "field_names": fields, "rows": rows, } @@ -344,13 +402,12 @@ class _Writer(object): self.total += len(rows) def finish(self): - self.request.write(json.dumps(self.streams, ensure_ascii=False)) - finish_request(self.request) + return self.streams class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers" + "push_rules", "pushers", "state" ))): __slots__ = [] diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/synapse/replication/slave/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/synapse/replication/slave/storage/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py new file mode 100644 index 0000000000..46e43ce1c7 --- /dev/null +++ b/synapse/replication/slave/storage/_base.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage._base import SQLBaseStore +from twisted.internet import defer + + +class BaseSlavedStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(BaseSlavedStore, self).__init__(hs) + + def stream_positions(self): + return {} + + def process_replication(self, result): + return defer.succeed(None) diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py new file mode 100644 index 0000000000..24b5c79d4a --- /dev/null +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.util.id_generators import _load_current_id + + +class SlavedIdTracker(object): + def __init__(self, db_conn, table, column, extra_tables=[], step=1): + self.step = step + self._current = _load_current_id(db_conn, table, column, step) + for table, column in extra_tables: + self.advance(_load_current_id(db_conn, table, column)) + + def advance(self, new_id): + self._current = (max if self.step > 0 else min)(self._current, new_id) + + def get_current_token(self): + return self._current diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py new file mode 100644 index 0000000000..735c03c7eb --- /dev/null +++ b/synapse/replication/slave/storage/account_data.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.storage.account_data import AccountDataStore +from synapse.storage.tags import TagsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedAccountDataStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedAccountDataStore, self).__init__(db_conn, hs) + self._account_data_id_gen = SlavedIdTracker( + db_conn, "account_data_max_stream_id", "stream_id", + ) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", + self._account_data_id_gen.get_current_token(), + ) + + get_account_data_for_user = ( + AccountDataStore.__dict__["get_account_data_for_user"] + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + get_tags_for_user = TagsStore.__dict__["get_tags_for_user"] + + get_updated_tags = DataStore.get_updated_tags.__func__ + get_updated_account_data_for_user = ( + DataStore.get_updated_account_data_for_user.__func__ + ) + + def get_max_account_data_stream_id(self): + return self._account_data_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedAccountDataStore, self).stream_positions() + position = self._account_data_id_gen.get_current_token() + result["user_account_data"] = position + result["room_account_data"] = position + result["tag_account_data"] = position + return result + + def process_replication(self, result): + stream = result.get("user_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id, data_type = row[:3] + self.get_global_account_data_by_type_for_user.invalidate( + (data_type, user_id,) + ) + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) + + stream = result.get("room_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) + + stream = result.get("tag_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_tags_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedAccountDataStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py new file mode 100644 index 0000000000..25792d9429 --- /dev/null +++ b/synapse/replication/slave/storage/appservice.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.config.appservice import load_appservices + + +class SlavedApplicationServiceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedApplicationServiceStore, self).__init__(db_conn, hs) + self.services_cache = load_appservices( + hs.config.server_name, + hs.config.app_service_config_files + ) + + get_app_service_by_token = DataStore.get_app_service_by_token.__func__ + get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py new file mode 100644 index 0000000000..877c68508c --- /dev/null +++ b/synapse/replication/slave/storage/events.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.api.constants import EventTypes +from synapse.events import FrozenEvent +from synapse.storage import DataStore +from synapse.storage.room import RoomStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore +from synapse.storage.state import StateStore +from synapse.storage.stream import StreamStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + +import ujson as json + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedEventStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedEventStore, self).__init__(db_conn, hs) + self._stream_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", + ) + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) + events_max = self._stream_id_gen.get_current_token() + event_cache_prefill, min_event_val = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", min_event_val, + prefilled_cache=event_cache_prefill, + ) + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", events_max, + ) + + # Cached functions can't be accessed through a class instance so we need + # to reach inside the __dict__ to extract them. + get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"] + get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] + get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_latest_event_ids_in_room = EventFederationStore.__dict__[ + "get_latest_event_ids_in_room" + ] + _get_current_state_for_key = StateStore.__dict__[ + "_get_current_state_for_key" + ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) + get_recent_event_ids_for_room = ( + StreamStore.__dict__["get_recent_event_ids_for_room"] + ) + + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) + get_event = DataStore.get_event.__func__ + get_events = DataStore.get_events.__func__ + get_current_state = DataStore.get_current_state.__func__ + get_current_state_for_key = DataStore.get_current_state_for_key.__func__ + get_rooms_for_user_where_membership_is = ( + DataStore.get_rooms_for_user_where_membership_is.__func__ + ) + get_membership_changes_for_user = ( + DataStore.get_membership_changes_for_user.__func__ + ) + get_room_events_max_id = DataStore.get_room_events_max_id.__func__ + get_room_events_stream_for_room = ( + DataStore.get_room_events_stream_for_room.__func__ + ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_event = DataStore.get_state_for_event.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ + get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ + get_room_events_stream_for_rooms = ( + DataStore.get_room_events_stream_for_rooms.__func__ + ) + get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ + + _set_before_and_after = staticmethod(DataStore._set_before_and_after) + + _get_events = DataStore._get_events.__func__ + _get_events_from_cache = DataStore._get_events_from_cache.__func__ + + _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ + _enqueue_events = DataStore._enqueue_events.__func__ + _do_fetch = DataStore._do_fetch.__func__ + _fetch_event_rows = DataStore._fetch_event_rows.__func__ + _get_event_from_row = DataStore._get_event_from_row.__func__ + _get_rooms_for_user_where_membership_is_txn = ( + DataStore._get_rooms_for_user_where_membership_is_txn.__func__ + ) + _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + + def stream_positions(self): + result = super(SlavedEventStore, self).stream_positions() + result["events"] = self._stream_id_gen.get_current_token() + result["backfill"] = -self._backfill_id_gen.get_current_token() + return result + + def process_replication(self, result): + state_resets = set( + r[0] for r in result.get("state_resets", {"rows": []})["rows"] + ) + + stream = result.get("events") + if stream: + self._stream_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + self._process_replication_row( + row, backfilled=False, state_resets=state_resets + ) + + stream = result.get("backfill") + if stream: + self._backfill_id_gen.advance(-int(stream["position"])) + for row in stream["rows"]: + self._process_replication_row( + row, backfilled=True, state_resets=state_resets + ) + + stream = result.get("forward_ex_outliers") + if stream: + self._stream_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + event_id = row[1] + self._invalidate_get_event_cache(event_id) + + stream = result.get("backward_ex_outliers") + if stream: + self._backfill_id_gen.advance(-int(stream["position"])) + for row in stream["rows"]: + event_id = row[1] + self._invalidate_get_event_cache(event_id) + + return super(SlavedEventStore, self).process_replication(result) + + def _process_replication_row(self, row, backfilled, state_resets): + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + self.invalidate_caches_for_event( + event, backfilled, reset_state=position in state_resets + ) + + def invalidate_caches_for_event(self, event, backfilled, reset_state): + if reset_state: + self._get_current_state_for_key.invalidate_all() + self.get_rooms_for_user.invalidate_all() + self.get_users_in_room.invalidate((event.room_id,)) + # self.get_joined_hosts_for_room.invalidate((event.room_id,)) + self.get_room_name_and_aliases.invalidate((event.room_id,)) + + self._invalidate_get_event_cache(event.event_id) + + self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + + if not backfilled: + self._events_stream_cache.entity_has_changed( + event.room_id, event.internal_metadata.stream_ordering + ) + + # self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + # (event.room_id,) + # ) + + if event.type == EventTypes.Redaction: + self._invalidate_get_event_cache(event.redacts) + + if event.type == EventTypes.Member: + self.get_rooms_for_user.invalidate((event.state_key,)) + # self.get_joined_hosts_for_room.invalidate((event.room_id,)) + self.get_users_in_room.invalidate((event.room_id,)) + self._membership_stream_cache.entity_has_changed( + event.state_key, event.internal_metadata.stream_ordering + ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) + + if not event.is_state(): + return + + if backfilled: + return + + if (not event.internal_metadata.is_invite_from_remote() + and event.internal_metadata.is_outlier()): + return + + self._get_current_state_for_key.invalidate(( + event.room_id, event.type, event.state_key + )) + + if event.type in [EventTypes.Name, EventTypes.Aliases]: + self.get_room_name_and_aliases.invalidate( + (event.room_id,) + ) + pass diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py new file mode 100644 index 0000000000..819ed62881 --- /dev/null +++ b/synapse/replication/slave/storage/filtering.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.filtering import FilteringStore + + +class SlavedFilteringStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedFilteringStore, self).__init__(db_conn, hs) + + # Filters are immutable so this cache doesn't need to be expired + get_user_filter = FilteringStore.__dict__["get_user_filter"] diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py new file mode 100644 index 0000000000..703f4a49bf --- /dev/null +++ b/synapse/replication/slave/storage/presence.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.storage import DataStore + + +class SlavedPresenceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedPresenceStore, self).__init__(db_conn, hs) + self._presence_id_gen = SlavedIdTracker( + db_conn, "presence_stream", "stream_id", + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() + ) + + _get_active_presence = DataStore._get_active_presence.__func__ + take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + get_presence_for_users = DataStore.get_presence_for_users.__func__ + + def get_current_presence_token(self): + return self._presence_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedPresenceStore, self).stream_positions() + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result + + def process_replication(self, result): + stream = result.get("presence") + if stream: + self._presence_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.presence_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py new file mode 100644 index 0000000000..21ceb0213a --- /dev/null +++ b/synapse/replication/slave/storage/push_rule.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events import SlavedEventStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.storage.push_rule import PushRuleStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedPushRuleStore(SlavedEventStore): + def __init__(self, db_conn, hs): + super(SlavedPushRuleStore, self).__init__(db_conn, hs) + self._push_rules_stream_id_gen = SlavedIdTracker( + db_conn, "push_rules_stream", "stream_id", + ) + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", + self._push_rules_stream_id_gen.get_current_token(), + ) + + get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] + get_push_rules_enabled_for_user = ( + PushRuleStore.__dict__["get_push_rules_enabled_for_user"] + ) + have_push_rules_changed_for_user = ( + DataStore.have_push_rules_changed_for_user.__func__ + ) + + def get_push_rules_stream_token(self): + return ( + self._push_rules_stream_id_gen.get_current_token(), + self._stream_id_gen.get_current_token(), + ) + + def stream_positions(self): + result = super(SlavedPushRuleStore, self).stream_positions() + result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("push_rules") + if stream: + for row in stream["rows"]: + position = row[0] + user_id = row[2] + self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self.push_rules_stream_cache.entity_has_changed( + user_id, position + ) + + self._push_rules_stream_id_gen.advance(int(stream["position"])) + + return super(SlavedPushRuleStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py new file mode 100644 index 0000000000..d88206b3bb --- /dev/null +++ b/synapse/replication/slave/storage/pushers.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore + + +class SlavedPusherStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedPusherStore, self).__init__(db_conn, hs) + self._pushers_id_gen = SlavedIdTracker( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) + + get_all_pushers = DataStore.get_all_pushers.__func__ + get_pushers_by = DataStore.get_pushers_by.__func__ + get_pushers_by_app_id_and_pushkey = ( + DataStore.get_pushers_by_app_id_and_pushkey.__func__ + ) + _decode_pushers_rows = DataStore._decode_pushers_rows.__func__ + + def stream_positions(self): + result = super(SlavedPusherStore, self).stream_positions() + result["pushers"] = self._pushers_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("pushers") + if stream: + self._pushers_id_gen.advance(int(stream["position"])) + + stream = result.get("deleted_pushers") + if stream: + self._pushers_id_gen.advance(int(stream["position"])) + + return super(SlavedPusherStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..ac9662d399 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + self._receipts_stream_cache = StreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + get_linearized_receipts_for_room = ( + ReceiptsStore.__dict__["get_linearized_receipts_for_room"] + ) + _get_linearized_receipts_for_rooms = ( + ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"] + ) + get_last_receipt_event_id_for_user = ( + ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"] + ) + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + get_linearized_receipts_for_rooms = ( + DataStore.get_linearized_receipts_for_rooms.__func__ + ) + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, room_id, receipt_type, user_id = row[:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + self._receipts_stream_cache.entity_has_changed(room_id, position) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) + self.get_linearized_receipts_for_room.invalidate_many((room_id,)) + self.get_last_receipt_event_id_for_user.invalidate( + (user_id, room_id, receipt_type) + ) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py new file mode 100644 index 0000000000..307833f9e1 --- /dev/null +++ b/synapse/replication/slave/storage/registration.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.registration import RegistrationStore + + +class SlavedRegistrationStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedRegistrationStore, self).__init__(db_conn, hs) + + # TODO: use the cached version and invalidate deleted tokens + get_user_by_access_token = RegistrationStore.__dict__[ + "get_user_by_access_token" + ].orig + + _query_for_auth = DataStore._query_for_auth.__func__ |