summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/utils.py126
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/initial_sync.py16
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/pagination.py8
-rw-r--r--synapse/handlers/room.py10
-rw-r--r--synapse/handlers/search.py10
-rw-r--r--synapse/rest/admin/rooms.py16
-rw-r--r--synapse/rest/client/events.py2
-rw-r--r--synapse/rest/client/notifications.py2
-rw-r--r--synapse/rest/client/relations.py11
-rw-r--r--synapse/rest/client/room.py28
-rw-r--r--synapse/rest/client/sync.py39
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/databases/main/relations.py128
15 files changed, 247 insertions, 155 deletions
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 2038e72924..de0e0c1731 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -14,17 +14,7 @@
 # limitations under the License.
 import collections.abc
 import re
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    Callable,
-    Dict,
-    Iterable,
-    List,
-    Mapping,
-    Optional,
-    Union,
-)
+from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union
 
 from frozendict import frozendict
 
@@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersion
 from synapse.types import JsonDict
-from synapse.util.async_helpers import yieldable_gather_results
 from synapse.util.frozenutils import unfreeze
 
 from . import EventBase
 
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
 # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
 # (?<!stuff) matches if the current position in the string is not preceded
 # by a match for 'stuff'.
@@ -385,17 +371,12 @@ class EventClientSerializer:
     clients.
     """
 
-    def __init__(self, hs: "HomeServer"):
-        self.store = hs.get_datastore()
-        self._msc1849_enabled = hs.config.experimental.msc1849_enabled
-        self._msc3440_enabled = hs.config.experimental.msc3440_enabled
-
-    async def serialize_event(
+    def serialize_event(
         self,
         event: Union[JsonDict, EventBase],
         time_now: int,
         *,
-        bundle_aggregations: bool = False,
+        bundle_aggregations: Optional[Dict[str, JsonDict]] = None,
         **kwargs: Any,
     ) -> JsonDict:
         """Serializes a single event.
@@ -418,66 +399,41 @@ class EventClientSerializer:
         serialized_event = serialize_event(event, time_now, **kwargs)
 
         # Check if there are any bundled aggregations to include with the event.
-        #
-        # Do not bundle aggregations if any of the following at true:
-        #
-        # * Support is disabled via the configuration or the caller.
-        # * The event is a state event.
-        # * The event has been redacted.
-        if (
-            self._msc1849_enabled
-            and bundle_aggregations
-            and not event.is_state()
-            and not event.internal_metadata.is_redacted()
-        ):
-            await self._injected_bundled_aggregations(event, time_now, serialized_event)
+        if bundle_aggregations:
+            event_aggregations = bundle_aggregations.get(event.event_id)
+            if event_aggregations:
+                self._injected_bundled_aggregations(
+                    event,
+                    time_now,
+                    bundle_aggregations[event.event_id],
+                    serialized_event,
+                )
 
         return serialized_event
 
-    async def _injected_bundled_aggregations(
-        self, event: EventBase, time_now: int, serialized_event: JsonDict
+    def _injected_bundled_aggregations(
+        self,
+        event: EventBase,
+        time_now: int,
+        aggregations: JsonDict,
+        serialized_event: JsonDict,
     ) -> None:
         """Potentially injects bundled aggregations into the unsigned portion of the serialized event.
 
         Args:
             event: The event being serialized.
             time_now: The current time in milliseconds
+            aggregations: The bundled aggregation to serialize.
             serialized_event: The serialized event which may be modified.
 
         """
-        # Do not bundle aggregations for an event which represents an edit or an
-        # annotation. It does not make sense for them to have related events.
-        relates_to = event.content.get("m.relates_to")
-        if isinstance(relates_to, (dict, frozendict)):
-            relation_type = relates_to.get("rel_type")
-            if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
-                return
-
-        event_id = event.event_id
-        room_id = event.room_id
-
-        # The bundled aggregations to include.
-        aggregations = {}
-
-        annotations = await self.store.get_aggregation_groups_for_event(
-            event_id, room_id
-        )
-        if annotations.chunk:
-            aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
+        # Make a copy in-case the object is cached.
+        aggregations = aggregations.copy()
 
-        references = await self.store.get_relations_for_event(
-            event_id, room_id, RelationTypes.REFERENCE, direction="f"
-        )
-        if references.chunk:
-            aggregations[RelationTypes.REFERENCE] = references.to_dict()
-
-        edit = None
-        if event.type == EventTypes.Message:
-            edit = await self.store.get_applicable_edit(event_id, room_id)
-
-        if edit:
+        if RelationTypes.REPLACE in aggregations:
             # If there is an edit replace the content, preserving existing
             # relations.
+            edit = aggregations[RelationTypes.REPLACE]
 
             # Ensure we take copies of the edit content, otherwise we risk modifying
             # the original event.
@@ -502,27 +458,19 @@ class EventClientSerializer:
             }
 
         # If this event is the start of a thread, include a summary of the replies.
-        if self._msc3440_enabled:
-            (
-                thread_count,
-                latest_thread_event,
-            ) = await self.store.get_thread_summary(event_id, room_id)
-            if latest_thread_event:
-                aggregations[RelationTypes.THREAD] = {
-                    # Don't bundle aggregations as this could recurse forever.
-                    "latest_event": await self.serialize_event(
-                        latest_thread_event, time_now, bundle_aggregations=False
-                    ),
-                    "count": thread_count,
-                }
-
-        # If any bundled aggregations were found, include them.
-        if aggregations:
-            serialized_event["unsigned"].setdefault("m.relations", {}).update(
-                aggregations
+        if RelationTypes.THREAD in aggregations:
+            # Serialize the latest thread event.
+            latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"]
+
+            # Don't bundle aggregations as this could recurse forever.
+            aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event(
+                latest_thread_event, time_now, bundle_aggregations=None
             )
 
-    async def serialize_events(
+        # Include the bundled aggregations in the event.
+        serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations)
+
+    def serialize_events(
         self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
     ) -> List[JsonDict]:
         """Serializes multiple events.
@@ -535,9 +483,9 @@ class EventClientSerializer:
         Returns:
             The list of serialized events
         """
-        return await yieldable_gather_results(
-            self.serialize_event, events, time_now=time_now, **kwargs
-        )
+        return [
+            self.serialize_event(event, time_now=time_now, **kwargs) for event in events
+        ]
 
 
 def copy_power_levels_contents(
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1b996c420d..a3add8a586 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -119,7 +119,7 @@ class EventStreamHandler:
 
             events.extend(to_add)
 
-            chunks = await self._event_serializer.serialize_events(
+            chunks = self._event_serializer.serialize_events(
                 events,
                 time_now,
                 as_client_event=as_client_event,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 601bab67f9..346a06ff49 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -170,7 +170,7 @@ class InitialSyncHandler:
                 d["inviter"] = event.sender
 
                 invite_event = await self.store.get_event(event.event_id)
-                d["invite"] = await self._event_serializer.serialize_event(
+                d["invite"] = self._event_serializer.serialize_event(
                     invite_event,
                     time_now,
                     as_client_event=as_client_event,
@@ -222,7 +222,7 @@ class InitialSyncHandler:
 
                 d["messages"] = {
                     "chunk": (
-                        await self._event_serializer.serialize_events(
+                        self._event_serializer.serialize_events(
                             messages,
                             time_now=time_now,
                             as_client_event=as_client_event,
@@ -232,7 +232,7 @@ class InitialSyncHandler:
                     "end": await end_token.to_string(self.store),
                 }
 
-                d["state"] = await self._event_serializer.serialize_events(
+                d["state"] = self._event_serializer.serialize_events(
                     current_state.values(),
                     time_now=time_now,
                     as_client_event=as_client_event,
@@ -376,16 +376,14 @@ class InitialSyncHandler:
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    await self._event_serializer.serialize_events(messages, time_now)
+                    self._event_serializer.serialize_events(messages, time_now)
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
             },
             "state": (
                 # Don't bundle aggregations as this is a deprecated API.
-                await self._event_serializer.serialize_events(
-                    room_state.values(), time_now
-                )
+                self._event_serializer.serialize_events(room_state.values(), time_now)
             ),
             "presence": [],
             "receipts": [],
@@ -404,7 +402,7 @@ class InitialSyncHandler:
         # TODO: These concurrently
         time_now = self.clock.time_msec()
         # Don't bundle aggregations as this is a deprecated API.
-        state = await self._event_serializer.serialize_events(
+        state = self._event_serializer.serialize_events(
             current_state.values(), time_now
         )
 
@@ -480,7 +478,7 @@ class InitialSyncHandler:
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    await self._event_serializer.serialize_events(messages, time_now)
+                    self._event_serializer.serialize_events(messages, time_now)
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5e3d3886eb..b37250aa38 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -246,7 +246,7 @@ class MessageHandler:
                 room_state = room_state_events[membership_event_id]
 
         now = self.clock.time_msec()
-        events = await self._event_serializer.serialize_events(room_state.values(), now)
+        events = self._event_serializer.serialize_events(room_state.values(), now)
         return events
 
     async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 7469cc55a2..472688f045 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -537,14 +537,16 @@ class PaginationHandler:
                 state_dict = await self.store.get_events(list(state_ids.values()))
                 state = state_dict.values()
 
+        aggregations = await self.store.get_bundled_aggregations(events)
+
         time_now = self.clock.time_msec()
 
         chunk = {
             "chunk": (
-                await self._event_serializer.serialize_events(
+                self._event_serializer.serialize_events(
                     events,
                     time_now,
-                    bundle_aggregations=True,
+                    bundle_aggregations=aggregations,
                     as_client_event=as_client_event,
                 )
             ),
@@ -553,7 +555,7 @@ class PaginationHandler:
         }
 
         if state:
-            chunk["state"] = await self._event_serializer.serialize_events(
+            chunk["state"] = self._event_serializer.serialize_events(
                 state, time_now, as_client_event=as_client_event
             )
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3d3a0f6ac3..3d47163f25 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1181,6 +1181,16 @@ class RoomContextHandler:
         # `filtered` rather than the event we retrieved from the datastore.
         results["event"] = filtered[0]
 
+        # Fetch the aggregations.
+        aggregations = await self.store.get_bundled_aggregations([results["event"]])
+        aggregations.update(
+            await self.store.get_bundled_aggregations(results["events_before"])
+        )
+        aggregations.update(
+            await self.store.get_bundled_aggregations(results["events_after"])
+        )
+        results["aggregations"] = aggregations
+
         if results["events_after"]:
             last_event_id = results["events_after"][-1].event_id
         else:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index ab7eaab2fb..0b153a6822 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -420,10 +420,10 @@ class SearchHandler:
         time_now = self.clock.time_msec()
 
         for context in contexts.values():
-            context["events_before"] = await self._event_serializer.serialize_events(
+            context["events_before"] = self._event_serializer.serialize_events(
                 context["events_before"], time_now
             )
-            context["events_after"] = await self._event_serializer.serialize_events(
+            context["events_after"] = self._event_serializer.serialize_events(
                 context["events_after"], time_now
             )
 
@@ -441,9 +441,7 @@ class SearchHandler:
             results.append(
                 {
                     "rank": rank_map[e.event_id],
-                    "result": (
-                        await self._event_serializer.serialize_event(e, time_now)
-                    ),
+                    "result": self._event_serializer.serialize_event(e, time_now),
                     "context": contexts.get(e.event_id, {}),
                 }
             )
@@ -457,7 +455,7 @@ class SearchHandler:
         if state_results:
             s = {}
             for room_id, state_events in state_results.items():
-                s[room_id] = await self._event_serializer.serialize_events(
+                s[room_id] = self._event_serializer.serialize_events(
                     state_events, time_now
                 )
 
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 6030373ebc..2e714ac87b 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -424,7 +424,7 @@ class RoomStateRestServlet(RestServlet):
         event_ids = await self.store.get_current_state_ids(room_id)
         events = await self.store.get_events(event_ids.values())
         now = self.clock.time_msec()
-        room_state = await self._event_serializer.serialize_events(events.values(), now)
+        room_state = self._event_serializer.serialize_events(events.values(), now)
         ret = {"state": room_state}
 
         return HTTPStatus.OK, ret
@@ -744,22 +744,22 @@ class RoomEventContextServlet(RestServlet):
             )
 
         time_now = self.clock.time_msec()
-        results["events_before"] = await self._event_serializer.serialize_events(
+        results["events_before"] = self._event_serializer.serialize_events(
             results["events_before"],
             time_now,
-            bundle_aggregations=True,
+            bundle_aggregations=results["aggregations"],
         )
-        results["event"] = await self._event_serializer.serialize_event(
+        results["event"] = self._event_serializer.serialize_event(
             results["event"],
             time_now,
-            bundle_aggregations=True,
+            bundle_aggregations=results["aggregations"],
         )
-        results["events_after"] = await self._event_serializer.serialize_events(
+        results["events_after"] = self._event_serializer.serialize_events(
             results["events_after"],
             time_now,
-            bundle_aggregations=True,
+            bundle_aggregations=results["aggregations"],
         )
-        results["state"] = await self._event_serializer.serialize_events(
+        results["state"] = self._event_serializer.serialize_events(
             results["state"], time_now
         )
 
diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py
index 13b72a045a..672c821061 100644
--- a/synapse/rest/client/events.py
+++ b/synapse/rest/client/events.py
@@ -91,7 +91,7 @@ class EventRestServlet(RestServlet):
 
         time_now = self.clock.time_msec()
         if event:
-            result = await self._event_serializer.serialize_event(event, time_now)
+            result = self._event_serializer.serialize_event(event, time_now)
             return 200, result
         else:
             return 404, "Event not found."
diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py
index acd0c9e135..8e427a96a3 100644
--- a/synapse/rest/client/notifications.py
+++ b/synapse/rest/client/notifications.py
@@ -72,7 +72,7 @@ class NotificationsServlet(RestServlet):
                 "actions": pa.actions,
                 "ts": pa.received_ts,
                 "event": (
-                    await self._event_serializer.serialize_event(
+                    self._event_serializer.serialize_event(
                         notif_events[pa.event_id],
                         self.clock.time_msec(),
                         event_format=format_event_for_client_v2_without_room_id,
diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py
index 3823498012..37d949a71e 100644
--- a/synapse/rest/client/relations.py
+++ b/synapse/rest/client/relations.py
@@ -113,13 +113,14 @@ class RelationPaginationServlet(RestServlet):
         now = self.clock.time_msec()
         # Do not bundle aggregations when retrieving the original event because
         # we want the content before relations are applied to it.
-        original_event = await self._event_serializer.serialize_event(
-            event, now, bundle_aggregations=False
+        original_event = self._event_serializer.serialize_event(
+            event, now, bundle_aggregations=None
         )
         # The relations returned for the requested event do include their
         # bundled aggregations.
-        serialized_events = await self._event_serializer.serialize_events(
-            events, now, bundle_aggregations=True
+        aggregations = await self.store.get_bundled_aggregations(events)
+        serialized_events = self._event_serializer.serialize_events(
+            events, now, bundle_aggregations=aggregations
         )
 
         return_value = pagination_chunk.to_dict()
@@ -308,7 +309,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet):
         )
 
         now = self.clock.time_msec()
-        serialized_events = await self._event_serializer.serialize_events(events, now)
+        serialized_events = self._event_serializer.serialize_events(events, now)
 
         return_value = result.to_dict()
         return_value["chunk"] = serialized_events
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 40330749e5..da6014900a 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet):
     def __init__(self, hs: "HomeServer"):
         super().__init__()
         self.clock = hs.get_clock()
+        self._store = hs.get_datastore()
         self.event_handler = hs.get_event_handler()
         self._event_serializer = hs.get_event_client_serializer()
         self.auth = hs.get_auth()
@@ -660,10 +661,13 @@ class RoomEventServlet(RestServlet):
             # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
             raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
 
-        time_now = self.clock.time_msec()
         if event:
-            event_dict = await self._event_serializer.serialize_event(
-                event, time_now, bundle_aggregations=True
+            # Ensure there are bundled aggregations available.
+            aggregations = await self._store.get_bundled_aggregations([event])
+
+            time_now = self.clock.time_msec()
+            event_dict = self._event_serializer.serialize_event(
+                event, time_now, bundle_aggregations=aggregations
             )
             return 200, event_dict
 
@@ -708,16 +712,20 @@ class RoomEventContextServlet(RestServlet):
             raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
 
         time_now = self.clock.time_msec()
-        results["events_before"] = await self._event_serializer.serialize_events(
-            results["events_before"], time_now, bundle_aggregations=True
+        results["events_before"] = self._event_serializer.serialize_events(
+            results["events_before"],
+            time_now,
+            bundle_aggregations=results["aggregations"],
         )
-        results["event"] = await self._event_serializer.serialize_event(
-            results["event"], time_now, bundle_aggregations=True
+        results["event"] = self._event_serializer.serialize_event(
+            results["event"], time_now, bundle_aggregations=results["aggregations"]
         )
-        results["events_after"] = await self._event_serializer.serialize_events(
-            results["events_after"], time_now, bundle_aggregations=True
+        results["events_after"] = self._event_serializer.serialize_events(
+            results["events_after"],
+            time_now,
+            bundle_aggregations=results["aggregations"],
         )
-        results["state"] = await self._event_serializer.serialize_events(
+        results["state"] = self._event_serializer.serialize_events(
             results["state"], time_now
         )
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index e99a943d0d..a3e57e4b20 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -17,7 +17,6 @@ from collections import defaultdict
 from typing import (
     TYPE_CHECKING,
     Any,
-    Awaitable,
     Callable,
     Dict,
     Iterable,
@@ -395,7 +394,7 @@ class SyncRestServlet(RestServlet):
         """
         invited = {}
         for room in rooms:
-            invite = await self._event_serializer.serialize_event(
+            invite = self._event_serializer.serialize_event(
                 room.invite,
                 time_now,
                 token_id=token_id,
@@ -432,7 +431,7 @@ class SyncRestServlet(RestServlet):
         """
         knocked = {}
         for room in rooms:
-            knock = await self._event_serializer.serialize_event(
+            knock = self._event_serializer.serialize_event(
                 room.knock,
                 time_now,
                 token_id=token_id,
@@ -525,21 +524,14 @@ class SyncRestServlet(RestServlet):
             The room, encoded in our response format
         """
 
-        def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
+        def serialize(
+            events: Iterable[EventBase],
+            aggregations: Optional[Dict[str, Dict[str, Any]]] = None,
+        ) -> List[JsonDict]:
             return self._event_serializer.serialize_events(
                 events,
                 time_now=time_now,
-                # Don't bother to bundle aggregations if the timeline is unlimited,
-                # as clients will have all the necessary information.
-                # bundle_aggregations=room.timeline.limited,
-                #
-                # richvdh 2021-12-15: disable this temporarily as it has too high an
-                # overhead for initialsyncs. We need to figure out a way that the
-                # bundling can be done *before* the events are stored in the
-                # SyncResponseCache so that this part can be synchronous.
-                #
-                # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
-                bundle_aggregations=False,
+                bundle_aggregations=aggregations,
                 token_id=token_id,
                 event_format=event_formatter,
                 only_event_fields=only_fields,
@@ -561,8 +553,21 @@ class SyncRestServlet(RestServlet):
                     event.room_id,
                 )
 
-        serialized_state = await serialize(state_events)
-        serialized_timeline = await serialize(timeline_events)
+        serialized_state = serialize(state_events)
+        # Don't bother to bundle aggregations if the timeline is unlimited,
+        # as clients will have all the necessary information.
+        # bundle_aggregations=room.timeline.limited,
+        #
+        # richvdh 2021-12-15: disable this temporarily as it has too high an
+        # overhead for initialsyncs. We need to figure out a way that the
+        # bundling can be done *before* the events are stored in the
+        # SyncResponseCache so that this part can be synchronous.
+        #
+        # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
+        # if room.timeline.limited:
+        #    aggregations = await self.store.get_bundled_aggregations(timeline_events)
+        aggregations = None
+        serialized_timeline = serialize(timeline_events, aggregations)
 
         account_data = room.account_data
 
diff --git a/synapse/server.py b/synapse/server.py
index 185e40e4da..3032f0b738 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -759,7 +759,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_event_client_serializer(self) -> EventClientSerializer:
-        return EventClientSerializer(self)
+        return EventClientSerializer()
 
     @cache_in_self
     def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 4ff6aed253..c6c4bd18da 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -13,14 +13,30 @@
 # limitations under the License.
 
 import logging
-from typing import List, Optional, Tuple, Union, cast
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    Union,
+    cast,
+)
 
 import attr
+from frozendict import frozendict
 
-from synapse.api.constants import RelationTypes
+from synapse.api.constants import EventTypes, RelationTypes
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+    make_in_list_sql_clause,
+)
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
 from synapse.storage.relations import (
     AggregationPaginationToken,
@@ -29,10 +45,24 @@ from synapse.storage.relations import (
 )
 from synapse.util.caches.descriptors import cached
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class RelationsWorkerStore(SQLBaseStore):
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self._msc1849_enabled = hs.config.experimental.msc1849_enabled
+        self._msc3440_enabled = hs.config.experimental.msc3440_enabled
+
     @cached(tree=True)
     async def get_relations_for_event(
         self,
@@ -515,6 +545,98 @@ class RelationsWorkerStore(SQLBaseStore):
             "get_if_user_has_annotated_event", _get_if_user_has_annotated_event
         )
 
+    async def _get_bundled_aggregation_for_event(
+        self, event: EventBase
+    ) -> Optional[Dict[str, Any]]:
+        """Generate bundled aggregations for an event.
+
+        Note that this does not use a cache, but depends on cached methods.
+
+        Args:
+            event: The event to calculate bundled aggregations for.
+
+        Returns:
+            The bundled aggregations for an event, if bundled aggregations are
+            enabled and the event can have bundled aggregations.
+        """
+        # State events and redacted events do not get bundled aggregations.
+        if event.is_state() or event.internal_metadata.is_redacted():
+            return None
+
+        # Do not bundle aggregations for an event which represents an edit or an
+        # annotation. It does not make sense for them to have related events.
+        relates_to = event.content.get("m.relates_to")
+        if isinstance(relates_to, (dict, frozendict)):
+            relation_type = relates_to.get("rel_type")
+            if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
+                return None
+
+        event_id = event.event_id
+        room_id = event.room_id
+
+        # The bundled aggregations to include, a mapping of relation type to a
+        # type-specific value. Some types include the direct return type here
+        # while others need more processing during serialization.
+        aggregations: Dict[str, Any] = {}
+
+        annotations = await self.get_aggregation_groups_for_event(event_id, room_id)
+        if annotations.chunk:
+            aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
+
+        references = await self.get_relations_for_event(
+            event_id, room_id, RelationTypes.REFERENCE, direction="f"
+        )
+        if references.chunk:
+            aggregations[RelationTypes.REFERENCE] = references.to_dict()
+
+        edit = None
+        if event.type == EventTypes.Message:
+            edit = await self.get_applicable_edit(event_id, room_id)
+
+        if edit:
+            aggregations[RelationTypes.REPLACE] = edit
+
+        # If this event is the start of a thread, include a summary of the replies.
+        if self._msc3440_enabled:
+            (
+                thread_count,
+                latest_thread_event,
+            ) = await self.get_thread_summary(event_id, room_id)
+            if latest_thread_event:
+                aggregations[RelationTypes.THREAD] = {
+                    # Don't bundle aggregations as this could recurse forever.
+                    "latest_event": latest_thread_event,
+                    "count": thread_count,
+                }
+
+        # Store the bundled aggregations in the event metadata for later use.
+        return aggregations
+
+    async def get_bundled_aggregations(
+        self, events: Iterable[EventBase]
+    ) -> Dict[str, Dict[str, Any]]:
+        """Generate bundled aggregations for events.
+
+        Args:
+            events: The iterable of events to calculate bundled aggregations for.
+
+        Returns:
+            A map of event ID to the bundled aggregation for the event. Not all
+            events may have bundled aggregations in the results.
+        """
+        # If bundled aggregations are disabled, nothing to do.
+        if not self._msc1849_enabled:
+            return {}
+
+        # TODO Parallelize.
+        results = {}
+        for event in events:
+            event_result = await self._get_bundled_aggregation_for_event(event)
+            if event_result is not None:
+                results[event.event_id] = event_result
+
+        return results
+
 
 class RelationsStore(RelationsWorkerStore):
     pass