diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 4729b3b37d..43e0518db0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -261,125 +261,141 @@ class BulkPushRuleEvaluator:
@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
- self, event: EventBase, context: EventContext
+ self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None:
- """Given an event and context, evaluate the push rules, check if the message
- should increment the unread count, and insert the results into the
- event_push_actions_staging table.
+ """Given a list of events and their associated contexts, evaluate the push rules
+ for each event, check if the message should increment the unread count, and
+ insert the results into the event_push_actions_staging table.
"""
- if not event.internal_metadata.is_notifiable():
- # Push rules for events that aren't notifiable can't be processed by this
- return
-
- # Disable counting as unread unless the experimental configuration is
- # enabled, as it can cause additional (unwanted) rows to be added to the
- # event_push_actions table.
- count_as_unread = False
- if self.hs.config.experimental.msc2654_enabled:
- count_as_unread = _should_count_as_unread(event, context)
-
- rules_by_user = await self._get_rules_for_event(event)
- actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
-
- room_member_count = await self.store.get_number_joined_users_in_room(
- event.room_id
- )
+ for event, context in events_and_context:
+ if not event.internal_metadata.is_notifiable():
+ # Push rules for events that aren't notifiable can't be processed by this
+ return
+ # Skip push notification actions for historical messages
+ # because we don't want to notify people about old history back in time.
+ # The historical messages also do not have the proper `context.current_state_ids`
+ # and `state_groups` because they have `prev_events` that aren't persisted yet
+ # (historical messages persisted in reverse-chronological order).
+ if event.internal_metadata.is_historical():
+ return
+
+ # Disable counting as unread unless the experimental configuration is
+ # enabled, as it can cause additional (unwanted) rows to be added to the
+ # event_push_actions table.
+ count_as_unread = False
+ if self.hs.config.experimental.msc2654_enabled:
+ count_as_unread = _should_count_as_unread(event, context)
+
+ rules_by_user = await self._get_rules_for_event(event)
+ actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
+
+ room_member_count = await self.store.get_number_joined_users_in_room(
+ event.room_id
+ )
- (
- power_levels,
- sender_power_level,
- ) = await self._get_power_levels_and_sender_level(event, context)
-
- relation = relation_from_event(event)
- # If the event does not have a relation, then cannot have any mutual
- # relations or thread ID.
- relations = {}
- thread_id = MAIN_TIMELINE
- if relation:
- relations = await self._get_mutual_relations(
- relation.parent_id,
- itertools.chain(*(r.rules() for r in rules_by_user.values())),
+ # For batched events the power level events may not have been persisted yet,
+ # so we pass in the batched events. Thus if the event cannot be found in the
+ # database we can check in the batch.
+ event_id_to_event = {e.event_id: e for e, _ in events_and_context}
+ (
+ power_levels,
+ sender_power_level,
+ ) = await self._get_power_levels_and_sender_level(
+ event, context, event_id_to_event
)
- # Recursively attempt to find the thread this event relates to.
- if relation.rel_type == RelationTypes.THREAD:
- thread_id = relation.parent_id
- else:
- # Since the event has not yet been persisted we check whether
- # the parent is part of a thread.
- thread_id = await self.store.get_thread_id(relation.parent_id) or "main"
-
- evaluator = PushRuleEvaluator(
- _flatten_dict(event),
- room_member_count,
- sender_power_level,
- power_levels.get("notifications", {}),
- relations,
- self._relations_match_enabled,
- )
- users = rules_by_user.keys()
- profiles = await self.store.get_subset_users_in_room_with_profiles(
- event.room_id, users
- )
+ relation = relation_from_event(event)
+ # If the event does not have a relation, then cannot have any mutual
+ # relations or thread ID.
+ relations = {}
+ thread_id = MAIN_TIMELINE
+ if relation:
+ relations = await self._get_mutual_relations(
+ relation.parent_id,
+ itertools.chain(*(r.rules() for r in rules_by_user.values())),
+ )
+ # Recursively attempt to find the thread this event relates to.
+ if relation.rel_type == RelationTypes.THREAD:
+ thread_id = relation.parent_id
+ else:
+ # Since the event has not yet been persisted we check whether
+ # the parent is part of a thread.
+ thread_id = (
+ await self.store.get_thread_id(relation.parent_id) or "main"
+ )
+
+ evaluator = PushRuleEvaluator(
+ _flatten_dict(event),
+ room_member_count,
+ sender_power_level,
+ power_levels.get("notifications", {}),
+ relations,
+ self._relations_match_enabled,
+ )
- for uid, rules in rules_by_user.items():
- if event.sender == uid:
- continue
+ users = rules_by_user.keys()
+ profiles = await self.store.get_subset_users_in_room_with_profiles(
+ event.room_id, users
+ )
- display_name = None
- profile = profiles.get(uid)
- if profile:
- display_name = profile.display_name
-
- if not display_name:
- # Handle the case where we are pushing a membership event to
- # that user, as they might not be already joined.
- if event.type == EventTypes.Member and event.state_key == uid:
- display_name = event.content.get("displayname", None)
- if not isinstance(display_name, str):
- display_name = None
-
- if count_as_unread:
- # Add an element for the current user if the event needs to be marked as
- # unread, so that add_push_actions_to_staging iterates over it.
- # If the event shouldn't be marked as unread but should notify the
- # current user, it'll be added to the dict later.
- actions_by_user[uid] = []
-
- actions = evaluator.run(rules, uid, display_name)
- if "notify" in actions:
- # Push rules say we should notify the user of this event
- actions_by_user[uid] = actions
-
- # If there aren't any actions then we can skip the rest of the
- # processing.
- if not actions_by_user:
- return
-
- # This is a check for the case where user joins a room without being
- # allowed to see history, and then the server receives a delayed event
- # from before the user joined, which they should not be pushed for
- #
- # We do this *after* calculating the push actions as a) its unlikely
- # that we'll filter anyone out and b) for large rooms its likely that
- # most users will have push disabled and so the set of users to check is
- # much smaller.
- uids_with_visibility = await filter_event_for_clients_with_state(
- self.store, actions_by_user.keys(), event, context
- )
+ for uid, rules in rules_by_user.items():
+ if event.sender == uid:
+ continue
- for user_id in set(actions_by_user).difference(uids_with_visibility):
- actions_by_user.pop(user_id, None)
-
- # Mark in the DB staging area the push actions for users who should be
- # notified for this event. (This will then get handled when we persist
- # the event)
- await self.store.add_push_actions_to_staging(
- event.event_id,
- actions_by_user,
- count_as_unread,
- thread_id,
- )
+ display_name = None
+ profile = profiles.get(uid)
+ if profile:
+ display_name = profile.display_name
+
+ if not display_name:
+ # Handle the case where we are pushing a membership event to
+ # that user, as they might not be already joined.
+ if event.type == EventTypes.Member and event.state_key == uid:
+ display_name = event.content.get("displayname", None)
+ if not isinstance(display_name, str):
+ display_name = None
+
+ if count_as_unread:
+ # Add an element for the current user if the event needs to be marked as
+ # unread, so that add_push_actions_to_staging iterates over it.
+ # If the event shouldn't be marked as unread but should notify the
+ # current user, it'll be added to the dict later.
+ actions_by_user[uid] = []
+
+ actions = evaluator.run(rules, uid, display_name)
+ if "notify" in actions:
+ # Push rules say we should notify the user of this event
+ actions_by_user[uid] = actions
+
+ # If there aren't any actions then we can skip the rest of the
+ # processing.
+ if not actions_by_user:
+ return
+
+ # This is a check for the case where user joins a room without being
+ # allowed to see history, and then the server receives a delayed event
+ # from before the user joined, which they should not be pushed for
+ #
+ # We do this *after* calculating the push actions as a) its unlikely
+ # that we'll filter anyone out and b) for large rooms its likely that
+ # most users will have push disabled and so the set of users to check is
+ # much smaller.
+ uids_with_visibility = await filter_event_for_clients_with_state(
+ self.store, actions_by_user.keys(), event, context
+ )
+
+ for user_id in set(actions_by_user).difference(uids_with_visibility):
+ actions_by_user.pop(user_id, None)
+
+ # Mark in the DB staging area the push actions for users who should be
+ # notified for this event. (This will then get handled when we persist
+ # the event)
+ await self.store.add_push_actions_to_staging(
+ event.event_id,
+ actions_by_user,
+ count_as_unread,
+ thread_id,
+ )
MemberMap = Dict[str, Optional[EventIdMembership]]
|