diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index a36936b520..0be2319577 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -12,7 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, Iterable, Optional
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Dict,
+ FrozenSet,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+)
import attr
from frozendict import frozendict
@@ -20,7 +29,8 @@ from frozendict import frozendict
from synapse.api.constants import RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.storage.databases.main.relations import _RelatedEvent
+from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -115,6 +125,9 @@ class RelationsHandler:
if event is None:
raise SynapseError(404, "Unknown parent event.")
+ # Note that ignored users are not passed into get_relations_for_event
+ # below. Ignored users are handled in filter_events_for_client (and by
+ # not passing them in here we should get a better cache hit rate).
related_events, next_token = await self._main_store.get_relations_for_event(
event_id=event_id,
event=event,
@@ -128,7 +141,9 @@ class RelationsHandler:
to_token=to_token,
)
- events = await self._main_store.get_events_as_list(related_events)
+ events = await self._main_store.get_events_as_list(
+ [e.event_id for e in related_events]
+ )
events = await filter_events_for_client(
self._storage, user_id, events, is_peeking=(member_event_id is None)
@@ -162,8 +177,87 @@ class RelationsHandler:
return return_value
+ async def get_relations_for_event(
+ self,
+ event_id: str,
+ event: EventBase,
+ room_id: str,
+ relation_type: str,
+ ignored_users: FrozenSet[str] = frozenset(),
+ ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
+ """Get a list of events which relate to an event, ordered by topological ordering.
+
+ Args:
+ event_id: Fetch events that relate to this event ID.
+ event: The matching EventBase to event_id.
+ room_id: The room the event belongs to.
+ relation_type: The type of relation.
+ ignored_users: The users ignored by the requesting user.
+
+ Returns:
+ List of event IDs that match relations requested. The rows are of
+ the form `{"event_id": "..."}`.
+ """
+
+ # Call the underlying storage method, which is cached.
+ related_events, next_token = await self._main_store.get_relations_for_event(
+ event_id, event, room_id, relation_type, direction="f"
+ )
+
+ # Filter out ignored users and convert to the expected format.
+ related_events = [
+ event for event in related_events if event.sender not in ignored_users
+ ]
+
+ return related_events, next_token
+
+ async def get_annotations_for_event(
+ self,
+ event_id: str,
+ room_id: str,
+ limit: int = 5,
+ ignored_users: FrozenSet[str] = frozenset(),
+ ) -> List[JsonDict]:
+ """Get a list of annotations on the event, grouped by event type and
+ aggregation key, sorted by count.
+
+ This is used e.g. to get the what and how many reactions have happend
+ on an event.
+
+ Args:
+ event_id: Fetch events that relate to this event ID.
+ room_id: The room the event belongs to.
+ limit: Only fetch the `limit` groups.
+ ignored_users: The users ignored by the requesting user.
+
+ Returns:
+ List of groups of annotations that match. Each row is a dict with
+ `type`, `key` and `count` fields.
+ """
+ # Get the base results for all users.
+ full_results = await self._main_store.get_aggregation_groups_for_event(
+ event_id, room_id, limit
+ )
+
+ # Then subtract off the results for any ignored users.
+ ignored_results = await self._main_store.get_aggregation_groups_for_users(
+ event_id, room_id, limit, ignored_users
+ )
+
+ filtered_results = []
+ for result in full_results:
+ key = (result["type"], result["key"])
+ if key in ignored_results:
+ result = result.copy()
+ result["count"] -= ignored_results[key]
+ if result["count"] <= 0:
+ continue
+ filtered_results.append(result)
+
+ return filtered_results
+
async def _get_bundled_aggregation_for_event(
- self, event: EventBase, user_id: str
+ self, event: EventBase, ignored_users: FrozenSet[str]
) -> Optional[BundledAggregations]:
"""Generate bundled aggregations for an event.
@@ -171,7 +265,7 @@ class RelationsHandler:
Args:
event: The event to calculate bundled aggregations for.
- user_id: The user requesting the bundled aggregations.
+ ignored_users: The users ignored by the requesting user.
Returns:
The bundled aggregations for an event, if bundled aggregations are
@@ -194,18 +288,22 @@ class RelationsHandler:
# while others need more processing during serialization.
aggregations = BundledAggregations()
- annotations = await self._main_store.get_aggregation_groups_for_event(
- event_id, room_id
+ annotations = await self.get_annotations_for_event(
+ event_id, room_id, ignored_users=ignored_users
)
if annotations:
aggregations.annotations = {"chunk": annotations}
- references, next_token = await self._main_store.get_relations_for_event(
- event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
+ references, next_token = await self.get_relations_for_event(
+ event_id,
+ event,
+ room_id,
+ RelationTypes.REFERENCE,
+ ignored_users=ignored_users,
)
if references:
aggregations.references = {
- "chunk": [{"event_id": event_id} for event_id in references]
+ "chunk": [{"event_id": event.event_id} for event in references]
}
if next_token:
@@ -216,6 +314,99 @@ class RelationsHandler:
# Store the bundled aggregations in the event metadata for later use.
return aggregations
+ async def get_threads_for_events(
+ self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str]
+ ) -> Dict[str, _ThreadAggregation]:
+ """Get the bundled aggregations for threads for the requested events.
+
+ Args:
+ event_ids: Events to get aggregations for threads.
+ user_id: The user requesting the bundled aggregations.
+ ignored_users: The users ignored by the requesting user.
+
+ Returns:
+ A dictionary mapping event ID to the thread information.
+
+ May not contain a value for all requested event IDs.
+ """
+ user = UserID.from_string(user_id)
+
+ # Fetch thread summaries.
+ summaries = await self._main_store.get_thread_summaries(event_ids)
+
+ # Only fetch participated for a limited selection based on what had
+ # summaries.
+ thread_event_ids = [
+ event_id for event_id, summary in summaries.items() if summary
+ ]
+ participated = await self._main_store.get_threads_participated(
+ thread_event_ids, user_id
+ )
+
+ # Then subtract off the results for any ignored users.
+ ignored_results = await self._main_store.get_threaded_messages_per_user(
+ thread_event_ids, ignored_users
+ )
+
+ # A map of event ID to the thread aggregation.
+ results = {}
+
+ for event_id, summary in summaries.items():
+ if summary:
+ thread_count, latest_thread_event, edit = summary
+
+ # Subtract off the count of any ignored users.
+ for ignored_user in ignored_users:
+ thread_count -= ignored_results.get((event_id, ignored_user), 0)
+
+ # This is gnarly, but if the latest event is from an ignored user,
+ # attempt to find one that isn't from an ignored user.
+ if latest_thread_event.sender in ignored_users:
+ room_id = latest_thread_event.room_id
+
+ # If the root event is not found, something went wrong, do
+ # not include a summary of the thread.
+ event = await self._event_handler.get_event(user, room_id, event_id)
+ if event is None:
+ continue
+
+ potential_events, _ = await self.get_relations_for_event(
+ event_id,
+ event,
+ room_id,
+ RelationTypes.THREAD,
+ ignored_users,
+ )
+
+ # If all found events are from ignored users, do not include
+ # a summary of the thread.
+ if not potential_events:
+ continue
+
+ # The *last* event returned is the one that is cared about.
+ event = await self._event_handler.get_event(
+ user, room_id, potential_events[-1].event_id
+ )
+ # It is unexpected that the event will not exist.
+ if event is None:
+ logger.warning(
+ "Unable to fetch latest event in a thread with event ID: %s",
+ potential_events[-1].event_id,
+ )
+ continue
+ latest_thread_event = event
+
+ results[event_id] = _ThreadAggregation(
+ latest_event=latest_thread_event,
+ latest_edit=edit,
+ count=thread_count,
+ # If there's a thread summary it must also exist in the
+ # participated dictionary.
+ current_user_participated=participated[event_id],
+ )
+
+ return results
+
async def get_bundled_aggregations(
self, events: Iterable[EventBase], user_id: str
) -> Dict[str, BundledAggregations]:
@@ -239,13 +430,21 @@ class RelationsHandler:
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}
+ # Fetch any ignored users of the requesting user.
+ ignored_users = await self._main_store.ignored_users(user_id)
+
# Fetch other relations per event.
for event in events_by_id.values():
- event_result = await self._get_bundled_aggregation_for_event(event, user_id)
+ event_result = await self._get_bundled_aggregation_for_event(
+ event, ignored_users
+ )
if event_result:
results[event.event_id] = event_result
# Fetch any edits (but not for redacted events).
+ #
+ # Note that there is no use in limiting edits by ignored users since the
+ # parent event should be ignored in the first place if the user is ignored.
edits = await self._main_store.get_applicable_edits(
[
event_id
@@ -256,25 +455,10 @@ class RelationsHandler:
for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit
- # Fetch thread summaries.
- summaries = await self._main_store.get_thread_summaries(events_by_id.keys())
- # Only fetch participated for a limited selection based on what had
- # summaries.
- participated = await self._main_store.get_threads_participated(
- [event_id for event_id, summary in summaries.items() if summary], user_id
+ threads = await self.get_threads_for_events(
+ events_by_id.keys(), user_id, ignored_users
)
- for event_id, summary in summaries.items():
- if summary:
- thread_count, latest_thread_event, edit = summary
- results.setdefault(
- event_id, BundledAggregations()
- ).thread = _ThreadAggregation(
- latest_event=latest_thread_event,
- latest_edit=edit,
- count=thread_count,
- # If there's a thread summary it must also exist in the
- # participated dictionary.
- current_user_participated=participated[event_id],
- )
+ for event_id, thread in threads.items():
+ results.setdefault(event_id, BundledAggregations()).thread = thread
return results
|