diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2022-11-22 09:47:32 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-22 09:47:32 -0500 |
commit | 7eb74600423e00c6982493eed18551d7f294140d (patch) | |
tree | 402003f3ce819f459c8605cc3ccf2d88e6fd420b /synapse/handlers/relations.py | |
parent | Batch fetch bundled references (#14508) (diff) | |
download | synapse-7eb74600423e00c6982493eed18551d7f294140d.tar.xz |
Parallelize calls to fetch bundled aggregations. (#14510)
The bundled aggregations for annotations, references, and edits can be parallelized.
Diffstat (limited to 'synapse/handlers/relations.py')
-rw-r--r-- | synapse/handlers/relations.py | 83 |
1 files changed, 51 insertions, 32 deletions
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 8414be5879..e96f9999a8 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -20,10 +20,12 @@ import attr from synapse.api.constants import EventTypes, RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase, relation_from_event +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import trace from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, UserID +from synapse.util.async_helpers import gather_results from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -525,39 +527,56 @@ class RelationsHandler: # (as that is what makes it part of the thread). relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD - # Fetch any annotations (ie, reactions) to bundle with this event. - annotations_by_event_id = await self.get_annotations_for_events( - events_by_id.keys(), ignored_users=ignored_users - ) - for event_id, annotations in annotations_by_event_id.items(): - if annotations: - results.setdefault(event_id, BundledAggregations()).annotations = { - "chunk": annotations - } - - # Fetch any references to bundle with this event. - references_by_event_id = await self.get_references_for_events( - events_by_id.keys(), ignored_users=ignored_users - ) - for event_id, references in references_by_event_id.items(): - if references: - results.setdefault(event_id, BundledAggregations()).references = { - "chunk": [{"event_id": ev.event_id} for ev in references] - } - - # Fetch any edits (but not for redacted events). - # - # Note that there is no use in limiting edits by ignored users since the - # parent event should be ignored in the first place if the user is ignored. - edits = await self._main_store.get_applicable_edits( - [ - event_id - for event_id, event in events_by_id.items() - if not event.internal_metadata.is_redacted() - ] + async def _fetch_annotations() -> None: + """Fetch any annotations (ie, reactions) to bundle with this event.""" + annotations_by_event_id = await self.get_annotations_for_events( + events_by_id.keys(), ignored_users=ignored_users + ) + for event_id, annotations in annotations_by_event_id.items(): + if annotations: + results.setdefault(event_id, BundledAggregations()).annotations = { + "chunk": annotations + } + + async def _fetch_references() -> None: + """Fetch any references to bundle with this event.""" + references_by_event_id = await self.get_references_for_events( + events_by_id.keys(), ignored_users=ignored_users + ) + for event_id, references in references_by_event_id.items(): + if references: + results.setdefault(event_id, BundledAggregations()).references = { + "chunk": [{"event_id": ev.event_id} for ev in references] + } + + async def _fetch_edits() -> None: + """ + Fetch any edits (but not for redacted events). + + Note that there is no use in limiting edits by ignored users since the + parent event should be ignored in the first place if the user is ignored. + """ + edits = await self._main_store.get_applicable_edits( + [ + event_id + for event_id, event in events_by_id.items() + if not event.internal_metadata.is_redacted() + ] + ) + for event_id, edit in edits.items(): + results.setdefault(event_id, BundledAggregations()).replace = edit + + # Parallelize the calls for annotations, references, and edits since they + # are unrelated. + await make_deferred_yieldable( + gather_results( + ( + run_in_background(_fetch_annotations), + run_in_background(_fetch_references), + run_in_background(_fetch_edits), + ) + ) ) - for event_id, edit in edits.items(): - results.setdefault(event_id, BundledAggregations()).replace = edit return results |