diff options
-rw-r--r-- | synapse/handlers/sync.py | 5 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 2 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 69 | ||||
-rw-r--r-- | synapse/push/clientformat.py | 30 | ||||
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 30 | ||||
-rw-r--r-- | synapse/replication/slave/storage/filtering.py | 25 | ||||
-rw-r--r-- | synapse/replication/slave/storage/presence.py | 59 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 67 | ||||
-rw-r--r-- | synapse/replication/slave/storage/registration.py | 30 | ||||
-rw-r--r-- | synapse/rest/client/v1/push_rule.py | 6 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 6 | ||||
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 38 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 38 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 28 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 3 |
16 files changed, 357 insertions, 82 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5307b62b85..be26a491ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -198,9 +198,8 @@ class SyncHandler(object): @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() - rawrules = yield self.store.get_push_rules_for_user(user_id) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - rules = format_push_rules_for_user(user, rawrules, enabled_map) + rules = yield self.store.get_push_rules_for_user(user_id) + rules = format_push_rules_for_user(user, rules) defer.returnValue(rules) @defer.inlineCallbacks diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 9b208668b6..46e768e35c 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ class ActionGenerator: def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store + event, self.hs, self.store, context.current_state ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 25f2fb9da4..6e42121b1d 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -18,10 +18,9 @@ import ujson as json from twisted.internet import defer -from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.visibility import filter_events_for_clients @@ -38,62 +37,41 @@ def decode_rule_json(rule): @defer.inlineCallbacks def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) - rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} - rules_by_user = { - uid: list_with_base_rules([ - decode_rule_json(rule_list) - for rule_list in rules_by_user.get(uid, []) - ]) - for uid in user_ids - } - - # We apply the rules-enabled map here: bulk_get_push_rules doesn't - # fetch disabled rules, but this won't account for any server default - # rules the user has disabled, so we need to do this too. - for uid in user_ids: - user_enabled_map = rules_enabled_by_user.get(uid) - if not user_enabled_map: - continue - - for i, rule in enumerate(rules_by_user[uid]): - rule_id = rule['rule_id'] - - if rule_id in user_enabled_map: - if rule.get('enabled', True) != bool(user_enabled_map[rule_id]): - # Rules are cached across users. - rule = dict(rule) - rule['enabled'] = bool(user_enabled_map[rule_id]) - rules_by_user[uid][i] = rule - defer.returnValue(rules_by_user) @defer.inlineCallbacks -def evaluator_for_event(event, hs, store): +def evaluator_for_event(event, hs, store, current_state): room_id = event.room_id - - # users in the room who have pushers need to get push rules run because - # that's how their pushers work - users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) - # We also will want to generate notifs for other people in the room so # their unread countss are correct in the event stream, but to avoid # generating them for bot / AS users etc, we only do so for people who've # sent a read receipt into the room. - all_in_room = yield store.get_users_in_room(room_id) - all_in_room = set(all_in_room) + local_users_in_room = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + and hs.is_mine_id(e.state_key) + ) - receipts = yield store.get_receipts_for_room(room_id, "m.read") + # users in the room who have pushers need to get push rules run because + # that's how their pushers work + if_users_with_pushers = yield store.get_if_users_have_pushers( + local_users_in_room + ) + user_ids = set( + uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher + ) + + users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id) # any users with pushers must be ours: they have pushers - user_ids = set(users_with_pushers) - for r in receipts: - if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room: - user_ids.add(r['user_id']) + for uid in users_with_receipts: + if uid in local_users_in_room: + user_ids.add(uid) # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited @@ -104,8 +82,6 @@ def evaluator_for_event(event, hs, store): if has_pusher: user_ids.add(invited_user) - user_ids = list(user_ids) - rules_by_user = yield _get_rules(room_id, user_ids, store) defer.returnValue(BulkPushRuleEvaluator( @@ -143,7 +119,10 @@ class BulkPushRuleEvaluator: self.store, user_tuples, [event], {event.event_id: current_state} ) - room_members = yield self.store.get_users_in_room(self.room_id) + room_members = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + ) evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py index ae9db9ec2f..b3983f7940 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py @@ -23,10 +23,7 @@ import copy import simplejson as json -def format_push_rules_for_user(user, rawrules, enabled_map): - """Converts a list of rawrules and a enabled map into nested dictionaries - to match the Matrix client-server format for push rules""" - +def load_rules_for_user(user, rawrules, enabled_map): ruleslist = [] for rawrule in rawrules: rule = dict(rawrule) @@ -35,7 +32,26 @@ def format_push_rules_for_user(user, rawrules, enabled_map): ruleslist.append(rule) # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + +def format_push_rules_for_user(user, ruleslist): + """Converts a list of rawrules and a enabled map into nested dictionaries + to match the Matrix client-server format for push rules""" + + # We're going to be mutating this a lot, so do a deep copy + ruleslist = copy.deepcopy(ruleslist) rules = {'global': {}, 'device': {}} @@ -60,9 +76,7 @@ def format_push_rules_for_user(user, rawrules, enabled_map): template_rule = _rule_to_template(r) if template_rule: - if r['rule_id'] in enabled_map: - template_rule['enabled'] = enabled_map[r['rule_id']] - elif 'enabled' in r: + if 'enabled' in r: template_rule['enabled'] = r['enabled'] else: template_rule['enabled'] = True diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py new file mode 100644 index 0000000000..25792d9429 --- /dev/null +++ b/synapse/replication/slave/storage/appservice.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.config.appservice import load_appservices + + +class SlavedApplicationServiceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedApplicationServiceStore, self).__init__(db_conn, hs) + self.services_cache = load_appservices( + hs.config.server_name, + hs.config.app_service_config_files + ) + + get_app_service_by_token = DataStore.get_app_service_by_token.__func__ + get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py new file mode 100644 index 0000000000..819ed62881 --- /dev/null +++ b/synapse/replication/slave/storage/filtering.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.filtering import FilteringStore + + +class SlavedFilteringStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedFilteringStore, self).__init__(db_conn, hs) + + # Filters are immutable so this cache doesn't need to be expired + get_user_filter = FilteringStore.__dict__["get_user_filter"] diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py new file mode 100644 index 0000000000..703f4a49bf --- /dev/null +++ b/synapse/replication/slave/storage/presence.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.storage import DataStore + + +class SlavedPresenceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedPresenceStore, self).__init__(db_conn, hs) + self._presence_id_gen = SlavedIdTracker( + db_conn, "presence_stream", "stream_id", + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() + ) + + _get_active_presence = DataStore._get_active_presence.__func__ + take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + get_presence_for_users = DataStore.get_presence_for_users.__func__ + + def get_current_presence_token(self): + return self._presence_id_gen.get_current_token() + + def stream_positions(self): + result = super(SlavedPresenceStore, self).stream_positions() + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result + + def process_replication(self, result): + stream = result.get("presence") + if stream: + self._presence_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.presence_stream_cache.entity_has_changed( + user_id, position + ) + + return super(SlavedPresenceStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py new file mode 100644 index 0000000000..21ceb0213a --- /dev/null +++ b/synapse/replication/slave/storage/push_rule.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events import SlavedEventStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.storage.push_rule import PushRuleStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedPushRuleStore(SlavedEventStore): + def __init__(self, db_conn, hs): + super(SlavedPushRuleStore, self).__init__(db_conn, hs) + self._push_rules_stream_id_gen = SlavedIdTracker( + db_conn, "push_rules_stream", "stream_id", + ) + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", + self._push_rules_stream_id_gen.get_current_token(), + ) + + get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] + get_push_rules_enabled_for_user = ( + PushRuleStore.__dict__["get_push_rules_enabled_for_user"] + ) + have_push_rules_changed_for_user = ( + DataStore.have_push_rules_changed_for_user.__func__ + ) + + def get_push_rules_stream_token(self): + return ( + self._push_rules_stream_id_gen.get_current_token(), + self._stream_id_gen.get_current_token(), + ) + + def stream_positions(self): + result = super(SlavedPushRuleStore, self).stream_positions() + result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("push_rules") + if stream: + for row in stream["rows"]: + position = row[0] + user_id = row[2] + self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self.push_rules_stream_cache.entity_has_changed( + user_id, position + ) + + self._push_rules_stream_id_gen.advance(int(stream["position"])) + + return super(SlavedPushRuleStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py new file mode 100644 index 0000000000..307833f9e1 --- /dev/null +++ b/synapse/replication/slave/storage/registration.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.registration import RegistrationStore + + +class SlavedRegistrationStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedRegistrationStore, self).__init__(db_conn, hs) + + # TODO: use the cached version and invalidate deleted tokens + get_user_by_access_token = RegistrationStore.__dict__[ + "get_user_by_access_token" + ].orig + + _query_for_auth = DataStore._query_for_auth.__func__ diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 02d837ee6a..6bb4821ec6 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -128,11 +128,9 @@ class PushRuleRestServlet(ClientV1RestServlet): # we build up the full structure and then decide which bits of it # to send which means doing unnecessary work sometimes but is # is probably not going to make a whole lot of difference - rawrules = yield self.store.get_push_rules_for_user(user_id) + rules = yield self.store.get_push_rules_for_user(user_id) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - - rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) + rules = format_push_rules_for_user(requester.user, rules) path = request.postpath[1:] diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8581796b7e..6928a213e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore, super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4655669ba0..2b3f79577b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index ebb97c8474..786d6f6d67 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.push.baserules import list_with_base_rules from twisted.internet import defer import logging @@ -23,6 +24,29 @@ import simplejson as json logger = logging.getLogger(__name__) +def _load_rules(rawrules, enabled_map): + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + class PushRuleStore(SQLBaseStore): @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): @@ -42,7 +66,11 @@ class PushRuleStore(SQLBaseStore): key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) ) - defer.returnValue(rows) + enabled_map = yield self.get_push_rules_enabled_for_user(user_id) + + rules = _load_rules(rows, enabled_map) + + defer.returnValue(rules) @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): @@ -85,6 +113,14 @@ class PushRuleStore(SQLBaseStore): for row in rows: results.setdefault(row['user_name'], []).append(row) + + enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) + + for user_id, rules in results.items(): + results[user_id] = _load_rules( + rules, enabled_map_by_user.get(user_id, {}) + ) + defer.returnValue(results) @cachedList(cached_method_name="get_push_rules_enabled_for_user", diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 9e8e2e2964..a7d7c54d7e 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,7 +18,7 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList import logging import simplejson as json @@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) - @cachedInlineCallbacks(num_args=1) - def get_users_with_pushers_in_room(self, room_id): - users = yield self.get_users_in_room(room_id) - + @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000) + def get_if_user_has_pusher(self, user_id): result = yield self._simple_select_many_batch( table='pushers', + keyvalues={ + 'user_name': 'user_id', + }, + retcol='user_name', + desc='get_if_user_has_pusher', + allow_none=True, + ) + + defer.returnValue(bool(result)) + + @cachedList(cached_method_name="get_if_user_has_pusher", + list_name="user_ids", num_args=1, inlineCallbacks=True) + def get_if_users_have_pushers(self, user_ids): + rows = yield self._simple_select_many_batch( + table='pushers', column='user_name', - iterable=users, + iterable=user_ids, retcols=['user_name'], - desc='get_users_with_pushers_in_room' + desc='get_if_users_have_pushers' ) - defer.returnValue([r['user_name'] for r in result]) + result = {user_id: False for user_id in user_ids} + result.update({r['user_name']: True for r in rows}) + + defer.returnValue(result) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore): }, ) if newly_inserted: - # get_users_with_pushers_in_room only cares if the user has + # get_if_user_has_pusher only cares if the user has # at least *one* pusher. - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) self._simple_delete_one_txn( txn, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index f1774f0e44..8c26f39fbb 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore): "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() ) + @cachedInlineCallbacks() + def get_users_with_read_receipts_in_room(self, room_id): + receipts = yield self.get_receipts_for_room(room_id, "m.read") + defer.returnValue(set(r['user_id'] for r in receipts)) + + def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type, + user_id): + if receipt_type != "m.read": + return + + # Returns an ObservableDeferred + res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) + + if res and res.called and user_id in res.result: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return + + self.get_users_with_read_receipts_in_room.invalidate((room_id,)) + @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): return self._simple_select_list( @@ -229,6 +249,10 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache @@ -374,6 +398,10 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index face685ed2..41b395e07c 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) - txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering ) |