summary refs log tree commit diff
path: root/synapse/replication/slave
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r--synapse/replication/slave/storage/_base.py6
-rw-r--r--synapse/replication/slave/storage/account_data.py47
-rw-r--r--synapse/replication/slave/storage/appservice.py36
-rw-r--r--synapse/replication/slave/storage/client_ips.py6
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py7
-rw-r--r--synapse/replication/slave/storage/devices.py5
-rw-r--r--synapse/replication/slave/storage/directory.py9
-rw-r--r--synapse/replication/slave/storage/events.py184
-rw-r--r--synapse/replication/slave/storage/filtering.py3
-rw-r--r--synapse/replication/slave/storage/groups.py55
-rw-r--r--synapse/replication/slave/storage/keys.py3
-rw-r--r--synapse/replication/slave/storage/presence.py8
-rw-r--r--synapse/replication/slave/storage/profile.py21
-rw-r--r--synapse/replication/slave/storage/push_rule.py27
-rw-r--r--synapse/replication/slave/storage/pushers.py14
-rw-r--r--synapse/replication/slave/storage/receipts.py40
-rw-r--r--synapse/replication/slave/storage/registration.py19
-rw-r--r--synapse/replication/slave/storage/room.py22
-rw-r--r--synapse/replication/slave/storage/transactions.py3
19 files changed, 184 insertions, 331 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index b962641166..3f7be74e02 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -13,19 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
 
 from ._slaved_id_tracker import SlavedIdTracker
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
 class BaseSlavedStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
-        super(BaseSlavedStore, self).__init__(hs)
+        super(BaseSlavedStore, self).__init__(db_conn, hs)
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = SlavedIdTracker(
                 db_conn, "cache_invalidation_stream", "stream_id",
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index efbd87918e..d9ba6d69b1 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,50 +14,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.account_data import AccountDataStore
-from synapse.storage.tags import TagsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.storage.tags import TagsWorkerStore
 
 
-class SlavedAccountDataStore(BaseSlavedStore):
+class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
         self._account_data_id_gen = SlavedIdTracker(
             db_conn, "account_data_max_stream_id", "stream_id",
         )
-        self._account_data_stream_cache = StreamChangeCache(
-            "AccountDataAndTagsChangeCache",
-            self._account_data_id_gen.get_current_token(),
-        )
-
-    get_account_data_for_user = (
-        AccountDataStore.__dict__["get_account_data_for_user"]
-    )
-
-    get_global_account_data_by_type_for_users = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
-    )
 
-    get_global_account_data_by_type_for_user = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
-    )
-
-    get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
-    get_tags_for_room = (
-        DataStore.get_tags_for_room.__func__
-    )
-    get_account_data_for_room = (
-        DataStore.get_account_data_for_room.__func__
-    )
-
-    get_updated_tags = DataStore.get_updated_tags.__func__
-    get_updated_account_data_for_user = (
-        DataStore.get_updated_account_data_for_user.__func__
-    )
+        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
 
     def get_max_account_data_stream_id(self):
         return self._account_data_id_gen.get_current_token()
@@ -85,6 +56,10 @@ class SlavedAccountDataStore(BaseSlavedStore):
                         (row.data_type, row.user_id,)
                     )
                 self.get_account_data_for_user.invalidate((row.user_id,))
+                self.get_account_data_for_room.invalidate((row.user_id, row.room_id,))
+                self.get_account_data_for_room_and_type.invalidate(
+                    (row.user_id, row.room_id, row.data_type,),
+                )
                 self._account_data_stream_cache.entity_has_changed(
                     row.user_id, token
                 )
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..b53a4c6bd1 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,33 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+    ApplicationServiceTransactionWorkerStore,
+    ApplicationServiceWorkerStore,
+)
 
 
-class SlavedApplicationServiceStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
-        self.services_cache = load_appservices(
-            hs.config.server_name,
-            hs.config.app_service_config_files
-        )
-        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
-    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
-    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
-    get_app_services = DataStore.get_app_services.__func__
-    get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
-    create_appservice_txn = DataStore.create_appservice_txn.__func__
-    get_appservices_by_state = DataStore.get_appservices_by_state.__func__
-    get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
-    _get_last_txn = DataStore._get_last_txn.__func__
-    complete_appservice_txn = DataStore.complete_appservice_txn.__func__
-    get_appservice_state = DataStore.get_appservice_state.__func__
-    set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
-    set_appservice_state = DataStore.set_appservice_state.__func__
-    get_if_app_services_interested_in_user = (
-        DataStore.get_if_app_services_interested_in_user.__func__
-    )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+                                    ApplicationServiceWorkerStore):
+    pass
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 65250285e8..60641f1a49 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.caches.descriptors import Cache
 
+from ._base import BaseSlavedStore
+
 
 class SlavedClientIpStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
@@ -29,9 +30,8 @@ class SlavedClientIpStore(BaseSlavedStore):
             max_entries=50000 * CACHE_SIZE_FACTOR,
         )
 
-    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+    def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
-        user_id = user.to_string()
         key = (user_id, access_token, ip)
 
         try:
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 6f3fb64770..87eaa53004 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedDeviceInboxStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 7687867aee..8206a988f7 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
 from synapse.storage.end_to_end_keys import EndToEndKeyStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
 
 class SlavedDeviceStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..1d1d48709a 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.directory import DirectoryWorkerStore
+
 from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
 
 
-class DirectoryStore(BaseSlavedStore):
-    get_aliases_for_room = DirectoryStore.__dict__[
-        "get_aliases_for_room"
-    ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 94ebbffc1b..bdb5eee4af 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,20 +13,20 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
+import logging
 
 from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.roommember import RoomMemberStore
-from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.state import StateStore
-from synapse.storage.stream import StreamStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-import logging
+from synapse.storage.event_federation import EventFederationWorkerStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.user_erasure_store import UserErasureWorkerStore
 
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 logger = logging.getLogger(__name__)
 
@@ -39,163 +40,34 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(BaseSlavedStore):
+class SlavedEventStore(EventFederationWorkerStore,
+                       RoomMemberWorkerStore,
+                       EventPushActionsWorkerStore,
+                       StreamWorkerStore,
+                       EventsWorkerStore,
+                       StateGroupWorkerStore,
+                       SignatureWorkerStore,
+                       UserErasureWorkerStore,
+                       BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedEventStore, self).__init__(db_conn, hs)
         self._stream_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering",
         )
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
 
-        self.stream_ordering_month_ago = 0
-        self._stream_order_on_start = self.get_room_max_stream_ordering()
+        super(SlavedEventStore, self).__init__(db_conn, hs)
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
-    get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
-    get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
-    get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
-    get_users_who_share_room_with_user = (
-        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
-    )
-    get_latest_event_ids_in_room = EventFederationStore.__dict__[
-        "get_latest_event_ids_in_room"
-    ]
-    get_invited_rooms_for_user = RoomMemberStore.__dict__[
-        "get_invited_rooms_for_user"
-    ]
-    get_unread_event_push_actions_by_room_for_user = (
-        EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
-    )
-    _get_unread_counts_by_receipt_txn = (
-        DataStore._get_unread_counts_by_receipt_txn.__func__
-    )
-    _get_unread_counts_by_pos_txn = (
-        DataStore._get_unread_counts_by_pos_txn.__func__
-    )
-    _get_state_group_for_events = (
-        StateStore.__dict__["_get_state_group_for_events"]
-    )
-    _get_state_group_for_event = (
-        StateStore.__dict__["_get_state_group_for_event"]
-    )
-    _get_state_groups_from_groups = (
-        StateStore.__dict__["_get_state_groups_from_groups"]
-    )
-    _get_state_groups_from_groups_txn = (
-        DataStore._get_state_groups_from_groups_txn.__func__
-    )
-    get_recent_event_ids_for_room = (
-        StreamStore.__dict__["get_recent_event_ids_for_room"]
-    )
-    get_current_state_ids = (
-        StateStore.__dict__["get_current_state_ids"]
-    )
-    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
-    _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
-    has_room_changed_since = DataStore.has_room_changed_since.__func__
-
-    get_unread_push_actions_for_user_in_range_for_http = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
-    )
-    get_unread_push_actions_for_user_in_range_for_email = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
-    )
-    get_push_action_users_in_range = (
-        DataStore.get_push_action_users_in_range.__func__
-    )
-    get_event = DataStore.get_event.__func__
-    get_events = DataStore.get_events.__func__
-    get_rooms_for_user_where_membership_is = (
-        DataStore.get_rooms_for_user_where_membership_is.__func__
-    )
-    get_membership_changes_for_user = (
-        DataStore.get_membership_changes_for_user.__func__
-    )
-    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
-    get_room_events_stream_for_room = (
-        DataStore.get_room_events_stream_for_room.__func__
-    )
-    get_events_around = DataStore.get_events_around.__func__
-    get_state_for_event = DataStore.get_state_for_event.__func__
-    get_state_for_events = DataStore.get_state_for_events.__func__
-    get_state_groups = DataStore.get_state_groups.__func__
-    get_state_groups_ids = DataStore.get_state_groups_ids.__func__
-    get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__
-    get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__
-    get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
-    get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
-    _get_joined_users_from_context = (
-        RoomMemberStore.__dict__["_get_joined_users_from_context"]
-    )
-
-    get_joined_hosts = DataStore.get_joined_hosts.__func__
-    _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"]
-
-    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
-    get_room_events_stream_for_rooms = (
-        DataStore.get_room_events_stream_for_rooms.__func__
-    )
-    is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
-    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
-    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
-    _get_events = DataStore._get_events.__func__
-    _get_events_from_cache = DataStore._get_events_from_cache.__func__
-
-    _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
-    _enqueue_events = DataStore._enqueue_events.__func__
-    _do_fetch = DataStore._do_fetch.__func__
-    _fetch_event_rows = DataStore._fetch_event_rows.__func__
-    _get_event_from_row = DataStore._get_event_from_row.__func__
-    _get_rooms_for_user_where_membership_is_txn = (
-        DataStore._get_rooms_for_user_where_membership_is_txn.__func__
-    )
-    _get_state_for_groups = DataStore._get_state_for_groups.__func__
-    _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
-    _get_events_around_txn = DataStore._get_events_around_txn.__func__
-    _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
-
-    get_backfill_events = DataStore.get_backfill_events.__func__
-    _get_backfill_events = DataStore._get_backfill_events.__func__
-    get_missing_events = DataStore.get_missing_events.__func__
-    _get_missing_events = DataStore._get_missing_events.__func__
-
-    get_auth_chain = DataStore.get_auth_chain.__func__
-    get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
-    _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
-    get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
-    get_forward_extremeties_for_room = (
-        DataStore.get_forward_extremeties_for_room.__func__
-    )
-    _get_forward_extremeties_for_room = (
-        EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
-    )
-
-    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
-    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
-    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 819ed62881..456a14cd5c 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.filtering import FilteringStore
 
+from ._base import BaseSlavedStore
+
 
 class SlavedFilteringStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
new file mode 100644
index 0000000000..5777f07c8d
--- /dev/null
+++ b/synapse/replication/slave/storage/groups.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import DataStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+
+class SlavedGroupServerStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedGroupServerStore, self).__init__(db_conn, hs)
+
+        self.hs = hs
+
+        self._group_updates_id_gen = SlavedIdTracker(
+            db_conn, "local_group_updates", "stream_id",
+        )
+        self._group_updates_stream_cache = StreamChangeCache(
+            "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
+        )
+
+    get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
+    get_group_stream_token = DataStore.get_group_stream_token.__func__
+    get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
+
+    def stream_positions(self):
+        result = super(SlavedGroupServerStore, self).stream_positions()
+        result["groups"] = self._group_updates_id_gen.get_current_token()
+        return result
+
+    def process_replication_rows(self, stream_name, token, rows):
+        if stream_name == "groups":
+            self._group_updates_id_gen.advance(token)
+            for row in rows:
+                self._group_updates_stream_cache.entity_has_changed(
+                    row.user_id, token
+                )
+
+        return super(SlavedGroupServerStore, self).process_replication_rows(
+            stream_name, token, rows
+        )
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index dd2ae49e48..05ed168463 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.keys import KeyStore
 
+from ._base import BaseSlavedStore
+
 
 class SlavedKeyStore(BaseSlavedStore):
     _get_server_verify_key = KeyStore.__dict__[
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index cfb9280181..80b744082a 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.storage import DataStore
 from synapse.storage.presence import PresenceStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedPresenceStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
new file mode 100644
index 0000000000..46c28d4171
--- /dev/null
+++ b/synapse/replication/slave/storage/profile.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.storage.profile import ProfileWorkerStore
+
+
+class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 83e880fdd2..f0200c1e98 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,31 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .events import SlavedEventStore
+from synapse.storage.push_rule import PushRulesWorkerStore
+
 from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.push_rule import PushRuleStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from .events import SlavedEventStore
 
 
-class SlavedPushRuleStore(SlavedEventStore):
+class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
     def __init__(self, db_conn, hs):
-        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
         self._push_rules_stream_id_gen = SlavedIdTracker(
             db_conn, "push_rules_stream", "stream_id",
         )
-        self.push_rules_stream_cache = StreamChangeCache(
-            "PushRulesStreamChangeCache",
-            self._push_rules_stream_id_gen.get_current_token(),
-        )
-
-    get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
-    get_push_rules_enabled_for_user = (
-        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
-    )
-    have_push_rules_changed_for_user = (
-        DataStore.have_push_rules_changed_for_user.__func__
-    )
+        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
 
     def get_push_rules_stream_token(self):
         return (
@@ -45,6 +33,9 @@ class SlavedPushRuleStore(SlavedEventStore):
             self._stream_id_gen.get_current_token(),
         )
 
+    def get_max_push_rules_stream_id(self):
+        return self._push_rules_stream_id_gen.get_current_token()
+
     def stream_positions(self):
         result = super(SlavedPushRuleStore, self).stream_positions()
         result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 4e8d68ece9..3b2213c0d4 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,13 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.pusher import PusherWorkerStore
+
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
-
 
-class SlavedPusherStore(BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedPusherStore, self).__init__(db_conn, hs)
@@ -28,13 +29,6 @@ class SlavedPusherStore(BaseSlavedStore):
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-    get_all_pushers = DataStore.get_all_pushers.__func__
-    get_pushers_by = DataStore.get_pushers_by.__func__
-    get_pushers_by_app_id_and_pushkey = (
-        DataStore.get_pushers_by_app_id_and_pushkey.__func__
-    )
-    _decode_pushers_rows = DataStore._decode_pushers_rows.__func__
-
     def stream_positions(self):
         result = super(SlavedPusherStore, self).stream_positions()
         result["pushers"] = self._pushers_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b371574ece..ed12342f40 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,13 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.receipts import ReceiptsWorkerStore
+
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
-from synapse.storage.receipts import ReceiptsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
 # DataStore or are cached and don't have cache invalidation logic.
@@ -29,36 +28,19 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedReceiptsStore(BaseSlavedStore):
+class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
-
+        # We instantiate this first as the ReceiptsWorkerStore constructor
+        # needs to be able to call get_max_receipt_stream_id
         self._receipts_id_gen = SlavedIdTracker(
             db_conn, "receipts_linearized", "stream_id"
         )
 
-        self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
-        )
-
-    get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
-    get_linearized_receipts_for_room = (
-        ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
-    )
-    _get_linearized_receipts_for_rooms = (
-        ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
-    )
-    get_last_receipt_event_id_for_user = (
-        ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
-    )
-
-    get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
-    get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
 
-    get_linearized_receipts_for_rooms = (
-        DataStore.get_linearized_receipts_for_rooms.__func__
-    )
+    def get_max_receipt_stream_id(self):
+        return self._receipts_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
@@ -67,10 +49,12 @@ class SlavedReceiptsStore(BaseSlavedStore):
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
-        self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+        self._get_linearized_receipts_for_room.invalidate_many((room_id,))
         self.get_last_receipt_event_id_for_user.invalidate(
             (user_id, room_id, receipt_type)
         )
+        self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
+        self.get_receipts_for_room.invalidate((room_id, receipt_type))
 
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name == "receipts":
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index e27c7332d2..408d91df1c 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -13,21 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.registration import RegistrationStore
-
+from synapse.storage.registration import RegistrationWorkerStore
 
-class SlavedRegistrationStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+from ._base import BaseSlavedStore
 
-    # TODO: use the cached version and invalidate deleted tokens
-    get_user_by_access_token = RegistrationStore.__dict__[
-        "get_user_by_access_token"
-    ]
 
-    _query_for_auth = DataStore._query_for_auth.__func__
-    get_user_by_id = RegistrationStore.__dict__[
-        "get_user_by_id"
-    ]
+class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..0cb474928c 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -13,33 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.room import RoomWorkerStore
+
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(RoomStore, self).__init__(db_conn, hs)
         self._public_room_id_gen = SlavedIdTracker(
             db_conn, "public_room_list_stream", "stream_id"
         )
 
-    get_public_room_ids = DataStore.get_public_room_ids.__func__
-    get_current_public_room_stream_id = (
-        DataStore.get_current_public_room_stream_id.__func__
-    )
-    get_public_room_ids_at_stream_id = (
-        RoomStore.__dict__["get_public_room_ids_at_stream_id"]
-    )
-    get_public_room_ids_at_stream_id_txn = (
-        DataStore.get_public_room_ids_at_stream_id_txn.__func__
-    )
-    get_published_at_stream_id_txn = (
-        DataStore.get_published_at_stream_id_txn.__func__
-    )
-    get_public_room_changes = DataStore.get_public_room_changes.__func__
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(RoomStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index fbb58f35da..9c9a5eadd9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
 
+from ._base import BaseSlavedStore
+
 
 class TransactionStore(BaseSlavedStore):
     get_destination_retry_timings = TransactionStore.__dict__[