From 88a78c6577086527e4569541b09e437a1ca0d1a9 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 5 Jan 2022 13:33:28 +0000 Subject: Cache empty responses from `/user/devices` (#11587) If we've never made a request to a remote homeserver, we should cache the response---even if the response is "this user has no devices". --- synapse/storage/databases/main/devices.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 273adb61fd..52fbf50db6 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -713,7 +713,7 @@ class DeviceWorkerStore(SQLBaseStore): @cached(max_entries=10000) async def get_device_list_last_stream_id_for_remote( self, user_id: str - ) -> Optional[Any]: + ) -> Optional[str]: """Get the last stream_id we got for a user. May be None if we haven't got any information for them. """ @@ -729,7 +729,9 @@ class DeviceWorkerStore(SQLBaseStore): cached_method_name="get_device_list_last_stream_id_for_remote", list_name="user_ids", ) - async def get_device_list_last_stream_id_for_remotes(self, user_ids: Iterable[str]): + async def get_device_list_last_stream_id_for_remotes( + self, user_ids: Iterable[str] + ) -> Dict[str, Optional[str]]: rows = await self.db_pool.simple_select_many_batch( table="device_lists_remote_extremeties", column="user_id", @@ -1316,6 +1318,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): content: JsonDict, stream_id: str, ) -> None: + """Delete, update or insert a cache entry for this (user, device) pair.""" if content.get("deleted"): self.db_pool.simple_delete_txn( txn, @@ -1375,6 +1378,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def _update_remote_device_list_cache_txn( self, txn: LoggingTransaction, user_id: str, devices: List[dict], stream_id: int ) -> None: + """Replace the list of cached devices for this user with the given list.""" self.db_pool.simple_delete_txn( txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id} ) -- cgit 1.5.1 From d8f94eeec23eba6274896e1aa2f92aa97ff10bee Mon Sep 17 00:00:00 2001 From: Shay Date: Wed, 5 Jan 2022 09:53:05 -0800 Subject: Run `pyupgrade --py37-plus --keep-percent-format` on Synapse (#11685) * newsfragment * fix newsfragment number * update changelog * remove extra space --- changelog.d/11685.misc | 1 + synapse/rest/admin/media.py | 2 +- synapse/storage/databases/main/session.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11685.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11685.misc b/changelog.d/11685.misc new file mode 100644 index 0000000000..c4566b2012 --- /dev/null +++ b/changelog.d/11685.misc @@ -0,0 +1 @@ +Run `pyupgrade --py37-plus --keep-percent-format` on Synapse. diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py index 7236e4027f..299f5c9eb0 100644 --- a/synapse/rest/admin/media.py +++ b/synapse/rest/admin/media.py @@ -466,7 +466,7 @@ class UserMediaRestServlet(RestServlet): ) deleted_media, total = await self.media_repository.delete_local_media_ids( - ([row["media_id"] for row in media]) + [row["media_id"] for row in media] ) return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total} diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py index 5a97120437..e8c776b97a 100644 --- a/synapse/storage/databases/main/session.py +++ b/synapse/storage/databases/main/session.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); -- cgit 1.5.1 From 3b51c763ba5601e155e3e27a46cddf0370da83eb Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 5 Jan 2022 20:46:50 +0100 Subject: Fix get federation status of destination if no error occured (#11593) --- changelog.d/11593.bugfix | 1 + synapse/rest/admin/federation.py | 26 ++++++--- synapse/storage/databases/main/transactions.py | 11 ++++ tests/rest/admin/test_federation.py | 75 +++++++++++++++++++------- 4 files changed, 88 insertions(+), 25 deletions(-) create mode 100644 changelog.d/11593.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11593.bugfix b/changelog.d/11593.bugfix new file mode 100644 index 0000000000..963fd0e58e --- /dev/null +++ b/changelog.d/11593.bugfix @@ -0,0 +1 @@ +Fix an error in to get federation status of a destination server even if no error has occurred. This admin API was new introduced in Synapse 1.49.0. diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index 50d88c9109..8cd3fa189e 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -111,25 +111,37 @@ class DestinationsRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) + if not await self._store.is_destination_known(destination): + raise NotFoundError("Unknown destination") + destination_retry_timings = await self._store.get_destination_retry_timings( destination ) - if not destination_retry_timings: - raise NotFoundError("Unknown destination") - last_successful_stream_ordering = ( await self._store.get_destination_last_successful_stream_ordering( destination ) ) - response = { + response: JsonDict = { "destination": destination, - "failure_ts": destination_retry_timings.failure_ts, - "retry_last_ts": destination_retry_timings.retry_last_ts, - "retry_interval": destination_retry_timings.retry_interval, "last_successful_stream_ordering": last_successful_stream_ordering, } + if destination_retry_timings: + response = { + **response, + "failure_ts": destination_retry_timings.failure_ts, + "retry_last_ts": destination_retry_timings.retry_last_ts, + "retry_interval": destination_retry_timings.retry_interval, + } + else: + response = { + **response, + "failure_ts": None, + "retry_last_ts": 0, + "retry_interval": 0, + } + return HTTPStatus.OK, response diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 6c299cafa5..4b78b4d098 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -560,3 +560,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): return await self.db_pool.runInteraction( "get_destinations_paginate_txn", get_destinations_paginate_txn ) + + async def is_destination_known(self, destination: str) -> bool: + """Check if a destination is known to the server.""" + result = await self.db_pool.simple_select_one_onecol( + table="destinations", + keyvalues={"destination": destination}, + retcol="1", + allow_none=True, + desc="is_destination_known", + ) + return bool(result) diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py index 742f194257..b70350b6f1 100644 --- a/tests/rest/admin/test_federation.py +++ b/tests/rest/admin/test_federation.py @@ -314,15 +314,12 @@ class FederationTestCase(unittest.HomeserverTestCase): retry_interval, last_successful_stream_ordering, ) in dest: - self.get_success( - self.store.set_destination_retry_timings( - destination, failure_ts, retry_last_ts, retry_interval - ) - ) - self.get_success( - self.store.set_destination_last_successful_stream_ordering( - destination, last_successful_stream_ordering - ) + self._create_destination( + destination, + failure_ts, + retry_last_ts, + retry_interval, + last_successful_stream_ordering, ) # order by default (destination) @@ -413,11 +410,9 @@ class FederationTestCase(unittest.HomeserverTestCase): _search_test(None, "foo") _search_test(None, "bar") - def test_get_single_destination(self) -> None: - """ - Get one specific destinations. - """ - self._create_destinations(5) + def test_get_single_destination_with_retry_timings(self) -> None: + """Get one specific destination which has retry timings.""" + self._create_destinations(1) channel = self.make_request( "GET", @@ -432,6 +427,53 @@ class FederationTestCase(unittest.HomeserverTestCase): # convert channel.json_body into a List self._check_fields([channel.json_body]) + def test_get_single_destination_no_retry_timings(self) -> None: + """Get one specific destination which has no retry timings.""" + self._create_destination("sub0.example.com") + + channel = self.make_request( + "GET", + self.url + "/sub0.example.com", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual("sub0.example.com", channel.json_body["destination"]) + self.assertEqual(0, channel.json_body["retry_last_ts"]) + self.assertEqual(0, channel.json_body["retry_interval"]) + self.assertIsNone(channel.json_body["failure_ts"]) + self.assertIsNone(channel.json_body["last_successful_stream_ordering"]) + + def _create_destination( + self, + destination: str, + failure_ts: Optional[int] = None, + retry_last_ts: int = 0, + retry_interval: int = 0, + last_successful_stream_ordering: Optional[int] = None, + ) -> None: + """Create one specific destination + + Args: + destination: the destination we have successfully sent to + failure_ts: when the server started failing (ms since epoch) + retry_last_ts: time of last retry attempt in unix epoch ms + retry_interval: how long until next retry in ms + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + self.get_success( + self.store.set_destination_retry_timings( + destination, failure_ts, retry_last_ts, retry_interval + ) + ) + if last_successful_stream_ordering is not None: + self.get_success( + self.store.set_destination_last_successful_stream_ordering( + destination, last_successful_stream_ordering + ) + ) + def _create_destinations(self, number_destinations: int) -> None: """Create a number of destinations @@ -440,10 +482,7 @@ class FederationTestCase(unittest.HomeserverTestCase): """ for i in range(0, number_destinations): dest = f"sub{i}.example.com" - self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50)) - self.get_success( - self.store.set_destination_last_successful_stream_ordering(dest, 100) - ) + self._create_destination(dest, 50, 50, 50, 100) def _check_fields(self, content: List[JsonDict]) -> None: """Checks that the expected destination attributes are present in content -- cgit 1.5.1 From 6bf81a7a61d8d5248be5def955104c44fcb78dae Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 7 Jan 2022 09:10:46 -0500 Subject: Bundle aggregations outside of the serialization method. (#11612) This makes the serialization of events synchronous (and it no longer access the database), but we must manually calculate and provide the bundled aggregations. Overall this should cause no change in behavior, but is prep work for other improvements. --- changelog.d/11612.misc | 1 + synapse/events/utils.py | 126 ++++++++------------------- synapse/handlers/events.py | 2 +- synapse/handlers/initial_sync.py | 16 ++-- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 8 +- synapse/handlers/room.py | 10 +++ synapse/handlers/search.py | 10 +-- synapse/rest/admin/rooms.py | 16 ++-- synapse/rest/client/events.py | 2 +- synapse/rest/client/notifications.py | 2 +- synapse/rest/client/relations.py | 11 +-- synapse/rest/client/room.py | 28 +++--- synapse/rest/client/sync.py | 39 +++++---- synapse/server.py | 2 +- synapse/storage/databases/main/relations.py | 128 +++++++++++++++++++++++++++- tests/rest/client/test_retention.py | 2 +- 17 files changed, 249 insertions(+), 156 deletions(-) create mode 100644 changelog.d/11612.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11612.misc b/changelog.d/11612.misc new file mode 100644 index 0000000000..2d886169c5 --- /dev/null +++ b/changelog.d/11612.misc @@ -0,0 +1 @@ +Avoid database access in the JSON serialization process. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2038e72924..de0e0c1731 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,17 +14,7 @@ # limitations under the License. import collections.abc import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Union, -) +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union from frozendict import frozendict @@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion from synapse.types import JsonDict -from synapse.util.async_helpers import yieldable_gather_results from synapse.util.frozenutils import unfreeze from . import EventBase -if TYPE_CHECKING: - from synapse.server import HomeServer - # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (? JsonDict: """Serializes a single event. @@ -418,66 +399,41 @@ class EventClientSerializer: serialized_event = serialize_event(event, time_now, **kwargs) # Check if there are any bundled aggregations to include with the event. - # - # Do not bundle aggregations if any of the following at true: - # - # * Support is disabled via the configuration or the caller. - # * The event is a state event. - # * The event has been redacted. - if ( - self._msc1849_enabled - and bundle_aggregations - and not event.is_state() - and not event.internal_metadata.is_redacted() - ): - await self._injected_bundled_aggregations(event, time_now, serialized_event) + if bundle_aggregations: + event_aggregations = bundle_aggregations.get(event.event_id) + if event_aggregations: + self._injected_bundled_aggregations( + event, + time_now, + bundle_aggregations[event.event_id], + serialized_event, + ) return serialized_event - async def _injected_bundled_aggregations( - self, event: EventBase, time_now: int, serialized_event: JsonDict + def _injected_bundled_aggregations( + self, + event: EventBase, + time_now: int, + aggregations: JsonDict, + serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. Args: event: The event being serialized. time_now: The current time in milliseconds + aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. """ - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return - - event_id = event.event_id - room_id = event.room_id - - # The bundled aggregations to include. - aggregations = {} - - annotations = await self.store.get_aggregation_groups_for_event( - event_id, room_id - ) - if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + # Make a copy in-case the object is cached. + aggregations = aggregations.copy() - references = await self.store.get_relations_for_event( - event_id, room_id, RelationTypes.REFERENCE, direction="f" - ) - if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() - - edit = None - if event.type == EventTypes.Message: - edit = await self.store.get_applicable_edit(event_id, room_id) - - if edit: + if RelationTypes.REPLACE in aggregations: # If there is an edit replace the content, preserving existing # relations. + edit = aggregations[RelationTypes.REPLACE] # Ensure we take copies of the edit content, otherwise we risk modifying # the original event. @@ -502,27 +458,19 @@ class EventClientSerializer: } # If this event is the start of a thread, include a summary of the replies. - if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id, room_id) - if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "count": thread_count, - } - - # If any bundled aggregations were found, include them. - if aggregations: - serialized_event["unsigned"].setdefault("m.relations", {}).update( - aggregations + if RelationTypes.THREAD in aggregations: + # Serialize the latest thread event. + latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] + + # Don't bundle aggregations as this could recurse forever. + aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=None ) - async def serialize_events( + # Include the bundled aggregations in the event. + serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) + + def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: """Serializes multiple events. @@ -535,9 +483,9 @@ class EventClientSerializer: Returns: The list of serialized events """ - return await yieldable_gather_results( - self.serialize_event, events, time_now=time_now, **kwargs - ) + return [ + self.serialize_event(event, time_now=time_now, **kwargs) for event in events + ] def copy_power_levels_contents( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b996c420d..a3add8a586 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -119,7 +119,7 @@ class EventStreamHandler: events.extend(to_add) - chunks = await self._event_serializer.serialize_events( + chunks = self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 601bab67f9..346a06ff49 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -170,7 +170,7 @@ class InitialSyncHandler: d["inviter"] = event.sender invite_event = await self.store.get_event(event.event_id) - d["invite"] = await self._event_serializer.serialize_event( + d["invite"] = self._event_serializer.serialize_event( invite_event, time_now, as_client_event=as_client_event, @@ -222,7 +222,7 @@ class InitialSyncHandler: d["messages"] = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( messages, time_now=time_now, as_client_event=as_client_event, @@ -232,7 +232,7 @@ class InitialSyncHandler: "end": await end_token.to_string(self.store), } - d["state"] = await self._event_serializer.serialize_events( + d["state"] = self._event_serializer.serialize_events( current_state.values(), time_now=time_now, as_client_event=as_client_event, @@ -376,16 +376,14 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), }, "state": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - room_state.values(), time_now - ) + self._event_serializer.serialize_events(room_state.values(), time_now) ), "presence": [], "receipts": [], @@ -404,7 +402,7 @@ class InitialSyncHandler: # TODO: These concurrently time_now = self.clock.time_msec() # Don't bundle aggregations as this is a deprecated API. - state = await self._event_serializer.serialize_events( + state = self._event_serializer.serialize_events( current_state.values(), time_now ) @@ -480,7 +478,7 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5e3d3886eb..b37250aa38 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -246,7 +246,7 @@ class MessageHandler: room_state = room_state_events[membership_event_id] now = self.clock.time_msec() - events = await self._event_serializer.serialize_events(room_state.values(), now) + events = self._event_serializer.serialize_events(room_state.values(), now) return events async def get_joined_members(self, requester: Requester, room_id: str) -> dict: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7469cc55a2..472688f045 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -537,14 +537,16 @@ class PaginationHandler: state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() + aggregations = await self.store.get_bundled_aggregations(events) + time_now = self.clock.time_msec() chunk = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( events, time_now, - bundle_aggregations=True, + bundle_aggregations=aggregations, as_client_event=as_client_event, ) ), @@ -553,7 +555,7 @@ class PaginationHandler: } if state: - chunk["state"] = await self._event_serializer.serialize_events( + chunk["state"] = self._event_serializer.serialize_events( state, time_now, as_client_event=as_client_event ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d3a0f6ac3..3d47163f25 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1181,6 +1181,16 @@ class RoomContextHandler: # `filtered` rather than the event we retrieved from the datastore. results["event"] = filtered[0] + # Fetch the aggregations. + aggregations = await self.store.get_bundled_aggregations([results["event"]]) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_before"]) + ) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_after"]) + ) + results["aggregations"] = aggregations + if results["events_after"]: last_event_id = results["events_after"][-1].event_id else: diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ab7eaab2fb..0b153a6822 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -420,10 +420,10 @@ class SearchHandler: time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = await self._event_serializer.serialize_events( + context["events_before"] = self._event_serializer.serialize_events( context["events_before"], time_now ) - context["events_after"] = await self._event_serializer.serialize_events( + context["events_after"] = self._event_serializer.serialize_events( context["events_after"], time_now ) @@ -441,9 +441,7 @@ class SearchHandler: results.append( { "rank": rank_map[e.event_id], - "result": ( - await self._event_serializer.serialize_event(e, time_now) - ), + "result": self._event_serializer.serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } ) @@ -457,7 +455,7 @@ class SearchHandler: if state_results: s = {} for room_id, state_events in state_results.items(): - s[room_id] = await self._event_serializer.serialize_events( + s[room_id] = self._event_serializer.serialize_events( state_events, time_now ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6030373ebc..2e714ac87b 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -424,7 +424,7 @@ class RoomStateRestServlet(RestServlet): event_ids = await self.store.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() - room_state = await self._event_serializer.serialize_events(events.values(), now) + room_state = self._event_serializer.serialize_events(events.values(), now) ret = {"state": room_state} return HTTPStatus.OK, ret @@ -744,22 +744,22 @@ class RoomEventContextServlet(RestServlet): ) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( + results["events_before"] = self._event_serializer.serialize_events( results["events_before"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( + results["event"] = self._event_serializer.serialize_event( results["event"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["events_after"] = await self._event_serializer.serialize_events( + results["events_after"] = self._event_serializer.serialize_events( results["events_after"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py index 13b72a045a..672c821061 100644 --- a/synapse/rest/client/events.py +++ b/synapse/rest/client/events.py @@ -91,7 +91,7 @@ class EventRestServlet(RestServlet): time_now = self.clock.time_msec() if event: - result = await self._event_serializer.serialize_event(event, time_now) + result = self._event_serializer.serialize_event(event, time_now) return 200, result else: return 404, "Event not found." diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index acd0c9e135..8e427a96a3 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -72,7 +72,7 @@ class NotificationsServlet(RestServlet): "actions": pa.actions, "ts": pa.received_ts, "event": ( - await self._event_serializer.serialize_event( + self._event_serializer.serialize_event( notif_events[pa.event_id], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 3823498012..37d949a71e 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -113,13 +113,14 @@ class RelationPaginationServlet(RestServlet): now = self.clock.time_msec() # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. - original_event = await self._event_serializer.serialize_event( - event, now, bundle_aggregations=False + original_event = self._event_serializer.serialize_event( + event, now, bundle_aggregations=None ) # The relations returned for the requested event do include their # bundled aggregations. - serialized_events = await self._event_serializer.serialize_events( - events, now, bundle_aggregations=True + aggregations = await self.store.get_bundled_aggregations(events) + serialized_events = self._event_serializer.serialize_events( + events, now, bundle_aggregations=aggregations ) return_value = pagination_chunk.to_dict() @@ -308,7 +309,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet): ) now = self.clock.time_msec() - serialized_events = await self._event_serializer.serialize_events(events, now) + serialized_events = self._event_serializer.serialize_events(events, now) return_value = result.to_dict() return_value["chunk"] = serialized_events diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 40330749e5..da6014900a 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.clock = hs.get_clock() + self._store = hs.get_datastore() self.event_handler = hs.get_event_handler() self._event_serializer = hs.get_event_client_serializer() self.auth = hs.get_auth() @@ -660,10 +661,13 @@ class RoomEventServlet(RestServlet): # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) - time_now = self.clock.time_msec() if event: - event_dict = await self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=True + # Ensure there are bundled aggregations available. + aggregations = await self._store.get_bundled_aggregations([event]) + + time_now = self.clock.time_msec() + event_dict = self._event_serializer.serialize_event( + event, time_now, bundle_aggregations=aggregations ) return 200, event_dict @@ -708,16 +712,20 @@ class RoomEventContextServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( - results["events_before"], time_now, bundle_aggregations=True + results["events_before"] = self._event_serializer.serialize_events( + results["events_before"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( - results["event"], time_now, bundle_aggregations=True + results["event"] = self._event_serializer.serialize_event( + results["event"], time_now, bundle_aggregations=results["aggregations"] ) - results["events_after"] = await self._event_serializer.serialize_events( - results["events_after"], time_now, bundle_aggregations=True + results["events_after"] = self._event_serializer.serialize_events( + results["events_after"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index e99a943d0d..a3e57e4b20 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -17,7 +17,6 @@ from collections import defaultdict from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, Dict, Iterable, @@ -395,7 +394,7 @@ class SyncRestServlet(RestServlet): """ invited = {} for room in rooms: - invite = await self._event_serializer.serialize_event( + invite = self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, @@ -432,7 +431,7 @@ class SyncRestServlet(RestServlet): """ knocked = {} for room in rooms: - knock = await self._event_serializer.serialize_event( + knock = self._event_serializer.serialize_event( room.knock, time_now, token_id=token_id, @@ -525,21 +524,14 @@ class SyncRestServlet(RestServlet): The room, encoded in our response format """ - def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]: + def serialize( + events: Iterable[EventBase], + aggregations: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> List[JsonDict]: return self._event_serializer.serialize_events( events, time_now=time_now, - # Don't bother to bundle aggregations if the timeline is unlimited, - # as clients will have all the necessary information. - # bundle_aggregations=room.timeline.limited, - # - # richvdh 2021-12-15: disable this temporarily as it has too high an - # overhead for initialsyncs. We need to figure out a way that the - # bundling can be done *before* the events are stored in the - # SyncResponseCache so that this part can be synchronous. - # - # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. - bundle_aggregations=False, + bundle_aggregations=aggregations, token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, @@ -561,8 +553,21 @@ class SyncRestServlet(RestServlet): event.room_id, ) - serialized_state = await serialize(state_events) - serialized_timeline = await serialize(timeline_events) + serialized_state = serialize(state_events) + # Don't bother to bundle aggregations if the timeline is unlimited, + # as clients will have all the necessary information. + # bundle_aggregations=room.timeline.limited, + # + # richvdh 2021-12-15: disable this temporarily as it has too high an + # overhead for initialsyncs. We need to figure out a way that the + # bundling can be done *before* the events are stored in the + # SyncResponseCache so that this part can be synchronous. + # + # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. + # if room.timeline.limited: + # aggregations = await self.store.get_bundled_aggregations(timeline_events) + aggregations = None + serialized_timeline = serialize(timeline_events, aggregations) account_data = room.account_data diff --git a/synapse/server.py b/synapse/server.py index 185e40e4da..3032f0b738 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -759,7 +759,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 4ff6aed253..c6c4bd18da 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,14 +13,30 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) import attr +from frozendict import frozendict -from synapse.api.constants import RelationTypes +from synapse.api.constants import EventTypes, RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -29,10 +45,24 @@ from synapse.storage.relations import ( ) from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class RelationsWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._msc1849_enabled = hs.config.experimental.msc1849_enabled + self._msc3440_enabled = hs.config.experimental.msc3440_enabled + @cached(tree=True) async def get_relations_for_event( self, @@ -515,6 +545,98 @@ class RelationsWorkerStore(SQLBaseStore): "get_if_user_has_annotated_event", _get_if_user_has_annotated_event ) + async def _get_bundled_aggregation_for_event( + self, event: EventBase + ) -> Optional[Dict[str, Any]]: + """Generate bundled aggregations for an event. + + Note that this does not use a cache, but depends on cached methods. + + Args: + event: The event to calculate bundled aggregations for. + + Returns: + The bundled aggregations for an event, if bundled aggregations are + enabled and the event can have bundled aggregations. + """ + # State events and redacted events do not get bundled aggregations. + if event.is_state() or event.internal_metadata.is_redacted(): + return None + + # Do not bundle aggregations for an event which represents an edit or an + # annotation. It does not make sense for them to have related events. + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + return None + + event_id = event.event_id + room_id = event.room_id + + # The bundled aggregations to include, a mapping of relation type to a + # type-specific value. Some types include the direct return type here + # while others need more processing during serialization. + aggregations: Dict[str, Any] = {} + + annotations = await self.get_aggregation_groups_for_event(event_id, room_id) + if annotations.chunk: + aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + + references = await self.get_relations_for_event( + event_id, room_id, RelationTypes.REFERENCE, direction="f" + ) + if references.chunk: + aggregations[RelationTypes.REFERENCE] = references.to_dict() + + edit = None + if event.type == EventTypes.Message: + edit = await self.get_applicable_edit(event_id, room_id) + + if edit: + aggregations[RelationTypes.REPLACE] = edit + + # If this event is the start of a thread, include a summary of the replies. + if self._msc3440_enabled: + ( + thread_count, + latest_thread_event, + ) = await self.get_thread_summary(event_id, room_id) + if latest_thread_event: + aggregations[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": latest_thread_event, + "count": thread_count, + } + + # Store the bundled aggregations in the event metadata for later use. + return aggregations + + async def get_bundled_aggregations( + self, events: Iterable[EventBase] + ) -> Dict[str, Dict[str, Any]]: + """Generate bundled aggregations for events. + + Args: + events: The iterable of events to calculate bundled aggregations for. + + Returns: + A map of event ID to the bundled aggregation for the event. Not all + events may have bundled aggregations in the results. + """ + # If bundled aggregations are disabled, nothing to do. + if not self._msc1849_enabled: + return {} + + # TODO Parallelize. + results = {} + for event in events: + event_result = await self._get_bundled_aggregation_for_event(event) + if event_result is not None: + results[event.event_id] = event_result + + return results + class RelationsStore(RelationsWorkerStore): pass diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index b58452195a..fe5b536d97 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -228,7 +228,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.assertIsNotNone(event) time_now = self.clock.time_msec() - serialized = self.get_success(self.serializer.serialize_event(event, time_now)) + serialized = self.serializer.serialize_event(event, time_now) return serialized -- cgit 1.5.1 From ffd227c3822b6e20b9dc203b3eae253adc0cf663 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Mon, 10 Jan 2022 15:38:22 +0000 Subject: Fix docstring on `add_account_data_for_user`. (#11716) --- changelog.d/11716.misc | 1 + synapse/handlers/account_data.py | 2 +- synapse/storage/databases/main/account_data.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11716.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11716.misc b/changelog.d/11716.misc new file mode 100644 index 0000000000..08f7310498 --- /dev/null +++ b/changelog.d/11716.misc @@ -0,0 +1 @@ +Fix docstring on `add_account_data_for_user`. \ No newline at end of file diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 96273e2f81..bad48713bc 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -77,7 +77,7 @@ class AccountDataHandler: async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: - """Add some account_data to a room for a user. + """Add some global account_data for a user. Args: user_id: The user to add a tag for. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 32a553fdd7..93db71d1b4 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -450,7 +450,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: - """Add some account_data to a room for a user. + """Add some global account_data for a user. Args: user_id: The user to add a tag for. -- cgit 1.5.1 From 10a88ba91cb16ccf757984f0a7d41ddf8b4dc07f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 13 Jan 2022 08:49:28 -0500 Subject: Use auto_attribs/native type hints for attrs classes. (#11692) --- changelog.d/11692.misc | 1 + synapse/api/room_versions.py | 30 ++++----- synapse/config/emailconfig.py | 24 +++---- synapse/config/server.py | 4 +- synapse/config/workers.py | 24 +++---- synapse/crypto/keyring.py | 18 +++--- synapse/events/snapshot.py | 18 +++--- synapse/federation/sender/per_destination_queue.py | 12 ++-- synapse/handlers/auth.py | 14 ++-- synapse/handlers/e2e_keys.py | 10 +-- synapse/handlers/sso.py | 32 ++++----- synapse/http/connectproxyclient.py | 4 +- synapse/http/matrixfederationclient.py | 18 +++--- synapse/http/site.py | 4 +- synapse/logging/_remote.py | 10 +-- synapse/logging/context.py | 20 +++--- synapse/logging/opentracing.py | 2 +- synapse/metrics/__init__.py | 20 +++--- synapse/notifier.py | 14 ++-- synapse/push/__init__.py | 38 +++++------ synapse/push/bulk_push_rule_evaluator.py | 18 +++--- synapse/replication/tcp/streams/events.py | 34 +++++----- synapse/rest/media/v1/media_storage.py | 6 +- synapse/state/__init__.py | 8 +-- synapse/storage/database.py | 8 +-- synapse/storage/databases/main/end_to_end_keys.py | 6 +- synapse/storage/databases/main/events.py | 14 ++-- .../storage/databases/main/events_bg_updates.py | 12 ++-- synapse/storage/databases/main/registration.py | 18 +++--- synapse/storage/databases/main/roommember.py | 6 +- synapse/storage/databases/main/ui_auth.py | 12 ++-- synapse/storage/keys.py | 6 +- synapse/storage/prepare_database.py | 6 +- synapse/storage/relations.py | 20 +++--- synapse/storage/state.py | 6 +- synapse/storage/util/id_generators.py | 8 +-- synapse/streams/config.py | 10 +-- synapse/types.py | 75 +++++++++++----------- synapse/util/async_helpers.py | 6 +- synapse/util/caches/dictionary_cache.py | 11 ++-- 40 files changed, 300 insertions(+), 307 deletions(-) create mode 100644 changelog.d/11692.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11692.misc b/changelog.d/11692.misc new file mode 100644 index 0000000000..0cdfca54e7 --- /dev/null +++ b/changelog.d/11692.misc @@ -0,0 +1 @@ +Use `auto_attribs` and native type hints for attrs classes. diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 0a895bba48..a747a40814 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -46,41 +46,41 @@ class RoomDisposition: UNSTABLE = "unstable" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class RoomVersion: """An object which describes the unique attributes of a room version.""" - identifier = attr.ib(type=str) # the identifier for this version - disposition = attr.ib(type=str) # one of the RoomDispositions - event_format = attr.ib(type=int) # one of the EventFormatVersions - state_res = attr.ib(type=int) # one of the StateResolutionVersions - enforce_key_validity = attr.ib(type=bool) + identifier: str # the identifier for this version + disposition: str # one of the RoomDispositions + event_format: int # one of the EventFormatVersions + state_res: int # one of the StateResolutionVersions + enforce_key_validity: bool # Before MSC2432, m.room.aliases had special auth rules and redaction rules - special_case_aliases_auth = attr.ib(type=bool) + special_case_aliases_auth: bool # Strictly enforce canonicaljson, do not allow: # * Integers outside the range of [-2 ^ 53 + 1, 2 ^ 53 - 1] # * Floats # * NaN, Infinity, -Infinity - strict_canonicaljson = attr.ib(type=bool) + strict_canonicaljson: bool # MSC2209: Check 'notifications' key while verifying # m.room.power_levels auth rules. - limit_notifications_power_levels = attr.ib(type=bool) + limit_notifications_power_levels: bool # MSC2174/MSC2176: Apply updated redaction rules algorithm. - msc2176_redaction_rules = attr.ib(type=bool) + msc2176_redaction_rules: bool # MSC3083: Support the 'restricted' join_rule. - msc3083_join_rules = attr.ib(type=bool) + msc3083_join_rules: bool # MSC3375: Support for the proper redaction rules for MSC3083. This mustn't # be enabled if MSC3083 is not. - msc3375_redaction_rules = attr.ib(type=bool) + msc3375_redaction_rules: bool # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending # m.room.membership event with membership 'knock'. - msc2403_knocking = attr.ib(type=bool) + msc2403_knocking: bool # MSC2716: Adds m.room.power_levels -> content.historical field to control # whether "insertion", "chunk", "marker" events can be sent - msc2716_historical = attr.ib(type=bool) + msc2716_historical: bool # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events - msc2716_redactions = attr.ib(type=bool) + msc2716_redactions: bool class RoomVersions: diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 510b647c63..949d7dd5ac 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -55,19 +55,19 @@ https://matrix-org.github.io/synapse/latest/templates.html ---------------------------------------------------------------------------------------""" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EmailSubjectConfig: - message_from_person_in_room = attr.ib(type=str) - message_from_person = attr.ib(type=str) - messages_from_person = attr.ib(type=str) - messages_in_room = attr.ib(type=str) - messages_in_room_and_others = attr.ib(type=str) - messages_from_person_and_others = attr.ib(type=str) - invite_from_person = attr.ib(type=str) - invite_from_person_to_room = attr.ib(type=str) - invite_from_person_to_space = attr.ib(type=str) - password_reset = attr.ib(type=str) - email_validation = attr.ib(type=str) + message_from_person_in_room: str + message_from_person: str + messages_from_person: str + messages_in_room: str + messages_in_room_and_others: str + messages_from_person_and_others: str + invite_from_person: str + invite_from_person_to_room: str + invite_from_person_to_space: str + password_reset: str + email_validation: str class EmailConfig(Config): diff --git a/synapse/config/server.py b/synapse/config/server.py index 1de2dea9b0..2c2b461cac 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -200,8 +200,8 @@ class HttpListenerConfig: """Object describing the http-specific parts of the config of a listener""" x_forwarded: bool = False - resources: List[HttpResourceConfig] = attr.ib(factory=list) - additional_resources: Dict[str, dict] = attr.ib(factory=dict) + resources: List[HttpResourceConfig] = attr.Factory(list) + additional_resources: Dict[str, dict] = attr.Factory(dict) tag: Optional[str] = None diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 576f519188..bdaba6db37 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -51,12 +51,12 @@ def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: return obj -@attr.s +@attr.s(auto_attribs=True) class InstanceLocationConfig: """The host and port to talk to an instance via HTTP replication.""" - host = attr.ib(type=str) - port = attr.ib(type=int) + host: str + port: int @attr.s @@ -77,34 +77,28 @@ class WriterLocations: can only be a single instance. """ - events = attr.ib( + events: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - typing = attr.ib( + typing: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - to_device = attr.ib( + to_device: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - account_data = attr.ib( + account_data: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - receipts = attr.ib( + receipts: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - presence = attr.ib( + presence: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 993b04099e..72d4a69aac 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -58,7 +58,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -@attr.s(slots=True, cmp=False) +@attr.s(slots=True, frozen=True, cmp=False, auto_attribs=True) class VerifyJsonRequest: """ A request to verify a JSON object. @@ -78,10 +78,10 @@ class VerifyJsonRequest: key_ids: The set of key_ids to that could be used to verify the JSON object """ - server_name = attr.ib(type=str) - get_json_object = attr.ib(type=Callable[[], JsonDict]) - minimum_valid_until_ts = attr.ib(type=int) - key_ids = attr.ib(type=List[str]) + server_name: str + get_json_object: Callable[[], JsonDict] + minimum_valid_until_ts: int + key_ids: List[str] @staticmethod def from_json_object( @@ -124,7 +124,7 @@ class KeyLookupError(ValueError): pass -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _FetchKeyRequest: """A request for keys for a given server. @@ -138,9 +138,9 @@ class _FetchKeyRequest: key_ids: The IDs of the keys to attempt to fetch """ - server_name = attr.ib(type=str) - minimum_valid_until_ts = attr.ib(type=int) - key_ids = attr.ib(type=List[str]) + server_name: str + minimum_valid_until_ts: int + key_ids: List[str] class Keyring: diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f251402ed8..0eab1aefd6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -28,7 +28,7 @@ if TYPE_CHECKING: from synapse.storage.databases.main import DataStore -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class EventContext: """ Holds information relevant to persisting an event @@ -103,15 +103,15 @@ class EventContext: accessed via get_prev_state_ids. """ - rejected = attr.ib(default=False, type=Union[bool, str]) - _state_group = attr.ib(default=None, type=Optional[int]) - state_group_before_event = attr.ib(default=None, type=Optional[int]) - prev_group = attr.ib(default=None, type=Optional[int]) - delta_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - app_service = attr.ib(default=None, type=Optional[ApplicationService]) + rejected: Union[bool, str] = False + _state_group: Optional[int] = None + state_group_before_event: Optional[int] = None + prev_group: Optional[int] = None + delta_ids: Optional[StateMap[str]] = None + app_service: Optional[ApplicationService] = None - _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) + _current_state_ids: Optional[StateMap[str]] = None + _prev_state_ids: Optional[StateMap[str]] = None @staticmethod def with_state( diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 391b30fbb5..8152e80b88 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -607,18 +607,18 @@ class PerDestinationQueue: self._pending_pdus = [] -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _TransactionQueueManager: """A helper async context manager for pulling stuff off the queues and tracking what was last successfully sent, etc. """ - queue = attr.ib(type=PerDestinationQueue) + queue: PerDestinationQueue - _device_stream_id = attr.ib(type=Optional[int], default=None) - _device_list_id = attr.ib(type=Optional[int], default=None) - _last_stream_ordering = attr.ib(type=Optional[int], default=None) - _pdus = attr.ib(type=List[EventBase], factory=list) + _device_stream_id: Optional[int] = None + _device_list_id: Optional[int] = None + _last_stream_ordering: Optional[int] = None + _pdus: List[EventBase] = attr.Factory(list) async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 84724b207c..2389c9ac52 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -168,25 +168,25 @@ def login_id_phone_to_thirdparty(identifier: JsonDict) -> Dict[str, str]: } -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class SsoLoginExtraAttributes: """Data we track about SAML2 sessions""" # time the session was created, in milliseconds - creation_time = attr.ib(type=int) - extra_attributes = attr.ib(type=JsonDict) + creation_time: int + extra_attributes: JsonDict -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class LoginTokenAttributes: """Data we store in a short-term login token""" - user_id = attr.ib(type=str) + user_id: str - auth_provider_id = attr.ib(type=str) + auth_provider_id: str """The SSO Identity Provider that the user authenticated with, to get this token.""" - auth_provider_session_id = attr.ib(type=Optional[str]) + auth_provider_session_id: Optional[str] """The session ID advertised by the SSO Identity Provider.""" diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 14360b4e40..d4dfddf63f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1321,14 +1321,14 @@ def _one_time_keys_match(old_key_json: str, new_key: JsonDict) -> bool: return old_key == new_key_copy -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class SignatureListItem: """An item in the signature list as used by upload_signatures_for_device_keys.""" - signing_key_id = attr.ib(type=str) - target_user_id = attr.ib(type=str) - target_device_id = attr.ib(type=str) - signature = attr.ib(type=JsonDict) + signing_key_id: str + target_user_id: str + target_device_id: str + signature: JsonDict class SigningKeyEduUpdater: diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 65c27bc64a..0bb8b0929e 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -126,45 +126,45 @@ class SsoIdentityProvider(Protocol): raise NotImplementedError() -@attr.s +@attr.s(auto_attribs=True) class UserAttributes: # the localpart of the mxid that the mapper has assigned to the user. # if `None`, the mapper has not picked a userid, and the user should be prompted to # enter one. - localpart = attr.ib(type=Optional[str]) - display_name = attr.ib(type=Optional[str], default=None) - emails = attr.ib(type=Collection[str], default=attr.Factory(list)) + localpart: Optional[str] + display_name: Optional[str] = None + emails: Collection[str] = attr.Factory(list) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class UsernameMappingSession: """Data we track about SSO sessions""" # A unique identifier for this SSO provider, e.g. "oidc" or "saml". - auth_provider_id = attr.ib(type=str) + auth_provider_id: str # user ID on the IdP server - remote_user_id = attr.ib(type=str) + remote_user_id: str # attributes returned by the ID mapper - display_name = attr.ib(type=Optional[str]) - emails = attr.ib(type=Collection[str]) + display_name: Optional[str] + emails: Collection[str] # An optional dictionary of extra attributes to be provided to the client in the # login response. - extra_login_attributes = attr.ib(type=Optional[JsonDict]) + extra_login_attributes: Optional[JsonDict] # where to redirect the client back to - client_redirect_url = attr.ib(type=str) + client_redirect_url: str # expiry time for the session, in milliseconds - expiry_time_ms = attr.ib(type=int) + expiry_time_ms: int # choices made by the user - chosen_localpart = attr.ib(type=Optional[str], default=None) - use_display_name = attr.ib(type=bool, default=True) - emails_to_use = attr.ib(type=Collection[str], default=()) - terms_accepted_version = attr.ib(type=Optional[str], default=None) + chosen_localpart: Optional[str] = None + use_display_name: bool = True + emails_to_use: Collection[str] = () + terms_accepted_version: Optional[str] = None # the HTTP cookie used to track the mapping session id diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index fbafffd69b..203e995bb7 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -32,9 +32,9 @@ class ProxyConnectError(ConnectError): pass -@attr.s +@attr.s(auto_attribs=True) class ProxyCredentials: - username_password = attr.ib(type=bytes) + username_password: bytes def as_proxy_authorization_value(self) -> bytes: """ diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index deedde0b5b..2e668363b2 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -123,37 +123,37 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC): pass -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class MatrixFederationRequest: - method = attr.ib(type=str) + method: str """HTTP method """ - path = attr.ib(type=str) + path: str """HTTP path """ - destination = attr.ib(type=str) + destination: str """The remote server to send the HTTP request to. """ - json = attr.ib(default=None, type=Optional[JsonDict]) + json: Optional[JsonDict] = None """JSON to send in the body. """ - json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]]) + json_callback: Optional[Callable[[], JsonDict]] = None """A callback to generate the JSON. """ - query = attr.ib(default=None, type=Optional[dict]) + query: Optional[dict] = None """Query arguments. """ - txn_id = attr.ib(default=None, type=Optional[str]) + txn_id: Optional[str] = None """Unique ID for this request (for logging) """ - uri = attr.ib(init=False, type=bytes) + uri: bytes = attr.ib(init=False) """The URI of this request """ diff --git a/synapse/http/site.py b/synapse/http/site.py index 80f7a2ff58..c180a1d323 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -534,9 +534,9 @@ class XForwardedForRequest(SynapseRequest): @implementer(IAddress) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class _XForwardedForAddress: - host = attr.ib(type=str) + host: str class SynapseSite(Site): diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 8202d0494d..475756f1db 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -39,7 +39,7 @@ from twisted.python.failure import Failure logger = logging.getLogger(__name__) -@attr.s +@attr.s(slots=True, auto_attribs=True) @implementer(IPushProducer) class LogProducer: """ @@ -54,10 +54,10 @@ class LogProducer: # This is essentially ITCPTransport, but that is missing certain fields # (connected and registerProducer) which are part of the implementation. - transport = attr.ib(type=Connection) - _format = attr.ib(type=Callable[[logging.LogRecord], str]) - _buffer = attr.ib(type=deque) - _paused = attr.ib(default=False, type=bool, init=False) + transport: Connection + _format: Callable[[logging.LogRecord], str] + _buffer: Deque[logging.LogRecord] + _paused: bool = attr.ib(default=False, init=False) def pauseProducing(self): self._paused = True diff --git a/synapse/logging/context.py b/synapse/logging/context.py index d4ee893376..c31c2960ad 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -193,7 +193,7 @@ class ContextResourceUsage: return res -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ContextRequest: """ A bundle of attributes from the SynapseRequest object. @@ -205,15 +205,15 @@ class ContextRequest: their children. """ - request_id = attr.ib(type=str) - ip_address = attr.ib(type=str) - site_tag = attr.ib(type=str) - requester = attr.ib(type=Optional[str]) - authenticated_entity = attr.ib(type=Optional[str]) - method = attr.ib(type=str) - url = attr.ib(type=str) - protocol = attr.ib(type=str) - user_agent = attr.ib(type=str) + request_id: str + ip_address: str + site_tag: str + requester: Optional[str] + authenticated_entity: Optional[str] + method: str + url: str + protocol: str + user_agent: str LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 622445e9f4..5672d60de3 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -251,7 +251,7 @@ try: class _WrappedRustReporter(BaseReporter): """Wrap the reporter to ensure `report_span` never throws.""" - _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter)) + _reporter: Reporter = attr.Factory(Reporter) def set_process(self, *args, **kwargs): return self._reporter.set_process(*args, **kwargs) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ceef57ad88..269c2b989e 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -76,19 +76,17 @@ class RegistryProxy: yield metric -@attr.s(slots=True, hash=True) +@attr.s(slots=True, hash=True, auto_attribs=True) class LaterGauge: - name = attr.ib(type=str) - desc = attr.ib(type=str) - labels = attr.ib(hash=False, type=Optional[Iterable[str]]) + name: str + desc: str + labels: Optional[Iterable[str]] = attr.ib(hash=False) # callback: should either return a value (if there are no labels for this metric), # or dict mapping from a label tuple to a value - caller = attr.ib( - type=Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] - ) + caller: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] def collect(self) -> Iterable[Metric]: @@ -157,7 +155,9 @@ class InFlightGauge(Generic[MetricsEntry]): # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. self._metrics_class: Type[MetricsEntry] = attr.make_class( - "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True + "_MetricsEntry", + attrs={x: attr.ib(default=0) for x in sub_metrics}, + slots=True, ) # Counts number of in flight blocks for a given set of label values diff --git a/synapse/notifier.py b/synapse/notifier.py index bbabdb0587..41fd94d772 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -193,15 +193,15 @@ class EventStreamResult: return bool(self.events) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _PendingRoomEventEntry: - event_pos = attr.ib(type=PersistedEventPosition) - extra_users = attr.ib(type=Collection[UserID]) + event_pos: PersistedEventPosition + extra_users: Collection[UserID] - room_id = attr.ib(type=str) - type = attr.ib(type=str) - state_key = attr.ib(type=Optional[str]) - membership = attr.ib(type=Optional[str]) + room_id: str + type: str + state_key: Optional[str] + membership: Optional[str] class Notifier: diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 820f6f3f7e..5176a1c186 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -23,25 +23,25 @@ if TYPE_CHECKING: from synapse.server import HomeServer -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PusherConfig: """Parameters necessary to configure a pusher.""" - id = attr.ib(type=Optional[str]) - user_name = attr.ib(type=str) - access_token = attr.ib(type=Optional[int]) - profile_tag = attr.ib(type=str) - kind = attr.ib(type=str) - app_id = attr.ib(type=str) - app_display_name = attr.ib(type=str) - device_display_name = attr.ib(type=str) - pushkey = attr.ib(type=str) - ts = attr.ib(type=int) - lang = attr.ib(type=Optional[str]) - data = attr.ib(type=Optional[JsonDict]) - last_stream_ordering = attr.ib(type=int) - last_success = attr.ib(type=Optional[int]) - failing_since = attr.ib(type=Optional[int]) + id: Optional[str] + user_name: str + access_token: Optional[int] + profile_tag: str + kind: str + app_id: str + app_display_name: str + device_display_name: str + pushkey: str + ts: int + lang: Optional[str] + data: Optional[JsonDict] + last_stream_ordering: int + last_success: Optional[int] + failing_since: Optional[int] def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" @@ -57,12 +57,12 @@ class PusherConfig: } -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ThrottleParams: """Parameters for controlling the rate of sending pushes via email.""" - last_sent_ts = attr.ib(type=int) - throttle_ms = attr.ib(type=int) + last_sent_ts: int + throttle_ms: int class Pusher(metaclass=abc.ABCMeta): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 009d8e77b0..bee660893b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -298,7 +298,7 @@ RulesByUser = Dict[str, List[Rule]] StateGroup = Union[object, int] -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class RulesForRoomData: """The data stored in the cache by `RulesForRoom`. @@ -307,29 +307,29 @@ class RulesForRoomData: """ # event_id -> (user_id, state) - member_map = attr.ib(type=MemberMap, factory=dict) + member_map: MemberMap = attr.Factory(dict) # user_id -> rules - rules_by_user = attr.ib(type=RulesByUser, factory=dict) + rules_by_user: RulesByUser = attr.Factory(dict) # The last state group we updated the caches for. If the state_group of # a new event comes along, we know that we can just return the cached # result. # On invalidation of the rules themselves (if the user changes them), # we invalidate everything and set state_group to `object()` - state_group = attr.ib(type=StateGroup, factory=object) + state_group: StateGroup = attr.Factory(object) # A sequence number to keep track of when we're allowed to update the # cache. We bump the sequence number when we invalidate the cache. If # the sequence number changes while we're calculating stuff we should # not update the cache with it. - sequence = attr.ib(type=int, default=0) + sequence: int = 0 # A cache of user_ids that we *know* aren't interesting, e.g. user_ids # owned by AS's, or remote users, etc. (I.e. users we will never need to # calculate push for) # These never need to be invalidated as we will never set up push for # them. - uninteresting_user_set = attr.ib(type=Set[str], factory=set) + uninteresting_user_set: Set[str] = attr.Factory(set) class RulesForRoom: @@ -553,7 +553,7 @@ class RulesForRoom: self.data.state_group = state_group -@attr.attrs(slots=True, frozen=True) +@attr.attrs(slots=True, frozen=True, auto_attribs=True) class _Invalidation: # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules, # which means that it it is stored on the bulk_get_push_rules cache entry. In order @@ -564,8 +564,8 @@ class _Invalidation: # attrs provides suitable __hash__ and __eq__ methods, provided we remember to # set `frozen=True`. - cache = attr.ib(type=LruCache) - room_id = attr.ib(type=str) + cache: LruCache + room_id: str def __call__(self) -> None: rules_data = self.cache.get(self.room_id, None, update_metrics=False) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index a390cfcb74..4f4f1ad453 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -50,12 +50,12 @@ data part are: """ -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamRow: """A parsed row from the events replication stream""" - type = attr.ib() # str: the TypeId of one of the *EventsStreamRows - data = attr.ib() # BaseEventsStreamRow + type: str # the TypeId of one of the *EventsStreamRows + data: "BaseEventsStreamRow" class BaseEventsStreamRow: @@ -79,28 +79,28 @@ class BaseEventsStreamRow: return cls(*data) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib(type=str) - room_id = attr.ib(type=str) - type = attr.ib(type=str) - state_key = attr.ib(type=Optional[str]) - redacts = attr.ib(type=Optional[str]) - relates_to = attr.ib(type=Optional[str]) - membership = attr.ib(type=Optional[str]) - rejected = attr.ib(type=bool) + event_id: str + room_id: str + type: str + state_key: Optional[str] + redacts: Optional[str] + relates_to: Optional[str] + membership: Optional[str] + rejected: bool -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamCurrentStateRow(BaseEventsStreamRow): TypeId = "state" - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str - event_id = attr.ib() # str, optional + room_id: str + type: str + state_key: str + event_id: Optional[str] _EventRows: Tuple[Type[BaseEventsStreamRow], ...] = ( diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index fca239d8c7..9f6c251caf 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -343,7 +343,7 @@ class SpamMediaException(NotFoundError): """ -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ReadableFileWrapper: """Wrapper that allows reading a file in chunks, yielding to the reactor, and writing to a callback. @@ -354,8 +354,8 @@ class ReadableFileWrapper: CHUNK_SIZE = 2 ** 14 - clock = attr.ib(type=Clock) - path = attr.ib(type=str) + clock: Clock + path: str async def write_chunks_to(self, callback: Callable[[bytes], None]) -> None: """Reads the file in chunks and calls the callback with each chunk.""" diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 69ac8c3423..923e31587e 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -450,19 +450,19 @@ class StateHandler: return {key: state_map[ev_id] for key, ev_id in new_state.items()} -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _StateResMetrics: """Keeps track of some usage metrics about state res.""" # System and User CPU time, in seconds - cpu_time = attr.ib(type=float, default=0.0) + cpu_time: float = 0.0 # time spent on database transactions (excluding scheduling time). This roughly # corresponds to the amount of work done on the db server, excluding event fetches. - db_time = attr.ib(type=float, default=0.0) + db_time: float = 0.0 # number of events fetched from the db. - db_events = attr.ib(type=int, default=0) + db_events: int = 0 _biggest_room_by_cpu_counter = Counter( diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 2cacc7dd6c..a27cc3605c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -143,7 +143,7 @@ def make_conn( return db_conn -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class LoggingDatabaseConnection: """A wrapper around a database connection that returns `LoggingTransaction` as its cursor class. @@ -151,9 +151,9 @@ class LoggingDatabaseConnection: This is mainly used on startup to ensure that queries get logged correctly """ - conn = attr.ib(type=Connection) - engine = attr.ib(type=BaseDatabaseEngine) - default_txn_name = attr.ib(type=str) + conn: Connection + engine: BaseDatabaseEngine + default_txn_name: str def cursor( self, *, txn_name=None, after_callbacks=None, exception_callbacks=None diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 57b5ffbad3..86cab97563 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -50,16 +50,16 @@ if TYPE_CHECKING: from synapse.server import HomeServer -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DeviceKeyLookupResult: """The type returned by get_e2e_device_keys_and_signatures""" - display_name = attr.ib(type=Optional[str]) + display_name: Optional[str] # the key data from e2e_device_keys_json. Typically includes fields like # "algorithm", "keys" (including the curve25519 identity key and the ed25519 signing # key) and "signatures" (a map from (user id) to (key id/device_id) to signature.) - keys = attr.ib(type=Optional[JsonDict]) + keys: Optional[JsonDict] class EndToEndKeyBackgroundStore(SQLBaseStore): diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index dd255aefb9..cce2305597 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -69,7 +69,7 @@ event_counter = Counter( ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -80,9 +80,9 @@ class DeltaState: should e.g. be removed from `current_state_events` table. """ - to_delete = attr.ib(type=List[Tuple[str, str]]) - to_insert = attr.ib(type=StateMap[str]) - no_longer_in_room = attr.ib(type=bool, default=False) + to_delete: List[Tuple[str, str]] + to_insert: StateMap[str] + no_longer_in_room: bool = False class PersistEventsStore: @@ -2226,17 +2226,17 @@ class PersistEventsStore: ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _LinkMap: """A helper type for tracking links between chains.""" # Stores the set of links as nested maps: source chain ID -> target chain ID # -> source sequence number -> target sequence number. - maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict) + maps: Dict[int, Dict[int, Dict[int, int]]] = attr.Factory(dict) # Stores the links that have been added (with new set to true), as tuples of # `(source chain ID, source sequence no, target chain ID, target sequence no.)` - additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set) + additions: Set[Tuple[int, int, int, int]] = attr.Factory(set) def add_link( self, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index a68f14ba48..0a96664caf 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -65,22 +65,22 @@ class _BackgroundUpdates: REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: """Return value for _calculate_chain_cover_txn.""" # The last room_id/depth/stream processed. - room_id = attr.ib(type=str) - depth = attr.ib(type=int) - stream = attr.ib(type=int) + room_id: str + depth: int + stream: int # Number of rows processed - processed_count = attr.ib(type=int) + processed_count: int # Map from room_id to last depth/stream processed for each room that we have # processed all events for (i.e. the rooms we can flip the # `has_auth_chain_index` for) - finished_room_map = attr.ib(type=Dict[str, Tuple[int, int]]) + finished_room_map: Dict[str, Tuple[int, int]] class EventsBackgroundUpdatesStore(SQLBaseStore): diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 4175c82a25..aac94fa464 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -51,7 +51,7 @@ class ExternalIDReuseException(Exception): pass -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class TokenLookupResult: """Result of looking up an access token. @@ -69,14 +69,14 @@ class TokenLookupResult: cached. """ - user_id = attr.ib(type=str) - is_guest = attr.ib(type=bool, default=False) - shadow_banned = attr.ib(type=bool, default=False) - token_id = attr.ib(type=Optional[int], default=None) - device_id = attr.ib(type=Optional[str], default=None) - valid_until_ms = attr.ib(type=Optional[int], default=None) - token_owner = attr.ib(type=str) - token_used = attr.ib(type=bool, default=False) + user_id: str + is_guest: bool = False + shadow_banned: bool = False + token_id: Optional[int] = None + device_id: Optional[str] = None + valid_until_ms: Optional[int] = None + token_owner: str = attr.ib() + token_used: bool = False # Make the token owner default to the user ID, which is the common case. @token_owner.default diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index cda80d6511..4489732fda 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1177,18 +1177,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _JoinedHostsCache: """The cached data used by the `_get_joined_hosts_cache`.""" # Dict of host to the set of their users in the room at the state group. - hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) + hosts_to_joined_users: Dict[str, Set[str]] = attr.Factory(dict) # The state group `hosts_to_joined_users` is derived from. Will be an object # if the instance is newly created or if the state is not based on a state # group. (An object is used as a sentinel value to ensure that it never is # equal to anything else). - state_group = attr.ib(type=Union[object, int], factory=object) + state_group: Union[object, int] = attr.Factory(object) def __len__(self): return sum(len(v) for v in self.hosts_to_joined_users.values()) diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index a1a1a6a14a..2d339b6008 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -23,19 +23,19 @@ from synapse.types import JsonDict from synapse.util import json_encoder, stringutils -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class UIAuthSessionData: - session_id = attr.ib(type=str) + session_id: str # The dictionary from the client root level, not the 'auth' key. - clientdict = attr.ib(type=JsonDict) + clientdict: JsonDict # The URI and method the session was intiatied with. These are checked at # each stage of the authentication to ensure that the asked for operation # has not changed. - uri = attr.ib(type=str) - method = attr.ib(type=str) + uri: str + method: str # A string description of the operation that the current authentication is # authorising. - description = attr.ib(type=str) + description: str class UIAuthWorkerStore(SQLBaseStore): diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 540adb8781..71584f3f74 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -21,7 +21,7 @@ from signedjson.types import VerifyKey logger = logging.getLogger(__name__) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class FetchKeyResult: - verify_key = attr.ib(type=VerifyKey) # the key itself - valid_until_ts = attr.ib(type=int) # how long we can use this key for + verify_key: VerifyKey # the key itself + valid_until_ts: int # how long we can use this key for diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index e45adfcb55..1823e18720 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -696,7 +696,7 @@ def _get_or_create_schema_state( ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _DirectoryListing: """Helper class to store schema file name and the absolute path to it. @@ -705,5 +705,5 @@ class _DirectoryListing: `file_name` attr is kept first. """ - file_name = attr.ib(type=str) - absolute_path = attr.ib(type=str) + file_name: str + absolute_path: str diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 10a46b5e82..b1536c1ca4 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -23,7 +23,7 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PaginationChunk: """Returned by relation pagination APIs. @@ -35,9 +35,9 @@ class PaginationChunk: None then there are no previous results. """ - chunk = attr.ib(type=List[JsonDict]) - next_batch = attr.ib(type=Optional[Any], default=None) - prev_batch = attr.ib(type=Optional[Any], default=None) + chunk: List[JsonDict] + next_batch: Optional[Any] = None + prev_batch: Optional[Any] = None def to_dict(self) -> Dict[str, Any]: d = {"chunk": self.chunk} @@ -51,7 +51,7 @@ class PaginationChunk: return d -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class RelationPaginationToken: """Pagination token for relation pagination API. @@ -64,8 +64,8 @@ class RelationPaginationToken: stream: The stream ordering of the boundary event. """ - topological = attr.ib(type=int) - stream = attr.ib(type=int) + topological: int + stream: int @staticmethod def from_string(string: str) -> "RelationPaginationToken": @@ -82,7 +82,7 @@ class RelationPaginationToken: return attr.astuple(self) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class AggregationPaginationToken: """Pagination token for relation aggregation pagination API. @@ -94,8 +94,8 @@ class AggregationPaginationToken: stream: The MAX stream ordering in the boundary group. """ - count = attr.ib(type=int) - stream = attr.ib(type=int) + count: int + stream: int @staticmethod def from_string(string: str) -> "AggregationPaginationToken": diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b5ba1560d1..df8b2f1088 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -45,7 +45,7 @@ logger = logging.getLogger(__name__) T = TypeVar("T") -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class StateFilter: """A filter used when querying for state. @@ -58,8 +58,8 @@ class StateFilter: appear in `types`. """ - types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]") - include_others = attr.ib(default=False, type=bool) + types: "frozendict[str, Optional[FrozenSet[str]]]" + include_others: bool = False def __attrs_post_init__(self): # If `include_others` is set we canonicalise the filter by removing diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index b8112e1c05..3c13859faa 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -762,13 +762,13 @@ class _AsyncCtxManagerWrapper(Generic[T]): return self.inner.__exit__(exc_type, exc, tb) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _MultiWriterCtxManager: """Async context manager returned by MultiWriterIdGenerator""" - id_gen = attr.ib(type=MultiWriterIdGenerator) - multiple_ids = attr.ib(type=Optional[int], default=None) - stream_ids = attr.ib(type=List[int], factory=list) + id_gen: MultiWriterIdGenerator + multiple_ids: Optional[int] = None + stream_ids: List[int] = attr.Factory(list) async def __aenter__(self) -> Union[int, List[int]]: # It's safe to run this in autocommit mode as fetching values from a diff --git a/synapse/streams/config.py b/synapse/streams/config.py index c08d591f29..b52723e2b8 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -28,14 +28,14 @@ logger = logging.getLogger(__name__) MAX_LIMIT = 1000 -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PaginationConfig: """A configuration object which stores pagination parameters.""" - from_token = attr.ib(type=Optional[StreamToken]) - to_token = attr.ib(type=Optional[StreamToken]) - direction = attr.ib(type=str) - limit = attr.ib(type=Optional[int]) + from_token: Optional[StreamToken] + to_token: Optional[StreamToken] + direction: str + limit: Optional[int] @classmethod async def from_request( diff --git a/synapse/types.py b/synapse/types.py index 74a2c51857..f89fb216a6 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -20,6 +20,7 @@ from typing import ( Any, ClassVar, Dict, + List, Mapping, Match, MutableMapping, @@ -80,7 +81,7 @@ class ISynapseReactor( """The interfaces necessary for Synapse to function.""" -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class Requester: """ Represents the user making a request @@ -98,13 +99,13 @@ class Requester: "puppeting" the user. """ - user = attr.ib(type="UserID") - access_token_id = attr.ib(type=Optional[int]) - is_guest = attr.ib(type=bool) - shadow_banned = attr.ib(type=bool) - device_id = attr.ib(type=Optional[str]) - app_service = attr.ib(type=Optional["ApplicationService"]) - authenticated_entity = attr.ib(type=str) + user: "UserID" + access_token_id: Optional[int] + is_guest: bool + shadow_banned: bool + device_id: Optional[str] + app_service: Optional["ApplicationService"] + authenticated_entity: str def serialize(self): """Converts self to a type that can be serialized as JSON, and then @@ -211,7 +212,7 @@ def get_localpart_from_id(string: str) -> str: DS = TypeVar("DS", bound="DomainSpecificString") -@attr.s(slots=True, frozen=True, repr=False) +@attr.s(slots=True, frozen=True, repr=False, auto_attribs=True) class DomainSpecificString(metaclass=abc.ABCMeta): """Common base class among ID/name strings that have a local part and a domain name, prefixed with a sigil. @@ -224,8 +225,8 @@ class DomainSpecificString(metaclass=abc.ABCMeta): SIGIL: ClassVar[str] = abc.abstractproperty() # type: ignore - localpart = attr.ib(type=str) - domain = attr.ib(type=str) + localpart: str + domain: str # Because this is a frozen class, it is deeply immutable. def __copy__(self): @@ -461,14 +462,12 @@ class RoomStreamToken: attributes, must be hashable. """ - topological = attr.ib( - type=Optional[int], + topological: Optional[int] = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(int)), ) - stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) + stream: int = attr.ib(validator=attr.validators.instance_of(int)) - instance_map = attr.ib( - type="frozendict[str, int]", + instance_map: "frozendict[str, int]" = attr.ib( factory=frozendict, validator=attr.validators.deep_mapping( key_validator=attr.validators.instance_of(str), @@ -477,7 +476,7 @@ class RoomStreamToken: ), ) - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: """Validates that both `topological` and `instance_map` aren't set.""" if self.instance_map and self.topological: @@ -593,7 +592,7 @@ class RoomStreamToken: return "s%d" % (self.stream,) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class StreamToken: """A collection of positions within multiple streams. @@ -601,20 +600,20 @@ class StreamToken: must be hashable. """ - room_key = attr.ib( - type=RoomStreamToken, validator=attr.validators.instance_of(RoomStreamToken) + room_key: RoomStreamToken = attr.ib( + validator=attr.validators.instance_of(RoomStreamToken) ) - presence_key = attr.ib(type=int) - typing_key = attr.ib(type=int) - receipt_key = attr.ib(type=int) - account_data_key = attr.ib(type=int) - push_rules_key = attr.ib(type=int) - to_device_key = attr.ib(type=int) - device_list_key = attr.ib(type=int) - groups_key = attr.ib(type=int) + presence_key: int + typing_key: int + receipt_key: int + account_data_key: int + push_rules_key: int + to_device_key: int + device_list_key: int + groups_key: int _SEPARATOR = "_" - START: "StreamToken" + START: ClassVar["StreamToken"] @classmethod async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": @@ -674,7 +673,7 @@ class StreamToken: StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class PersistedEventPosition: """Position of a newly persisted event with instance that persisted it. @@ -682,8 +681,8 @@ class PersistedEventPosition: RoomStreamToken. """ - instance_name = attr.ib(type=str) - stream = attr.ib(type=int) + instance_name: str + stream: int def persisted_after(self, token: RoomStreamToken) -> bool: return token.get_stream_pos_for_instance(self.instance_name) < self.stream @@ -733,15 +732,15 @@ class ThirdPartyInstanceID: __str__ = to_string -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class ReadReceipt: """Information about a read-receipt""" - room_id = attr.ib() - receipt_type = attr.ib() - user_id = attr.ib() - event_ids = attr.ib() - data = attr.ib() + room_id: str + receipt_type: str + user_id: str + event_ids: List[str] + data: JsonDict def get_verify_key_from_cross_signing_key(key_info): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 150a04b53e..3f7299aff7 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -309,12 +309,12 @@ def gather_results( # type: ignore[misc] return deferred.addCallback(tuple) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _LinearizerEntry: # The number of things executing. - count = attr.ib(type=int) + count: int # Deferreds for the things blocked from executing. - deferreds = attr.ib(type=collections.OrderedDict) + deferreds: collections.OrderedDict class Linearizer: diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 485ddb1893..d267703df0 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -33,7 +33,7 @@ DV = TypeVar("DV") # This class can't be generic because it uses slots with attrs. # See: https://github.com/python-attrs/attrs/issues/313 -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DictionaryEntry: # should be: Generic[DKT, DV]. """Returned when getting an entry from the cache @@ -41,14 +41,13 @@ class DictionaryEntry: # should be: Generic[DKT, DV]. full: Whether the cache has the full or dict or just some keys. If not full then not all requested keys will necessarily be present in `value` - known_absent: Keys that were looked up in the dict and were not - there. + known_absent: Keys that were looked up in the dict and were not there. value: The full or partial dict value """ - full = attr.ib(type=bool) - known_absent = attr.ib(type=Set[Any]) # should be: Set[DKT] - value = attr.ib(type=Dict[Any, Any]) # should be: Dict[DKT, DV] + full: bool + known_absent: Set[Any] # should be: Set[DKT] + value: Dict[Any, Any] # should be: Dict[DKT, DV] def __len__(self) -> int: return len(self.value) -- cgit 1.5.1 From 3e0536cd2afb5a640619bd872fc27b068ec3eb9b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 13 Jan 2022 19:44:18 -0500 Subject: Replace uses of simple_insert_many with simple_insert_many_values. (#11742) This should be (slightly) more efficient and it is simpler to have a single method for inserting multiple values. --- changelog.d/11742.misc | 1 + synapse/rest/admin/background_updates.py | 44 +++----- synapse/storage/database.py | 54 +-------- synapse/storage/databases/main/account_data.py | 4 +- synapse/storage/databases/main/deviceinbox.py | 30 ++--- synapse/storage/databases/main/devices.py | 37 ++++--- synapse/storage/databases/main/directory.py | 6 +- synapse/storage/databases/main/e2e_room_keys.py | 34 ++++-- synapse/storage/databases/main/end_to_end_keys.py | 42 ++++--- .../storage/databases/main/event_push_actions.py | 21 ++-- synapse/storage/databases/main/events.py | 122 ++++++++++----------- .../storage/databases/main/events_bg_updates.py | 30 ++--- synapse/storage/databases/main/presence.py | 33 ++++-- synapse/storage/databases/main/pusher.py | 8 +- synapse/storage/databases/main/user_directory.py | 12 +- synapse/storage/databases/state/bg_updates.py | 15 +-- synapse/storage/databases/state/store.py | 27 +---- tests/rest/admin/test_registration_tokens.py | 15 +-- tests/storage/test_event_federation.py | 26 +++-- 19 files changed, 263 insertions(+), 298 deletions(-) create mode 100644 changelog.d/11742.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11742.misc b/changelog.d/11742.misc new file mode 100644 index 0000000000..f65ccdf30a --- /dev/null +++ b/changelog.d/11742.misc @@ -0,0 +1 @@ +Minor efficiency improvements when inserting many values into the database. diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 6ec00ce0b9..e9bce22a34 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -123,34 +123,25 @@ class BackgroundUpdateStartJobRestServlet(RestServlet): job_name = body["job_name"] if job_name == "populate_stats_process_rooms": - jobs = [ - { - "update_name": "populate_stats_process_rooms", - "progress_json": "{}", - }, - ] + jobs = [("populate_stats_process_rooms", "{}", "")] elif job_name == "regenerate_directory": jobs = [ - { - "update_name": "populate_user_directory_createtables", - "progress_json": "{}", - "depends_on": "", - }, - { - "update_name": "populate_user_directory_process_rooms", - "progress_json": "{}", - "depends_on": "populate_user_directory_createtables", - }, - { - "update_name": "populate_user_directory_process_users", - "progress_json": "{}", - "depends_on": "populate_user_directory_process_rooms", - }, - { - "update_name": "populate_user_directory_cleanup", - "progress_json": "{}", - "depends_on": "populate_user_directory_process_users", - }, + ("populate_user_directory_createtables", "{}", ""), + ( + "populate_user_directory_process_rooms", + "{}", + "populate_user_directory_createtables", + ), + ( + "populate_user_directory_process_users", + "{}", + "populate_user_directory_process_rooms", + ), + ( + "populate_user_directory_cleanup", + "{}", + "populate_user_directory_process_users", + ), ] else: raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") @@ -158,6 +149,7 @@ class BackgroundUpdateStartJobRestServlet(RestServlet): try: await self._store.db_pool.simple_insert_many( table="background_updates", + keys=("update_name", "progress_json", "depends_on"), values=jobs, desc=f"admin_api_run_{job_name}", ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a27cc3605c..57cc1d76e0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -934,56 +934,6 @@ class DatabasePool: txn.execute(sql, vals) async def simple_insert_many( - self, table: str, values: List[Dict[str, Any]], desc: str - ) -> None: - """Executes an INSERT query on the named table. - - The input is given as a list of dicts, with one dict per row. - Generally simple_insert_many_values should be preferred for new code. - - Args: - table: string giving the table name - values: dict of new column names and values for them - desc: description of the transaction, for logging and metrics - """ - await self.runInteraction(desc, self.simple_insert_many_txn, table, values) - - @staticmethod - def simple_insert_many_txn( - txn: LoggingTransaction, table: str, values: List[Dict[str, Any]] - ) -> None: - """Executes an INSERT query on the named table. - - The input is given as a list of dicts, with one dict per row. - Generally simple_insert_many_values_txn should be preferred for new code. - - Args: - txn: The transaction to use. - table: string giving the table name - values: dict of new column names and values for them - """ - if not values: - return - - # This is a *slight* abomination to get a list of tuples of key names - # and a list of tuples of value names. - # - # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] - # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] - # - # The sort is to ensure that we don't rely on dictionary iteration - # order. - keys, vals = zip( - *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i) - ) - - for k in keys: - if k != keys[0]: - raise RuntimeError("All items must have the same keys") - - return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals) - - async def simple_insert_many_values( self, table: str, keys: Collection[str], @@ -1002,11 +952,11 @@ class DatabasePool: desc: description of the transaction, for logging and metrics """ await self.runInteraction( - desc, self.simple_insert_many_values_txn, table, keys, values + desc, self.simple_insert_many_txn, table, keys, values ) @staticmethod - def simple_insert_many_values_txn( + def simple_insert_many_txn( txn: LoggingTransaction, table: str, keys: Collection[str], diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 93db71d1b4..ef475e18c7 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -536,9 +536,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="ignored_users", + keys=("ignorer_user_id", "ignored_user_id"), values=[ - {"ignorer_user_id": user_id, "ignored_user_id": u} - for u in currently_ignored_users - previously_ignored_users + (user_id, u) for u in currently_ignored_users - previously_ignored_users ], ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3682cb6a81..4eca97189b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -432,14 +432,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="device_federation_outbox", + keys=( + "destination", + "stream_id", + "queued_ts", + "messages_json", + "instance_name", + ), values=[ - { - "destination": destination, - "stream_id": stream_id, - "queued_ts": now_ms, - "messages_json": json_encoder.encode(edu), - "instance_name": self._instance_name, - } + ( + destination, + stream_id, + now_ms, + json_encoder.encode(edu), + self._instance_name, + ) for destination, edu in remote_messages_by_destination.items() ], ) @@ -571,14 +578,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="device_inbox", + keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"), values=[ - { - "user_id": user_id, - "device_id": device_id, - "stream_id": stream_id, - "message_json": message_json, - "instance_name": self._instance_name, - } + (user_id, device_id, stream_id, message_json, self._instance_name) for user_id, messages_by_device in local_by_user_then_device.items() for device_id, message_json in messages_by_device.items() ], diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 52fbf50db6..8748654b55 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1386,12 +1386,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_remote_cache", + keys=("user_id", "device_id", "content"), values=[ - { - "user_id": user_id, - "device_id": content["device_id"], - "content": json_encoder.encode(content), - } + (user_id, content["device_id"], json_encoder.encode(content)) for content in devices ], ) @@ -1479,8 +1476,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_stream", + keys=("stream_id", "user_id", "device_id"), values=[ - {"stream_id": stream_id, "user_id": user_id, "device_id": device_id} + (stream_id, user_id, device_id) for stream_id, device_id in zip(stream_ids, device_ids) ], ) @@ -1507,18 +1505,27 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_outbound_pokes", + keys=( + "destination", + "stream_id", + "user_id", + "device_id", + "sent", + "ts", + "opentracing_context", + ), values=[ - { - "destination": destination, - "stream_id": next(next_stream_id), - "user_id": user_id, - "device_id": device_id, - "sent": False, - "ts": now, - "opentracing_context": json_encoder.encode(context) + ( + destination, + next(next_stream_id), + user_id, + device_id, + False, + now, + json_encoder.encode(context) if whitelisted_homeserver(destination) else "{}", - } + ) for destination in hosts for device_id in device_ids ], diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index f76c6121e8..5903fdaf00 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -112,10 +112,8 @@ class DirectoryWorkerStore(CacheInvalidationWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="room_alias_servers", - values=[ - {"room_alias": room_alias.to_string(), "server": server} - for server in servers - ], + keys=("room_alias", "server"), + values=[(room_alias.to_string(), server) for server in servers], ) self._invalidate_cache_and_stream( diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index 0cb48b9dd7..b789a588a5 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -110,16 +110,16 @@ class EndToEndRoomKeyStore(SQLBaseStore): values = [] for (room_id, session_id, room_key) in room_keys: values.append( - { - "user_id": user_id, - "version": version_int, - "room_id": room_id, - "session_id": session_id, - "first_message_index": room_key["first_message_index"], - "forwarded_count": room_key["forwarded_count"], - "is_verified": room_key["is_verified"], - "session_data": json_encoder.encode(room_key["session_data"]), - } + ( + user_id, + version_int, + room_id, + session_id, + room_key["first_message_index"], + room_key["forwarded_count"], + room_key["is_verified"], + json_encoder.encode(room_key["session_data"]), + ) ) log_kv( { @@ -131,7 +131,19 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) await self.db_pool.simple_insert_many( - table="e2e_room_keys", values=values, desc="add_e2e_room_keys" + table="e2e_room_keys", + keys=( + "user_id", + "version", + "room_id", + "session_id", + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + values=values, + desc="add_e2e_room_keys", ) @trace diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 86cab97563..1f8447b507 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -387,15 +387,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker self.db_pool.simple_insert_many_txn( txn, table="e2e_one_time_keys_json", + keys=( + "user_id", + "device_id", + "algorithm", + "key_id", + "ts_added_ms", + "key_json", + ), values=[ - { - "user_id": user_id, - "device_id": device_id, - "algorithm": algorithm, - "key_id": key_id, - "ts_added_ms": time_now, - "key_json": json_bytes, - } + (user_id, device_id, algorithm, key_id, time_now, json_bytes) for algorithm, key_id, json_bytes in new_keys ], ) @@ -1186,15 +1187,22 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): """ await self.db_pool.simple_insert_many( "e2e_cross_signing_signatures", - [ - { - "user_id": user_id, - "key_id": item.signing_key_id, - "target_user_id": item.target_user_id, - "target_device_id": item.target_device_id, - "signature": item.signature, - } + keys=( + "user_id", + "key_id", + "target_user_id", + "target_device_id", + "signature", + ), + values=[ + ( + user_id, + item.signing_key_id, + item.target_user_id, + item.target_device_id, + item.signature, + ) for item in signatures ], - "add_e2e_signing_key", + desc="add_e2e_signing_key", ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a98e6b2593..b7c4c62222 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -875,14 +875,21 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="event_push_summary", + keys=( + "user_id", + "room_id", + "notif_count", + "unread_count", + "stream_ordering", + ), values=[ - { - "user_id": user_id, - "room_id": room_id, - "notif_count": summary.notif_count, - "unread_count": summary.unread_count, - "stream_ordering": summary.stream_ordering, - } + ( + user_id, + room_id, + summary.notif_count, + summary.unread_count, + summary.stream_ordering, + ) for ((user_id, room_id), summary) in summaries.items() if summary.old_user_id is None ], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index cce2305597..de3b48524b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -442,12 +442,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_auth", + keys=("event_id", "room_id", "auth_id"), values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - } + (event.event_id, event.room_id, auth_id) for event in events for auth_id in event.auth_event_ids() if event.is_state() @@ -675,8 +672,9 @@ class PersistEventsStore: db_pool.simple_insert_many_txn( txn, table="event_auth_chains", + keys=("event_id", "chain_id", "sequence_number"), values=[ - {"event_id": event_id, "chain_id": c_id, "sequence_number": seq} + (event_id, c_id, seq) for event_id, (c_id, seq) in new_chain_tuples.items() ], ) @@ -782,13 +780,14 @@ class PersistEventsStore: db_pool.simple_insert_many_txn( txn, table="event_auth_chain_links", + keys=( + "origin_chain_id", + "origin_sequence_number", + "target_chain_id", + "target_sequence_number", + ), values=[ - { - "origin_chain_id": source_id, - "origin_sequence_number": source_seq, - "target_chain_id": target_id, - "target_sequence_number": target_seq, - } + (source_id, source_seq, target_id, target_seq) for ( source_id, source_seq, @@ -943,20 +942,28 @@ class PersistEventsStore: txn_id = getattr(event.internal_metadata, "txn_id", None) if token_id and txn_id: to_insert.append( - { - "event_id": event.event_id, - "room_id": event.room_id, - "user_id": event.sender, - "token_id": token_id, - "txn_id": txn_id, - "inserted_ts": self._clock.time_msec(), - } + ( + event.event_id, + event.room_id, + event.sender, + token_id, + txn_id, + self._clock.time_msec(), + ) ) if to_insert: self.db_pool.simple_insert_many_txn( txn, table="event_txn_id", + keys=( + "event_id", + "room_id", + "user_id", + "token_id", + "txn_id", + "inserted_ts", + ), values=to_insert, ) @@ -1161,8 +1168,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_forward_extremities", + keys=("event_id", "room_id"), values=[ - {"event_id": ev_id, "room_id": room_id} + (ev_id, room_id) for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], @@ -1174,12 +1182,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="stream_ordering_to_exterm", + keys=("room_id", "event_id", "stream_ordering"), values=[ - { - "room_id": room_id, - "event_id": event_id, - "stream_ordering": max_stream_order, - } + (room_id, event_id, max_stream_order) for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ], @@ -1342,7 +1347,7 @@ class PersistEventsStore: d.pop("redacted_because", None) return d - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="event_json", keys=("event_id", "room_id", "internal_metadata", "json", "format_version"), @@ -1358,7 +1363,7 @@ class PersistEventsStore: ), ) - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="events", keys=( @@ -1412,7 +1417,7 @@ class PersistEventsStore: ) txn.execute(sql + clause, [False] + args) - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="state_events", keys=("event_id", "room_id", "type", "state_key"), @@ -1622,14 +1627,9 @@ class PersistEventsStore: return self.db_pool.simple_insert_many_txn( txn=txn, table="event_labels", + keys=("event_id", "label", "room_id", "topological_ordering"), values=[ - { - "event_id": event_id, - "label": label, - "room_id": room_id, - "topological_ordering": topological_ordering, - } - for label in labels + (event_id, label, room_id, topological_ordering) for label in labels ], ) @@ -1657,16 +1657,13 @@ class PersistEventsStore: vals = [] for event in events: ref_alg, ref_hash_bytes = compute_event_reference_hash(event) - vals.append( - { - "event_id": event.event_id, - "algorithm": ref_alg, - "hash": memoryview(ref_hash_bytes), - } - ) + vals.append((event.event_id, ref_alg, memoryview(ref_hash_bytes))) self.db_pool.simple_insert_many_txn( - txn, table="event_reference_hashes", values=vals + txn, + table="event_reference_hashes", + keys=("event_id", "algorithm", "hash"), + values=vals, ) def _store_room_members_txn( @@ -1689,18 +1686,25 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="room_memberships", + keys=( + "event_id", + "user_id", + "sender", + "room_id", + "membership", + "display_name", + "avatar_url", + ), values=[ - { - "event_id": event.event_id, - "user_id": event.state_key, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - "display_name": non_null_str_or_none( - event.content.get("displayname") - ), - "avatar_url": non_null_str_or_none(event.content.get("avatar_url")), - } + ( + event.event_id, + event.state_key, + event.user_id, + event.room_id, + event.membership, + non_null_str_or_none(event.content.get("displayname")), + non_null_str_or_none(event.content.get("avatar_url")), + ) for event in events ], ) @@ -2163,13 +2167,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", + keys=("event_id", "prev_event_id", "room_id", "is_state"), values=[ - { - "event_id": ev.event_id, - "prev_event_id": e_id, - "room_id": ev.room_id, - "is_state": False, - } + (ev.event_id, e_id, ev.room_id, False) for ev in events for e_id in ev.prev_event_ids() ], diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 0a96664caf..d5f0059665 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -684,13 +684,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn=txn, table="event_labels", + keys=("event_id", "label", "room_id", "topological_ordering"), values=[ - { - "event_id": event_id, - "label": label, - "room_id": event_json["room_id"], - "topological_ordering": event_json["depth"], - } + ( + event_id, + label, + event_json["room_id"], + event_json["depth"], + ) for label in event_json["content"].get( EventContentFields.LABELS, [] ) @@ -803,29 +804,19 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if not has_state: state_events.append( - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } + (event.event_id, event.room_id, event.type, event.state_key) ) if not has_event_auth: # Old, dodgy, events may have duplicate auth events, which we # need to deduplicate as we have a unique constraint. for auth_id in set(event.auth_event_ids()): - auth_events.append( - { - "room_id": event.room_id, - "event_id": event.event_id, - "auth_id": auth_id, - } - ) + auth_events.append((event.event_id, event.room_id, auth_id)) if state_events: await self.db_pool.simple_insert_many( table="state_events", + keys=("event_id", "room_id", "type", "state_key"), values=state_events, desc="_rejected_events_metadata_state_events", ) @@ -833,6 +824,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if auth_events: await self.db_pool.simple_insert_many( table="event_auth", + keys=("event_id", "room_id", "auth_id"), values=auth_events, desc="_rejected_events_metadata_event_auth", ) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index cbf9ec38f7..4f05811a77 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -129,18 +129,29 @@ class PresenceStore(PresenceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="presence_stream", + keys=( + "stream_id", + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + "instance_name", + ), values=[ - { - "stream_id": stream_id, - "user_id": state.user_id, - "state": state.state, - "last_active_ts": state.last_active_ts, - "last_federation_update_ts": state.last_federation_update_ts, - "last_user_sync_ts": state.last_user_sync_ts, - "status_msg": state.status_msg, - "currently_active": state.currently_active, - "instance_name": self._instance_name, - } + ( + stream_id, + state.user_id, + state.state, + state.last_active_ts, + state.last_federation_update_ts, + state.last_user_sync_ts, + state.status_msg, + state.currently_active, + self._instance_name, + ) for stream_id, state in zip(stream_orderings, presence_states) ], ) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 747b4f31df..cf64cd63a4 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -561,13 +561,9 @@ class PusherStore(PusherWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="deleted_pushers", + keys=("stream_id", "app_id", "pushkey", "user_id"), values=[ - { - "stream_id": stream_id, - "app_id": pusher.app_id, - "pushkey": pusher.pushkey, - "user_id": user_id, - } + (stream_id, pusher.app_id, pusher.pushkey, user_id) for stream_id, pusher in zip(stream_ids, pushers) ], ) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 0f9b8575d3..f7c778bdf2 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -105,8 +105,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): GROUP BY room_id """ txn.execute(sql) - rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] - self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + rooms = list(txn.fetchall()) + self.db_pool.simple_insert_many_txn( + txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms + ) del rooms sql = ( @@ -117,9 +119,11 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): txn.execute(sql) txn.execute("SELECT name FROM users") - users = [{"user_id": x[0]} for x in txn.fetchall()] + users = list(txn.fetchall()) - self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) + self.db_pool.simple_insert_many_txn( + txn, TEMP_TABLE + "_users", keys=("user_id",), values=users + ) new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index eb1118d2cb..5de70f31d2 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -327,14 +327,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=( + "state_group", + "room_id", + "type", + "state_key", + "event_id", + ), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in delta_state.items() ], ) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index c4c8c0021b..7614d76ac6 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -460,14 +460,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in delta_ids.items() ], ) @@ -475,14 +470,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in current_state_ids.items() ], ) @@ -589,14 +579,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": sg, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (sg, room_id, key[0], key[1], state_id) for key, state_id in curr_state.items() ], ) diff --git a/tests/rest/admin/test_registration_tokens.py b/tests/rest/admin/test_registration_tokens.py index 81f3ac7f04..8513b1d2df 100644 --- a/tests/rest/admin/test_registration_tokens.py +++ b/tests/rest/admin/test_registration_tokens.py @@ -223,20 +223,13 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase): # Create all possible single character tokens tokens = [] for c in string.ascii_letters + string.digits + "._~-": - tokens.append( - { - "token": c, - "uses_allowed": None, - "pending": 0, - "completed": 0, - "expiry_time": None, - } - ) + tokens.append((c, None, 0, 0, None)) self.get_success( self.store.db_pool.simple_insert_many( "registration_tokens", - tokens, - "create_all_registration_tokens", + keys=("token", "uses_allowed", "pending", "completed", "expiry_time"), + values=tokens, + desc="create_all_registration_tokens", ) ) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index ecfda7677e..632bbc9de7 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -515,17 +515,23 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): self.get_success( self.store.db_pool.simple_insert_many( table="federation_inbound_events_staging", + keys=( + "origin", + "room_id", + "received_ts", + "event_id", + "event_json", + "internal_metadata", + ), values=[ - { - "origin": "some_origin", - "room_id": room_id, - "received_ts": 0, - "event_id": f"$fake_event_id_{i + 1}", - "event_json": json_encoder.encode( - {"prev_events": [f"$fake_event_id_{i}"]} - ), - "internal_metadata": "{}", - } + ( + "some_origin", + room_id, + 0, + f"$fake_event_id_{i + 1}", + json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}), + "{}", + ) for i in range(500) ], desc="test_prune_inbound_federation_queue", -- cgit 1.5.1 From 6b241f5286f05d4d8dc0569e90388713a956d038 Mon Sep 17 00:00:00 2001 From: Daniel Sonck Date: Mon, 17 Jan 2022 12:42:51 +0100 Subject: Make pagination of rooms in admin api stable (#11737) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Always add state.room_id after the configurable ORDER BY. Otherwise, for any sort, certain pages can contain results from other pages. (Especially when sorting by creator, since there may be many rooms by the same creator) * Document different order direction of numerical fields "joined_members", "joined_local_members", "version" and "state_events" are ordered in descending direction by default (dir=f). Added a note in tests to explain the differences in ordering. Signed-off-by: Daniël Sonck --- changelog.d/11737.bugfix | 1 + synapse/storage/databases/main/room.py | 18 ++++++------- tests/rest/admin/test_room.py | 47 ++++++++++++++++++++-------------- 3 files changed, 38 insertions(+), 28 deletions(-) create mode 100644 changelog.d/11737.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11737.bugfix b/changelog.d/11737.bugfix new file mode 100644 index 0000000000..a293d1cfec --- /dev/null +++ b/changelog.d/11737.bugfix @@ -0,0 +1 @@ +Make the list rooms admin api sort stable. Contributed by Daniël Sonck. \ No newline at end of file diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index c0e837854a..95167116c9 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -551,24 +551,24 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): FROM room_stats_state state INNER JOIN room_stats_current curr USING (room_id) INNER JOIN rooms USING (room_id) - %s - ORDER BY %s %s + {where} + ORDER BY {order_by} {direction}, state.room_id {direction} LIMIT ? OFFSET ? - """ % ( - where_statement, - order_by_column, - "ASC" if order_by_asc else "DESC", + """.format( + where=where_statement, + order_by=order_by_column, + direction="ASC" if order_by_asc else "DESC", ) # Use a nested SELECT statement as SQL can't count(*) with an OFFSET count_sql = """ SELECT count(*) FROM ( SELECT room_id FROM room_stats_state state - %s + {where} ) AS get_room_ids - """ % ( - where_statement, + """.format( + where=where_statement, ) def _get_rooms_paginate_txn( diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index d2c8781cd4..3495a0366a 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1089,6 +1089,8 @@ class RoomTestCase(unittest.HomeserverTestCase): ) room_ids.append(room_id) + room_ids.sort() + # Request the list of rooms url = "/_synapse/admin/v1/rooms" channel = self.make_request( @@ -1360,6 +1362,12 @@ class RoomTestCase(unittest.HomeserverTestCase): room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) room_id_3 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) + # Also create a list sorted by IDs for properties that are equal (and thus sorted by room_id) + sorted_by_room_id_asc = [room_id_1, room_id_2, room_id_3] + sorted_by_room_id_asc.sort() + sorted_by_room_id_desc = sorted_by_room_id_asc.copy() + sorted_by_room_id_desc.reverse() + # Set room names in alphabetical order. room 1 -> A, 2 -> B, 3 -> C self.helper.send_state( room_id_1, @@ -1405,41 +1413,42 @@ class RoomTestCase(unittest.HomeserverTestCase): _order_test("canonical_alias", [room_id_1, room_id_2, room_id_3]) _order_test("canonical_alias", [room_id_3, room_id_2, room_id_1], reverse=True) + # Note: joined_member counts are sorted in descending order when dir=f _order_test("joined_members", [room_id_3, room_id_2, room_id_1]) _order_test("joined_members", [room_id_1, room_id_2, room_id_3], reverse=True) + # Note: joined_local_member counts are sorted in descending order when dir=f _order_test("joined_local_members", [room_id_3, room_id_2, room_id_1]) _order_test( "joined_local_members", [room_id_1, room_id_2, room_id_3], reverse=True ) - _order_test("version", [room_id_1, room_id_2, room_id_3]) - _order_test("version", [room_id_1, room_id_2, room_id_3], reverse=True) + # Note: versions are sorted in descending order when dir=f + _order_test("version", sorted_by_room_id_asc, reverse=True) + _order_test("version", sorted_by_room_id_desc) - _order_test("creator", [room_id_1, room_id_2, room_id_3]) - _order_test("creator", [room_id_1, room_id_2, room_id_3], reverse=True) + _order_test("creator", sorted_by_room_id_asc) + _order_test("creator", sorted_by_room_id_desc, reverse=True) - _order_test("encryption", [room_id_1, room_id_2, room_id_3]) - _order_test("encryption", [room_id_1, room_id_2, room_id_3], reverse=True) + _order_test("encryption", sorted_by_room_id_asc) + _order_test("encryption", sorted_by_room_id_desc, reverse=True) - _order_test("federatable", [room_id_1, room_id_2, room_id_3]) - _order_test("federatable", [room_id_1, room_id_2, room_id_3], reverse=True) + _order_test("federatable", sorted_by_room_id_asc) + _order_test("federatable", sorted_by_room_id_desc, reverse=True) - _order_test("public", [room_id_1, room_id_2, room_id_3]) - # Different sort order of SQlite and PostreSQL - # _order_test("public", [room_id_3, room_id_2, room_id_1], reverse=True) + _order_test("public", sorted_by_room_id_asc) + _order_test("public", sorted_by_room_id_desc, reverse=True) - _order_test("join_rules", [room_id_1, room_id_2, room_id_3]) - _order_test("join_rules", [room_id_1, room_id_2, room_id_3], reverse=True) + _order_test("join_rules", sorted_by_room_id_asc) + _order_test("join_rules", sorted_by_room_id_desc, reverse=True) - _order_test("guest_access", [room_id_1, room_id_2, room_id_3]) - _order_test("guest_access", [room_id_1, room_id_2, room_id_3], reverse=True) + _order_test("guest_access", sorted_by_room_id_asc) + _order_test("guest_access", sorted_by_room_id_desc, reverse=True) - _order_test("history_visibility", [room_id_1, room_id_2, room_id_3]) - _order_test( - "history_visibility", [room_id_1, room_id_2, room_id_3], reverse=True - ) + _order_test("history_visibility", sorted_by_room_id_asc) + _order_test("history_visibility", sorted_by_room_id_desc, reverse=True) + # Note: state_event counts are sorted in descending order when dir=f _order_test("state_events", [room_id_3, room_id_2, room_id_1]) _order_test("state_events", [room_id_1, room_id_2, room_id_3], reverse=True) -- cgit 1.5.1 From 251b5567ecc8fb3d6debaa3f77f6ec2620877d36 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 18 Jan 2022 13:06:04 +0000 Subject: Remove `log_function` and its uses (#11761) I've never found this terribly useful. I think it was added in the early days of Synapse, without much thought as to what would actually be useful to log, and has just been cargo-culted ever since. Rather, it tends to clutter up debug logs with useless information. --- changelog.d/11761.misc | 1 + synapse/federation/federation_client.py | 5 --- synapse/federation/federation_server.py | 3 -- synapse/federation/persistence.py | 3 -- synapse/federation/transport/client.py | 48 -------------------- synapse/handlers/events.py | 2 - synapse/handlers/federation.py | 6 --- synapse/handlers/federation_event.py | 3 -- synapse/handlers/presence.py | 2 - synapse/logging/utils.py | 76 -------------------------------- synapse/notifier.py | 3 -- synapse/state/__init__.py | 2 - synapse/storage/databases/main/events.py | 2 - 13 files changed, 1 insertion(+), 155 deletions(-) create mode 100644 changelog.d/11761.misc delete mode 100644 synapse/logging/utils.py (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11761.misc b/changelog.d/11761.misc new file mode 100644 index 0000000000..d4d997a7b9 --- /dev/null +++ b/changelog.d/11761.misc @@ -0,0 +1 @@ +Remove `log_function` utility function and its uses. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6ea4edfc71..57cf35bd92 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -56,7 +56,6 @@ from synapse.api.room_versions import ( from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.transport.client import SendJoinResponse -from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -144,7 +143,6 @@ class FederationClient(FederationBase): if destination_dict: self.pdu_destination_tried[event_id] = destination_dict - @log_function async def make_query( self, destination: str, @@ -178,7 +176,6 @@ class FederationClient(FederationBase): ignore_backoff=ignore_backoff, ) - @log_function async def query_client_keys( self, destination: str, content: JsonDict, timeout: int ) -> JsonDict: @@ -196,7 +193,6 @@ class FederationClient(FederationBase): destination, content, timeout ) - @log_function async def query_user_devices( self, destination: str, user_id: str, timeout: int = 30000 ) -> JsonDict: @@ -208,7 +204,6 @@ class FederationClient(FederationBase): destination, user_id, timeout ) - @log_function async def claim_client_keys( self, destination: str, content: JsonDict, timeout: int ) -> JsonDict: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ee71f289c8..af9cb98f67 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -58,7 +58,6 @@ from synapse.logging.context import ( run_in_background, ) from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace -from synapse.logging.utils import log_function from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -859,7 +858,6 @@ class FederationServer(FederationBase): res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]} return 200, res - @log_function async def on_query_client_keys( self, origin: str, content: Dict[str, str] ) -> Tuple[int, Dict[str, Any]]: @@ -940,7 +938,6 @@ class FederationServer(FederationBase): return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]} - @log_function async def on_openid_userinfo(self, token: str) -> Optional[str]: ts_now_ms = self._clock.time_msec() return await self.store.get_user_id_for_open_id_token(token, ts_now_ms) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 523ab1c51e..60e2e6cf01 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,6 @@ import logging from typing import Optional, Tuple from synapse.federation.units import Transaction -from synapse.logging.utils import log_function from synapse.storage.databases.main import DataStore from synapse.types import JsonDict @@ -36,7 +35,6 @@ class TransactionActions: def __init__(self, datastore: DataStore): self.store = datastore - @log_function async def have_responded( self, origin: str, transaction: Transaction ) -> Optional[Tuple[int, JsonDict]]: @@ -53,7 +51,6 @@ class TransactionActions: return await self.store.get_received_txn_response(transaction_id, origin) - @log_function async def set_response( self, origin: str, transaction: Transaction, code: int, response: JsonDict ) -> None: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 9fc4c31c93..8782586cd6 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -44,7 +44,6 @@ from synapse.api.urls import ( from synapse.events import EventBase, make_event_from_dict from synapse.federation.units import Transaction from synapse.http.matrixfederationclient import ByteParser -from synapse.logging.utils import log_function from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -62,7 +61,6 @@ class TransportLayerClient: self.server_name = hs.hostname self.client = hs.get_federation_http_client() - @log_function async def get_room_state_ids( self, destination: str, room_id: str, event_id: str ) -> JsonDict: @@ -88,7 +86,6 @@ class TransportLayerClient: try_trailing_slash_on_400=True, ) - @log_function async def get_event( self, destination: str, event_id: str, timeout: Optional[int] = None ) -> JsonDict: @@ -111,7 +108,6 @@ class TransportLayerClient: destination, path=path, timeout=timeout, try_trailing_slash_on_400=True ) - @log_function async def backfill( self, destination: str, room_id: str, event_tuples: Collection[str], limit: int ) -> Optional[JsonDict]: @@ -149,7 +145,6 @@ class TransportLayerClient: destination, path=path, args=args, try_trailing_slash_on_400=True ) - @log_function async def timestamp_to_event( self, destination: str, room_id: str, timestamp: int, direction: str ) -> Union[JsonDict, List]: @@ -185,7 +180,6 @@ class TransportLayerClient: return remote_response - @log_function async def send_transaction( self, transaction: Transaction, @@ -234,7 +228,6 @@ class TransportLayerClient: try_trailing_slash_on_400=True, ) - @log_function async def make_query( self, destination: str, @@ -254,7 +247,6 @@ class TransportLayerClient: ignore_backoff=ignore_backoff, ) - @log_function async def make_membership_event( self, destination: str, @@ -317,7 +309,6 @@ class TransportLayerClient: ignore_backoff=ignore_backoff, ) - @log_function async def send_join_v1( self, room_version: RoomVersion, @@ -336,7 +327,6 @@ class TransportLayerClient: max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) - @log_function async def send_join_v2( self, room_version: RoomVersion, @@ -355,7 +345,6 @@ class TransportLayerClient: max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) - @log_function async def send_leave_v1( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> Tuple[int, JsonDict]: @@ -372,7 +361,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def send_leave_v2( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> JsonDict: @@ -389,7 +377,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def send_knock_v1( self, destination: str, @@ -423,7 +410,6 @@ class TransportLayerClient: destination=destination, path=path, data=content ) - @log_function async def send_invite_v1( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> Tuple[int, JsonDict]: @@ -433,7 +419,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def send_invite_v2( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> JsonDict: @@ -443,7 +428,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def get_public_rooms( self, remote_server: str, @@ -516,7 +500,6 @@ class TransportLayerClient: return response - @log_function async def exchange_third_party_invite( self, destination: str, room_id: str, event_dict: JsonDict ) -> JsonDict: @@ -526,7 +509,6 @@ class TransportLayerClient: destination=destination, path=path, data=event_dict ) - @log_function async def get_event_auth( self, destination: str, room_id: str, event_id: str ) -> JsonDict: @@ -534,7 +516,6 @@ class TransportLayerClient: return await self.client.get_json(destination=destination, path=path) - @log_function async def query_client_keys( self, destination: str, query_content: JsonDict, timeout: int ) -> JsonDict: @@ -576,7 +557,6 @@ class TransportLayerClient: destination=destination, path=path, data=query_content, timeout=timeout ) - @log_function async def query_user_devices( self, destination: str, user_id: str, timeout: int ) -> JsonDict: @@ -616,7 +596,6 @@ class TransportLayerClient: destination=destination, path=path, timeout=timeout ) - @log_function async def claim_client_keys( self, destination: str, query_content: JsonDict, timeout: int ) -> JsonDict: @@ -655,7 +634,6 @@ class TransportLayerClient: destination=destination, path=path, data=query_content, timeout=timeout ) - @log_function async def get_missing_events( self, destination: str, @@ -680,7 +658,6 @@ class TransportLayerClient: timeout=timeout, ) - @log_function async def get_group_profile( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -694,7 +671,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def update_group_profile( self, destination: str, group_id: str, requester_user_id: str, content: JsonDict ) -> JsonDict: @@ -716,7 +692,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_group_summary( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -730,7 +705,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_rooms_in_group( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -798,7 +772,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_users_in_group( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -812,7 +785,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_invited_users_in_group( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -826,7 +798,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def accept_group_invite( self, destination: str, group_id: str, user_id: str, content: JsonDict ) -> JsonDict: @@ -837,7 +808,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function def join_group( self, destination: str, group_id: str, user_id: str, content: JsonDict ) -> Awaitable[JsonDict]: @@ -848,7 +818,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def invite_to_group( self, destination: str, @@ -868,7 +837,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def invite_to_group_notification( self, destination: str, group_id: str, user_id: str, content: JsonDict ) -> JsonDict: @@ -882,7 +850,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def remove_user_from_group( self, destination: str, @@ -902,7 +869,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def remove_user_from_group_notification( self, destination: str, group_id: str, user_id: str, content: JsonDict ) -> JsonDict: @@ -916,7 +882,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def renew_group_attestation( self, destination: str, group_id: str, user_id: str, content: JsonDict ) -> JsonDict: @@ -930,7 +895,6 @@ class TransportLayerClient: destination=destination, path=path, data=content, ignore_backoff=True ) - @log_function async def update_group_summary_room( self, destination: str, @@ -959,7 +923,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def delete_group_summary_room( self, destination: str, @@ -986,7 +949,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_group_categories( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -1000,7 +962,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_group_category( self, destination: str, group_id: str, requester_user_id: str, category_id: str ) -> JsonDict: @@ -1014,7 +975,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def update_group_category( self, destination: str, @@ -1034,7 +994,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def delete_group_category( self, destination: str, group_id: str, requester_user_id: str, category_id: str ) -> JsonDict: @@ -1048,7 +1007,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_group_roles( self, destination: str, group_id: str, requester_user_id: str ) -> JsonDict: @@ -1062,7 +1020,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def get_group_role( self, destination: str, group_id: str, requester_user_id: str, role_id: str ) -> JsonDict: @@ -1076,7 +1033,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def update_group_role( self, destination: str, @@ -1096,7 +1052,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def delete_group_role( self, destination: str, group_id: str, requester_user_id: str, role_id: str ) -> JsonDict: @@ -1110,7 +1065,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def update_group_summary_user( self, destination: str, @@ -1136,7 +1090,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def set_group_join_policy( self, destination: str, group_id: str, requester_user_id: str, content: JsonDict ) -> JsonDict: @@ -1151,7 +1104,6 @@ class TransportLayerClient: ignore_backoff=True, ) - @log_function async def delete_group_summary_user( self, destination: str, diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index a3add8a586..bac5de0526 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -20,7 +20,6 @@ from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state -from synapse.logging.utils import log_function from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, UserID from synapse.visibility import filter_events_for_client @@ -43,7 +42,6 @@ class EventStreamHandler: self._server_notices_sender = hs.get_server_notices_sender() self._event_serializer = hs.get_event_client_serializer() - @log_function async def get_stream( self, auth_user_id: str, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 26b8e3f43c..a37ae0ca09 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -51,7 +51,6 @@ from synapse.logging.context import ( preserve_fn, run_in_background, ) -from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationStoreRoomOnOutlierMembershipRestServlet, @@ -556,7 +555,6 @@ class FederationHandler: run_in_background(self._handle_queued_pdus, room_queue) - @log_function async def do_knock( self, target_hosts: List[str], @@ -928,7 +926,6 @@ class FederationHandler: return event - @log_function async def on_make_knock_request( self, origin: str, room_id: str, user_id: str ) -> EventBase: @@ -1039,7 +1036,6 @@ class FederationHandler: else: return [] - @log_function async def on_backfill_request( self, origin: str, room_id: str, pdu_list: List[str], limit: int ) -> List[EventBase]: @@ -1056,7 +1052,6 @@ class FederationHandler: return events - @log_function async def get_persisted_pdu( self, origin: str, event_id: str ) -> Optional[EventBase]: @@ -1118,7 +1113,6 @@ class FederationHandler: return missing_events - @log_function async def exchange_third_party_invite( self, sender_user_id: str, target_user_id: str, room_id: str, signed: JsonDict ) -> None: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 11771f3c9c..3905f60b3a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -56,7 +56,6 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context, run_in_background -from synapse.logging.utils import log_function from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -275,7 +274,6 @@ class FederationEventHandler: await self._process_received_pdu(origin, pdu, state=None) - @log_function async def on_send_membership_event( self, origin: str, event: EventBase ) -> Tuple[EventBase, EventContext]: @@ -472,7 +470,6 @@ class FederationEventHandler: return await self.persist_events_and_notify(room_id, [(event, context)]) - @log_function async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c781fefb1b..067c43ae47 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -55,7 +55,6 @@ from synapse.api.presence import UserPresenceState from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background -from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.presence import ( @@ -1542,7 +1541,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): self.clock = hs.get_clock() self.store = hs.get_datastore() - @log_function async def get_new_events( self, user: UserID, diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py deleted file mode 100644 index 4a01b902c2..0000000000 --- a/synapse/logging/utils.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from functools import wraps -from inspect import getcallargs -from typing import Callable, TypeVar, cast - -_TIME_FUNC_ID = 0 - - -def _log_debug_as_f(f, msg, msg_args): - name = f.__module__ - logger = logging.getLogger(name) - - if logger.isEnabledFor(logging.DEBUG): - lineno = f.__code__.co_firstlineno - pathname = f.__code__.co_filename - - record = logger.makeRecord( - name=name, - level=logging.DEBUG, - fn=pathname, - lno=lineno, - msg=msg, - args=msg_args, - exc_info=None, - ) - - logger.handle(record) - - -F = TypeVar("F", bound=Callable) - - -def log_function(f: F) -> F: - """Function decorator that logs every call to that function.""" - func_name = f.__name__ - - @wraps(f) - def wrapped(*args, **kwargs): - name = f.__module__ - logger = logging.getLogger(name) - level = logging.DEBUG - - if logger.isEnabledFor(level): - bound_args = getcallargs(f, *args, **kwargs) - - def format(value): - r = str(value) - if len(r) > 50: - r = r[:50] + "..." - return r - - func_args = ["%s=%s" % (k, format(v)) for k, v in bound_args.items()] - - msg_args = {"func_name": func_name, "args": ", ".join(func_args)} - - _log_debug_as_f(f, "Invoked '%(func_name)s' with args: %(args)s", msg_args) - - return f(*args, **kwargs) - - wrapped.__name__ = func_name - return cast(F, wrapped) diff --git a/synapse/notifier.py b/synapse/notifier.py index 41fd94d772..632b2245ef 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span -from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( @@ -686,7 +685,6 @@ class Notifier: else: return False - @log_function def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] @@ -700,7 +698,6 @@ class Notifier: for expired_stream in expired_streams: expired_stream.remove(self) - @log_function def _register_with_keys(self, user_stream: _NotifierUserStream): self.user_to_user_stream[user_stream.user_id] = user_stream diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 923e31587e..67e8bc6ec2 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -45,7 +45,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersio from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import ContextResourceUsage -from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo @@ -512,7 +511,6 @@ class StateResolutionHandler: self.clock.looping_call(self._report_metrics, 120 * 1000) - @log_function async def resolve_state_groups( self, room_id: str, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index de3b48524b..2be36a741a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -39,7 +39,6 @@ from synapse.api.room_versions import RoomVersions from synapse.crypto.event_signing import compute_event_reference_hash from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -from synapse.logging.utils import log_function from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -328,7 +327,6 @@ class PersistEventsStore: return existing_prevs - @log_function def _persist_events_txn( self, txn: LoggingTransaction, -- cgit 1.5.1 From 68acb0a29dcb03a0ecbcebdb95e09c5999598f42 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Jan 2022 11:38:57 -0500 Subject: Include whether the requesting user has participated in a thread. (#11577) Per updates to MSC3440. This is implement as a separate method since it needs to be cached on a per-user basis, instead of a per-thread basis. --- changelog.d/11577.feature | 1 + synapse/handlers/pagination.py | 2 +- synapse/handlers/room.py | 12 ++++-- synapse/handlers/sync.py | 4 +- synapse/rest/client/relations.py | 4 +- synapse/rest/client/room.py | 4 +- synapse/storage/databases/main/events.py | 7 +++ synapse/storage/databases/main/relations.py | 66 ++++++++++++++++++++++++----- tests/rest/client/test_relations.py | 3 ++ 9 files changed, 85 insertions(+), 18 deletions(-) create mode 100644 changelog.d/11577.feature (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11577.feature b/changelog.d/11577.feature new file mode 100644 index 0000000000..f9c8a0d5f4 --- /dev/null +++ b/changelog.d/11577.feature @@ -0,0 +1 @@ +Include whether the requesting user has participated in a thread when generating a summary for [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 472688f045..973f262964 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -537,7 +537,7 @@ class PaginationHandler: state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() - aggregations = await self.store.get_bundled_aggregations(events) + aggregations = await self.store.get_bundled_aggregations(events, user_id) time_now = self.clock.time_msec() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d47163f25..f963078e59 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1182,12 +1182,18 @@ class RoomContextHandler: results["event"] = filtered[0] # Fetch the aggregations. - aggregations = await self.store.get_bundled_aggregations([results["event"]]) + aggregations = await self.store.get_bundled_aggregations( + [results["event"]], user.to_string() + ) aggregations.update( - await self.store.get_bundled_aggregations(results["events_before"]) + await self.store.get_bundled_aggregations( + results["events_before"], user.to_string() + ) ) aggregations.update( - await self.store.get_bundled_aggregations(results["events_after"]) + await self.store.get_bundled_aggregations( + results["events_after"], user.to_string() + ) ) results["aggregations"] = aggregations diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e1df9b3106..ffc6b748e8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -637,7 +637,9 @@ class SyncHandler: # as clients will have all the necessary information. bundled_aggregations = None if limited or newly_joined_room: - bundled_aggregations = await self.store.get_bundled_aggregations(recents) + bundled_aggregations = await self.store.get_bundled_aggregations( + recents, sync_config.user.to_string() + ) return TimelineBatch( events=recents, diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 37d949a71e..8cf5ebaa07 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -118,7 +118,9 @@ class RelationPaginationServlet(RestServlet): ) # The relations returned for the requested event do include their # bundled aggregations. - aggregations = await self.store.get_bundled_aggregations(events) + aggregations = await self.store.get_bundled_aggregations( + events, requester.user.to_string() + ) serialized_events = self._event_serializer.serialize_events( events, now, bundle_aggregations=aggregations ) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index da6014900a..31fd329a38 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -663,7 +663,9 @@ class RoomEventServlet(RestServlet): if event: # Ensure there are bundled aggregations available. - aggregations = await self._store.get_bundled_aggregations([event]) + aggregations = await self._store.get_bundled_aggregations( + [event], requester.user.to_string() + ) time_now = self.clock.time_msec() event_dict = self._event_serializer.serialize_event( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2be36a741a..7278002322 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1793,6 +1793,13 @@ class PersistEventsStore: txn.call_after( self.store.get_thread_summary.invalidate, (parent_id, event.room_id) ) + # It should be safe to only invalidate the cache if the user has not + # previously participated in the thread, but that's difficult (and + # potentially error-prone) so it is always invalidated. + txn.call_after( + self.store.get_thread_participated.invalidate, + (parent_id, event.room_id, event.sender), + ) def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): """Handles keeping track of insertion events and edges/connections. diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index c6c4bd18da..2cb5d06c13 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -384,8 +384,7 @@ class RelationsWorkerStore(SQLBaseStore): async def get_thread_summary( self, event_id: str, room_id: str ) -> Tuple[int, Optional[EventBase]]: - """Get the number of threaded replies, the senders of those replies, and - the latest reply (if any) for the given event. + """Get the number of threaded replies and the latest reply (if any) for the given event. Args: event_id: Summarize the thread related to this event ID. @@ -398,7 +397,7 @@ class RelationsWorkerStore(SQLBaseStore): def _get_thread_summary_txn( txn: LoggingTransaction, ) -> Tuple[int, Optional[str]]: - # Fetch the count of threaded events and the latest event ID. + # Fetch the latest event ID in the thread. # TODO Should this only allow m.room.message events. sql = """ SELECT event_id @@ -419,6 +418,7 @@ class RelationsWorkerStore(SQLBaseStore): latest_event_id = row[0] + # Fetch the number of threaded replies. sql = """ SELECT COUNT(event_id) FROM event_relations @@ -443,6 +443,44 @@ class RelationsWorkerStore(SQLBaseStore): return count, latest_event + @cached() + async def get_thread_participated( + self, event_id: str, room_id: str, user_id: str + ) -> bool: + """Get whether the requesting user participated in a thread. + + This is separate from get_thread_summary since that can be cached across + all users while this value is specific to the requeser. + + Args: + event_id: The thread related to this event ID. + room_id: The room the event belongs to. + user_id: The user requesting the summary. + + Returns: + True if the requesting user participated in the thread, otherwise false. + """ + + def _get_thread_summary_txn(txn: LoggingTransaction) -> bool: + # Fetch whether the requester has participated or not. + sql = """ + SELECT 1 + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id = ? + AND room_id = ? + AND relation_type = ? + AND sender = ? + """ + + txn.execute(sql, (event_id, room_id, RelationTypes.THREAD, user_id)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "get_thread_summary", _get_thread_summary_txn + ) + async def events_have_relations( self, parent_ids: List[str], @@ -546,7 +584,7 @@ class RelationsWorkerStore(SQLBaseStore): ) async def _get_bundled_aggregation_for_event( - self, event: EventBase + self, event: EventBase, user_id: str ) -> Optional[Dict[str, Any]]: """Generate bundled aggregations for an event. @@ -554,6 +592,7 @@ class RelationsWorkerStore(SQLBaseStore): Args: event: The event to calculate bundled aggregations for. + user_id: The user requesting the bundled aggregations. Returns: The bundled aggregations for an event, if bundled aggregations are @@ -598,27 +637,32 @@ class RelationsWorkerStore(SQLBaseStore): # If this event is the start of a thread, include a summary of the replies. if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.get_thread_summary(event_id, room_id) + thread_count, latest_thread_event = await self.get_thread_summary( + event_id, room_id + ) + participated = await self.get_thread_participated( + event_id, room_id, user_id + ) if latest_thread_event: aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. "latest_event": latest_thread_event, "count": thread_count, + "current_user_participated": participated, } # Store the bundled aggregations in the event metadata for later use. return aggregations async def get_bundled_aggregations( - self, events: Iterable[EventBase] + self, + events: Iterable[EventBase], + user_id: str, ) -> Dict[str, Dict[str, Any]]: """Generate bundled aggregations for events. Args: events: The iterable of events to calculate bundled aggregations for. + user_id: The user requesting the bundled aggregations. Returns: A map of event ID to the bundled aggregation for the event. Not all @@ -631,7 +675,7 @@ class RelationsWorkerStore(SQLBaseStore): # TODO Parallelize. results = {} for event in events: - event_result = await self._get_bundled_aggregation_for_event(event) + event_result = await self._get_bundled_aggregation_for_event(event, user_id) if event_result is not None: results[event.event_id] = event_result diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index ee26751430..4b20ab0e3e 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -515,6 +515,9 @@ class RelationsTestCase(unittest.HomeserverTestCase): 2, actual[RelationTypes.THREAD].get("count"), ) + self.assertTrue( + actual[RelationTypes.THREAD].get("current_user_participated") + ) # The latest thread event has some fields that don't matter. self.assert_dict( { -- cgit 1.5.1 From 5572e6cc4b0f7f3a907773f29d82671202812a6d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 19 Jan 2022 19:45:36 +0000 Subject: Comments and typing for `_update_outliers_txn` (#11776) A couple of surprises for me here, so thought I'd document them --- changelog.d/11776.misc | 1 + synapse/storage/databases/main/events.py | 35 +++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 12 deletions(-) create mode 100644 changelog.d/11776.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11776.misc b/changelog.d/11776.misc new file mode 100644 index 0000000000..572ccda847 --- /dev/null +++ b/changelog.d/11776.misc @@ -0,0 +1 @@ +Add some comments and type annotations for `_update_outliers_txn`. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 7278002322..1ae1ebe108 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1254,20 +1254,22 @@ class PersistEventsStore: for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) - def _update_outliers_txn(self, txn, events_and_contexts): + def _update_outliers_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ) -> List[Tuple[EventBase, EventContext]]: """Update any outliers with new event info. - This turns outliers into ex-outliers (unless the new event was - rejected). + This turns outliers into ex-outliers (unless the new event was rejected), and + also removes any other events we have already seen from the list. Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + txn: db connection + events_and_contexts: events we are persisting Returns: - list[(EventBase, EventContext)] new list, without events which - are already in the events table. + new list, without events which are already in the events table. """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" @@ -1275,7 +1277,9 @@ class PersistEventsStore: [event.event_id for event, _ in events_and_contexts], ) - have_persisted = {event_id: outlier for event_id, outlier in txn} + have_persisted: Dict[str, bool] = { + event_id: outlier for event_id, outlier in txn + } to_remove = set() for event, context in events_and_contexts: @@ -1285,15 +1289,22 @@ class PersistEventsStore: to_remove.add(event) if context.rejected: - # If the event is rejected then we don't care if the event - # was an outlier or not. + # If the incoming event is rejected then we don't care if the event + # was an outlier or not - what we have is at least as good. continue outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as - # an outlier in the database. We now have some state at that + # an outlier in the database. We now have some state at that event # so we need to update the state_groups table with that state. + # + # Note that we do not update the stream_ordering of the event in this + # scenario. XXX: does this cause bugs? It will mean we won't send such + # events down /sync. In general they will be historical events, so that + # doesn't matter too much, but that is not always the case. + + logger.info("Updating state for ex-outlier event %s", event.event_id) # insert into event_to_state_groups. try: -- cgit 1.5.1 From f160fe18e3cc5604375f491300d12dd5c7e9b9b2 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 20 Jan 2022 13:38:44 +0000 Subject: Debug for device lists updates (#11760) Debug for #8631. I'm having a hard time tracking down what's going wrong in that issue. In the reported example, I could see server A sending federation traffic to server B and all was well. Yet B reports out-of-sync device updates from A. I couldn't see what was _in_ the events being sent from A to B. So I have added some crude logging to track - when we have updates to send to a remote HS - the edus we actually accumulate to send - when a federation transaction includes a device list update edu - when such an EDU is received This is a bit of a sledgehammer. --- changelog.d/11760.misc | 1 + synapse/federation/sender/transaction_manager.py | 12 ++++++++++++ synapse/federation/transport/server/federation.py | 15 +++++++++++++++ synapse/storage/databases/main/devices.py | 18 ++++++++++++++++++ 4 files changed, 46 insertions(+) create mode 100644 changelog.d/11760.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11760.misc b/changelog.d/11760.misc new file mode 100644 index 0000000000..6cb1b5dd49 --- /dev/null +++ b/changelog.d/11760.misc @@ -0,0 +1 @@ +Add optional debugging to investigate [issue 8631](https://github.com/matrix-org/synapse/issues/8631). \ No newline at end of file diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index ab935e5a7e..742ee57255 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -35,6 +35,7 @@ if TYPE_CHECKING: import synapse.server logger = logging.getLogger(__name__) +issue_8631_logger = logging.getLogger("synapse.8631_debug") last_pdu_ts_metric = Gauge( "synapse_federation_last_sent_pdu_time", @@ -124,6 +125,17 @@ class TransactionManager: len(pdus), len(edus), ) + if issue_8631_logger.isEnabledFor(logging.DEBUG): + DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"} + device_list_updates = [ + edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS + ] + if device_list_updates: + issue_8631_logger.debug( + "about to send txn [%s] including device list updates: %s", + transaction.transaction_id, + device_list_updates, + ) # Actually send the transaction diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 77bfd88ad0..beadfa422b 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -36,6 +36,7 @@ from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) +issue_8631_logger = logging.getLogger("synapse.8631_debug") class BaseFederationServerServlet(BaseFederationServlet): @@ -95,6 +96,20 @@ class FederationSendServlet(BaseFederationServerServlet): len(transaction_data.get("edus", [])), ) + if issue_8631_logger.isEnabledFor(logging.DEBUG): + DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"} + device_list_updates = [ + edu.content + for edu in transaction_data.get("edus", []) + if edu.edu_type in DEVICE_UPDATE_EDUS + ] + if device_list_updates: + issue_8631_logger.debug( + "received transaction [%s] including device list updates: %s", + transaction_id, + device_list_updates, + ) + except Exception as e: logger.exception(e) return 400, {"error": "Invalid transaction"} diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 8f0cd0695f..b2a5cd9a65 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -53,6 +53,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer logger = logging.getLogger(__name__) +issue_8631_logger = logging.getLogger("synapse.8631_debug") DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( "drop_device_list_streams_non_unique_indexes" @@ -229,6 +230,12 @@ class DeviceWorkerStore(SQLBaseStore): if not updates: return now_stream_id, [] + if issue_8631_logger.isEnabledFor(logging.DEBUG): + data = {(user, device): stream_id for user, device, stream_id, _ in updates} + issue_8631_logger.debug( + "device updates need to be sent to %s: %s", destination, data + ) + # get the cross-signing keys of the users in the list, so that we can # determine which of the device changes were cross-signing keys users = {r[0] for r in updates} @@ -365,6 +372,17 @@ class DeviceWorkerStore(SQLBaseStore): # and remove the length budgeting above. results.append(("org.matrix.signing_key_update", result)) + if issue_8631_logger.isEnabledFor(logging.DEBUG): + for (user_id, edu) in results: + issue_8631_logger.debug( + "device update to %s for %s from %s to %s: %s", + destination, + user_id, + from_stream_id, + last_processed_stream_id, + edu, + ) + return last_processed_stream_id, results def _get_device_updates_by_remote_txn( -- cgit 1.5.1 From dc671d3ea7e869cf765c08c755d4045feee98fe7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 24 Jan 2022 12:20:01 +0000 Subject: Fix logic for dropping old events in fed queue (#11806) Co-authored-by: Brendan Abolivier Co-authored-by: Richard van der Hoff --- changelog.d/11806.bugfix | 1 + synapse/storage/databases/main/event_federation.py | 5 +++- tests/storage/test_event_federation.py | 30 +++++++++++++++++----- 3 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 changelog.d/11806.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/11806.bugfix b/changelog.d/11806.bugfix new file mode 100644 index 0000000000..e4beaf103b --- /dev/null +++ b/changelog.d/11806.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. \ No newline at end of file diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 270b30800b..a556f17dac 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1432,7 +1432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas if room_version.event_format == EventFormatVersions.V1: for prev_event_tuple in prev_events: - if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: + if ( + not isinstance(prev_event_tuple, list) + or len(prev_event_tuple) != 2 + ): logger.info("Invalid prev_events for %s", event_id) break diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 632bbc9de7..2bc89512f8 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple, Union + import attr from parameterized import parameterized -from synapse.api.room_versions import RoomVersions +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersion, +) from synapse.events import _EventInternalMetadata from synapse.util import json_encoder @@ -506,11 +512,21 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): ) self.assertSetEqual(difference, set()) - def test_prune_inbound_federation_queue(self): - "Test that pruning of inbound federation queues work" + @parameterized.expand( + [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()] + ) + def test_prune_inbound_federation_queue(self, room_version: RoomVersion): + """Test that pruning of inbound federation queues work""" room_id = "some_room_id" + def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]: + """Account for differences in prev_events format across room versions""" + if room_version.event_format == EventFormatVersions.V1: + return prev_event_id, {} + + return prev_event_id + # Insert a bunch of events that all reference the previous one. self.get_success( self.store.db_pool.simple_insert_many( @@ -529,7 +545,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): room_id, 0, f"$fake_event_id_{i + 1}", - json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}), + json_encoder.encode( + {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]} + ), "{}", ) for i in range(500) @@ -541,12 +559,12 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): # Calling prune once should return True, i.e. a prune happen. The second # time it shouldn't. pruned = self.get_success( - self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + self.store.prune_staged_events_in_room(room_id, room_version) ) self.assertTrue(pruned) pruned = self.get_success( - self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + self.store.prune_staged_events_in_room(room_id, room_version) ) self.assertFalse(pruned) -- cgit 1.5.1