diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py
new file mode 100644
index 0000000000..fc18130ab4
--- /dev/null
+++ b/synapse/replication/presence_resource.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+
+class PresenceResource(Resource):
+ """
+ HTTP endpoint for marking users as syncing.
+
+ POST /_synapse/replication/presence HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "process_id": "<process_id>",
+ "syncing_users": ["<user_id>"]
+ }
+ """
+
+ def __init__(self, hs):
+ Resource.__init__(self) # Resource is old-style, so no super()
+
+ self.version_string = hs.version_string
+ self.clock = hs.get_clock()
+ self.presence_handler = hs.get_presence_handler()
+
+ def render_POST(self, request):
+ self._async_render_POST(request)
+ return NOT_DONE_YET
+
+ @request_handler()
+ @defer.inlineCallbacks
+ def _async_render_POST(self, request):
+ content = parse_json_object_from_request(request)
+
+ process_id = content["process_id"]
+ syncing_user_ids = content["syncing_users"]
+
+ yield self.presence_handler.update_external_syncs(
+ process_id, set(syncing_user_ids)
+ )
+
+ respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 847f212a3d..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -16,6 +16,7 @@
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -115,6 +116,7 @@ class ReplicationResource(Resource):
self.clock = hs.get_clock()
self.putChild("remove_pushers", PusherResource(hs))
+ self.putChild("syncing_users", PresenceResource(hs))
def render_GET(self, request):
self._async_render_GET(request)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index f59b0eabbc..735c03c7eb 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -15,7 +15,10 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
class SlavedAccountDataStore(BaseSlavedStore):
@@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id",
)
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache",
+ self._account_data_id_gen.get_current_token(),
+ )
+
+ get_account_data_for_user = (
+ AccountDataStore.__dict__["get_account_data_for_user"]
+ )
get_global_account_data_by_type_for_users = (
AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
@@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
)
+ get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+ get_updated_tags = DataStore.get_updated_tags.__func__
+ get_updated_account_data_for_user = (
+ DataStore.get_updated_account_data_for_user.__func__
+ )
+
+ def get_max_account_data_stream_id(self):
+ return self._account_data_id_gen.get_current_token()
+
def stream_positions(self):
result = super(SlavedAccountDataStore, self).stream_positions()
position = self._account_data_id_gen.get_current_token()
@@ -47,15 +68,33 @@ class SlavedAccountDataStore(BaseSlavedStore):
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- user_id, data_type = row[1:3]
+ position, user_id, data_type = row[:3]
self.get_global_account_data_by_type_for_user.invalidate(
(data_type, user_id,)
)
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("room_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_account_data_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
stream = result.get("tag_account_data")
if stream:
self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.get_tags_for_user.invalidate((user_id,))
+ self._account_data_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+ self.services_cache = load_appservices(
+ hs.config.server_name,
+ hs.config.app_service_config_files
+ )
+
+ get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
new file mode 100644
index 0000000000..5fbe3a303a
--- /dev/null
+++ b/synapse/replication/slave/storage/directory.py
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.directory import DirectoryStore
+
+
+class DirectoryStore(BaseSlavedStore):
+ get_aliases_for_room = DirectoryStore.__dict__[
+ "get_aliases_for_room"
+ ].orig
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c0d741452d..f4f31f2d27 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -18,11 +18,11 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.api.constants import EventTypes
from synapse.events import FrozenEvent
from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
import ujson as json
@@ -57,10 +57,12 @@ class SlavedEventStore(BaseSlavedStore):
"EventsRoomStreamChangeCache", min_event_val,
prefilled_cache=event_cache_prefill,
)
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
+ )
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
- get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"]
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
get_latest_event_ids_in_room = EventFederationStore.__dict__[
@@ -87,9 +89,15 @@ class SlavedEventStore(BaseSlavedStore):
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
+ get_recent_event_ids_for_room = (
+ StreamStore.__dict__["get_recent_event_ids_for_room"]
+ )
- get_unread_push_actions_for_user_in_range = (
- DataStore.get_unread_push_actions_for_user_in_range.__func__
+ get_unread_push_actions_for_user_in_range_for_http = (
+ DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
+ )
+ get_unread_push_actions_for_user_in_range_for_email = (
+ DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
)
get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__
@@ -109,24 +117,25 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
+ get_state_for_event = DataStore.get_state_for_event.__func__
get_state_for_events = DataStore.get_state_for_events.__func__
get_state_groups = DataStore.get_state_groups.__func__
+ get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+ get_room_events_stream_for_rooms = (
+ DataStore.get_room_events_stream_for_rooms.__func__
+ )
+ get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
- _set_before_and_after = DataStore._set_before_and_after
+ _set_before_and_after = staticmethod(DataStore._set_before_and_after)
_get_events = DataStore._get_events.__func__
_get_events_from_cache = DataStore._get_events_from_cache.__func__
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
- _parse_events_txn = DataStore._parse_events_txn.__func__
- _get_events_txn = DataStore._get_events_txn.__func__
- _get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
- _fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
- _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
@@ -136,6 +145,15 @@ class SlavedEventStore(BaseSlavedStore):
_get_events_around_txn = DataStore._get_events_around_txn.__func__
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
+ get_backfill_events = DataStore.get_backfill_events.__func__
+ _get_backfill_events = DataStore._get_backfill_events.__func__
+ get_missing_events = DataStore.get_missing_events.__func__
+ _get_missing_events = DataStore._get_missing_events.__func__
+
+ get_auth_chain = DataStore.get_auth_chain.__func__
+ get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
+ _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
+
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
@@ -194,7 +212,6 @@ class SlavedEventStore(BaseSlavedStore):
self.get_rooms_for_user.invalidate_all()
self.get_users_in_room.invalidate((event.room_id,))
# self.get_joined_hosts_for_room.invalidate((event.room_id,))
- self.get_room_name_and_aliases.invalidate((event.room_id,))
self._invalidate_get_event_cache(event.event_id)
@@ -220,9 +237,9 @@ class SlavedEventStore(BaseSlavedStore):
self.get_rooms_for_user.invalidate((event.state_key,))
# self.get_joined_hosts_for_room.invalidate((event.room_id,))
self.get_users_in_room.invalidate((event.room_id,))
- # self._membership_stream_cache.entity_has_changed(
- # event.state_key, event.internal_metadata.stream_ordering
- # )
+ self._membership_stream_cache.entity_has_changed(
+ event.state_key, event.internal_metadata.stream_ordering
+ )
self.get_invited_rooms_for_user.invalidate((event.state_key,))
if not event.is_state():
@@ -238,9 +255,3 @@ class SlavedEventStore(BaseSlavedStore):
self._get_current_state_for_key.invalidate((
event.room_id, event.type, event.state_key
))
-
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- self.get_room_name_and_aliases.invalidate(
- (event.room_id,)
- )
- pass
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..819ed62881
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+ # Filters are immutable so this cache doesn't need to be expired
+ get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
new file mode 100644
index 0000000000..dd2ae49e48
--- /dev/null
+++ b/synapse/replication/slave/storage/keys.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.keys import KeyStore
+
+
+class SlavedKeyStore(BaseSlavedStore):
+ _get_server_verify_key = KeyStore.__dict__[
+ "_get_server_verify_key"
+ ]
+
+ get_server_verify_keys = DataStore.get_server_verify_keys.__func__
+ store_server_verify_key = DataStore.store_server_verify_key.__func__
+
+ get_server_certificate = DataStore.get_server_certificate.__func__
+ store_server_certificate = DataStore.store_server_certificate.__func__
+
+ get_server_keys_json = DataStore.get_server_keys_json.__func__
+ store_server_keys_json = DataStore.store_server_keys_json.__func__
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPresenceStore, self).__init__(db_conn, hs)
+ self._presence_id_gen = SlavedIdTracker(
+ db_conn, "presence_stream", "stream_id",
+ )
+
+ self._presence_on_startup = self._get_active_presence(db_conn)
+
+ self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+ )
+
+ _get_active_presence = DataStore._get_active_presence.__func__
+ take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_current_token()
+
+ def stream_positions(self):
+ result = super(SlavedPresenceStore, self).stream_positions()
+ position = self._presence_id_gen.get_current_token()
+ result["presence"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("presence")
+ if stream:
+ self._presence_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ position, user_id = row[:2]
+ self.presence_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+ self._push_rules_stream_id_gen = SlavedIdTracker(
+ db_conn, "push_rules_stream", "stream_id",
+ )
+ self.push_rules_stream_cache = StreamChangeCache(
+ "PushRulesStreamChangeCache",
+ self._push_rules_stream_id_gen.get_current_token(),
+ )
+
+ get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+ get_push_rules_enabled_for_user = (
+ PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+ )
+ have_push_rules_changed_for_user = (
+ DataStore.have_push_rules_changed_for_user.__func__
+ )
+
+ def get_push_rules_stream_token(self):
+ return (
+ self._push_rules_stream_id_gen.get_current_token(),
+ self._stream_id_gen.get_current_token(),
+ )
+
+ def stream_positions(self):
+ result = super(SlavedPushRuleStore, self).stream_positions()
+ result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("push_rules")
+ if stream:
+ for row in stream["rows"]:
+ position = row[0]
+ user_id = row[2]
+ self.get_push_rules_for_user.invalidate((user_id,))
+ self.get_push_rules_enabled_for_user.invalidate((user_id,))
+ self.push_rules_stream_cache.entity_has_changed(
+ user_id, position
+ )
+
+ self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+ return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ec007516d0..ac9662d399 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.receipts import ReceiptsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
@@ -37,11 +38,28 @@ class SlavedReceiptsStore(BaseSlavedStore):
db_conn, "receipts_linearized", "stream_id"
)
+ self._receipts_stream_cache = StreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
+ )
+
get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+ get_linearized_receipts_for_room = (
+ ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+ )
+ _get_linearized_receipts_for_rooms = (
+ ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+ )
+ get_last_receipt_event_id_for_user = (
+ ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+ )
get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+ get_linearized_receipts_for_rooms = (
+ DataStore.get_linearized_receipts_for_rooms.__func__
+ )
+
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
result["receipts"] = self._receipts_id_gen.get_current_token()
@@ -52,10 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore):
if stream:
self._receipts_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
- room_id, receipt_type, user_id = row[1:4]
+ position, room_id, receipt_type, user_id = row[:4]
self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
+ self._receipts_stream_cache.entity_has_changed(room_id, position)
return super(SlavedReceiptsStore, self).process_replication(result)
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
+ self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self.get_last_receipt_event_id_for_user.invalidate(
+ (user_id, room_id, receipt_type)
+ )
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+ # TODO: use the cached version and invalidate deleted tokens
+ get_user_by_access_token = RegistrationStore.__dict__[
+ "get_user_by_access_token"
+ ].orig
+
+ _query_for_auth = DataStore._query_for_auth.__func__
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
new file mode 100644
index 0000000000..d5bb0f98ea
--- /dev/null
+++ b/synapse/replication/slave/storage/room.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+
+
+class RoomStore(BaseSlavedStore):
+ get_public_room_ids = DataStore.get_public_room_ids.__func__
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
new file mode 100644
index 0000000000..6f2ba98af5
--- /dev/null
+++ b/synapse/replication/slave/storage/transactions.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.transactions import TransactionStore
+
+
+class TransactionStore(BaseSlavedStore):
+ get_destination_retry_timings = TransactionStore.__dict__[
+ "get_destination_retry_timings"
+ ].orig
+ _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+
+ # For now, don't record the destination rety timings
+ def set_destination_retry_timings(*args, **kwargs):
+ return defer.succeed(None)
|