diff options
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 30 | ||||
-rw-r--r-- | synapse/replication/slave/storage/filtering.py | 25 | ||||
-rw-r--r-- | synapse/replication/slave/storage/presence.py | 59 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 67 | ||||
-rw-r--r-- | synapse/replication/slave/storage/registration.py | 30 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 6 |
6 files changed, 214 insertions, 3 deletions
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py new file mode 100644 index 0000000000..25792d9429 --- /dev/null +++ b/synapse/replication/slave/storage/appservice.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.config.appservice import load_appservices + + +class SlavedApplicationServiceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedApplicationServiceStore, self).__init__(db_conn, hs) + self.services_cache = load_appservices( + hs.config.server_name, + hs.config.app_service_config_files + ) + + get_app_service_by_token = DataStore.get_app_service_by_token.__func__ + get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py new file mode 100644 index 0000000000..819ed62881 --- /dev/null +++ b/synapse/replication/slave/storage/filtering.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.filtering import FilteringStore + + +class SlavedFilteringStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedFilteringStore, self).__init__(db_conn, hs) + + # Filters are immutable so this cache doesn't need to be expired + get_user_filter = FilteringStore.__dict__["get_user_filter"] diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py new file mode 100644 index 0000000000..703f4a49bf --- /dev/null +++ b/synapse/replication/slave/storage/presence.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.storage import DataStore + + +class SlavedPresenceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedPresenceStore, self).__init__(db_conn, hs) + self._presence_id_gen = SlavedIdTracker( + db_conn, "presence_stream", "stream_id", + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() + ) + + _get_active_presence = DataStore._get_active_presence.__func__ + take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + get_presence_for_users = DataStore.get_presence_for_users.__func__ + + def get_current_presence_token(self): + return self._presence_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedPresenceStore, self).stream_positions() + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result + + def process_replication(self, result): + stream = result.get("presence") + if stream: + self._presence_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.presence_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py new file mode 100644 index 0000000000..21ceb0213a --- /dev/null +++ b/synapse/replication/slave/storage/push_rule.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events import SlavedEventStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.storage.push_rule import PushRuleStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedPushRuleStore(SlavedEventStore): + def __init__(self, db_conn, hs): + super(SlavedPushRuleStore, self).__init__(db_conn, hs) + self._push_rules_stream_id_gen = SlavedIdTracker( + db_conn, "push_rules_stream", "stream_id", + ) + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", + self._push_rules_stream_id_gen.get_current_token(), + ) + + get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] + get_push_rules_enabled_for_user = ( + PushRuleStore.__dict__["get_push_rules_enabled_for_user"] + ) + have_push_rules_changed_for_user = ( + DataStore.have_push_rules_changed_for_user.__func__ + ) + + def get_push_rules_stream_token(self): + return ( + self._push_rules_stream_id_gen.get_current_token(), + self._stream_id_gen.get_current_token(), + ) + + def stream_positions(self): + result = super(SlavedPushRuleStore, self).stream_positions() + result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("push_rules") + if stream: + for row in stream["rows"]: + position = row[0] + user_id = row[2] + self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self.push_rules_stream_cache.entity_has_changed( + user_id, position + ) + + self._push_rules_stream_id_gen.advance(int(stream["position"])) + + return super(SlavedPushRuleStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py new file mode 100644 index 0000000000..307833f9e1 --- /dev/null +++ b/synapse/replication/slave/storage/registration.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.registration import RegistrationStore + + +class SlavedRegistrationStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedRegistrationStore, self).__init__(db_conn, hs) + + # TODO: use the cached version and invalidate deleted tokens + get_user_by_access_token = RegistrationStore.__dict__[ + "get_user_by_access_token" + ].orig + + _query_for_auth = DataStore._query_for_auth.__func__ diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8581796b7e..6928a213e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore, super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn): |