diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 9b208668b6..46e768e35c 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,7 +40,7 @@ class ActionGenerator:
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "handle_push_actions_for_event"):
bulk_evaluator = yield evaluator_for_event(
- event, self.hs, self.store
+ event, self.hs, self.store, context.current_state
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 1e5c4b073c..8c59e59e03 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
from .baserules import list_with_base_rules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.visibility import filter_events_for_clients
@@ -72,20 +72,24 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks
-def evaluator_for_event(event, hs, store):
+def evaluator_for_event(event, hs, store, current_state):
room_id = event.room_id
-
- # users in the room who have pushers need to get push rules run because
- # that's how their pushers work
- users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
-
# We also will want to generate notifs for other people in the room so
# their unread countss are correct in the event stream, but to avoid
# generating them for bot / AS users etc, we only do so for people who've
# sent a read receipt into the room.
- all_in_room = yield store.get_users_in_room(room_id)
- all_in_room = set(all_in_room)
+ all_in_room = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ )
+
+ # users in the room who have pushers need to get push rules run because
+ # that's how their pushers work
+ if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
+ users_with_pushers = set(
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+ )
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
@@ -143,7 +147,10 @@ class BulkPushRuleEvaluator:
self.store, user_tuples, [event], {event.event_id: current_state}
)
- room_members = yield self.store.get_users_in_room(self.room_id)
+ room_members = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ )
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..2b3f79577b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9e8e2e2964..39d5349eaa 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
import simplejson as json
@@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(num_args=1)
- def get_users_with_pushers_in_room(self, room_id):
- users = yield self.get_users_in_room(room_id)
-
+ @cachedInlineCallbacks(lru=True, num_args=1)
+ def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
table='pushers',
+ keyvalues={
+ 'user_name': 'user_id',
+ },
+ retcol='user_name',
+ desc='get_if_user_has_pusher',
+ allow_none=True,
+ )
+
+ defer.returnValue(bool(result))
+
+ @cachedList(cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
+ def get_if_users_have_pushers(self, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table='pushers',
column='user_name',
- iterable=users,
+ iterable=user_ids,
retcols=['user_name'],
- desc='get_users_with_pushers_in_room'
+ desc='get_if_users_have_pushers'
)
- defer.returnValue([r['user_name'] for r in result])
+ result = {user_id: False for user_id in user_ids}
+ result.update({r['user_name']: True for r in rows})
+
+ defer.returnValue(result)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
},
)
if newly_inserted:
- # get_users_with_pushers_in_room only cares if the user has
+ # get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
self._simple_delete_one_txn(
txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index face685ed2..41b395e07c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
- txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
|