summary refs log tree commit diff
diff options
context:
space:
mode:
authorH. Shay <hillerys@element.io>2022-10-06 11:28:56 -0700
committerH. Shay <hillerys@element.io>2022-10-06 11:28:56 -0700
commitdcc096b4353f7ff6ff89d3583a9122daeb725b7a (patch)
treed82b4982fcb6c74fda808970ed52f13d26713684
parentchange _get_power_levels_and_sender_level to check for events in batch (diff)
downloadsynapse-dcc096b4353f7ff6ff89d3583a9122daeb725b7a.tar.xz
change action for event by user to take a list of events, context and pass batch of events to _get_power_levels_and_sender_level
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py238
1 files changed, 127 insertions, 111 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py

index 4729b3b37d..43e0518db0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -261,125 +261,141 @@ class BulkPushRuleEvaluator: @measure_func("action_for_event_by_user") async def action_for_event_by_user( - self, event: EventBase, context: EventContext + self, events_and_context: List[Tuple[EventBase, EventContext]] ) -> None: - """Given an event and context, evaluate the push rules, check if the message - should increment the unread count, and insert the results into the - event_push_actions_staging table. + """Given a list of events and their associated contexts, evaluate the push rules + for each event, check if the message should increment the unread count, and + insert the results into the event_push_actions_staging table. """ - if not event.internal_metadata.is_notifiable(): - # Push rules for events that aren't notifiable can't be processed by this - return - - # Disable counting as unread unless the experimental configuration is - # enabled, as it can cause additional (unwanted) rows to be added to the - # event_push_actions table. - count_as_unread = False - if self.hs.config.experimental.msc2654_enabled: - count_as_unread = _should_count_as_unread(event, context) - - rules_by_user = await self._get_rules_for_event(event) - actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} - - room_member_count = await self.store.get_number_joined_users_in_room( - event.room_id - ) + for event, context in events_and_context: + if not event.internal_metadata.is_notifiable(): + # Push rules for events that aren't notifiable can't be processed by this + return + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if event.internal_metadata.is_historical(): + return + + # Disable counting as unread unless the experimental configuration is + # enabled, as it can cause additional (unwanted) rows to be added to the + # event_push_actions table. + count_as_unread = False + if self.hs.config.experimental.msc2654_enabled: + count_as_unread = _should_count_as_unread(event, context) + + rules_by_user = await self._get_rules_for_event(event) + actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} + + room_member_count = await self.store.get_number_joined_users_in_room( + event.room_id + ) - ( - power_levels, - sender_power_level, - ) = await self._get_power_levels_and_sender_level(event, context) - - relation = relation_from_event(event) - # If the event does not have a relation, then cannot have any mutual - # relations or thread ID. - relations = {} - thread_id = MAIN_TIMELINE - if relation: - relations = await self._get_mutual_relations( - relation.parent_id, - itertools.chain(*(r.rules() for r in rules_by_user.values())), + # For batched events the power level events may not have been persisted yet, + # so we pass in the batched events. Thus if the event cannot be found in the + # database we can check in the batch. + event_id_to_event = {e.event_id: e for e, _ in events_and_context} + ( + power_levels, + sender_power_level, + ) = await self._get_power_levels_and_sender_level( + event, context, event_id_to_event ) - # Recursively attempt to find the thread this event relates to. - if relation.rel_type == RelationTypes.THREAD: - thread_id = relation.parent_id - else: - # Since the event has not yet been persisted we check whether - # the parent is part of a thread. - thread_id = await self.store.get_thread_id(relation.parent_id) or "main" - - evaluator = PushRuleEvaluator( - _flatten_dict(event), - room_member_count, - sender_power_level, - power_levels.get("notifications", {}), - relations, - self._relations_match_enabled, - ) - users = rules_by_user.keys() - profiles = await self.store.get_subset_users_in_room_with_profiles( - event.room_id, users - ) + relation = relation_from_event(event) + # If the event does not have a relation, then cannot have any mutual + # relations or thread ID. + relations = {} + thread_id = MAIN_TIMELINE + if relation: + relations = await self._get_mutual_relations( + relation.parent_id, + itertools.chain(*(r.rules() for r in rules_by_user.values())), + ) + # Recursively attempt to find the thread this event relates to. + if relation.rel_type == RelationTypes.THREAD: + thread_id = relation.parent_id + else: + # Since the event has not yet been persisted we check whether + # the parent is part of a thread. + thread_id = ( + await self.store.get_thread_id(relation.parent_id) or "main" + ) + + evaluator = PushRuleEvaluator( + _flatten_dict(event), + room_member_count, + sender_power_level, + power_levels.get("notifications", {}), + relations, + self._relations_match_enabled, + ) - for uid, rules in rules_by_user.items(): - if event.sender == uid: - continue + users = rules_by_user.keys() + profiles = await self.store.get_subset_users_in_room_with_profiles( + event.room_id, users + ) - display_name = None - profile = profiles.get(uid) - if profile: - display_name = profile.display_name - - if not display_name: - # Handle the case where we are pushing a membership event to - # that user, as they might not be already joined. - if event.type == EventTypes.Member and event.state_key == uid: - display_name = event.content.get("displayname", None) - if not isinstance(display_name, str): - display_name = None - - if count_as_unread: - # Add an element for the current user if the event needs to be marked as - # unread, so that add_push_actions_to_staging iterates over it. - # If the event shouldn't be marked as unread but should notify the - # current user, it'll be added to the dict later. - actions_by_user[uid] = [] - - actions = evaluator.run(rules, uid, display_name) - if "notify" in actions: - # Push rules say we should notify the user of this event - actions_by_user[uid] = actions - - # If there aren't any actions then we can skip the rest of the - # processing. - if not actions_by_user: - return - - # This is a check for the case where user joins a room without being - # allowed to see history, and then the server receives a delayed event - # from before the user joined, which they should not be pushed for - # - # We do this *after* calculating the push actions as a) its unlikely - # that we'll filter anyone out and b) for large rooms its likely that - # most users will have push disabled and so the set of users to check is - # much smaller. - uids_with_visibility = await filter_event_for_clients_with_state( - self.store, actions_by_user.keys(), event, context - ) + for uid, rules in rules_by_user.items(): + if event.sender == uid: + continue - for user_id in set(actions_by_user).difference(uids_with_visibility): - actions_by_user.pop(user_id, None) - - # Mark in the DB staging area the push actions for users who should be - # notified for this event. (This will then get handled when we persist - # the event) - await self.store.add_push_actions_to_staging( - event.event_id, - actions_by_user, - count_as_unread, - thread_id, - ) + display_name = None + profile = profiles.get(uid) + if profile: + display_name = profile.display_name + + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None + + if count_as_unread: + # Add an element for the current user if the event needs to be marked as + # unread, so that add_push_actions_to_staging iterates over it. + # If the event shouldn't be marked as unread but should notify the + # current user, it'll be added to the dict later. + actions_by_user[uid] = [] + + actions = evaluator.run(rules, uid, display_name) + if "notify" in actions: + # Push rules say we should notify the user of this event + actions_by_user[uid] = actions + + # If there aren't any actions then we can skip the rest of the + # processing. + if not actions_by_user: + return + + # This is a check for the case where user joins a room without being + # allowed to see history, and then the server receives a delayed event + # from before the user joined, which they should not be pushed for + # + # We do this *after* calculating the push actions as a) its unlikely + # that we'll filter anyone out and b) for large rooms its likely that + # most users will have push disabled and so the set of users to check is + # much smaller. + uids_with_visibility = await filter_event_for_clients_with_state( + self.store, actions_by_user.keys(), event, context + ) + + for user_id in set(actions_by_user).difference(uids_with_visibility): + actions_by_user.pop(user_id, None) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + await self.store.add_push_actions_to_staging( + event.event_id, + actions_by_user, + count_as_unread, + thread_id, + ) MemberMap = Dict[str, Optional[EventIdMembership]]