diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
new file mode 100644
index 0000000000..b7df13c9ee
--- /dev/null
+++ b/synapse/replication/slave/storage/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
new file mode 100644
index 0000000000..46e43ce1c7
--- /dev/null
+++ b/synapse/replication/slave/storage/_base.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage._base import SQLBaseStore
+from twisted.internet import defer
+
+
+class BaseSlavedStore(SQLBaseStore):
+ def __init__(self, db_conn, hs):
+ super(BaseSlavedStore, self).__init__(hs)
+
+ def stream_positions(self):
+ return {}
+
+ def process_replication(self, result):
+ return defer.succeed(None)
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
new file mode 100644
index 0000000000..24b5c79d4a
--- /dev/null
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.util.id_generators import _load_current_id
+
+
+class SlavedIdTracker(object):
+ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+ self.step = step
+ self._current = _load_current_id(db_conn, table, column, step)
+ for table, column in extra_tables:
+ self.advance(_load_current_id(db_conn, table, column))
+
+ def advance(self, new_id):
+ self._current = (max if self.step > 0 else min)(self._current, new_id)
+
+ def get_current_token(self):
+ return self._current
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
new file mode 100644
index 0000000000..735c03c7eb
--- /dev/null
+++ b/synapse/replication/slave/storage/account_data.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedAccountDataStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedAccountDataStore, self).__init__(db_conn, hs)
+ self._account_data_id_gen = SlavedIdTracker(
+ db_conn, "account_data_max_stream_id", "stream_id",
+ )
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache",
+ self._account_data_id_gen.get_current_token(),
+ )
+
+ get_account_data_for_user = (
+ AccountDataStore.__dict__["get_account_data_for_user"]
+ )
+
+ get_global_account_data_by_type_for_users = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
+ )
+
+ get_global_account_data_by_type_for_user = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
+ )
+
+ get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+ get_updated_tags = DataStore.get_updated_tags.__func__
+ get_updated_account_data_for_user = (
+ DataStore.get_updated_account_data_for_user.__func__
+ )
+
+ def get_max_account_data_stream_id(self):
+ return self._account_data_id_gen.get_current_token()
+
+ def stream_positions(self):
+ result = super(SlavedAccountDataStore, self).stream_positions()
+ position = self._account_data_id_gen.get_current_token()
+ result["user_account_data"] = position
+ result["room_account_data"] = position
+ result["tag_account_data"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("user_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id, data_type = row[:3]
+ self.get_global_account_data_by_type_for_user.invalidate(
+ (data_type, user_id,)
+ )
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ stream = result.get("room_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ stream = result.get("tag_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_tags_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+ self.services_cache = load_appservices(
+ hs.config.server_name,
+ hs.config.app_service_config_files
+ )
+
+ get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
new file mode 100644
index 0000000000..877c68508c
--- /dev/null
+++ b/synapse/replication/slave/storage/events.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.api.constants import EventTypes
+from synapse.events import FrozenEvent
+from synapse.storage import DataStore
+from synapse.storage.room import RoomStore
+from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.event_federation import EventFederationStore
+from synapse.storage.event_push_actions import EventPushActionsStore
+from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+import ujson as json
+
+# So, um, we want to borrow a load of functions intended for reading from
+# a DataStore, but we don't want to take functions that either write to the
+# DataStore or are cached and don't have cache invalidation logic.
+#
+# Rather than write duplicate versions of those functions, or lift them to
+# a common base class, we going to grab the underlying __func__ object from
+# the method descriptor on the DataStore and chuck them into our class.
+
+
+class SlavedEventStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedEventStore, self).__init__(db_conn, hs)
+ self._stream_id_gen = SlavedIdTracker(
+ db_conn, "events", "stream_ordering",
+ )
+ self._backfill_id_gen = SlavedIdTracker(
+ db_conn, "events", "stream_ordering", step=-1
+ )
+ events_max = self._stream_id_gen.get_current_token()
+ event_cache_prefill, min_event_val = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", min_event_val,
+ prefilled_cache=event_cache_prefill,
+ )
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
+ )
+
+ # Cached functions can't be accessed through a class instance so we need
+ # to reach inside the __dict__ to extract them.
+ get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"]
+ get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
+ get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+ get_latest_event_ids_in_room = EventFederationStore.__dict__[
+ "get_latest_event_ids_in_room"
+ ]
+ _get_current_state_for_key = StateStore.__dict__[
+ "_get_current_state_for_key"
+ ]
+ get_invited_rooms_for_user = RoomMemberStore.__dict__[
+ "get_invited_rooms_for_user"
+ ]
+ get_unread_event_push_actions_by_room_for_user = (
+ EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
+ )
+ _get_state_group_for_events = (
+ StateStore.__dict__["_get_state_group_for_events"]
+ )
+ _get_state_group_for_event = (
+ StateStore.__dict__["_get_state_group_for_event"]
+ )
+ _get_state_groups_from_groups = (
+ StateStore.__dict__["_get_state_groups_from_groups"]
+ )
+ _get_state_group_from_group = (
+ StateStore.__dict__["_get_state_group_from_group"]
+ )
+ get_recent_event_ids_for_room = (
+ StreamStore.__dict__["get_recent_event_ids_for_room"]
+ )
+
+ get_unread_push_actions_for_user_in_range = (
+ DataStore.get_unread_push_actions_for_user_in_range.__func__
+ )
+ get_push_action_users_in_range = (
+ DataStore.get_push_action_users_in_range.__func__
+ )
+ get_event = DataStore.get_event.__func__
+ get_events = DataStore.get_events.__func__
+ get_current_state = DataStore.get_current_state.__func__
+ get_current_state_for_key = DataStore.get_current_state_for_key.__func__
+ get_rooms_for_user_where_membership_is = (
+ DataStore.get_rooms_for_user_where_membership_is.__func__
+ )
+ get_membership_changes_for_user = (
+ DataStore.get_membership_changes_for_user.__func__
+ )
+ get_room_events_max_id = DataStore.get_room_events_max_id.__func__
+ get_room_events_stream_for_room = (
+ DataStore.get_room_events_stream_for_room.__func__
+ )
+ get_events_around = DataStore.get_events_around.__func__
+ get_state_for_event = DataStore.get_state_for_event.__func__
+ get_state_for_events = DataStore.get_state_for_events.__func__
+ get_state_groups = DataStore.get_state_groups.__func__
+ get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+ get_room_events_stream_for_rooms = (
+ DataStore.get_room_events_stream_for_rooms.__func__
+ )
+ get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
+
+ _set_before_and_after = staticmethod(DataStore._set_before_and_after)
+
+ _get_events = DataStore._get_events.__func__
+ _get_events_from_cache = DataStore._get_events_from_cache.__func__
+
+ _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
+ _enqueue_events = DataStore._enqueue_events.__func__
+ _do_fetch = DataStore._do_fetch.__func__
+ _fetch_event_rows = DataStore._fetch_event_rows.__func__
+ _get_event_from_row = DataStore._get_event_from_row.__func__
+ _get_rooms_for_user_where_membership_is_txn = (
+ DataStore._get_rooms_for_user_where_membership_is_txn.__func__
+ )
+ _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
+ _get_state_for_groups = DataStore._get_state_for_groups.__func__
+ _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
+ _get_events_around_txn = DataStore._get_events_around_txn.__func__
+ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
+
+ def stream_positions(self):
+ result = super(SlavedEventStore, self).stream_positions()
+ result["events"] = self._stream_id_gen.get_current_token()
+ result["backfill"] = -self._backfill_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ state_resets = set(
+ r[0] for r in result.get("state_resets", {"rows": []})["rows"]
+ )
+
+ stream = result.get("events")
+ if stream:
+ self._stream_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ self._process_replication_row(
+ row, backfilled=False, state_resets=state_resets
+ )
+
+ stream = result.get("backfill")
+ if stream:
+ self._backfill_id_gen.advance(-int(stream["position"]))
+ for row in stream["rows"]:
+ self._process_replication_row(
+ row, backfilled=True, state_resets=state_resets
+ )
+
+ stream = result.get("forward_ex_outliers")
+ if stream:
+ self._stream_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ event_id = row[1]
+ self._invalidate_get_event_cache(event_id)
+
+ stream = result.get("backward_ex_outliers")
+ if stream:
+ self._backfill_id_gen.advance(-int(stream["position"]))
+ for row in stream["rows"]:
+ event_id = row[1]
+ self._invalidate_get_event_cache(event_id)
+
+ return super(SlavedEventStore, self).process_replication(result)
+
+ def _process_replication_row(self, row, backfilled, state_resets):
+ position = row[0]
+ internal = json.loads(row[1])
+ event_json = json.loads(row[2])
+ event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ self.invalidate_caches_for_event(
+ event, backfilled, reset_state=position in state_resets
+ )
+
+ def invalidate_caches_for_event(self, event, backfilled, reset_state):
+ if reset_state:
+ self._get_current_state_for_key.invalidate_all()
+ self.get_rooms_for_user.invalidate_all()
+ self.get_users_in_room.invalidate((event.room_id,))
+ # self.get_joined_hosts_for_room.invalidate((event.room_id,))
+ self.get_room_name_and_aliases.invalidate((event.room_id,))
+
+ self._invalidate_get_event_cache(event.event_id)
+
+ self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
+ (event.room_id,)
+ )
+
+ if not backfilled:
+ self._events_stream_cache.entity_has_changed(
+ event.room_id, event.internal_metadata.stream_ordering
+ )
+
+ # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
+ # (event.room_id,)
+ # )
+
+ if event.type == EventTypes.Redaction:
+ self._invalidate_get_event_cache(event.redacts)
+
+ if event.type == EventTypes.Member:
+ self.get_rooms_for_user.invalidate((event.state_key,))
+ # self.get_joined_hosts_for_room.invalidate((event.room_id,))
+ self.get_users_in_room.invalidate((event.room_id,))
+ self._membership_stream_cache.entity_has_changed(
+ event.state_key, event.internal_metadata.stream_ordering
+ )
+ self.get_invited_rooms_for_user.invalidate((event.state_key,))
+
+ if not event.is_state():
+ return
+
+ if backfilled:
+ return
+
+ if (not event.internal_metadata.is_invite_from_remote()
+ and event.internal_metadata.is_outlier()):
+ return
+
+ self._get_current_state_for_key.invalidate((
+ event.room_id, event.type, event.state_key
+ ))
+
+ if event.type in [EventTypes.Name, EventTypes.Aliases]:
+ self.get_room_name_and_aliases.invalidate(
+ (event.room_id,)
+ )
+ pass
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..819ed62881
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+ # Filters are immutable so this cache doesn't need to be expired
+ get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPresenceStore, self).__init__(db_conn, hs)
+ self._presence_id_gen = SlavedIdTracker(
+ db_conn, "presence_stream", "stream_id",
+ )
+
+ self._presence_on_startup = self._get_active_presence(db_conn)
+
+ self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+ )
+
+ _get_active_presence = DataStore._get_active_presence.__func__
+ take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_current_token()
+
+ def stream_positions(self):
+ result = super(SlavedPresenceStore, self).stream_positions()
+ position = self._presence_id_gen.get_current_token()
+ result["presence"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("presence")
+ if stream:
+ self._presence_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.presence_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+ self._push_rules_stream_id_gen = SlavedIdTracker(
+ db_conn, "push_rules_stream", "stream_id",
+ )
+ self.push_rules_stream_cache = StreamChangeCache(
+ "PushRulesStreamChangeCache",
+ self._push_rules_stream_id_gen.get_current_token(),
+ )
+
+ get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+ get_push_rules_enabled_for_user = (
+ PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+ )
+ have_push_rules_changed_for_user = (
+ DataStore.have_push_rules_changed_for_user.__func__
+ )
+
+ def get_push_rules_stream_token(self):
+ return (
+ self._push_rules_stream_id_gen.get_current_token(),
+ self._stream_id_gen.get_current_token(),
+ )
+
+ def stream_positions(self):
+ result = super(SlavedPushRuleStore, self).stream_positions()
+ result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("push_rules")
+ if stream:
+ for row in stream["rows"]:
+ position = row[0]
+ user_id = row[2]
+ self.get_push_rules_for_user.invalidate((user_id,))
+ self.get_push_rules_enabled_for_user.invalidate((user_id,))
+ self.push_rules_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+ return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
new file mode 100644
index 0000000000..d88206b3bb
--- /dev/null
+++ b/synapse/replication/slave/storage/pushers.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.storage import DataStore
+
+
+class SlavedPusherStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedPusherStore, self).__init__(db_conn, hs)
+ self._pushers_id_gen = SlavedIdTracker(
+ db_conn, "pushers", "id",
+ extra_tables=[("deleted_pushers", "stream_id")],
+ )
+
+ get_all_pushers = DataStore.get_all_pushers.__func__
+ get_pushers_by = DataStore.get_pushers_by.__func__
+ get_pushers_by_app_id_and_pushkey = (
+ DataStore.get_pushers_by_app_id_and_pushkey.__func__
+ )
+ _decode_pushers_rows = DataStore._decode_pushers_rows.__func__
+
+ def stream_positions(self):
+ result = super(SlavedPusherStore, self).stream_positions()
+ result["pushers"] = self._pushers_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("pushers")
+ if stream:
+ self._pushers_id_gen.advance(int(stream["position"]))
+
+ stream = result.get("deleted_pushers")
+ if stream:
+ self._pushers_id_gen.advance(int(stream["position"]))
+
+ return super(SlavedPusherStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
new file mode 100644
index 0000000000..ac9662d399
--- /dev/null
+++ b/synapse/replication/slave/storage/receipts.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.storage import DataStore
+from synapse.storage.receipts import ReceiptsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+# So, um, we want to borrow a load of functions intended for reading from
+# a DataStore, but we don't want to take functions that either write to the
+# DataStore or are cached and don't have cache invalidation logic.
+#
+# Rather than write duplicate versions of those functions, or lift them to
+# a common base class, we going to grab the underlying __func__ object from
+# the method descriptor on the DataStore and chuck them into our class.
+
+
+class SlavedReceiptsStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedReceiptsStore, self).__init__(db_conn, hs)
+
+ self._receipts_id_gen = SlavedIdTracker(
+ db_conn, "receipts_linearized", "stream_id"
+ )
+
+ self._receipts_stream_cache = StreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
+ )
+
+ get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+ get_linearized_receipts_for_room = (
+ ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+ )
+ _get_linearized_receipts_for_rooms = (
+ ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+ )
+ get_last_receipt_event_id_for_user = (
+ ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+ )
+
+ get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
+ get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+
+ get_linearized_receipts_for_rooms = (
+ DataStore.get_linearized_receipts_for_rooms.__func__
+ )
+
+ def stream_positions(self):
+ result = super(SlavedReceiptsStore, self).stream_positions()
+ result["receipts"] = self._receipts_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("receipts")
+ if stream:
+ self._receipts_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, room_id, receipt_type, user_id = row[:4]
+ self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
+ self._receipts_stream_cache.entity_has_changed(room_id, position)
+
+ return super(SlavedReceiptsStore, self).process_replication(result)
+
+ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
+ self.get_receipts_for_user.invalidate((user_id, receipt_type))
+ self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self.get_last_receipt_event_id_for_user.invalidate(
+ (user_id, room_id, receipt_type)
+ )
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+ # TODO: use the cached version and invalidate deleted tokens
+ get_user_by_access_token = RegistrationStore.__dict__[
+ "get_user_by_access_token"
+ ].orig
+
+ _query_for_auth = DataStore._query_for_auth.__func__
|