summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-01 11:08:45 +0100
committerErik Johnston <erik@matrix.org>2016-06-01 15:25:25 +0100
commitc8285564a3772db387fd5c94b1a82329dc320e36 (patch)
tree25785e1481ccd25d5931dea6a2655d65bee8cdb6 /synapse
parentAdd get_users_with_read_receipts_in_room cache (diff)
downloadsynapse-c8285564a3772db387fd5c94b1a82329dc320e36.tar.xz
Use state to calculate get_users_in_room
Diffstat (limited to 'synapse')
-rw-r--r--synapse/push/action_generator.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py27
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/pusher.py38
-rw-r--r--synapse/storage/roommember.py3
5 files changed, 45 insertions, 28 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 9b208668b6..46e768e35c 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,7 +40,7 @@ class ActionGenerator:
     def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "handle_push_actions_for_event"):
             bulk_evaluator = yield evaluator_for_event(
-                event, self.hs, self.store
+                event, self.hs, self.store, context.current_state
             )
 
             actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 1e5c4b073c..8c59e59e03 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
 from .baserules import list_with_base_rules
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.visibility import filter_events_for_clients
 
 
@@ -72,20 +72,24 @@ def _get_rules(room_id, user_ids, store):
 
 
 @defer.inlineCallbacks
-def evaluator_for_event(event, hs, store):
+def evaluator_for_event(event, hs, store, current_state):
     room_id = event.room_id
-
-    # users in the room who have pushers need to get push rules run because
-    # that's how their pushers work
-    users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
-
     # We also will want to generate notifs for other people in the room so
     # their unread countss are correct in the event stream, but to avoid
     # generating them for bot / AS users etc, we only do so for people who've
     # sent a read receipt into the room.
 
-    all_in_room = yield store.get_users_in_room(room_id)
-    all_in_room = set(all_in_room)
+    all_in_room = set(
+        e.state_key for e in current_state.values()
+        if e.type == EventTypes.Member and e.membership == Membership.JOIN
+    )
+
+    # users in the room who have pushers need to get push rules run because
+    # that's how their pushers work
+    if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
+    users_with_pushers = set(
+        uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+    )
 
     users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
 
@@ -143,7 +147,10 @@ class BulkPushRuleEvaluator:
             self.store, user_tuples, [event], {event.event_id: current_state}
         )
 
-        room_members = yield self.store.get_users_in_room(self.room_id)
+        room_members = set(
+            e.state_key for e in current_state.values()
+            if e.type == EventTypes.Member and e.membership == Membership.JOIN
+        )
 
         evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..2b3f79577b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-            txn.call_after(
-                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
-            )
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9e8e2e2964..39d5349eaa 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
 import logging
 import simplejson as json
@@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
-    @cachedInlineCallbacks(num_args=1)
-    def get_users_with_pushers_in_room(self, room_id):
-        users = yield self.get_users_in_room(room_id)
-
+    @cachedInlineCallbacks(lru=True, num_args=1)
+    def get_if_user_has_pusher(self, user_id):
         result = yield self._simple_select_many_batch(
             table='pushers',
+            keyvalues={
+                'user_name': 'user_id',
+            },
+            retcol='user_name',
+            desc='get_if_user_has_pusher',
+            allow_none=True,
+        )
+
+        defer.returnValue(bool(result))
+
+    @cachedList(cached_method_name="get_if_user_has_pusher",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
+    def get_if_users_have_pushers(self, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table='pushers',
             column='user_name',
-            iterable=users,
+            iterable=user_ids,
             retcols=['user_name'],
-            desc='get_users_with_pushers_in_room'
+            desc='get_if_users_have_pushers'
         )
 
-        defer.returnValue([r['user_name'] for r in result])
+        result = {user_id: False for user_id in user_ids}
+        result.update({r['user_name']: True for r in rows})
+
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
@@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
                     },
                 )
                 if newly_inserted:
-                    # get_users_with_pushers_in_room only cares if the user has
+                    # get_if_user_has_pusher only cares if the user has
                     # at least *one* pusher.
-                    txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
 
             yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
-            txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
 
             self._simple_delete_one_txn(
                 txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index face685ed2..41b395e07c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore):
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(
-                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
-            )
-            txn.call_after(
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )