From 0eae0757232169b833224f48208aed9fdc9c6fe6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 10:58:03 +0100 Subject: Add slaved stores for filters, tokens, and push rules --- synapse/replication/slave/storage/appservice.py | 30 ++++++++++ synapse/replication/slave/storage/filtering.py | 24 ++++++++ synapse/replication/slave/storage/push_rule.py | 67 +++++++++++++++++++++++ synapse/replication/slave/storage/registration.py | 30 ++++++++++ 4 files changed, 151 insertions(+) create mode 100644 synapse/replication/slave/storage/appservice.py create mode 100644 synapse/replication/slave/storage/filtering.py create mode 100644 synapse/replication/slave/storage/push_rule.py create mode 100644 synapse/replication/slave/storage/registration.py (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py new file mode 100644 index 0000000000..25792d9429 --- /dev/null +++ b/synapse/replication/slave/storage/appservice.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.config.appservice import load_appservices + + +class SlavedApplicationServiceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedApplicationServiceStore, self).__init__(db_conn, hs) + self.services_cache = load_appservices( + hs.config.server_name, + hs.config.app_service_config_files + ) + + get_app_service_by_token = DataStore.get_app_service_by_token.__func__ + get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py new file mode 100644 index 0000000000..5037f395b9 --- /dev/null +++ b/synapse/replication/slave/storage/filtering.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.filtering import FilteringStore + + +class SlavedFilteringStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedFilteringStore, self).__init__(db_conn, hs) + + get_user_filter = FilteringStore.__dict__["get_user_filter"] diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py new file mode 100644 index 0000000000..21ceb0213a --- /dev/null +++ b/synapse/replication/slave/storage/push_rule.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events import SlavedEventStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.storage.push_rule import PushRuleStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedPushRuleStore(SlavedEventStore): + def __init__(self, db_conn, hs): + super(SlavedPushRuleStore, self).__init__(db_conn, hs) + self._push_rules_stream_id_gen = SlavedIdTracker( + db_conn, "push_rules_stream", "stream_id", + ) + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", + self._push_rules_stream_id_gen.get_current_token(), + ) + + get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] + get_push_rules_enabled_for_user = ( + PushRuleStore.__dict__["get_push_rules_enabled_for_user"] + ) + have_push_rules_changed_for_user = ( + DataStore.have_push_rules_changed_for_user.__func__ + ) + + def get_push_rules_stream_token(self): + return ( + self._push_rules_stream_id_gen.get_current_token(), + self._stream_id_gen.get_current_token(), + ) + + def stream_positions(self): + result = super(SlavedPushRuleStore, self).stream_positions() + result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("push_rules") + if stream: + for row in stream["rows"]: + position = row[0] + user_id = row[2] + self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self.push_rules_stream_cache.entity_has_changed( + user_id, position + ) + + self._push_rules_stream_id_gen.advance(int(stream["position"])) + + return super(SlavedPushRuleStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py new file mode 100644 index 0000000000..307833f9e1 --- /dev/null +++ b/synapse/replication/slave/storage/registration.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.registration import RegistrationStore + + +class SlavedRegistrationStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedRegistrationStore, self).__init__(db_conn, hs) + + # TODO: use the cached version and invalidate deleted tokens + get_user_by_access_token = RegistrationStore.__dict__[ + "get_user_by_access_token" + ].orig + + _query_for_auth = DataStore._query_for_auth.__func__ -- cgit 1.4.1 From f88d747f7959808884451245aeba65edf7c490bf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:03:10 +0100 Subject: Add a comment explaining why the filter cache doesn't need exipiring --- synapse/replication/slave/storage/filtering.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py index 5037f395b9..819ed62881 100644 --- a/synapse/replication/slave/storage/filtering.py +++ b/synapse/replication/slave/storage/filtering.py @@ -21,4 +21,5 @@ class SlavedFilteringStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedFilteringStore, self).__init__(db_conn, hs) + # Filters are immutable so this cache doesn't need to be expired get_user_filter = FilteringStore.__dict__["get_user_filter"] -- cgit 1.4.1 From 3ae915b27e4531031ee325931b3c62bc200ce798 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:05:53 +0100 Subject: Add a slaved store for presence --- synapse/replication/slave/storage/presence.py | 59 +++++++++++++++++++++++++++ synapse/storage/__init__.py | 6 +-- 2 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/slave/storage/presence.py (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py new file mode 100644 index 0000000000..703f4a49bf --- /dev/null +++ b/synapse/replication/slave/storage/presence.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.storage import DataStore + + +class SlavedPresenceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedPresenceStore, self).__init__(db_conn, hs) + self._presence_id_gen = SlavedIdTracker( + db_conn, "presence_stream", "stream_id", + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() + ) + + _get_active_presence = DataStore._get_active_presence.__func__ + take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + get_presence_for_users = DataStore.get_presence_for_users.__func__ + + def get_current_presence_token(self): + return self._presence_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedPresenceStore, self).stream_positions() + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result + + def process_replication(self, result): + stream = result.get("presence") + if stream: + self._presence_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.presence_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8581796b7e..6928a213e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore, super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn): -- cgit 1.4.1 From 81cf449daa8b310899014f5564f5fdf10289e79c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:19:27 +0100 Subject: Add methods to events, account data and receipt slaves Adds the methods needed by /sync to the slaved events, account data and receipt stores. --- synapse/replication/slave/storage/account_data.py | 41 ++++++++++++++++++++++- synapse/replication/slave/storage/events.py | 21 +++++++++--- synapse/replication/slave/storage/receipts.py | 25 +++++++++++++- 3 files changed, 81 insertions(+), 6 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index f59b0eabbc..735c03c7eb 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -15,7 +15,10 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore from synapse.storage.account_data import AccountDataStore +from synapse.storage.tags import TagsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedAccountDataStore(BaseSlavedStore): @@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore): self._account_data_id_gen = SlavedIdTracker( db_conn, "account_data_max_stream_id", "stream_id", ) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", + self._account_data_id_gen.get_current_token(), + ) + + get_account_data_for_user = ( + AccountDataStore.__dict__["get_account_data_for_user"] + ) get_global_account_data_by_type_for_users = ( AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] @@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore): AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] ) + get_tags_for_user = TagsStore.__dict__["get_tags_for_user"] + + get_updated_tags = DataStore.get_updated_tags.__func__ + get_updated_account_data_for_user = ( + DataStore.get_updated_account_data_for_user.__func__ + ) + + def get_max_account_data_stream_id(self): + return self._account_data_id_gen.get_current_token() + def stream_positions(self): result = super(SlavedAccountDataStore, self).stream_positions() position = self._account_data_id_gen.get_current_token() @@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore): if stream: self._account_data_id_gen.advance(int(stream["position"])) for row in stream["rows"]: - user_id, data_type = row[1:3] + position, user_id, data_type = row[:3] self.get_global_account_data_by_type_for_user.invalidate( (data_type, user_id,) ) + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) stream = result.get("room_account_data") if stream: self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) stream = result.get("tag_account_data") if stream: self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_tags_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedAccountDataStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index c0d741452d..cbc1ae4190 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore +from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json @@ -57,6 +58,9 @@ class SlavedEventStore(BaseSlavedStore): "EventsRoomStreamChangeCache", min_event_val, prefilled_cache=event_cache_prefill, ) + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", events_max, + ) # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. @@ -87,6 +91,9 @@ class SlavedEventStore(BaseSlavedStore): _get_state_group_from_group = ( StateStore.__dict__["_get_state_group_from_group"] ) + get_recent_event_ids_for_room = ( + StreamStore.__dict__["get_recent_event_ids_for_room"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -109,10 +116,16 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_room_events_stream_for_room.__func__ ) get_events_around = DataStore.get_events_around.__func__ + get_state_for_event = DataStore.get_state_for_event.__func__ get_state_for_events = DataStore.get_state_for_events.__func__ get_state_groups = DataStore.get_state_groups.__func__ + get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ + get_room_events_stream_for_rooms = ( + DataStore.get_room_events_stream_for_rooms.__func__ + ) + get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ - _set_before_and_after = DataStore._set_before_and_after + _set_before_and_after = staticmethod(DataStore._set_before_and_after) _get_events = DataStore._get_events.__func__ _get_events_from_cache = DataStore._get_events_from_cache.__func__ @@ -220,9 +233,9 @@ class SlavedEventStore(BaseSlavedStore): self.get_rooms_for_user.invalidate((event.state_key,)) # self.get_joined_hosts_for_room.invalidate((event.room_id,)) self.get_users_in_room.invalidate((event.room_id,)) - # self._membership_stream_cache.entity_has_changed( - # event.state_key, event.internal_metadata.stream_ordering - # ) + self._membership_stream_cache.entity_has_changed( + event.state_key, event.internal_metadata.stream_ordering + ) self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index ec007516d0..ac9662d399 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore from synapse.storage.receipts import ReceiptsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -37,11 +38,28 @@ class SlavedReceiptsStore(BaseSlavedStore): db_conn, "receipts_linearized", "stream_id" ) + self._receipts_stream_cache = StreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() + ) + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + get_linearized_receipts_for_room = ( + ReceiptsStore.__dict__["get_linearized_receipts_for_room"] + ) + _get_linearized_receipts_for_rooms = ( + ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"] + ) + get_last_receipt_event_id_for_user = ( + ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"] + ) get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + get_linearized_receipts_for_rooms = ( + DataStore.get_linearized_receipts_for_rooms.__func__ + ) + def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions() result["receipts"] = self._receipts_id_gen.get_current_token() @@ -52,10 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore): if stream: self._receipts_id_gen.advance(int(stream["position"])) for row in stream["rows"]: - room_id, receipt_type, user_id = row[1:4] + position, room_id, receipt_type, user_id = row[:4] self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + self._receipts_stream_cache.entity_has_changed(room_id, position) return super(SlavedReceiptsStore, self).process_replication(result) def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) + self.get_linearized_receipts_for_room.invalidate_many((room_id,)) + self.get_last_receipt_event_id_for_user.invalidate( + (user_id, room_id, receipt_type) + ) -- cgit 1.4.1