summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py5
-rw-r--r--synapse/push/action_generator.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py69
-rw-r--r--synapse/push/clientformat.py30
-rw-r--r--synapse/replication/slave/storage/appservice.py30
-rw-r--r--synapse/replication/slave/storage/filtering.py25
-rw-r--r--synapse/replication/slave/storage/presence.py59
-rw-r--r--synapse/replication/slave/storage/push_rule.py67
-rw-r--r--synapse/replication/slave/storage/registration.py30
-rw-r--r--synapse/rest/client/v1/push_rule.py6
-rw-r--r--synapse/storage/__init__.py6
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/push_rule.py38
-rw-r--r--synapse/storage/pusher.py38
-rw-r--r--synapse/storage/receipts.py28
-rw-r--r--synapse/storage/roommember.py3
16 files changed, 357 insertions, 82 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5307b62b85..be26a491ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -198,9 +198,8 @@ class SyncHandler(object):
     @defer.inlineCallbacks
     def push_rules_for_user(self, user):
         user_id = user.to_string()
-        rawrules = yield self.store.get_push_rules_for_user(user_id)
-        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-        rules = format_push_rules_for_user(user, rawrules, enabled_map)
+        rules = yield self.store.get_push_rules_for_user(user_id)
+        rules = format_push_rules_for_user(user, rules)
         defer.returnValue(rules)
 
     @defer.inlineCallbacks
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 9b208668b6..46e768e35c 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,7 +40,7 @@ class ActionGenerator:
     def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "handle_push_actions_for_event"):
             bulk_evaluator = yield evaluator_for_event(
-                event, self.hs, self.store
+                event, self.hs, self.store, context.current_state
             )
 
             actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 25f2fb9da4..6e42121b1d 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -18,10 +18,9 @@ import ujson as json
 
 from twisted.internet import defer
 
-from .baserules import list_with_base_rules
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.visibility import filter_events_for_clients
 
 
@@ -38,62 +37,41 @@ def decode_rule_json(rule):
 @defer.inlineCallbacks
 def _get_rules(room_id, user_ids, store):
     rules_by_user = yield store.bulk_get_push_rules(user_ids)
-    rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
 
     rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
 
-    rules_by_user = {
-        uid: list_with_base_rules([
-            decode_rule_json(rule_list)
-            for rule_list in rules_by_user.get(uid, [])
-        ])
-        for uid in user_ids
-    }
-
-    # We apply the rules-enabled map here: bulk_get_push_rules doesn't
-    # fetch disabled rules, but this won't account for any server default
-    # rules the user has disabled, so we need to do this too.
-    for uid in user_ids:
-        user_enabled_map = rules_enabled_by_user.get(uid)
-        if not user_enabled_map:
-            continue
-
-        for i, rule in enumerate(rules_by_user[uid]):
-            rule_id = rule['rule_id']
-
-            if rule_id in user_enabled_map:
-                if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
-                    # Rules are cached across users.
-                    rule = dict(rule)
-                    rule['enabled'] = bool(user_enabled_map[rule_id])
-                    rules_by_user[uid][i] = rule
-
     defer.returnValue(rules_by_user)
 
 
 @defer.inlineCallbacks
-def evaluator_for_event(event, hs, store):
+def evaluator_for_event(event, hs, store, current_state):
     room_id = event.room_id
-
-    # users in the room who have pushers need to get push rules run because
-    # that's how their pushers work
-    users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
-
     # We also will want to generate notifs for other people in the room so
     # their unread countss are correct in the event stream, but to avoid
     # generating them for bot / AS users etc, we only do so for people who've
     # sent a read receipt into the room.
 
-    all_in_room = yield store.get_users_in_room(room_id)
-    all_in_room = set(all_in_room)
+    local_users_in_room = set(
+        e.state_key for e in current_state.values()
+        if e.type == EventTypes.Member and e.membership == Membership.JOIN
+        and hs.is_mine_id(e.state_key)
+    )
 
-    receipts = yield store.get_receipts_for_room(room_id, "m.read")
+    # users in the room who have pushers need to get push rules run because
+    # that's how their pushers work
+    if_users_with_pushers = yield store.get_if_users_have_pushers(
+        local_users_in_room
+    )
+    user_ids = set(
+        uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+    )
+
+    users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
 
     # any users with pushers must be ours: they have pushers
-    user_ids = set(users_with_pushers)
-    for r in receipts:
-        if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room:
-            user_ids.add(r['user_id'])
+    for uid in users_with_receipts:
+        if uid in local_users_in_room:
+            user_ids.add(uid)
 
     # if this event is an invite event, we may need to run rules for the user
     # who's been invited, otherwise they won't get told they've been invited
@@ -104,8 +82,6 @@ def evaluator_for_event(event, hs, store):
             if has_pusher:
                 user_ids.add(invited_user)
 
-    user_ids = list(user_ids)
-
     rules_by_user = yield _get_rules(room_id, user_ids, store)
 
     defer.returnValue(BulkPushRuleEvaluator(
@@ -143,7 +119,10 @@ class BulkPushRuleEvaluator:
             self.store, user_tuples, [event], {event.event_id: current_state}
         )
 
-        room_members = yield self.store.get_users_in_room(self.room_id)
+        room_members = set(
+            e.state_key for e in current_state.values()
+            if e.type == EventTypes.Member and e.membership == Membership.JOIN
+        )
 
         evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
 
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index ae9db9ec2f..b3983f7940 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -23,10 +23,7 @@ import copy
 import simplejson as json
 
 
-def format_push_rules_for_user(user, rawrules, enabled_map):
-    """Converts a list of rawrules and a enabled map into nested dictionaries
-    to match the Matrix client-server format for push rules"""
-
+def load_rules_for_user(user, rawrules, enabled_map):
     ruleslist = []
     for rawrule in rawrules:
         rule = dict(rawrule)
@@ -35,7 +32,26 @@ def format_push_rules_for_user(user, rawrules, enabled_map):
         ruleslist.append(rule)
 
     # We're going to be mutating this a lot, so do a deep copy
-    ruleslist = copy.deepcopy(list_with_base_rules(ruleslist))
+    rules = list(list_with_base_rules(ruleslist))
+
+    for i, rule in enumerate(rules):
+        rule_id = rule['rule_id']
+        if rule_id in enabled_map:
+            if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+                # Rules are cached across users.
+                rule = dict(rule)
+                rule['enabled'] = bool(enabled_map[rule_id])
+                rules[i] = rule
+
+    return rules
+
+
+def format_push_rules_for_user(user, ruleslist):
+    """Converts a list of rawrules and a enabled map into nested dictionaries
+    to match the Matrix client-server format for push rules"""
+
+    # We're going to be mutating this a lot, so do a deep copy
+    ruleslist = copy.deepcopy(ruleslist)
 
     rules = {'global': {}, 'device': {}}
 
@@ -60,9 +76,7 @@ def format_push_rules_for_user(user, rawrules, enabled_map):
 
         template_rule = _rule_to_template(r)
         if template_rule:
-            if r['rule_id'] in enabled_map:
-                template_rule['enabled'] = enabled_map[r['rule_id']]
-            elif 'enabled' in r:
+            if 'enabled' in r:
                 template_rule['enabled'] = r['enabled']
             else:
                 template_rule['enabled'] = True
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
new file mode 100644
index 0000000000..25792d9429
--- /dev/null
+++ b/synapse/replication/slave/storage/appservice.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.config.appservice import load_appservices
+
+
+class SlavedApplicationServiceStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
+        self.services_cache = load_appservices(
+            hs.config.server_name,
+            hs.config.app_service_config_files
+        )
+
+    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
new file mode 100644
index 0000000000..819ed62881
--- /dev/null
+++ b/synapse/replication/slave/storage/filtering.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.filtering import FilteringStore
+
+
+class SlavedFilteringStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedFilteringStore, self).__init__(db_conn, hs)
+
+    # Filters are immutable so this cache doesn't need to be expired
+    get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedPresenceStore, self).__init__(db_conn, hs)
+        self._presence_id_gen = SlavedIdTracker(
+            db_conn, "presence_stream", "stream_id",
+        )
+
+        self._presence_on_startup = self._get_active_presence(db_conn)
+
+        self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+            "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+        )
+
+    _get_active_presence = DataStore._get_active_presence.__func__
+    take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+    get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+    def get_current_presence_token(self):
+        return self._presence_id_gen.get_current_token()
+
+    def stream_positions(self):
+        result = super(SlavedPresenceStore, self).stream_positions()
+        position = self._presence_id_gen.get_current_token()
+        result["presence"] = position
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("presence")
+        if stream:
+            self._presence_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.presence_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+        return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
new file mode 100644
index 0000000000..21ceb0213a
--- /dev/null
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events import SlavedEventStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedPushRuleStore(SlavedEventStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
+        self._push_rules_stream_id_gen = SlavedIdTracker(
+            db_conn, "push_rules_stream", "stream_id",
+        )
+        self.push_rules_stream_cache = StreamChangeCache(
+            "PushRulesStreamChangeCache",
+            self._push_rules_stream_id_gen.get_current_token(),
+        )
+
+    get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+    get_push_rules_enabled_for_user = (
+        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+    )
+    have_push_rules_changed_for_user = (
+        DataStore.have_push_rules_changed_for_user.__func__
+    )
+
+    def get_push_rules_stream_token(self):
+        return (
+            self._push_rules_stream_id_gen.get_current_token(),
+            self._stream_id_gen.get_current_token(),
+        )
+
+    def stream_positions(self):
+        result = super(SlavedPushRuleStore, self).stream_positions()
+        result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("push_rules")
+        if stream:
+            for row in stream["rows"]:
+                position = row[0]
+                user_id = row[2]
+                self.get_push_rules_for_user.invalidate((user_id,))
+                self.get_push_rules_enabled_for_user.invalidate((user_id,))
+                self.push_rules_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+            self._push_rules_stream_id_gen.advance(int(stream["position"]))
+
+        return super(SlavedPushRuleStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
new file mode 100644
index 0000000000..307833f9e1
--- /dev/null
+++ b/synapse/replication/slave/storage/registration.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.registration import RegistrationStore
+
+
+class SlavedRegistrationStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedRegistrationStore, self).__init__(db_conn, hs)
+
+    # TODO: use the cached version and invalidate deleted tokens
+    get_user_by_access_token = RegistrationStore.__dict__[
+        "get_user_by_access_token"
+    ].orig
+
+    _query_for_auth = DataStore._query_for_auth.__func__
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 02d837ee6a..6bb4821ec6 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -128,11 +128,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
         # we build up the full structure and then decide which bits of it
         # to send which means doing unnecessary work sometimes but is
         # is probably not going to make a whole lot of difference
-        rawrules = yield self.store.get_push_rules_for_user(user_id)
+        rules = yield self.store.get_push_rules_for_user(user_id)
 
-        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-
-        rules = format_push_rules_for_user(requester.user, rawrules, enabled_map)
+        rules = format_push_rules_for_user(requester.user, rules)
 
         path = request.postpath[1:]
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8581796b7e..6928a213e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "AccountDataAndTagsChangeCache", account_max,
         )
 
-        self.__presence_on_startup = self._get_active_presence(db_conn)
+        self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
             db_conn, "presence_stream",
@@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore,
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
-        active_on_startup = self.__presence_on_startup
-        self.__presence_on_startup = None
+        active_on_startup = self._presence_on_startup
+        self._presence_on_startup = None
         return active_on_startup
 
     def _get_active_presence(self, db_conn):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..2b3f79577b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-            txn.call_after(
-                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
-            )
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index ebb97c8474..786d6f6d67 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -15,6 +15,7 @@
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+from synapse.push.baserules import list_with_base_rules
 from twisted.internet import defer
 
 import logging
@@ -23,6 +24,29 @@ import simplejson as json
 logger = logging.getLogger(__name__)
 
 
+def _load_rules(rawrules, enabled_map):
+    ruleslist = []
+    for rawrule in rawrules:
+        rule = dict(rawrule)
+        rule["conditions"] = json.loads(rawrule["conditions"])
+        rule["actions"] = json.loads(rawrule["actions"])
+        ruleslist.append(rule)
+
+    # We're going to be mutating this a lot, so do a deep copy
+    rules = list(list_with_base_rules(ruleslist))
+
+    for i, rule in enumerate(rules):
+        rule_id = rule['rule_id']
+        if rule_id in enabled_map:
+            if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+                # Rules are cached across users.
+                rule = dict(rule)
+                rule['enabled'] = bool(enabled_map[rule_id])
+                rules[i] = rule
+
+    return rules
+
+
 class PushRuleStore(SQLBaseStore):
     @cachedInlineCallbacks(lru=True)
     def get_push_rules_for_user(self, user_id):
@@ -42,7 +66,11 @@ class PushRuleStore(SQLBaseStore):
             key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
         )
 
-        defer.returnValue(rows)
+        enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
+
+        rules = _load_rules(rows, enabled_map)
+
+        defer.returnValue(rules)
 
     @cachedInlineCallbacks(lru=True)
     def get_push_rules_enabled_for_user(self, user_id):
@@ -85,6 +113,14 @@ class PushRuleStore(SQLBaseStore):
 
         for row in rows:
             results.setdefault(row['user_name'], []).append(row)
+
+        enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
+
+        for user_id, rules in results.items():
+            results[user_id] = _load_rules(
+                rules, enabled_map_by_user.get(user_id, {})
+            )
+
         defer.returnValue(results)
 
     @cachedList(cached_method_name="get_push_rules_enabled_for_user",
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9e8e2e2964..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
 import logging
 import simplejson as json
@@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
-    @cachedInlineCallbacks(num_args=1)
-    def get_users_with_pushers_in_room(self, room_id):
-        users = yield self.get_users_in_room(room_id)
-
+    @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+    def get_if_user_has_pusher(self, user_id):
         result = yield self._simple_select_many_batch(
             table='pushers',
+            keyvalues={
+                'user_name': 'user_id',
+            },
+            retcol='user_name',
+            desc='get_if_user_has_pusher',
+            allow_none=True,
+        )
+
+        defer.returnValue(bool(result))
+
+    @cachedList(cached_method_name="get_if_user_has_pusher",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
+    def get_if_users_have_pushers(self, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table='pushers',
             column='user_name',
-            iterable=users,
+            iterable=user_ids,
             retcols=['user_name'],
-            desc='get_users_with_pushers_in_room'
+            desc='get_if_users_have_pushers'
         )
 
-        defer.returnValue([r['user_name'] for r in result])
+        result = {user_id: False for user_id in user_ids}
+        result.update({r['user_name']: True for r in rows})
+
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
@@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
                     },
                 )
                 if newly_inserted:
-                    # get_users_with_pushers_in_room only cares if the user has
+                    # get_if_user_has_pusher only cares if the user has
                     # at least *one* pusher.
-                    txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
 
             yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
-            txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
 
             self._simple_delete_one_txn(
                 txn,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index f1774f0e44..8c26f39fbb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore):
             "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
+    @cachedInlineCallbacks()
+    def get_users_with_read_receipts_in_room(self, room_id):
+        receipts = yield self.get_receipts_for_room(room_id, "m.read")
+        defer.returnValue(set(r['user_id'] for r in receipts))
+
+    def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+                                                    user_id):
+        if receipt_type != "m.read":
+            return
+
+        # Returns an ObservableDeferred
+        res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+        if res and res.called and user_id in res.result:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
+
+        self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
         return self._simple_select_list(
@@ -229,6 +249,10 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
@@ -374,6 +398,10 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index face685ed2..41b395e07c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore):
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(
-                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
-            )
-            txn.call_after(
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )