From d46b18a00f0811489299b4835482af2a71dcaf55 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 18:27:06 +0200 Subject: Pass through _get_event_txn --- synapse/replication/slave/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/replication/slave/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 86f00b6ff5..56e8b9d906 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -104,6 +104,7 @@ class SlavedEventStore(BaseSlavedStore): _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ _parse_events_txn = DataStore._parse_events_txn.__func__ _get_events_txn = DataStore._get_events_txn.__func__ + _get_event_txn = DataStore._get_event_txn.__func__ _enqueue_events = DataStore._enqueue_events.__func__ _do_fetch = DataStore._do_fetch.__func__ _fetch_events_txn = DataStore._fetch_events_txn.__func__ -- cgit 1.4.1 From 5f46be19a78f1811eee9a7f8fd9fde70563fdfe9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 18:43:40 +0200 Subject: Pass through get_events to pusher too --- synapse/replication/slave/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/replication/slave/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 56e8b9d906..7ba7a6f6e4 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -83,6 +83,7 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_push_action_users_in_range.__func__ ) get_event = DataStore.get_event.__func__ + get_events = DataStore.get_events.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ get_rooms_for_user_where_membership_is = ( -- cgit 1.4.1 From 3547e66bc684ce3f0fbc83297fbe319a683c2a15 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 11:53:00 +0100 Subject: Make sure we advance our stream position --- synapse/replication/slave/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/replication/slave/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e4..635febb174 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -146,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("forward_ex_outliers") if stream: + self._stream_id_gen.advance(stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) stream = result.get("backward_ex_outliers") if stream: + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) -- cgit 1.4.1 From 3abab26458cc9fe8a77d5ccee664e87ce407ed58 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 15:34:06 +0100 Subject: Add a slaved datastore for account data --- synapse/replication/slave/storage/account_data.py | 61 ++++++++++++++++++++++ .../replication/slave/storage/test_account_data.py | 56 ++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 synapse/replication/slave/storage/account_data.py create mode 100644 tests/replication/slave/storage/test_account_data.py (limited to 'synapse/replication/slave/storage') diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py new file mode 100644 index 0000000000..f59b0eabbc --- /dev/null +++ b/synapse/replication/slave/storage/account_data.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage.account_data import AccountDataStore + + +class SlavedAccountDataStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedAccountDataStore, self).__init__(db_conn, hs) + self._account_data_id_gen = SlavedIdTracker( + db_conn, "account_data_max_stream_id", "stream_id", + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + def stream_positions(self): + result = super(SlavedAccountDataStore, self).stream_positions() + position = self._account_data_id_gen.get_current_token() + result["user_account_data"] = position + result["room_account_data"] = position + result["tag_account_data"] = position + return result + + def process_replication(self, result): + stream = result.get("user_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + user_id, data_type = row[1:3] + self.get_global_account_data_by_type_for_user.invalidate( + (data_type, user_id,) + ) + + stream = result.get("room_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + + stream = result.get("tag_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py new file mode 100644 index 0000000000..da54d478ce --- /dev/null +++ b/tests/replication/slave/storage/test_account_data.py @@ -0,0 +1,56 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +TYPE = "my.type" + + +class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedAccountDataStore + + @defer.inlineCallbacks + def test_user_account_data(self): + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 1} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 1} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 1}} + ) + + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 2} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 2} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 2}} + ) -- cgit 1.4.1 From 206eb9fd947ba86060340ba2154d1112570b76cd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 16:58:14 +0100 Subject: Shift some of the state_group methods into the SlavedEventStore --- synapse/app/pusher.py | 45 ----------------------------- synapse/replication/slave/storage/events.py | 19 ++++++++++++ 2 files changed, 19 insertions(+), 45 deletions(-) (limited to 'synapse/replication/slave/storage') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 662cd0dc6b..9d41b62db5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,7 +23,6 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.state import StateStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -131,50 +130,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_state_groups = ( - DataStore.get_state_groups.__func__ - ) - - _get_state_for_groups = ( - DataStore._get_state_for_groups.__func__ - ) - - _get_all_state_from_cache = ( - DataStore._get_all_state_from_cache.__func__ - ) - - get_events_around = ( - DataStore.get_events_around.__func__ - ) - - _get_events_around_txn = ( - DataStore._get_events_around_txn.__func__ - ) - - get_state_for_events = ( - DataStore.get_state_for_events.__func__ - ) - - _get_some_state_from_cache = ( - DataStore._get_some_state_from_cache.__func__ - ) - - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - - _get_state_group_from_group = ( - StateStore.__dict__["_get_state_group_from_group"] - ) - get_global_account_data_by_type_for_users = ( AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e4..0e29bd51d6 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ _set_before_and_after = DataStore._set_before_and_after @@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore): DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() -- cgit 1.4.1 From 0466454b003860dba23363f882916eb4f7d27648 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 17:33:44 +0100 Subject: Assert that stream replicated stream positions are ints --- synapse/replication/slave/storage/events.py | 8 ++++---- synapse/replication/slave/storage/pushers.py | 4 ++-- synapse/replication/slave/storage/receipts.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/replication/slave/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 99cddf2518..c0d741452d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -149,7 +149,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: - self._stream_id_gen.advance(stream["position"]) + self._stream_id_gen.advance(int(stream["position"])) for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets @@ -157,7 +157,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(-stream["position"]) + self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets @@ -165,14 +165,14 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("forward_ex_outliers") if stream: - self._stream_id_gen.advance(stream["position"]) + self._stream_id_gen.advance(int(stream["position"])) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) stream = result.get("backward_ex_outliers") if stream: - self._backfill_id_gen.advance(-stream["position"]) + self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index 8faddb2595..d88206b3bb 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -43,10 +43,10 @@ class SlavedPusherStore(BaseSlavedStore): def process_replication(self, result): stream = result.get("pushers") if stream: - self._pushers_id_gen.advance(stream["position"]) + self._pushers_id_gen.advance(int(stream["position"])) stream = result.get("deleted_pushers") if stream: - self._pushers_id_gen.advance(stream["position"]) + self._pushers_id_gen.advance(int(stream["position"])) return super(SlavedPusherStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index b55d5dfd08..ec007516d0 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -50,7 +50,7 @@ class SlavedReceiptsStore(BaseSlavedStore): def process_replication(self, result): stream = result.get("receipts") if stream: - self._receipts_id_gen.advance(stream["position"]) + self._receipts_id_gen.advance(int(stream["position"])) for row in stream["rows"]: room_id, receipt_type, user_id = row[1:4] self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) -- cgit 1.4.1