From 6bf81a7a61d8d5248be5def955104c44fcb78dae Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 7 Jan 2022 09:10:46 -0500 Subject: Bundle aggregations outside of the serialization method. (#11612) This makes the serialization of events synchronous (and it no longer access the database), but we must manually calculate and provide the bundled aggregations. Overall this should cause no change in behavior, but is prep work for other improvements. --- synapse/events/utils.py | 126 ++++++++++++++---------------------------------- 1 file changed, 37 insertions(+), 89 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2038e72924..de0e0c1731 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,17 +14,7 @@ # limitations under the License. import collections.abc import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Union, -) +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union from frozendict import frozendict @@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion from synapse.types import JsonDict -from synapse.util.async_helpers import yieldable_gather_results from synapse.util.frozenutils import unfreeze from . import EventBase -if TYPE_CHECKING: - from synapse.server import HomeServer - # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (? JsonDict: """Serializes a single event. @@ -418,66 +399,41 @@ class EventClientSerializer: serialized_event = serialize_event(event, time_now, **kwargs) # Check if there are any bundled aggregations to include with the event. - # - # Do not bundle aggregations if any of the following at true: - # - # * Support is disabled via the configuration or the caller. - # * The event is a state event. - # * The event has been redacted. - if ( - self._msc1849_enabled - and bundle_aggregations - and not event.is_state() - and not event.internal_metadata.is_redacted() - ): - await self._injected_bundled_aggregations(event, time_now, serialized_event) + if bundle_aggregations: + event_aggregations = bundle_aggregations.get(event.event_id) + if event_aggregations: + self._injected_bundled_aggregations( + event, + time_now, + bundle_aggregations[event.event_id], + serialized_event, + ) return serialized_event - async def _injected_bundled_aggregations( - self, event: EventBase, time_now: int, serialized_event: JsonDict + def _injected_bundled_aggregations( + self, + event: EventBase, + time_now: int, + aggregations: JsonDict, + serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. Args: event: The event being serialized. time_now: The current time in milliseconds + aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. """ - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return - - event_id = event.event_id - room_id = event.room_id - - # The bundled aggregations to include. - aggregations = {} - - annotations = await self.store.get_aggregation_groups_for_event( - event_id, room_id - ) - if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + # Make a copy in-case the object is cached. + aggregations = aggregations.copy() - references = await self.store.get_relations_for_event( - event_id, room_id, RelationTypes.REFERENCE, direction="f" - ) - if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() - - edit = None - if event.type == EventTypes.Message: - edit = await self.store.get_applicable_edit(event_id, room_id) - - if edit: + if RelationTypes.REPLACE in aggregations: # If there is an edit replace the content, preserving existing # relations. + edit = aggregations[RelationTypes.REPLACE] # Ensure we take copies of the edit content, otherwise we risk modifying # the original event. @@ -502,27 +458,19 @@ class EventClientSerializer: } # If this event is the start of a thread, include a summary of the replies. - if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id, room_id) - if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "count": thread_count, - } - - # If any bundled aggregations were found, include them. - if aggregations: - serialized_event["unsigned"].setdefault("m.relations", {}).update( - aggregations + if RelationTypes.THREAD in aggregations: + # Serialize the latest thread event. + latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] + + # Don't bundle aggregations as this could recurse forever. + aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=None ) - async def serialize_events( + # Include the bundled aggregations in the event. + serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) + + def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: """Serializes multiple events. @@ -535,9 +483,9 @@ class EventClientSerializer: Returns: The list of serialized events """ - return await yieldable_gather_results( - self.serialize_event, events, time_now=time_now, **kwargs - ) + return [ + self.serialize_event(event, time_now=time_now, **kwargs) for event in events + ] def copy_power_levels_contents( -- cgit 1.5.1 From 7a11509d17f56b8711a3bda11491b3a5a42a2f67 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 21 Jan 2022 05:31:31 -0500 Subject: Do not try to serialize raw aggregations dict. (#11791) --- changelog.d/11612.bugfix | 1 + changelog.d/11612.misc | 1 - changelog.d/11791.bugfix | 1 + synapse/events/utils.py | 4 +- synapse/rest/admin/rooms.py | 13 ++--- synapse/rest/client/room.py | 11 ++-- tests/rest/client/test_relations.py | 108 ++++++++++++++++++++++++------------ 7 files changed, 85 insertions(+), 54 deletions(-) create mode 100644 changelog.d/11612.bugfix delete mode 100644 changelog.d/11612.misc create mode 100644 changelog.d/11791.bugfix (limited to 'synapse/events/utils.py') diff --git a/changelog.d/11612.bugfix b/changelog.d/11612.bugfix new file mode 100644 index 0000000000..842f6892fd --- /dev/null +++ b/changelog.d/11612.bugfix @@ -0,0 +1 @@ +Include the bundled aggregations in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675). diff --git a/changelog.d/11612.misc b/changelog.d/11612.misc deleted file mode 100644 index 2d886169c5..0000000000 --- a/changelog.d/11612.misc +++ /dev/null @@ -1 +0,0 @@ -Avoid database access in the JSON serialization process. diff --git a/changelog.d/11791.bugfix b/changelog.d/11791.bugfix new file mode 100644 index 0000000000..842f6892fd --- /dev/null +++ b/changelog.d/11791.bugfix @@ -0,0 +1 @@ +Include the bundled aggregations in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675). diff --git a/synapse/events/utils.py b/synapse/events/utils.py index de0e0c1731..918adeecf8 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -402,7 +402,7 @@ class EventClientSerializer: if bundle_aggregations: event_aggregations = bundle_aggregations.get(event.event_id) if event_aggregations: - self._injected_bundled_aggregations( + self._inject_bundled_aggregations( event, time_now, bundle_aggregations[event.event_id], @@ -411,7 +411,7 @@ class EventClientSerializer: return serialized_event - def _injected_bundled_aggregations( + def _inject_bundled_aggregations( self, event: EventBase, time_now: int, diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 2e714ac87b..efe25fe7eb 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -744,20 +744,15 @@ class RoomEventContextServlet(RestServlet): ) time_now = self.clock.time_msec() + aggregations = results.pop("aggregations", None) results["events_before"] = self._event_serializer.serialize_events( - results["events_before"], - time_now, - bundle_aggregations=results["aggregations"], + results["events_before"], time_now, bundle_aggregations=aggregations ) results["event"] = self._event_serializer.serialize_event( - results["event"], - time_now, - bundle_aggregations=results["aggregations"], + results["event"], time_now, bundle_aggregations=aggregations ) results["events_after"] = self._event_serializer.serialize_events( - results["events_after"], - time_now, - bundle_aggregations=results["aggregations"], + results["events_after"], time_now, bundle_aggregations=aggregations ) results["state"] = self._event_serializer.serialize_events( results["state"], time_now diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 31fd329a38..90bb9142a0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -714,18 +714,15 @@ class RoomEventContextServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() + aggregations = results.pop("aggregations", None) results["events_before"] = self._event_serializer.serialize_events( - results["events_before"], - time_now, - bundle_aggregations=results["aggregations"], + results["events_before"], time_now, bundle_aggregations=aggregations ) results["event"] = self._event_serializer.serialize_event( - results["event"], time_now, bundle_aggregations=results["aggregations"] + results["event"], time_now, bundle_aggregations=aggregations ) results["events_after"] = self._event_serializer.serialize_events( - results["events_after"], - time_now, - bundle_aggregations=results["aggregations"], + results["events_after"], time_now, bundle_aggregations=aggregations ) results["state"] = self._event_serializer.serialize_events( results["state"], time_now diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 4b20ab0e3e..c9b220e73d 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -21,6 +21,7 @@ from unittest.mock import patch from synapse.api.constants import EventTypes, RelationTypes from synapse.rest import admin from synapse.rest.client import login, register, relations, room, sync +from synapse.types import JsonDict from tests import unittest from tests.server import FakeChannel @@ -454,7 +455,14 @@ class RelationsTestCase(unittest.HomeserverTestCase): @unittest.override_config({"experimental_features": {"msc3440_enabled": True}}) def test_bundled_aggregations(self): - """Test that annotations, references, and threads get correctly bundled.""" + """ + Test that annotations, references, and threads get correctly bundled. + + Note that this doesn't test against /relations since only thread relations + get bundled via that API. See test_aggregation_get_event_for_thread. + + See test_edit for a similar test for edits. + """ # Setup by sending a variety of relations. channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") self.assertEquals(200, channel.code, channel.json_body) @@ -482,12 +490,13 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.json_body) thread_2 = channel.json_body["event_id"] - def assert_bundle(actual): + def assert_bundle(event_json: JsonDict) -> None: """Assert the expected values of the bundled aggregations.""" + relations_dict = event_json["unsigned"].get("m.relations") # Ensure the fields are as expected. self.assertCountEqual( - actual.keys(), + relations_dict.keys(), ( RelationTypes.ANNOTATION, RelationTypes.REFERENCE, @@ -503,20 +512,20 @@ class RelationsTestCase(unittest.HomeserverTestCase): {"type": "m.reaction", "key": "b", "count": 1}, ] }, - actual[RelationTypes.ANNOTATION], + relations_dict[RelationTypes.ANNOTATION], ) self.assertEquals( {"chunk": [{"event_id": reply_1}, {"event_id": reply_2}]}, - actual[RelationTypes.REFERENCE], + relations_dict[RelationTypes.REFERENCE], ) self.assertEquals( 2, - actual[RelationTypes.THREAD].get("count"), + relations_dict[RelationTypes.THREAD].get("count"), ) self.assertTrue( - actual[RelationTypes.THREAD].get("current_user_participated") + relations_dict[RelationTypes.THREAD].get("current_user_participated") ) # The latest thread event has some fields that don't matter. self.assert_dict( @@ -533,20 +542,9 @@ class RelationsTestCase(unittest.HomeserverTestCase): "type": "m.room.test", "user_id": self.user_id, }, - actual[RelationTypes.THREAD].get("latest_event"), + relations_dict[RelationTypes.THREAD].get("latest_event"), ) - def _find_and_assert_event(events): - """ - Find the parent event in a chunk of events and assert that it has the proper bundled aggregations. - """ - for event in events: - if event["event_id"] == self.parent_id: - break - else: - raise AssertionError(f"Event {self.parent_id} not found in chunk") - assert_bundle(event["unsigned"].get("m.relations")) - # Request the event directly. channel = self.make_request( "GET", @@ -554,7 +552,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) - assert_bundle(channel.json_body["unsigned"].get("m.relations")) + assert_bundle(channel.json_body) # Request the room messages. channel = self.make_request( @@ -563,7 +561,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) - _find_and_assert_event(channel.json_body["chunk"]) + assert_bundle(self._find_event_in_chunk(channel.json_body["chunk"])) # Request the room context. channel = self.make_request( @@ -572,17 +570,14 @@ class RelationsTestCase(unittest.HomeserverTestCase): access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) - assert_bundle(channel.json_body["event"]["unsigned"].get("m.relations")) + assert_bundle(channel.json_body["event"]) # Request sync. channel = self.make_request("GET", "/sync", access_token=self.user_token) self.assertEquals(200, channel.code, channel.json_body) room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"] self.assertTrue(room_timeline["limited"]) - _find_and_assert_event(room_timeline["events"]) - - # Note that /relations is tested separately in test_aggregation_get_event_for_thread - # since it needs different data configured. + self._find_event_in_chunk(room_timeline["events"]) def test_aggregation_get_event_for_annotation(self): """Test that annotations do not get bundled aggregations included @@ -777,25 +772,58 @@ class RelationsTestCase(unittest.HomeserverTestCase): edit_event_id = channel.json_body["event_id"] + def assert_bundle(event_json: JsonDict) -> None: + """Assert the expected values of the bundled aggregations.""" + relations_dict = event_json["unsigned"].get("m.relations") + self.assertIn(RelationTypes.REPLACE, relations_dict) + + m_replace_dict = relations_dict[RelationTypes.REPLACE] + for key in ["event_id", "sender", "origin_server_ts"]: + self.assertIn(key, m_replace_dict) + + self.assert_dict( + {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict + ) + channel = self.make_request( "GET", - "/rooms/%s/event/%s" % (self.room, self.parent_id), + f"/rooms/{self.room}/event/{self.parent_id}", access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) - self.assertEquals(channel.json_body["content"], new_body) + assert_bundle(channel.json_body) - relations_dict = channel.json_body["unsigned"].get("m.relations") - self.assertIn(RelationTypes.REPLACE, relations_dict) + # Request the room messages. + channel = self.make_request( + "GET", + f"/rooms/{self.room}/messages?dir=b", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + assert_bundle(self._find_event_in_chunk(channel.json_body["chunk"])) - m_replace_dict = relations_dict[RelationTypes.REPLACE] - for key in ["event_id", "sender", "origin_server_ts"]: - self.assertIn(key, m_replace_dict) + # Request the room context. + channel = self.make_request( + "GET", + f"/rooms/{self.room}/context/{self.parent_id}", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + assert_bundle(channel.json_body["event"]) - self.assert_dict( - {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict + # Request sync, but limit the timeline so it becomes limited (and includes + # bundled aggregations). + filter = urllib.parse.quote_plus( + '{"room": {"timeline": {"limit": 2}}}'.encode() + ) + channel = self.make_request( + "GET", f"/sync?filter={filter}", access_token=self.user_token ) + self.assertEquals(200, channel.code, channel.json_body) + room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"] + self.assertTrue(room_timeline["limited"]) + assert_bundle(self._find_event_in_chunk(room_timeline["events"])) def test_multi_edit(self): """Test that multiple edits, including attempts by people who @@ -1102,6 +1130,16 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.json_body) self.assertEquals(channel.json_body["chunk"], []) + def _find_event_in_chunk(self, events: List[JsonDict]) -> JsonDict: + """ + Find the parent event in a chunk of events and assert that it has the proper bundled aggregations. + """ + for event in events: + if event["event_id"] == self.parent_id: + return event + + raise AssertionError(f"Event {self.parent_id} not found in chunk") + def _send_relation( self, relation_type: str, -- cgit 1.5.1