From 2b78981736f9004f99b1760e3e77b234f92755a7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 28 Feb 2023 18:49:28 +0000 Subject: Remove support for aggregating reactions (#15172) It turns out that no clients rely on server-side aggregation of `m.annotation` relationships: it's just not very useful as currently implemented. It's also non-trivial to calculate. I want to remove it from MSC2677, so to keep the implementation in line, let's remove it here. --- synapse/events/utils.py | 5 - synapse/handlers/relations.py | 76 +----------- synapse/storage/databases/main/cache.py | 3 - synapse/storage/databases/main/events.py | 4 - .../storage/databases/main/events_bg_updates.py | 3 - synapse/storage/databases/main/relations.py | 137 --------------------- 6 files changed, 1 insertion(+), 227 deletions(-) (limited to 'synapse') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index ebf8c7ed83..eaa6cad4af 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -516,11 +516,6 @@ class EventClientSerializer: # being serialized. serialized_aggregations = {} - if event_aggregations.annotations: - serialized_aggregations[ - RelationTypes.ANNOTATION - ] = event_aggregations.annotations - if event_aggregations.references: serialized_aggregations[ RelationTypes.REFERENCE diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 0fb15391e0..553053b694 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -60,13 +60,12 @@ class BundledAggregations: Some values require additional processing during serialization. """ - annotations: Optional[JsonDict] = None references: Optional[JsonDict] = None replace: Optional[EventBase] = None thread: Optional[_ThreadAggregation] = None def __bool__(self) -> bool: - return bool(self.annotations or self.references or self.replace or self.thread) + return bool(self.references or self.replace or self.thread) class RelationsHandler: @@ -227,67 +226,6 @@ class RelationsHandler: e.msg, ) - async def get_annotations_for_events( - self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset() - ) -> Dict[str, List[JsonDict]]: - """Get a list of annotations to the given events, grouped by event type and - aggregation key, sorted by count. - - This is used e.g. to get the what and how many reactions have happened - on an event. - - Args: - event_ids: Fetch events that relate to these event IDs. - ignored_users: The users ignored by the requesting user. - - Returns: - A map of event IDs to a list of groups of annotations that match. - Each entry is a dict with `type`, `key` and `count` fields. - """ - # Get the base results for all users. - full_results = await self._main_store.get_aggregation_groups_for_events( - event_ids - ) - - # Avoid additional logic if there are no ignored users. - if not ignored_users: - return { - event_id: results - for event_id, results in full_results.items() - if results - } - - # Then subtract off the results for any ignored users. - ignored_results = await self._main_store.get_aggregation_groups_for_users( - [event_id for event_id, results in full_results.items() if results], - ignored_users, - ) - - filtered_results = {} - for event_id, results in full_results.items(): - # If no annotations, skip. - if not results: - continue - - # If there are not ignored results for this event, copy verbatim. - if event_id not in ignored_results: - filtered_results[event_id] = results - continue - - # Otherwise, subtract out the ignored results. - event_ignored_results = ignored_results[event_id] - for result in results: - key = (result["type"], result["key"]) - if key in event_ignored_results: - # Ensure to not modify the cache. - result = result.copy() - result["count"] -= event_ignored_results[key] - if result["count"] <= 0: - continue - filtered_results.setdefault(event_id, []).append(result) - - return filtered_results - async def get_references_for_events( self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset() ) -> Dict[str, List[_RelatedEvent]]: @@ -531,17 +469,6 @@ class RelationsHandler: # (as that is what makes it part of the thread). relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD - async def _fetch_annotations() -> None: - """Fetch any annotations (ie, reactions) to bundle with this event.""" - annotations_by_event_id = await self.get_annotations_for_events( - events_by_id.keys(), ignored_users=ignored_users - ) - for event_id, annotations in annotations_by_event_id.items(): - if annotations: - results.setdefault(event_id, BundledAggregations()).annotations = { - "chunk": annotations - } - async def _fetch_references() -> None: """Fetch any references to bundle with this event.""" references_by_event_id = await self.get_references_for_events( @@ -575,7 +502,6 @@ class RelationsHandler: await make_deferred_yieldable( gather_results( ( - run_in_background(_fetch_annotations), run_in_background(_fetch_references), run_in_background(_fetch_edits), ) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 5b66431691..096dec7f87 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -266,9 +266,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if relates_to: self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,)) self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,)) - self._attempt_to_invalidate_cache( - "get_aggregation_groups_for_event", (relates_to,) - ) self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,)) self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,)) self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 73b8aea16c..a8a4ed4436 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2024,10 +2024,6 @@ class PersistEventsStore: self.store._invalidate_cache_and_stream( txn, self.store.get_relations_for_event, (redacted_relates_to,) ) - if rel_type == RelationTypes.ANNOTATION: - self.store._invalidate_cache_and_stream( - txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) - ) if rel_type == RelationTypes.REFERENCE: self.store._invalidate_cache_and_stream( txn, self.store.get_references_for_event, (redacted_relates_to,) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 0a275e6ce6..daef3685b0 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1219,9 +1219,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._invalidate_cache_and_stream( # type: ignore[attr-defined] txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] ) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_aggregation_groups_for_event, cache_tuple # type: ignore[attr-defined] - ) self._invalidate_cache_and_stream( # type: ignore[attr-defined] txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] ) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index fa3266c081..bc3a83919c 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -397,143 +397,6 @@ class RelationsWorkerStore(SQLBaseStore): ) return result is not None - @cached() - async def get_aggregation_groups_for_event( - self, event_id: str - ) -> Sequence[JsonDict]: - raise NotImplementedError() - - @cachedList( - cached_method_name="get_aggregation_groups_for_event", list_name="event_ids" - ) - async def get_aggregation_groups_for_events( - self, event_ids: Collection[str] - ) -> Mapping[str, Optional[List[JsonDict]]]: - """Get a list of annotations on the given events, grouped by event type and - aggregation key, sorted by count. - - This is used e.g. to get the what and how many reactions have happend - on an event. - - Args: - event_ids: Fetch events that relate to these event IDs. - - Returns: - A map of event IDs to a list of groups of annotations that match. - Each entry is a dict with `type`, `key` and `count` fields. - """ - # The number of entries to return per event ID. - limit = 5 - - clause, args = make_in_list_sql_clause( - self.database_engine, "relates_to_id", event_ids - ) - args.append(RelationTypes.ANNOTATION) - - sql = f""" - SELECT - relates_to_id, - annotation.type, - aggregation_key, - COUNT(DISTINCT annotation.sender) - FROM events AS annotation - INNER JOIN event_relations USING (event_id) - INNER JOIN events AS parent ON - parent.event_id = relates_to_id - AND parent.room_id = annotation.room_id - WHERE - {clause} - AND relation_type = ? - GROUP BY relates_to_id, annotation.type, aggregation_key - ORDER BY relates_to_id, COUNT(*) DESC - """ - - def _get_aggregation_groups_for_events_txn( - txn: LoggingTransaction, - ) -> Mapping[str, List[JsonDict]]: - txn.execute(sql, args) - - result: Dict[str, List[JsonDict]] = {} - for event_id, type, key, count in cast( - List[Tuple[str, str, str, int]], txn - ): - event_results = result.setdefault(event_id, []) - - # Limit the number of results per event ID. - if len(event_results) == limit: - continue - - event_results.append({"type": type, "key": key, "count": count}) - - return result - - return await self.db_pool.runInteraction( - "get_aggregation_groups_for_events", _get_aggregation_groups_for_events_txn - ) - - async def get_aggregation_groups_for_users( - self, event_ids: Collection[str], users: FrozenSet[str] - ) -> Dict[str, Dict[Tuple[str, str], int]]: - """Fetch the partial aggregations for an event for specific users. - - This is used, in conjunction with get_aggregation_groups_for_event, to - remove information from the results for ignored users. - - Args: - event_ids: Fetch events that relate to these event IDs. - users: The users to fetch information for. - - Returns: - A map of event ID to a map of (event type, aggregation key) to a - count of users. - """ - - if not users: - return {} - - events_sql, args = make_in_list_sql_clause( - self.database_engine, "relates_to_id", event_ids - ) - - users_sql, users_args = make_in_list_sql_clause( - self.database_engine, "annotation.sender", users - ) - args.extend(users_args) - args.append(RelationTypes.ANNOTATION) - - sql = f""" - SELECT - relates_to_id, - annotation.type, - aggregation_key, - COUNT(DISTINCT annotation.sender) - FROM events AS annotation - INNER JOIN event_relations USING (event_id) - INNER JOIN events AS parent ON - parent.event_id = relates_to_id - AND parent.room_id = annotation.room_id - WHERE {events_sql} AND {users_sql} AND relation_type = ? - GROUP BY relates_to_id, annotation.type, aggregation_key - ORDER BY relates_to_id, COUNT(*) DESC - """ - - def _get_aggregation_groups_for_users_txn( - txn: LoggingTransaction, - ) -> Dict[str, Dict[Tuple[str, str], int]]: - txn.execute(sql, args) - - result: Dict[str, Dict[Tuple[str, str], int]] = {} - for event_id, type, key, count in cast( - List[Tuple[str, str, str, int]], txn - ): - result.setdefault(event_id, {})[(type, key)] = count - - return result - - return await self.db_pool.runInteraction( - "get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn - ) - @cached() async def get_references_for_event(self, event_id: str) -> List[JsonDict]: raise NotImplementedError() -- cgit 1.4.1