summary refs log tree commit diff
path: root/synapse/handlers/relations.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2022-11-22 07:26:11 -0500
committerGitHub <noreply@github.com>2022-11-22 07:26:11 -0500
commit1799a54a545618782840a60950ef4b64da9ee24d (patch)
tree2a05d26deba1b743f51021d1e6da2107fa7f8560 /synapse/handlers/relations.py
parentMerge branch 'master' into develop (diff)
downloadsynapse-1799a54a545618782840a60950ef4b64da9ee24d.tar.xz
Batch fetch bundled annotations (#14491)
Avoid an n+1 query problem and fetch the bundled aggregations for
m.annotation relations in a single query instead of a query per event.

This applies similar logic for as was previously done for edits in
8b309adb436c162510ed1402f33b8741d71fc058 (#11660) and threads
in b65acead428653b988351ae8d7b22127a22039cd (#11752).
Diffstat (limited to 'synapse/handlers/relations.py')
-rw-r--r--synapse/handlers/relations.py197
1 files changed, 113 insertions, 84 deletions
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 8e71dda970..ca94239f61 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -13,7 +13,16 @@
 # limitations under the License.
 import enum
 import logging
-from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+)
 
 import attr
 
@@ -259,48 +268,64 @@ class RelationsHandler:
                     e.msg,
                 )
 
-    async def get_annotations_for_event(
-        self,
-        event_id: str,
-        room_id: str,
-        limit: int = 5,
-        ignored_users: FrozenSet[str] = frozenset(),
-    ) -> List[JsonDict]:
-        """Get a list of annotations on the event, grouped by event type and
+    async def get_annotations_for_events(
+        self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
+    ) -> Dict[str, List[JsonDict]]:
+        """Get a list of annotations to the given events, grouped by event type and
         aggregation key, sorted by count.
 
-        This is used e.g. to get the what and how many reactions have happend
+        This is used e.g. to get the what and how many reactions have happened
         on an event.
 
         Args:
-            event_id: Fetch events that relate to this event ID.
-            room_id: The room the event belongs to.
-            limit: Only fetch the `limit` groups.
+            event_ids: Fetch events that relate to these event IDs.
             ignored_users: The users ignored by the requesting user.
 
         Returns:
-            List of groups of annotations that match. Each row is a dict with
-            `type`, `key` and `count` fields.
+            A map of event IDs to a list of groups of annotations that match.
+            Each entry is a dict with `type`, `key` and `count` fields.
         """
         # Get the base results for all users.
-        full_results = await self._main_store.get_aggregation_groups_for_event(
-            event_id, room_id, limit
+        full_results = await self._main_store.get_aggregation_groups_for_events(
+            event_ids
         )
 
+        # Avoid additional logic if there are no ignored users.
+        if not ignored_users:
+            return {
+                event_id: results
+                for event_id, results in full_results.items()
+                if results
+            }
+
         # Then subtract off the results for any ignored users.
         ignored_results = await self._main_store.get_aggregation_groups_for_users(
-            event_id, room_id, limit, ignored_users
+            [event_id for event_id, results in full_results.items() if results],
+            ignored_users,
         )
 
-        filtered_results = []
-        for result in full_results:
-            key = (result["type"], result["key"])
-            if key in ignored_results:
-                result = result.copy()
-                result["count"] -= ignored_results[key]
-                if result["count"] <= 0:
-                    continue
-            filtered_results.append(result)
+        filtered_results = {}
+        for event_id, results in full_results.items():
+            # If no annotations, skip.
+            if not results:
+                continue
+
+            # If there are not ignored results for this event, copy verbatim.
+            if event_id not in ignored_results:
+                filtered_results[event_id] = results
+                continue
+
+            # Otherwise, subtract out the ignored results.
+            event_ignored_results = ignored_results[event_id]
+            for result in results:
+                key = (result["type"], result["key"])
+                if key in event_ignored_results:
+                    # Ensure to not modify the cache.
+                    result = result.copy()
+                    result["count"] -= event_ignored_results[key]
+                    if result["count"] <= 0:
+                        continue
+                filtered_results.setdefault(event_id, []).append(result)
 
         return filtered_results
 
@@ -366,59 +391,62 @@ class RelationsHandler:
         results = {}
 
         for event_id, summary in summaries.items():
-            if summary:
-                thread_count, latest_thread_event = summary
-
-                # Subtract off the count of any ignored users.
-                for ignored_user in ignored_users:
-                    thread_count -= ignored_results.get((event_id, ignored_user), 0)
-
-                # This is gnarly, but if the latest event is from an ignored user,
-                # attempt to find one that isn't from an ignored user.
-                if latest_thread_event.sender in ignored_users:
-                    room_id = latest_thread_event.room_id
-
-                    # If the root event is not found, something went wrong, do
-                    # not include a summary of the thread.
-                    event = await self._event_handler.get_event(user, room_id, event_id)
-                    if event is None:
-                        continue
+            # If no thread, skip.
+            if not summary:
+                continue
 
-                    potential_events, _ = await self.get_relations_for_event(
-                        event_id,
-                        event,
-                        room_id,
-                        RelationTypes.THREAD,
-                        ignored_users,
-                    )
+            thread_count, latest_thread_event = summary
 
-                    # If all found events are from ignored users, do not include
-                    # a summary of the thread.
-                    if not potential_events:
-                        continue
+            # Subtract off the count of any ignored users.
+            for ignored_user in ignored_users:
+                thread_count -= ignored_results.get((event_id, ignored_user), 0)
 
-                    # The *last* event returned is the one that is cared about.
-                    event = await self._event_handler.get_event(
-                        user, room_id, potential_events[-1].event_id
-                    )
-                    # It is unexpected that the event will not exist.
-                    if event is None:
-                        logger.warning(
-                            "Unable to fetch latest event in a thread with event ID: %s",
-                            potential_events[-1].event_id,
-                        )
-                        continue
-                    latest_thread_event = event
-
-                results[event_id] = _ThreadAggregation(
-                    latest_event=latest_thread_event,
-                    count=thread_count,
-                    # If there's a thread summary it must also exist in the
-                    # participated dictionary.
-                    current_user_participated=events_by_id[event_id].sender == user_id
-                    or participated[event_id],
+            # This is gnarly, but if the latest event is from an ignored user,
+            # attempt to find one that isn't from an ignored user.
+            if latest_thread_event.sender in ignored_users:
+                room_id = latest_thread_event.room_id
+
+                # If the root event is not found, something went wrong, do
+                # not include a summary of the thread.
+                event = await self._event_handler.get_event(user, room_id, event_id)
+                if event is None:
+                    continue
+
+                potential_events, _ = await self.get_relations_for_event(
+                    event_id,
+                    event,
+                    room_id,
+                    RelationTypes.THREAD,
+                    ignored_users,
                 )
 
+                # If all found events are from ignored users, do not include
+                # a summary of the thread.
+                if not potential_events:
+                    continue
+
+                # The *last* event returned is the one that is cared about.
+                event = await self._event_handler.get_event(
+                    user, room_id, potential_events[-1].event_id
+                )
+                # It is unexpected that the event will not exist.
+                if event is None:
+                    logger.warning(
+                        "Unable to fetch latest event in a thread with event ID: %s",
+                        potential_events[-1].event_id,
+                    )
+                    continue
+                latest_thread_event = event
+
+            results[event_id] = _ThreadAggregation(
+                latest_event=latest_thread_event,
+                count=thread_count,
+                # If there's a thread summary it must also exist in the
+                # participated dictionary.
+                current_user_participated=events_by_id[event_id].sender == user_id
+                or participated[event_id],
+            )
+
         return results
 
     @trace
@@ -496,17 +524,18 @@ class RelationsHandler:
                 # (as that is what makes it part of the thread).
                 relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD
 
-        # Fetch other relations per event.
-        for event in events_by_id.values():
-            # Fetch any annotations (ie, reactions) to bundle with this event.
-            annotations = await self.get_annotations_for_event(
-                event.event_id, event.room_id, ignored_users=ignored_users
-            )
+        # Fetch any annotations (ie, reactions) to bundle with this event.
+        annotations_by_event_id = await self.get_annotations_for_events(
+            events_by_id.keys(), ignored_users=ignored_users
+        )
+        for event_id, annotations in annotations_by_event_id.items():
             if annotations:
-                results.setdefault(
-                    event.event_id, BundledAggregations()
-                ).annotations = {"chunk": annotations}
+                results.setdefault(event_id, BundledAggregations()).annotations = {
+                    "chunk": annotations
+                }
 
+        # Fetch other relations per event.
+        for event in events_by_id.values():
             # Fetch any references to bundle with this event.
             references, next_token = await self.get_relations_for_event(
                 event.event_id,