summary refs log tree commit diff
path: root/synapse/handlers/relations.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/relations.py')
-rw-r--r--synapse/handlers/relations.py244
1 files changed, 214 insertions, 30 deletions
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index a36936b520..0be2319577 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -12,7 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Dict, Iterable, Optional
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+)
 
 import attr
 from frozendict import frozendict
@@ -20,7 +29,8 @@ from frozendict import frozendict
 from synapse.api.constants import RelationTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.storage.databases.main.relations import _RelatedEvent
+from synapse.types import JsonDict, Requester, StreamToken, UserID
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -115,6 +125,9 @@ class RelationsHandler:
         if event is None:
             raise SynapseError(404, "Unknown parent event.")
 
+        # Note that ignored users are not passed into get_relations_for_event
+        # below. Ignored users are handled in filter_events_for_client (and by
+        # not passing them in here we should get a better cache hit rate).
         related_events, next_token = await self._main_store.get_relations_for_event(
             event_id=event_id,
             event=event,
@@ -128,7 +141,9 @@ class RelationsHandler:
             to_token=to_token,
         )
 
-        events = await self._main_store.get_events_as_list(related_events)
+        events = await self._main_store.get_events_as_list(
+            [e.event_id for e in related_events]
+        )
 
         events = await filter_events_for_client(
             self._storage, user_id, events, is_peeking=(member_event_id is None)
@@ -162,8 +177,87 @@ class RelationsHandler:
 
         return return_value
 
+    async def get_relations_for_event(
+        self,
+        event_id: str,
+        event: EventBase,
+        room_id: str,
+        relation_type: str,
+        ignored_users: FrozenSet[str] = frozenset(),
+    ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
+        """Get a list of events which relate to an event, ordered by topological ordering.
+
+        Args:
+            event_id: Fetch events that relate to this event ID.
+            event: The matching EventBase to event_id.
+            room_id: The room the event belongs to.
+            relation_type: The type of relation.
+            ignored_users: The users ignored by the requesting user.
+
+        Returns:
+            List of event IDs that match relations requested. The rows are of
+            the form `{"event_id": "..."}`.
+        """
+
+        # Call the underlying storage method, which is cached.
+        related_events, next_token = await self._main_store.get_relations_for_event(
+            event_id, event, room_id, relation_type, direction="f"
+        )
+
+        # Filter out ignored users and convert to the expected format.
+        related_events = [
+            event for event in related_events if event.sender not in ignored_users
+        ]
+
+        return related_events, next_token
+
+    async def get_annotations_for_event(
+        self,
+        event_id: str,
+        room_id: str,
+        limit: int = 5,
+        ignored_users: FrozenSet[str] = frozenset(),
+    ) -> List[JsonDict]:
+        """Get a list of annotations on the event, grouped by event type and
+        aggregation key, sorted by count.
+
+        This is used e.g. to get the what and how many reactions have happend
+        on an event.
+
+        Args:
+            event_id: Fetch events that relate to this event ID.
+            room_id: The room the event belongs to.
+            limit: Only fetch the `limit` groups.
+            ignored_users: The users ignored by the requesting user.
+
+        Returns:
+            List of groups of annotations that match. Each row is a dict with
+            `type`, `key` and `count` fields.
+        """
+        # Get the base results for all users.
+        full_results = await self._main_store.get_aggregation_groups_for_event(
+            event_id, room_id, limit
+        )
+
+        # Then subtract off the results for any ignored users.
+        ignored_results = await self._main_store.get_aggregation_groups_for_users(
+            event_id, room_id, limit, ignored_users
+        )
+
+        filtered_results = []
+        for result in full_results:
+            key = (result["type"], result["key"])
+            if key in ignored_results:
+                result = result.copy()
+                result["count"] -= ignored_results[key]
+                if result["count"] <= 0:
+                    continue
+            filtered_results.append(result)
+
+        return filtered_results
+
     async def _get_bundled_aggregation_for_event(
-        self, event: EventBase, user_id: str
+        self, event: EventBase, ignored_users: FrozenSet[str]
     ) -> Optional[BundledAggregations]:
         """Generate bundled aggregations for an event.
 
@@ -171,7 +265,7 @@ class RelationsHandler:
 
         Args:
             event: The event to calculate bundled aggregations for.
-            user_id: The user requesting the bundled aggregations.
+            ignored_users: The users ignored by the requesting user.
 
         Returns:
             The bundled aggregations for an event, if bundled aggregations are
@@ -194,18 +288,22 @@ class RelationsHandler:
         # while others need more processing during serialization.
         aggregations = BundledAggregations()
 
-        annotations = await self._main_store.get_aggregation_groups_for_event(
-            event_id, room_id
+        annotations = await self.get_annotations_for_event(
+            event_id, room_id, ignored_users=ignored_users
         )
         if annotations:
             aggregations.annotations = {"chunk": annotations}
 
-        references, next_token = await self._main_store.get_relations_for_event(
-            event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
+        references, next_token = await self.get_relations_for_event(
+            event_id,
+            event,
+            room_id,
+            RelationTypes.REFERENCE,
+            ignored_users=ignored_users,
         )
         if references:
             aggregations.references = {
-                "chunk": [{"event_id": event_id} for event_id in references]
+                "chunk": [{"event_id": event.event_id} for event in references]
             }
 
             if next_token:
@@ -216,6 +314,99 @@ class RelationsHandler:
         # Store the bundled aggregations in the event metadata for later use.
         return aggregations
 
+    async def get_threads_for_events(
+        self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str]
+    ) -> Dict[str, _ThreadAggregation]:
+        """Get the bundled aggregations for threads for the requested events.
+
+        Args:
+            event_ids: Events to get aggregations for threads.
+            user_id: The user requesting the bundled aggregations.
+            ignored_users: The users ignored by the requesting user.
+
+        Returns:
+            A dictionary mapping event ID to the thread information.
+
+            May not contain a value for all requested event IDs.
+        """
+        user = UserID.from_string(user_id)
+
+        # Fetch thread summaries.
+        summaries = await self._main_store.get_thread_summaries(event_ids)
+
+        # Only fetch participated for a limited selection based on what had
+        # summaries.
+        thread_event_ids = [
+            event_id for event_id, summary in summaries.items() if summary
+        ]
+        participated = await self._main_store.get_threads_participated(
+            thread_event_ids, user_id
+        )
+
+        # Then subtract off the results for any ignored users.
+        ignored_results = await self._main_store.get_threaded_messages_per_user(
+            thread_event_ids, ignored_users
+        )
+
+        # A map of event ID to the thread aggregation.
+        results = {}
+
+        for event_id, summary in summaries.items():
+            if summary:
+                thread_count, latest_thread_event, edit = summary
+
+                # Subtract off the count of any ignored users.
+                for ignored_user in ignored_users:
+                    thread_count -= ignored_results.get((event_id, ignored_user), 0)
+
+                # This is gnarly, but if the latest event is from an ignored user,
+                # attempt to find one that isn't from an ignored user.
+                if latest_thread_event.sender in ignored_users:
+                    room_id = latest_thread_event.room_id
+
+                    # If the root event is not found, something went wrong, do
+                    # not include a summary of the thread.
+                    event = await self._event_handler.get_event(user, room_id, event_id)
+                    if event is None:
+                        continue
+
+                    potential_events, _ = await self.get_relations_for_event(
+                        event_id,
+                        event,
+                        room_id,
+                        RelationTypes.THREAD,
+                        ignored_users,
+                    )
+
+                    # If all found events are from ignored users, do not include
+                    # a summary of the thread.
+                    if not potential_events:
+                        continue
+
+                    # The *last* event returned is the one that is cared about.
+                    event = await self._event_handler.get_event(
+                        user, room_id, potential_events[-1].event_id
+                    )
+                    # It is unexpected that the event will not exist.
+                    if event is None:
+                        logger.warning(
+                            "Unable to fetch latest event in a thread with event ID: %s",
+                            potential_events[-1].event_id,
+                        )
+                        continue
+                    latest_thread_event = event
+
+                results[event_id] = _ThreadAggregation(
+                    latest_event=latest_thread_event,
+                    latest_edit=edit,
+                    count=thread_count,
+                    # If there's a thread summary it must also exist in the
+                    # participated dictionary.
+                    current_user_participated=participated[event_id],
+                )
+
+        return results
+
     async def get_bundled_aggregations(
         self, events: Iterable[EventBase], user_id: str
     ) -> Dict[str, BundledAggregations]:
@@ -239,13 +430,21 @@ class RelationsHandler:
         # event ID -> bundled aggregation in non-serialized form.
         results: Dict[str, BundledAggregations] = {}
 
+        # Fetch any ignored users of the requesting user.
+        ignored_users = await self._main_store.ignored_users(user_id)
+
         # Fetch other relations per event.
         for event in events_by_id.values():
-            event_result = await self._get_bundled_aggregation_for_event(event, user_id)
+            event_result = await self._get_bundled_aggregation_for_event(
+                event, ignored_users
+            )
             if event_result:
                 results[event.event_id] = event_result
 
         # Fetch any edits (but not for redacted events).
+        #
+        # Note that there is no use in limiting edits by ignored users since the
+        # parent event should be ignored in the first place if the user is ignored.
         edits = await self._main_store.get_applicable_edits(
             [
                 event_id
@@ -256,25 +455,10 @@ class RelationsHandler:
         for event_id, edit in edits.items():
             results.setdefault(event_id, BundledAggregations()).replace = edit
 
-        # Fetch thread summaries.
-        summaries = await self._main_store.get_thread_summaries(events_by_id.keys())
-        # Only fetch participated for a limited selection based on what had
-        # summaries.
-        participated = await self._main_store.get_threads_participated(
-            [event_id for event_id, summary in summaries.items() if summary], user_id
+        threads = await self.get_threads_for_events(
+            events_by_id.keys(), user_id, ignored_users
         )
-        for event_id, summary in summaries.items():
-            if summary:
-                thread_count, latest_thread_event, edit = summary
-                results.setdefault(
-                    event_id, BundledAggregations()
-                ).thread = _ThreadAggregation(
-                    latest_event=latest_thread_event,
-                    latest_edit=edit,
-                    count=thread_count,
-                    # If there's a thread summary it must also exist in the
-                    # participated dictionary.
-                    current_user_participated=participated[event_id],
-                )
+        for event_id, thread in threads.items():
+            results.setdefault(event_id, BundledAggregations()).thread = thread
 
         return results