diff options
Diffstat (limited to 'synapse/replication/slave')
19 files changed, 184 insertions, 331 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index b962641166..3f7be74e02 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -13,19 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker -import logging - logger = logging.getLogger(__name__) class BaseSlavedStore(SQLBaseStore): def __init__(self, db_conn, hs): - super(BaseSlavedStore, self).__init__(hs) + super(BaseSlavedStore, self).__init__(db_conn, hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = SlavedIdTracker( db_conn, "cache_invalidation_stream", "stream_id", diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index efbd87918e..d9ba6d69b1 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,50 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker -from synapse.storage import DataStore -from synapse.storage.account_data import AccountDataStore -from synapse.storage.tags import TagsStore -from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.account_data import AccountDataWorkerStore +from synapse.storage.tags import TagsWorkerStore -class SlavedAccountDataStore(BaseSlavedStore): +class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): - super(SlavedAccountDataStore, self).__init__(db_conn, hs) self._account_data_id_gen = SlavedIdTracker( db_conn, "account_data_max_stream_id", "stream_id", ) - self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", - self._account_data_id_gen.get_current_token(), - ) - - get_account_data_for_user = ( - AccountDataStore.__dict__["get_account_data_for_user"] - ) - - get_global_account_data_by_type_for_users = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] - ) - get_global_account_data_by_type_for_user = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] - ) - - get_tags_for_user = TagsStore.__dict__["get_tags_for_user"] - get_tags_for_room = ( - DataStore.get_tags_for_room.__func__ - ) - get_account_data_for_room = ( - DataStore.get_account_data_for_room.__func__ - ) - - get_updated_tags = DataStore.get_updated_tags.__func__ - get_updated_account_data_for_user = ( - DataStore.get_updated_account_data_for_user.__func__ - ) + super(SlavedAccountDataStore, self).__init__(db_conn, hs) def get_max_account_data_stream_id(self): return self._account_data_id_gen.get_current_token() @@ -85,6 +56,10 @@ class SlavedAccountDataStore(BaseSlavedStore): (row.data_type, row.user_id,) ) self.get_account_data_for_user.invalidate((row.user_id,)) + self.get_account_data_for_room.invalidate((row.user_id, row.room_id,)) + self.get_account_data_for_room_and_type.invalidate( + (row.user_id, row.room_id, row.data_type,), + ) self._account_data_stream_cache.entity_has_changed( row.user_id, token ) diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 0d3f31a50c..b53a4c6bd1 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,33 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from synapse.storage import DataStore -from synapse.config.appservice import load_appservices -from synapse.storage.appservice import _make_exclusive_regex +from synapse.storage.appservice import ( + ApplicationServiceTransactionWorkerStore, + ApplicationServiceWorkerStore, +) -class SlavedApplicationServiceStore(BaseSlavedStore): - def __init__(self, db_conn, hs): - super(SlavedApplicationServiceStore, self).__init__(db_conn, hs) - self.services_cache = load_appservices( - hs.config.server_name, - hs.config.app_service_config_files - ) - self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) - - get_app_service_by_token = DataStore.get_app_service_by_token.__func__ - get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ - get_app_services = DataStore.get_app_services.__func__ - get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__ - create_appservice_txn = DataStore.create_appservice_txn.__func__ - get_appservices_by_state = DataStore.get_appservices_by_state.__func__ - get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__ - _get_last_txn = DataStore._get_last_txn.__func__ - complete_appservice_txn = DataStore.complete_appservice_txn.__func__ - get_appservice_state = DataStore.get_appservice_state.__func__ - set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__ - set_appservice_state = DataStore.set_appservice_state.__func__ - get_if_app_services_interested_in_user = ( - DataStore.get_if_app_services_interested_in_user.__func__ - ) +class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore, + ApplicationServiceWorkerStore): + pass diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 65250285e8..60641f1a49 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore from synapse.storage.client_ips import LAST_SEEN_GRANULARITY from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.descriptors import Cache +from ._base import BaseSlavedStore + class SlavedClientIpStore(BaseSlavedStore): def __init__(self, db_conn, hs): @@ -29,9 +30,8 @@ class SlavedClientIpStore(BaseSlavedStore): max_entries=50000 * CACHE_SIZE_FACTOR, ) - def insert_client_ip(self, user, access_token, ip, user_agent, device_id): + def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) - user_id = user.to_string() key = (user_id, access_token, ip) try: diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 6f3fb64770..87eaa53004 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker class SlavedDeviceInboxStore(BaseSlavedStore): diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 7687867aee..8206a988f7 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + class SlavedDeviceStore(BaseSlavedStore): def __init__(self, db_conn, hs): diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py index 7301d885f2..1d1d48709a 100644 --- a/synapse/replication/slave/storage/directory.py +++ b/synapse/replication/slave/storage/directory.py @@ -13,11 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.storage.directory import DirectoryWorkerStore + from ._base import BaseSlavedStore -from synapse.storage.directory import DirectoryStore -class DirectoryStore(BaseSlavedStore): - get_aliases_for_room = DirectoryStore.__dict__[ - "get_aliases_for_room" - ] +class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 94ebbffc1b..bdb5eee4af 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,20 +13,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker +import logging from synapse.api.constants import EventTypes -from synapse.storage import DataStore -from synapse.storage.roommember import RoomMemberStore -from synapse.storage.event_federation import EventFederationStore -from synapse.storage.event_push_actions import EventPushActionsStore -from synapse.storage.state import StateStore -from synapse.storage.stream import StreamStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - -import logging +from synapse.storage.event_federation import EventFederationWorkerStore +from synapse.storage.event_push_actions import EventPushActionsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.roommember import RoomMemberWorkerStore +from synapse.storage.signatures import SignatureWorkerStore +from synapse.storage.state import StateGroupWorkerStore +from synapse.storage.stream import StreamWorkerStore +from synapse.storage.user_erasure_store import UserErasureWorkerStore +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) @@ -39,163 +40,34 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(BaseSlavedStore): +class SlavedEventStore(EventFederationWorkerStore, + RoomMemberWorkerStore, + EventPushActionsWorkerStore, + StreamWorkerStore, + EventsWorkerStore, + StateGroupWorkerStore, + SignatureWorkerStore, + UserErasureWorkerStore, + BaseSlavedStore): def __init__(self, db_conn, hs): - super(SlavedEventStore, self).__init__(db_conn, hs) self._stream_id_gen = SlavedIdTracker( db_conn, "events", "stream_ordering", ) self._backfill_id_gen = SlavedIdTracker( db_conn, "events", "stream_ordering", step=-1 ) - events_max = self._stream_id_gen.get_current_token() - event_cache_prefill, min_event_val = self._get_cache_dict( - db_conn, "events", - entity_column="room_id", - stream_column="stream_ordering", - max_value=events_max, - ) - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", min_event_val, - prefilled_cache=event_cache_prefill, - ) - self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max, - ) - self.stream_ordering_month_ago = 0 - self._stream_order_on_start = self.get_room_max_stream_ordering() + super(SlavedEventStore, self).__init__(db_conn, hs) # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. - get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] - get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] - get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] - get_users_who_share_room_with_user = ( - RoomMemberStore.__dict__["get_users_who_share_room_with_user"] - ) - get_latest_event_ids_in_room = EventFederationStore.__dict__[ - "get_latest_event_ids_in_room" - ] - get_invited_rooms_for_user = RoomMemberStore.__dict__[ - "get_invited_rooms_for_user" - ] - get_unread_event_push_actions_by_room_for_user = ( - EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] - ) - _get_unread_counts_by_receipt_txn = ( - DataStore._get_unread_counts_by_receipt_txn.__func__ - ) - _get_unread_counts_by_pos_txn = ( - DataStore._get_unread_counts_by_pos_txn.__func__ - ) - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - _get_state_groups_from_groups_txn = ( - DataStore._get_state_groups_from_groups_txn.__func__ - ) - get_recent_event_ids_for_room = ( - StreamStore.__dict__["get_recent_event_ids_for_room"] - ) - get_current_state_ids = ( - StateStore.__dict__["get_current_state_ids"] - ) - get_state_group_delta = StateStore.__dict__["get_state_group_delta"] - _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] - has_room_changed_since = DataStore.has_room_changed_since.__func__ - - get_unread_push_actions_for_user_in_range_for_http = ( - DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ - ) - get_unread_push_actions_for_user_in_range_for_email = ( - DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__ - ) - get_push_action_users_in_range = ( - DataStore.get_push_action_users_in_range.__func__ - ) - get_event = DataStore.get_event.__func__ - get_events = DataStore.get_events.__func__ - get_rooms_for_user_where_membership_is = ( - DataStore.get_rooms_for_user_where_membership_is.__func__ - ) - get_membership_changes_for_user = ( - DataStore.get_membership_changes_for_user.__func__ - ) - get_room_events_max_id = DataStore.get_room_events_max_id.__func__ - get_room_events_stream_for_room = ( - DataStore.get_room_events_stream_for_room.__func__ - ) - get_events_around = DataStore.get_events_around.__func__ - get_state_for_event = DataStore.get_state_for_event.__func__ - get_state_for_events = DataStore.get_state_for_events.__func__ - get_state_groups = DataStore.get_state_groups.__func__ - get_state_groups_ids = DataStore.get_state_groups_ids.__func__ - get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__ - get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__ - get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__ - get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__ - _get_joined_users_from_context = ( - RoomMemberStore.__dict__["_get_joined_users_from_context"] - ) - - get_joined_hosts = DataStore.get_joined_hosts.__func__ - _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"] - - get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ - get_room_events_stream_for_rooms = ( - DataStore.get_room_events_stream_for_rooms.__func__ - ) - is_host_joined = RoomMemberStore.__dict__["is_host_joined"] - get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ - - _set_before_and_after = staticmethod(DataStore._set_before_and_after) - - _get_events = DataStore._get_events.__func__ - _get_events_from_cache = DataStore._get_events_from_cache.__func__ - - _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ - _enqueue_events = DataStore._enqueue_events.__func__ - _do_fetch = DataStore._do_fetch.__func__ - _fetch_event_rows = DataStore._fetch_event_rows.__func__ - _get_event_from_row = DataStore._get_event_from_row.__func__ - _get_rooms_for_user_where_membership_is_txn = ( - DataStore._get_rooms_for_user_where_membership_is_txn.__func__ - ) - _get_state_for_groups = DataStore._get_state_for_groups.__func__ - _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ - _get_events_around_txn = DataStore._get_events_around_txn.__func__ - _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ - - get_backfill_events = DataStore.get_backfill_events.__func__ - _get_backfill_events = DataStore._get_backfill_events.__func__ - get_missing_events = DataStore.get_missing_events.__func__ - _get_missing_events = DataStore._get_missing_events.__func__ - - get_auth_chain = DataStore.get_auth_chain.__func__ - get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ - _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ - - get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ - - get_forward_extremeties_for_room = ( - DataStore.get_forward_extremeties_for_room.__func__ - ) - _get_forward_extremeties_for_room = ( - EventFederationStore.__dict__["_get_forward_extremeties_for_room"] - ) - - get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ - - get_federation_out_pos = DataStore.get_federation_out_pos.__func__ - update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py index 819ed62881..456a14cd5c 100644 --- a/synapse/replication/slave/storage/filtering.py +++ b/synapse/replication/slave/storage/filtering.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore from synapse.storage.filtering import FilteringStore +from ._base import BaseSlavedStore + class SlavedFilteringStore(BaseSlavedStore): def __init__(self, db_conn, hs): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py new file mode 100644 index 0000000000..5777f07c8d --- /dev/null +++ b/synapse/replication/slave/storage/groups.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + + +class SlavedGroupServerStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedGroupServerStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._group_updates_id_gen = SlavedIdTracker( + db_conn, "local_group_updates", "stream_id", + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(), + ) + + get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__ + get_group_stream_token = DataStore.get_group_stream_token.__func__ + get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__ + + def stream_positions(self): + result = super(SlavedGroupServerStore, self).stream_positions() + result["groups"] = self._group_updates_id_gen.get_current_token() + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "groups": + self._group_updates_id_gen.advance(token) + for row in rows: + self._group_updates_stream_cache.entity_has_changed( + row.user_id, token + ) + + return super(SlavedGroupServerStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index dd2ae49e48..05ed168463 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.keys import KeyStore +from ._base import BaseSlavedStore + class SlavedKeyStore(BaseSlavedStore): _get_server_verify_key = KeyStore.__dict__[ diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index cfb9280181..80b744082a 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker - -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.storage import DataStore from synapse.storage.presence import PresenceStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker class SlavedPresenceStore(BaseSlavedStore): diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py new file mode 100644 index 0000000000..46c28d4171 --- /dev/null +++ b/synapse/replication/slave/storage/profile.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.storage.profile import ProfileWorkerStore + + +class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 83e880fdd2..f0200c1e98 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,31 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .events import SlavedEventStore +from synapse.storage.push_rule import PushRulesWorkerStore + from ._slaved_id_tracker import SlavedIdTracker -from synapse.storage import DataStore -from synapse.storage.push_rule import PushRuleStore -from synapse.util.caches.stream_change_cache import StreamChangeCache +from .events import SlavedEventStore -class SlavedPushRuleStore(SlavedEventStore): +class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore): def __init__(self, db_conn, hs): - super(SlavedPushRuleStore, self).__init__(db_conn, hs) self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id", ) - self.push_rules_stream_cache = StreamChangeCache( - "PushRulesStreamChangeCache", - self._push_rules_stream_id_gen.get_current_token(), - ) - - get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] - get_push_rules_enabled_for_user = ( - PushRuleStore.__dict__["get_push_rules_enabled_for_user"] - ) - have_push_rules_changed_for_user = ( - DataStore.have_push_rules_changed_for_user.__func__ - ) + super(SlavedPushRuleStore, self).__init__(db_conn, hs) def get_push_rules_stream_token(self): return ( @@ -45,6 +33,9 @@ class SlavedPushRuleStore(SlavedEventStore): self._stream_id_gen.get_current_token(), ) + def get_max_push_rules_stream_id(self): + return self._push_rules_stream_id_gen.get_current_token() + def stream_positions(self): result = super(SlavedPushRuleStore, self).stream_positions() result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index 4e8d68ece9..3b2213c0d4 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.storage.pusher import PusherWorkerStore + from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker -from synapse.storage import DataStore - -class SlavedPusherStore(BaseSlavedStore): +class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedPusherStore, self).__init__(db_conn, hs) @@ -28,13 +29,6 @@ class SlavedPusherStore(BaseSlavedStore): extra_tables=[("deleted_pushers", "stream_id")], ) - get_all_pushers = DataStore.get_all_pushers.__func__ - get_pushers_by = DataStore.get_pushers_by.__func__ - get_pushers_by_app_id_and_pushkey = ( - DataStore.get_pushers_by_app_id_and_pushkey.__func__ - ) - _decode_pushers_rows = DataStore._decode_pushers_rows.__func__ - def stream_positions(self): result = super(SlavedPusherStore, self).stream_positions() result["pushers"] = self._pushers_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index b371574ece..ed12342f40 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.storage.receipts import ReceiptsWorkerStore + from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker -from synapse.storage import DataStore -from synapse.storage.receipts import ReceiptsStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the # DataStore or are cached and don't have cache invalidation logic. @@ -29,36 +28,19 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache # the method descriptor on the DataStore and chuck them into our class. -class SlavedReceiptsStore(BaseSlavedStore): +class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): - super(SlavedReceiptsStore, self).__init__(db_conn, hs) - + # We instantiate this first as the ReceiptsWorkerStore constructor + # needs to be able to call get_max_receipt_stream_id self._receipts_id_gen = SlavedIdTracker( db_conn, "receipts_linearized", "stream_id" ) - self._receipts_stream_cache = StreamChangeCache( - "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() - ) - - get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] - get_linearized_receipts_for_room = ( - ReceiptsStore.__dict__["get_linearized_receipts_for_room"] - ) - _get_linearized_receipts_for_rooms = ( - ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"] - ) - get_last_receipt_event_id_for_user = ( - ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"] - ) - - get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ - get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + super(SlavedReceiptsStore, self).__init__(db_conn, hs) - get_linearized_receipts_for_rooms = ( - DataStore.get_linearized_receipts_for_rooms.__func__ - ) + def get_max_receipt_stream_id(self): + return self._receipts_id_gen.get_current_token() def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions() @@ -67,10 +49,12 @@ class SlavedReceiptsStore(BaseSlavedStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self.get_linearized_receipts_for_room.invalidate_many((room_id,)) + self._get_linearized_receipts_for_room.invalidate_many((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) + self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id) + self.get_receipts_for_room.invalidate((room_id, receipt_type)) def process_replication_rows(self, stream_name, token, rows): if stream_name == "receipts": diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index e27c7332d2..408d91df1c 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -13,21 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from synapse.storage import DataStore -from synapse.storage.registration import RegistrationStore - +from synapse.storage.registration import RegistrationWorkerStore -class SlavedRegistrationStore(BaseSlavedStore): - def __init__(self, db_conn, hs): - super(SlavedRegistrationStore, self).__init__(db_conn, hs) +from ._base import BaseSlavedStore - # TODO: use the cached version and invalidate deleted tokens - get_user_by_access_token = RegistrationStore.__dict__[ - "get_user_by_access_token" - ] - _query_for_auth = DataStore._query_for_auth.__func__ - get_user_by_id = RegistrationStore.__dict__[ - "get_user_by_id" - ] +class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index f510384033..0cb474928c 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -13,33 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.storage.room import RoomWorkerStore + from ._base import BaseSlavedStore -from synapse.storage import DataStore -from synapse.storage.room import RoomStore from ._slaved_id_tracker import SlavedIdTracker -class RoomStore(BaseSlavedStore): +class RoomStore(RoomWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(RoomStore, self).__init__(db_conn, hs) self._public_room_id_gen = SlavedIdTracker( db_conn, "public_room_list_stream", "stream_id" ) - get_public_room_ids = DataStore.get_public_room_ids.__func__ - get_current_public_room_stream_id = ( - DataStore.get_current_public_room_stream_id.__func__ - ) - get_public_room_ids_at_stream_id = ( - RoomStore.__dict__["get_public_room_ids_at_stream_id"] - ) - get_public_room_ids_at_stream_id_txn = ( - DataStore.get_public_room_ids_at_stream_id_txn.__func__ - ) - get_published_at_stream_id_txn = ( - DataStore.get_published_at_stream_id_txn.__func__ - ) - get_public_room_changes = DataStore.get_public_room_changes.__func__ + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() def stream_positions(self): result = super(RoomStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index fbb58f35da..9c9a5eadd9 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore +from ._base import BaseSlavedStore + class TransactionStore(BaseSlavedStore): get_destination_retry_timings = TransactionStore.__dict__[ |