summary refs log tree commit diff
path: root/synapse/replication/slave
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r--synapse/replication/slave/storage/account_data.py43
-rw-r--r--synapse/replication/slave/storage/appservice.py35
-rw-r--r--synapse/replication/slave/storage/events.py64
-rw-r--r--synapse/replication/slave/storage/push_rule.py24
-rw-r--r--synapse/replication/slave/storage/pushers.py12
5 files changed, 32 insertions, 146 deletions
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index efbd87918e..6c8d2954d7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,50 +14,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.account_data import AccountDataStore
-from synapse.storage.tags import TagsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.storage.tags import TagsWorkerStore
 
 
-class SlavedAccountDataStore(BaseSlavedStore):
+class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
         self._account_data_id_gen = SlavedIdTracker(
             db_conn, "account_data_max_stream_id", "stream_id",
         )
-        self._account_data_stream_cache = StreamChangeCache(
-            "AccountDataAndTagsChangeCache",
-            self._account_data_id_gen.get_current_token(),
-        )
-
-    get_account_data_for_user = (
-        AccountDataStore.__dict__["get_account_data_for_user"]
-    )
-
-    get_global_account_data_by_type_for_users = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
-    )
 
-    get_global_account_data_by_type_for_user = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
-    )
-
-    get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
-    get_tags_for_room = (
-        DataStore.get_tags_for_room.__func__
-    )
-    get_account_data_for_room = (
-        DataStore.get_account_data_for_room.__func__
-    )
-
-    get_updated_tags = DataStore.get_updated_tags.__func__
-    get_updated_account_data_for_user = (
-        DataStore.get_updated_account_data_for_user.__func__
-    )
+        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
 
     def get_max_account_data_stream_id(self):
         return self._account_data_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..8cae3076f4 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,33 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+    ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+)
 
 
-class SlavedApplicationServiceStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
-        self.services_cache = load_appservices(
-            hs.config.server_name,
-            hs.config.app_service_config_files
-        )
-        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
-    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
-    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
-    get_app_services = DataStore.get_app_services.__func__
-    get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
-    create_appservice_txn = DataStore.create_appservice_txn.__func__
-    get_appservices_by_state = DataStore.get_appservices_by_state.__func__
-    get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
-    _get_last_txn = DataStore._get_last_txn.__func__
-    complete_appservice_txn = DataStore.complete_appservice_txn.__func__
-    get_appservice_state = DataStore.get_appservice_state.__func__
-    set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
-    set_appservice_state = DataStore.set_appservice_state.__func__
-    get_if_app_services_interested_in_user = (
-        DataStore.get_if_app_services_interested_in_user.__func__
-    )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+                                    ApplicationServiceWorkerStore):
+    pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index f8c164b48b..de0b26f437 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,8 +18,9 @@ import logging
 from synapse.api.constants import EventTypes
 from synapse.storage import DataStore
 from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
 from synapse.storage.stream import StreamStore
 from synapse.storage.signatures import SignatureStore
@@ -38,7 +40,9 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
+class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
+                       EventsWorkerStore, StateGroupWorkerStore,
+                       BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedEventStore, self).__init__(db_conn, hs)
@@ -68,47 +72,15 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
-    get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
-    get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
-    get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
-    get_users_who_share_room_with_user = (
-        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
-    )
     get_latest_event_ids_in_room = EventFederationStore.__dict__[
         "get_latest_event_ids_in_room"
     ]
-    get_invited_rooms_for_user = RoomMemberStore.__dict__[
-        "get_invited_rooms_for_user"
-    ]
-    get_unread_event_push_actions_by_room_for_user = (
-        EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
-    )
-    _get_unread_counts_by_receipt_txn = (
-        DataStore._get_unread_counts_by_receipt_txn.__func__
-    )
-    _get_unread_counts_by_pos_txn = (
-        DataStore._get_unread_counts_by_pos_txn.__func__
-    )
+
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
-    _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
-    get_unread_push_actions_for_user_in_range_for_http = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
-    )
-    get_unread_push_actions_for_user_in_range_for_email = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
-    )
-    get_push_action_users_in_range = (
-        DataStore.get_push_action_users_in_range.__func__
-    )
-    get_event = DataStore.get_event.__func__
-    get_events = DataStore.get_events.__func__
-    get_rooms_for_user_where_membership_is = (
-        DataStore.get_rooms_for_user_where_membership_is.__func__
-    )
     get_membership_changes_for_user = (
         DataStore.get_membership_changes_for_user.__func__
     )
@@ -117,35 +89,15 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
         DataStore.get_room_events_stream_for_room.__func__
     )
     get_events_around = DataStore.get_events_around.__func__
-    get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
-    get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
-    _get_joined_users_from_context = (
-        RoomMemberStore.__dict__["_get_joined_users_from_context"]
-    )
-
-    get_joined_hosts = DataStore.get_joined_hosts.__func__
-    _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"]
 
     get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
     get_room_events_stream_for_rooms = (
         DataStore.get_room_events_stream_for_rooms.__func__
     )
-    is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
     get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 
     _set_before_and_after = staticmethod(DataStore._set_before_and_after)
 
-    _get_events = DataStore._get_events.__func__
-    _get_events_from_cache = DataStore._get_events_from_cache.__func__
-
-    _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
-    _enqueue_events = DataStore._enqueue_events.__func__
-    _do_fetch = DataStore._do_fetch.__func__
-    _fetch_event_rows = DataStore._fetch_event_rows.__func__
-    _get_event_from_row = DataStore._get_event_from_row.__func__
-    _get_rooms_for_user_where_membership_is_txn = (
-        DataStore._get_rooms_for_user_where_membership_is_txn.__func__
-    )
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
 
     get_backfill_events = DataStore.get_backfill_events.__func__
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 83e880fdd2..bb2c40b6e3 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,29 +16,15 @@
 
 from .events import SlavedEventStore
 from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.push_rule import PushRuleStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.push_rule import PushRulesWorkerStore
 
 
-class SlavedPushRuleStore(SlavedEventStore):
+class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
     def __init__(self, db_conn, hs):
-        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
         self._push_rules_stream_id_gen = SlavedIdTracker(
             db_conn, "push_rules_stream", "stream_id",
         )
-        self.push_rules_stream_cache = StreamChangeCache(
-            "PushRulesStreamChangeCache",
-            self._push_rules_stream_id_gen.get_current_token(),
-        )
-
-    get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
-    get_push_rules_enabled_for_user = (
-        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
-    )
-    have_push_rules_changed_for_user = (
-        DataStore.have_push_rules_changed_for_user.__func__
-    )
+        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
 
     def get_push_rules_stream_token(self):
         return (
@@ -45,6 +32,9 @@ class SlavedPushRuleStore(SlavedEventStore):
             self._stream_id_gen.get_current_token(),
         )
 
+    def get_max_push_rules_stream_id(self):
+        return self._push_rules_stream_id_gen.get_current_token()
+
     def stream_positions(self):
         result = super(SlavedPushRuleStore, self).stream_positions()
         result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 4e8d68ece9..a7cd5a7291 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,10 +17,10 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
+from synapse.storage.pusher import PusherWorkerStore
 
 
-class SlavedPusherStore(BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedPusherStore, self).__init__(db_conn, hs)
@@ -28,13 +29,6 @@ class SlavedPusherStore(BaseSlavedStore):
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-    get_all_pushers = DataStore.get_all_pushers.__func__
-    get_pushers_by = DataStore.get_pushers_by.__func__
-    get_pushers_by_app_id_and_pushkey = (
-        DataStore.get_pushers_by_app_id_and_pushkey.__func__
-    )
-    _decode_pushers_rows = DataStore._decode_pushers_rows.__func__
-
     def stream_positions(self):
         result = super(SlavedPusherStore, self).stream_positions()
         result["pushers"] = self._pushers_id_gen.get_current_token()