diff options
Diffstat (limited to 'synapse/handlers/relations.py')
-rw-r--r-- | synapse/handlers/relations.py | 244 |
1 files changed, 214 insertions, 30 deletions
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index a36936b520..0be2319577 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -12,7 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, Optional +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Tuple, +) import attr from frozendict import frozendict @@ -20,7 +29,8 @@ from frozendict import frozendict from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase -from synapse.types import JsonDict, Requester, StreamToken +from synapse.storage.databases.main.relations import _RelatedEvent +from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -115,6 +125,9 @@ class RelationsHandler: if event is None: raise SynapseError(404, "Unknown parent event.") + # Note that ignored users are not passed into get_relations_for_event + # below. Ignored users are handled in filter_events_for_client (and by + # not passing them in here we should get a better cache hit rate). related_events, next_token = await self._main_store.get_relations_for_event( event_id=event_id, event=event, @@ -128,7 +141,9 @@ class RelationsHandler: to_token=to_token, ) - events = await self._main_store.get_events_as_list(related_events) + events = await self._main_store.get_events_as_list( + [e.event_id for e in related_events] + ) events = await filter_events_for_client( self._storage, user_id, events, is_peeking=(member_event_id is None) @@ -162,8 +177,87 @@ class RelationsHandler: return return_value + async def get_relations_for_event( + self, + event_id: str, + event: EventBase, + room_id: str, + relation_type: str, + ignored_users: FrozenSet[str] = frozenset(), + ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: + """Get a list of events which relate to an event, ordered by topological ordering. + + Args: + event_id: Fetch events that relate to this event ID. + event: The matching EventBase to event_id. + room_id: The room the event belongs to. + relation_type: The type of relation. + ignored_users: The users ignored by the requesting user. + + Returns: + List of event IDs that match relations requested. The rows are of + the form `{"event_id": "..."}`. + """ + + # Call the underlying storage method, which is cached. + related_events, next_token = await self._main_store.get_relations_for_event( + event_id, event, room_id, relation_type, direction="f" + ) + + # Filter out ignored users and convert to the expected format. + related_events = [ + event for event in related_events if event.sender not in ignored_users + ] + + return related_events, next_token + + async def get_annotations_for_event( + self, + event_id: str, + room_id: str, + limit: int = 5, + ignored_users: FrozenSet[str] = frozenset(), + ) -> List[JsonDict]: + """Get a list of annotations on the event, grouped by event type and + aggregation key, sorted by count. + + This is used e.g. to get the what and how many reactions have happend + on an event. + + Args: + event_id: Fetch events that relate to this event ID. + room_id: The room the event belongs to. + limit: Only fetch the `limit` groups. + ignored_users: The users ignored by the requesting user. + + Returns: + List of groups of annotations that match. Each row is a dict with + `type`, `key` and `count` fields. + """ + # Get the base results for all users. + full_results = await self._main_store.get_aggregation_groups_for_event( + event_id, room_id, limit + ) + + # Then subtract off the results for any ignored users. + ignored_results = await self._main_store.get_aggregation_groups_for_users( + event_id, room_id, limit, ignored_users + ) + + filtered_results = [] + for result in full_results: + key = (result["type"], result["key"]) + if key in ignored_results: + result = result.copy() + result["count"] -= ignored_results[key] + if result["count"] <= 0: + continue + filtered_results.append(result) + + return filtered_results + async def _get_bundled_aggregation_for_event( - self, event: EventBase, user_id: str + self, event: EventBase, ignored_users: FrozenSet[str] ) -> Optional[BundledAggregations]: """Generate bundled aggregations for an event. @@ -171,7 +265,7 @@ class RelationsHandler: Args: event: The event to calculate bundled aggregations for. - user_id: The user requesting the bundled aggregations. + ignored_users: The users ignored by the requesting user. Returns: The bundled aggregations for an event, if bundled aggregations are @@ -194,18 +288,22 @@ class RelationsHandler: # while others need more processing during serialization. aggregations = BundledAggregations() - annotations = await self._main_store.get_aggregation_groups_for_event( - event_id, room_id + annotations = await self.get_annotations_for_event( + event_id, room_id, ignored_users=ignored_users ) if annotations: aggregations.annotations = {"chunk": annotations} - references, next_token = await self._main_store.get_relations_for_event( - event_id, event, room_id, RelationTypes.REFERENCE, direction="f" + references, next_token = await self.get_relations_for_event( + event_id, + event, + room_id, + RelationTypes.REFERENCE, + ignored_users=ignored_users, ) if references: aggregations.references = { - "chunk": [{"event_id": event_id} for event_id in references] + "chunk": [{"event_id": event.event_id} for event in references] } if next_token: @@ -216,6 +314,99 @@ class RelationsHandler: # Store the bundled aggregations in the event metadata for later use. return aggregations + async def get_threads_for_events( + self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str] + ) -> Dict[str, _ThreadAggregation]: + """Get the bundled aggregations for threads for the requested events. + + Args: + event_ids: Events to get aggregations for threads. + user_id: The user requesting the bundled aggregations. + ignored_users: The users ignored by the requesting user. + + Returns: + A dictionary mapping event ID to the thread information. + + May not contain a value for all requested event IDs. + """ + user = UserID.from_string(user_id) + + # Fetch thread summaries. + summaries = await self._main_store.get_thread_summaries(event_ids) + + # Only fetch participated for a limited selection based on what had + # summaries. + thread_event_ids = [ + event_id for event_id, summary in summaries.items() if summary + ] + participated = await self._main_store.get_threads_participated( + thread_event_ids, user_id + ) + + # Then subtract off the results for any ignored users. + ignored_results = await self._main_store.get_threaded_messages_per_user( + thread_event_ids, ignored_users + ) + + # A map of event ID to the thread aggregation. + results = {} + + for event_id, summary in summaries.items(): + if summary: + thread_count, latest_thread_event, edit = summary + + # Subtract off the count of any ignored users. + for ignored_user in ignored_users: + thread_count -= ignored_results.get((event_id, ignored_user), 0) + + # This is gnarly, but if the latest event is from an ignored user, + # attempt to find one that isn't from an ignored user. + if latest_thread_event.sender in ignored_users: + room_id = latest_thread_event.room_id + + # If the root event is not found, something went wrong, do + # not include a summary of the thread. + event = await self._event_handler.get_event(user, room_id, event_id) + if event is None: + continue + + potential_events, _ = await self.get_relations_for_event( + event_id, + event, + room_id, + RelationTypes.THREAD, + ignored_users, + ) + + # If all found events are from ignored users, do not include + # a summary of the thread. + if not potential_events: + continue + + # The *last* event returned is the one that is cared about. + event = await self._event_handler.get_event( + user, room_id, potential_events[-1].event_id + ) + # It is unexpected that the event will not exist. + if event is None: + logger.warning( + "Unable to fetch latest event in a thread with event ID: %s", + potential_events[-1].event_id, + ) + continue + latest_thread_event = event + + results[event_id] = _ThreadAggregation( + latest_event=latest_thread_event, + latest_edit=edit, + count=thread_count, + # If there's a thread summary it must also exist in the + # participated dictionary. + current_user_participated=participated[event_id], + ) + + return results + async def get_bundled_aggregations( self, events: Iterable[EventBase], user_id: str ) -> Dict[str, BundledAggregations]: @@ -239,13 +430,21 @@ class RelationsHandler: # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} + # Fetch any ignored users of the requesting user. + ignored_users = await self._main_store.ignored_users(user_id) + # Fetch other relations per event. for event in events_by_id.values(): - event_result = await self._get_bundled_aggregation_for_event(event, user_id) + event_result = await self._get_bundled_aggregation_for_event( + event, ignored_users + ) if event_result: results[event.event_id] = event_result # Fetch any edits (but not for redacted events). + # + # Note that there is no use in limiting edits by ignored users since the + # parent event should be ignored in the first place if the user is ignored. edits = await self._main_store.get_applicable_edits( [ event_id @@ -256,25 +455,10 @@ class RelationsHandler: for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit - # Fetch thread summaries. - summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) - # Only fetch participated for a limited selection based on what had - # summaries. - participated = await self._main_store.get_threads_participated( - [event_id for event_id, summary in summaries.items() if summary], user_id + threads = await self.get_threads_for_events( + events_by_id.keys(), user_id, ignored_users ) - for event_id, summary in summaries.items(): - if summary: - thread_count, latest_thread_event, edit = summary - results.setdefault( - event_id, BundledAggregations() - ).thread = _ThreadAggregation( - latest_event=latest_thread_event, - latest_edit=edit, - count=thread_count, - # If there's a thread summary it must also exist in the - # participated dictionary. - current_user_participated=participated[event_id], - ) + for event_id, thread in threads.items(): + results.setdefault(event_id, BundledAggregations()).thread = thread return results |