summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-11-06 15:41:57 -0500
committerGitHub <noreply@github.com>2023-11-06 15:41:57 -0500
commit7e5d3b06fa8b6ce3676eb1178d7db0e252d48679 (patch)
tree868dd9c257a232886080e1a65a7588daa45c29aa /synapse/push
parentBump setuptools_rust to match pinned version. (#16605) (diff)
downloadsynapse-7e5d3b06fa8b6ce3676eb1178d7db0e252d48679.tar.xz
Collect information for PushRuleEvaluator in parallel. (#16590)
Fetch information needed for push rule evaluation in parallel.
Ideally this would use query pipelining, but this is not
available in psycopg2.

Due to the database thread pool this may result in little
to no parallelization.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py56
1 files changed, 41 insertions, 15 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 14784312dc..5934b1ef34 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -25,10 +25,13 @@ from typing import (
     Sequence,
     Tuple,
     Union,
+    cast,
 )
 
 from prometheus_client import Counter
 
+from twisted.internet.defer import Deferred
+
 from synapse.api.constants import (
     MAIN_TIMELINE,
     EventContentFields,
@@ -40,11 +43,15 @@ from synapse.api.room_versions import PushRuleRoomFlag
 from synapse.event_auth import auth_types_for_event, get_user_power_level
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.state import POWER_KEY
 from synapse.storage.databases.main.roommember import EventIdMembership
+from synapse.storage.roommember import ProfileInfo
 from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
 from synapse.types import JsonValue
 from synapse.types.state import StateFilter
+from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import gather_results
 from synapse.util.caches import register_cache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_event_for_clients_with_state
@@ -342,15 +349,41 @@ class BulkPushRuleEvaluator:
         rules_by_user = await self._get_rules_for_event(event)
         actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
 
-        room_member_count = await self.store.get_number_joined_users_in_room(
-            event.room_id
-        )
-
+        # Gather a bunch of info in parallel.
+        #
+        # This has a lot of ignored types and casting due to the use of @cached
+        # decorated functions passed into run_in_background.
+        #
+        # See https://github.com/matrix-org/synapse/issues/16606
         (
-            power_levels,
-            sender_power_level,
-        ) = await self._get_power_levels_and_sender_level(
-            event, context, event_id_to_event
+            room_member_count,
+            (power_levels, sender_power_level),
+            related_events,
+            profiles,
+        ) = await make_deferred_yieldable(
+            cast(
+                "Deferred[Tuple[int, Tuple[dict, Optional[int]], Dict[str, Dict[str, JsonValue]], Mapping[str, ProfileInfo]]]",
+                gather_results(
+                    (
+                        run_in_background(  # type: ignore[call-arg]
+                            self.store.get_number_joined_users_in_room, event.room_id  # type: ignore[arg-type]
+                        ),
+                        run_in_background(
+                            self._get_power_levels_and_sender_level,
+                            event,
+                            context,
+                            event_id_to_event,
+                        ),
+                        run_in_background(self._related_events, event),
+                        run_in_background(  # type: ignore[call-arg]
+                            self.store.get_subset_users_in_room_with_profiles,
+                            event.room_id,  # type: ignore[arg-type]
+                            rules_by_user.keys(),  # type: ignore[arg-type]
+                        ),
+                    ),
+                    consumeErrors=True,
+                ).addErrback(unwrapFirstError),
+            )
         )
 
         # Find the event's thread ID.
@@ -366,8 +399,6 @@ class BulkPushRuleEvaluator:
                 # the parent is part of a thread.
                 thread_id = await self.store.get_thread_id(relation.parent_id)
 
-        related_events = await self._related_events(event)
-
         # It's possible that old room versions have non-integer power levels (floats or
         # strings; even the occasional `null`). For old rooms, we interpret these as if
         # they were integers. Do this here for the `@room` power level threshold.
@@ -400,11 +431,6 @@ class BulkPushRuleEvaluator:
             self.hs.config.experimental.msc1767_enabled,  # MSC3931 flag
         )
 
-        users = rules_by_user.keys()
-        profiles = await self.store.get_subset_users_in_room_with_profiles(
-            event.room_id, users
-        )
-
         for uid, rules in rules_by_user.items():
             if event.sender == uid:
                 continue