summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/presence_resource.py59
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/replication/slave/storage/account_data.py41
-rw-r--r--synapse/replication/slave/storage/appservice.py30
-rw-r--r--synapse/replication/slave/storage/directory.py23
-rw-r--r--synapse/replication/slave/storage/events.py51
-rw-r--r--synapse/replication/slave/storage/filtering.py25
-rw-r--r--synapse/replication/slave/storage/keys.py33
-rw-r--r--synapse/replication/slave/storage/presence.py59
-rw-r--r--synapse/replication/slave/storage/push_rule.py67
-rw-r--r--synapse/replication/slave/storage/receipts.py25
-rw-r--r--synapse/replication/slave/storage/registration.py30
-rw-r--r--synapse/replication/slave/storage/room.py21
-rw-r--r--synapse/replication/slave/storage/transactions.py30
14 files changed, 474 insertions, 22 deletions
diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py
new file mode 100644
index 0000000000..fc18130ab4
--- /dev/null
+++ b/synapse/replication/presence_resource.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+
+class PresenceResource(Resource):
+    """
+    HTTP endpoint for marking users as syncing.
+
+    POST /_synapse/replication/presence HTTP/1.1
+    Content-Type: application/json
+
+    {
+        "process_id": "<process_id>",
+        "syncing_users": ["<user_id>"]
+    }
+    """
+
+    def __init__(self, hs):
+        Resource.__init__(self)  # Resource is old-style, so no super()
+
+        self.version_string = hs.version_string
+        self.clock = hs.get_clock()
+        self.presence_handler = hs.get_presence_handler()
+
+    def render_POST(self, request):
+        self._async_render_POST(request)
+        return NOT_DONE_YET
+
+    @request_handler()
+    @defer.inlineCallbacks
+    def _async_render_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        process_id = content["process_id"]
+        syncing_user_ids = content["syncing_users"]
+
+        yield self.presence_handler.update_external_syncs(
+            process_id, set(syncing_user_ids)
+        )
+
+        respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 847f212a3d..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -16,6 +16,7 @@
 from synapse.http.servlet import parse_integer, parse_string
 from synapse.http.server import request_handler, finish_request
 from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
 
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
@@ -115,6 +116,7 @@ class ReplicationResource(Resource):
         self.clock = hs.get_clock()
 
         self.putChild("remove_pushers", PusherResource(hs))
+        self.putChild("syncing_users", PresenceResource(hs))
 
     def render_GET(self, request):
         self._async_render_GET(request)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index f59b0eabbc..735c03c7eb 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -15,7 +15,10 @@
 
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
 from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 
 class SlavedAccountDataStore(BaseSlavedStore):
@@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
         self._account_data_id_gen = SlavedIdTracker(
             db_conn, "account_data_max_stream_id", "stream_id",
         )
+        self._account_data_stream_cache = StreamChangeCache(
+            "AccountDataAndTagsChangeCache",
+            self._account_data_id_gen.get_current_token(),
+        )
+
+    get_account_data_for_user = (
+        AccountDataStore.__dict__["get_account_data_for_user"]
+    )
 
     get_global_account_data_by_type_for_users = (
         AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
@@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
         AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
     )
 
+    get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+    get_updated_tags = DataStore.get_updated_tags.__func__
+    get_updated_account_data_for_user = (
+        DataStore.get_updated_account_data_for_user.__func__
+    )
+
+    def get_max_account_data_stream_id(self):
+        return self._account_data_id_gen.get_current_token()
+
     def stream_positions(self):
         result = super(SlavedAccountDataStore, self).stream_positions()
         position = self._account_data_id_gen.get_current_token()
@@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore):
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
             for row in stream["rows"]:
-                user_id, data_type = row[1:3]
+                position, user_id, data_type = row[:3]
                 self.get_global_account_data_by_type_for_user.invalidate(
                     (data_type, user_id,)
                 )
+                self.get_account_data_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
 
         stream = result.get("room_account_data")
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.get_account_data_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
 
         stream = result.get("tag_account_data")
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.get_tags_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+        return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+        self.services_cache = load_appservices(
+            hs.config.server_name,
+            hs.config.app_service_config_files
+        )
+
+    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
new file mode 100644
index 0000000000..5fbe3a303a
--- /dev/null
+++ b/synapse/replication/slave/storage/directory.py
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.directory import DirectoryStore
+
+
+class DirectoryStore(BaseSlavedStore):
+    get_aliases_for_room = DirectoryStore.__dict__[
+        "get_aliases_for_room"
+    ].orig
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c0d741452d..f4f31f2d27 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -18,11 +18,11 @@ from ._slaved_id_tracker import SlavedIdTracker
 from synapse.api.constants import EventTypes
 from synapse.events import FrozenEvent
 from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.event_push_actions import EventPushActionsStore
 from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 import ujson as json
@@ -57,10 +57,12 @@ class SlavedEventStore(BaseSlavedStore):
             "EventsRoomStreamChangeCache", min_event_val,
             prefilled_cache=event_cache_prefill,
         )
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
+        )
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
-    get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"]
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
     get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
     get_latest_event_ids_in_room = EventFederationStore.__dict__[
@@ -87,9 +89,15 @@ class SlavedEventStore(BaseSlavedStore):
     _get_state_group_from_group = (
         StateStore.__dict__["_get_state_group_from_group"]
     )
+    get_recent_event_ids_for_room = (
+        StreamStore.__dict__["get_recent_event_ids_for_room"]
+    )
 
-    get_unread_push_actions_for_user_in_range = (
-        DataStore.get_unread_push_actions_for_user_in_range.__func__
+    get_unread_push_actions_for_user_in_range_for_http = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
+    )
+    get_unread_push_actions_for_user_in_range_for_email = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
     )
     get_push_action_users_in_range = (
         DataStore.get_push_action_users_in_range.__func__
@@ -109,24 +117,25 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore.get_room_events_stream_for_room.__func__
     )
     get_events_around = DataStore.get_events_around.__func__
+    get_state_for_event = DataStore.get_state_for_event.__func__
     get_state_for_events = DataStore.get_state_for_events.__func__
     get_state_groups = DataStore.get_state_groups.__func__
+    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+    get_room_events_stream_for_rooms = (
+        DataStore.get_room_events_stream_for_rooms.__func__
+    )
+    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 
-    _set_before_and_after = DataStore._set_before_and_after
+    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
 
     _get_events = DataStore._get_events.__func__
     _get_events_from_cache = DataStore._get_events_from_cache.__func__
 
     _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
-    _parse_events_txn = DataStore._parse_events_txn.__func__
-    _get_events_txn = DataStore._get_events_txn.__func__
-    _get_event_txn = DataStore._get_event_txn.__func__
     _enqueue_events = DataStore._enqueue_events.__func__
     _do_fetch = DataStore._do_fetch.__func__
-    _fetch_events_txn = DataStore._fetch_events_txn.__func__
     _fetch_event_rows = DataStore._fetch_event_rows.__func__
     _get_event_from_row = DataStore._get_event_from_row.__func__
-    _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
     _get_rooms_for_user_where_membership_is_txn = (
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
@@ -136,6 +145,15 @@ class SlavedEventStore(BaseSlavedStore):
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
     _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
 
+    get_backfill_events = DataStore.get_backfill_events.__func__
+    _get_backfill_events = DataStore._get_backfill_events.__func__
+    get_missing_events = DataStore.get_missing_events.__func__
+    _get_missing_events = DataStore._get_missing_events.__func__
+
+    get_auth_chain = DataStore.get_auth_chain.__func__
+    get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
+    _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
@@ -194,7 +212,6 @@ class SlavedEventStore(BaseSlavedStore):
             self.get_rooms_for_user.invalidate_all()
             self.get_users_in_room.invalidate((event.room_id,))
             # self.get_joined_hosts_for_room.invalidate((event.room_id,))
-            self.get_room_name_and_aliases.invalidate((event.room_id,))
 
         self._invalidate_get_event_cache(event.event_id)
 
@@ -220,9 +237,9 @@ class SlavedEventStore(BaseSlavedStore):
             self.get_rooms_for_user.invalidate((event.state_key,))
             # self.get_joined_hosts_for_room.invalidate((event.room_id,))
             self.get_users_in_room.invalidate((event.room_id,))
-            # self._membership_stream_cache.entity_has_changed(
-            #    event.state_key, event.internal_metadata.stream_ordering
-            # )
+            self._membership_stream_cache.entity_has_changed(
+                event.state_key, event.internal_metadata.stream_ordering
+            )
             self.get_invited_rooms_for_user.invalidate((event.state_key,))
 
         if not event.is_state():
@@ -238,9 +255,3 @@ class SlavedEventStore(BaseSlavedStore):
         self._get_current_state_for_key.invalidate((
             event.room_id, event.type, event.state_key
         ))
-
-        if event.type in [EventTypes.Name, EventTypes.Aliases]:
-            self.get_room_name_and_aliases.invalidate(
-                (event.room_id,)
-            )
-            pass
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..819ed62881
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+    # Filters are immutable so this cache doesn't need to be expired
+    get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
new file mode 100644
index 0000000000..dd2ae49e48
--- /dev/null
+++ b/synapse/replication/slave/storage/keys.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.keys import KeyStore
+
+
+class SlavedKeyStore(BaseSlavedStore):
+    _get_server_verify_key = KeyStore.__dict__[
+        "_get_server_verify_key"
+    ]
+
+    get_server_verify_keys = DataStore.get_server_verify_keys.__func__
+    store_server_verify_key = DataStore.store_server_verify_key.__func__
+
+    get_server_certificate = DataStore.get_server_certificate.__func__
+    store_server_certificate = DataStore.store_server_certificate.__func__
+
+    get_server_keys_json = DataStore.get_server_keys_json.__func__
+    store_server_keys_json = DataStore.store_server_keys_json.__func__
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedPresenceStore, self).__init__(db_conn, hs)
+        self._presence_id_gen = SlavedIdTracker(
+            db_conn, "presence_stream", "stream_id",
+        )
+
+        self._presence_on_startup = self._get_active_presence(db_conn)
+
+        self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+            "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+        )
+
+    _get_active_presence = DataStore._get_active_presence.__func__
+    take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+    get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+    def get_current_presence_token(self):
+        return self._presence_id_gen.get_current_token()
+
+    def stream_positions(self):
+        result = super(SlavedPresenceStore, self).stream_positions()
+        position = self._presence_id_gen.get_current_token()
+        result["presence"] = position
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("presence")
+        if stream:
+            self._presence_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.presence_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+        return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+        self._push_rules_stream_id_gen = SlavedIdTracker(
+            db_conn, "push_rules_stream", "stream_id",
+        )
+        self.push_rules_stream_cache = StreamChangeCache(
+            "PushRulesStreamChangeCache",
+            self._push_rules_stream_id_gen.get_current_token(),
+        )
+
+    get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+    get_push_rules_enabled_for_user = (
+        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+    )
+    have_push_rules_changed_for_user = (
+        DataStore.have_push_rules_changed_for_user.__func__
+    )
+
+    def get_push_rules_stream_token(self):
+        return (
+            self._push_rules_stream_id_gen.get_current_token(),
+            self._stream_id_gen.get_current_token(),
+        )
+
+    def stream_positions(self):
+        result = super(SlavedPushRuleStore, self).stream_positions()
+        result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("push_rules")
+        if stream:
+            for row in stream["rows"]:
+                position = row[0]
+                user_id = row[2]
+                self.get_push_rules_for_user.invalidate((user_id,))
+                self.get_push_rules_enabled_for_user.invalidate((user_id,))
+                self.push_rules_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+            self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+        return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ec007516d0..ac9662d399 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
 
 from synapse.storage import DataStore
 from synapse.storage.receipts import ReceiptsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
@@ -37,11 +38,28 @@ class SlavedReceiptsStore(BaseSlavedStore):
             db_conn, "receipts_linearized", "stream_id"
         )
 
+        self._receipts_stream_cache = StreamChangeCache(
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
+        )
+
     get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+    get_linearized_receipts_for_room = (
+        ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+    )
+    _get_linearized_receipts_for_rooms = (
+        ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+    )
+    get_last_receipt_event_id_for_user = (
+        ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+    )
 
     get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
     get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
 
+    get_linearized_receipts_for_rooms = (
+        DataStore.get_linearized_receipts_for_rooms.__func__
+    )
+
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
         result["receipts"] = self._receipts_id_gen.get_current_token()
@@ -52,10 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore):
         if stream:
             self._receipts_id_gen.advance(int(stream["position"]))
             for row in stream["rows"]:
-                room_id, receipt_type, user_id = row[1:4]
+                position, room_id, receipt_type, user_id = row[:4]
                 self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
+                self._receipts_stream_cache.entity_has_changed(room_id, position)
 
         return super(SlavedReceiptsStore, self).process_replication(result)
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
+        self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+        self.get_last_receipt_event_id_for_user.invalidate(
+            (user_id, room_id, receipt_type)
+        )
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+    # TODO: use the cached version and invalidate deleted tokens
+    get_user_by_access_token = RegistrationStore.__dict__[
+        "get_user_by_access_token"
+    ].orig
+
+    _query_for_auth = DataStore._query_for_auth.__func__
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
new file mode 100644
index 0000000000..d5bb0f98ea
--- /dev/null
+++ b/synapse/replication/slave/storage/room.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+
+
+class RoomStore(BaseSlavedStore):
+    get_public_room_ids = DataStore.get_public_room_ids.__func__
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
new file mode 100644
index 0000000000..6f2ba98af5
--- /dev/null
+++ b/synapse/replication/slave/storage/transactions.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.transactions import TransactionStore
+
+
+class TransactionStore(BaseSlavedStore):
+    get_destination_retry_timings = TransactionStore.__dict__[
+        "get_destination_retry_timings"
+    ].orig
+    _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+
+    # For now, don't record the destination rety timings
+    def set_destination_retry_timings(*args, **kwargs):
+        return defer.succeed(None)