From a19d01c3d95f5dbd3a4bb181cb70dacd44135a8b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 9 Nov 2021 08:10:58 -0500 Subject: Support filtering by relations per MSC3440 (#11236) Adds experimental support for `relation_types` and `relation_senders` fields for filters. --- synapse/storage/databases/main/relations.py | 58 ++++++++++++++++++- synapse/storage/databases/main/stream.py | 86 +++++++++++++++++++++-------- 2 files changed, 121 insertions(+), 23 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 53576ad52f..907af10995 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -20,7 +20,7 @@ import attr from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction +from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -334,6 +334,62 @@ class RelationsWorkerStore(SQLBaseStore): return count, latest_event + async def events_have_relations( + self, + parent_ids: List[str], + relation_senders: Optional[List[str]], + relation_types: Optional[List[str]], + ) -> List[str]: + """Check which events have a relationship from the given senders of the + given types. + + Args: + parent_ids: The events being annotated + relation_senders: The relation senders to check. + relation_types: The relation types to check. + + Returns: + True if the event has at least one relationship from one of the given senders of the given type. + """ + # If no restrictions are given then the event has the required relations. + if not relation_senders and not relation_types: + return parent_ids + + sql = """ + SELECT relates_to_id FROM event_relations + INNER JOIN events USING (event_id) + WHERE + %s; + """ + + def _get_if_event_has_relations(txn) -> List[str]: + clauses: List[str] = [] + clause, args = make_in_list_sql_clause( + txn.database_engine, "relates_to_id", parent_ids + ) + clauses.append(clause) + + if relation_senders: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "sender", relation_senders + ) + clauses.append(clause) + args.extend(temp_args) + if relation_types: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "relation_type", relation_types + ) + clauses.append(clause) + args.extend(temp_args) + + txn.execute(sql % " AND ".join(clauses), args) + + return [row[0] for row in txn] + + return await self.db_pool.runInteraction( + "get_if_event_has_relations", _get_if_event_has_relations + ) + async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str ) -> bool: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dc7884b1c0..42dc807d17 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -272,31 +272,37 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: args = [] if event_filter.types: - clauses.append("(%s)" % " OR ".join("type = ?" for _ in event_filter.types)) + clauses.append( + "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types) + ) args.extend(event_filter.types) for typ in event_filter.not_types: - clauses.append("type != ?") + clauses.append("event.type != ?") args.append(typ) if event_filter.senders: - clauses.append("(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders)) + clauses.append( + "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders) + ) args.extend(event_filter.senders) for sender in event_filter.not_senders: - clauses.append("sender != ?") + clauses.append("event.sender != ?") args.append(sender) if event_filter.rooms: - clauses.append("(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms)) + clauses.append( + "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms) + ) args.extend(event_filter.rooms) for room_id in event_filter.not_rooms: - clauses.append("room_id != ?") + clauses.append("event.room_id != ?") args.append(room_id) if event_filter.contains_url: - clauses.append("contains_url = ?") + clauses.append("event.contains_url = ?") args.append(event_filter.contains_url) # We're only applying the "labels" filter on the database query, because applying the @@ -307,6 +313,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels)) args.extend(event_filter.labels) + # Filter on relation_senders / relation types from the joined tables. + if event_filter.relation_senders: + clauses.append( + "(%s)" + % " OR ".join( + "related_event.sender = ?" for _ in event_filter.relation_senders + ) + ) + args.extend(event_filter.relation_senders) + + if event_filter.relation_types: + clauses.append( + "(%s)" + % " OR ".join("relation_type = ?" for _ in event_filter.relation_types) + ) + args.extend(event_filter.relation_types) + return " AND ".join(clauses), args @@ -1116,7 +1139,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): bounds = generate_pagination_where_clause( direction=direction, - column_names=("topological_ordering", "stream_ordering"), + column_names=("event.topological_ordering", "event.stream_ordering"), from_token=from_bound, to_token=to_bound, engine=self.database_engine, @@ -1133,32 +1156,51 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): select_keywords = "SELECT" join_clause = "" + # Using DISTINCT in this SELECT query is quite expensive, because it + # requires the engine to sort on the entire (not limited) result set, + # i.e. the entire events table. Only use it in scenarios that could result + # in the same event ID occurring multiple times in the results. + needs_distinct = False if event_filter and event_filter.labels: # If we're not filtering on a label, then joining on event_labels will # return as many row for a single event as the number of labels it has. To # avoid this, only join if we're filtering on at least one label. - join_clause = """ + join_clause += """ LEFT JOIN event_labels USING (event_id, room_id, topological_ordering) """ if len(event_filter.labels) > 1: - # Using DISTINCT in this SELECT query is quite expensive, because it - # requires the engine to sort on the entire (not limited) result set, - # i.e. the entire events table. We only need to use it when we're - # filtering on more than two labels, because that's the only scenario - # in which we can possibly to get multiple times the same event ID in - # the results. - select_keywords += "DISTINCT" + # Multiple labels could cause the same event to appear multiple times. + needs_distinct = True + + # If there is a filter on relation_senders and relation_types join to the + # relations table. + if event_filter and ( + event_filter.relation_senders or event_filter.relation_types + ): + # Filtering by relations could cause the same event to appear multiple + # times (since there's no limit on the number of relations to an event). + needs_distinct = True + join_clause += """ + LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id) + """ + if event_filter.relation_senders: + join_clause += """ + LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id) + """ + + if needs_distinct: + select_keywords += " DISTINCT" sql = """ %(select_keywords)s - event_id, instance_name, - topological_ordering, stream_ordering - FROM events + event.event_id, event.instance_name, + event.topological_ordering, event.stream_ordering + FROM events AS event %(join_clause)s - WHERE outlier = ? AND room_id = ? AND %(bounds)s - ORDER BY topological_ordering %(order)s, - stream_ordering %(order)s LIMIT ? + WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s + ORDER BY event.topological_ordering %(order)s, + event.stream_ordering %(order)s LIMIT ? """ % { "select_keywords": select_keywords, "join_clause": join_clause, -- cgit 1.5.1 From b6f4d122efb86e3fc44e358cf573dc2caa6ff634 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 9 Nov 2021 13:11:47 +0000 Subject: Allow admins to proactively block rooms (#11228) Co-authored-by: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/11228.feature | 1 + docs/admin_api/rooms.md | 16 +++++++---- synapse/handlers/room.py | 51 ++++++++++++++++++++++++++-------- synapse/rest/admin/rooms.py | 21 +++++++++++--- synapse/storage/databases/main/room.py | 7 ++++- tests/rest/admin/test_room.py | 28 +++++++++++++++++++ 6 files changed, 103 insertions(+), 21 deletions(-) create mode 100644 changelog.d/11228.feature (limited to 'synapse/storage') diff --git a/changelog.d/11228.feature b/changelog.d/11228.feature new file mode 100644 index 0000000000..33c1756b50 --- /dev/null +++ b/changelog.d/11228.feature @@ -0,0 +1 @@ +Allow the admin [Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api) to block a room without the need to join it. diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index ab6b82a082..41a4961d00 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -396,13 +396,17 @@ The new room will be created with the user specified by the `new_room_user_id` p as room administrator and will contain a message explaining what happened. Users invited to the new room will have power level `-10` by default, and thus be unable to speak. -If `block` is `True` it prevents new joins to the old room. +If `block` is `true`, users will be prevented from joining the old room. +This option can also be used to pre-emptively block a room, even if it's unknown +to this homeserver. In this case, the room will be blocked, and no further action +will be taken. If `block` is `false`, attempting to delete an unknown room is +invalid and will be rejected as a bad request. This API will remove all trace of the old room from your database after removing all local users. If `purge` is `true` (the default), all traces of the old room will be removed from your database after removing all local users. If you do not want this to happen, set `purge` to `false`. -Depending on the amount of history being purged a call to the API may take +Depending on the amount of history being purged, a call to the API may take several minutes or longer. The local server will only have the power to move local user and room aliases to @@ -464,8 +468,9 @@ The following JSON body parameters are available: `new_room_user_id` in the new room. Ideally this will clearly convey why the original room was shut down. Defaults to `Sharing illegal content on this server is not permitted and rooms in violation will be blocked.` -* `block` - Optional. If set to `true`, this room will be added to a blocking list, preventing - future attempts to join the room. Defaults to `false`. +* `block` - Optional. If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Rooms can be blocked + even if they're not yet known to the homeserver. Defaults to `false`. * `purge` - Optional. If set to `true`, it will remove all traces of the room from your database. Defaults to `true`. * `force_purge` - Optional, and ignored unless `purge` is `true`. If set to `true`, it @@ -483,7 +488,8 @@ The following fields are returned in the JSON response body: * `failed_to_kick_users` - An array of users (`user_id`) that that were not kicked. * `local_aliases` - An array of strings representing the local aliases that were migrated from the old room to the new. -* `new_room_id` - A string representing the room ID of the new room. +* `new_room_id` - A string representing the room ID of the new room, or `null` if + no such room was created. ## Undoing room deletions diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7a95e31cbe..11af30eee7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Contains functions for performing events on rooms.""" - +"""Contains functions for performing actions on rooms.""" import itertools import logging import math @@ -31,6 +30,8 @@ from typing import ( Tuple, ) +from typing_extensions import TypedDict + from synapse.api.constants import ( EventContentFields, EventTypes, @@ -1277,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): return self.store.get_room_events_max_id(room_id) +class ShutdownRoomResponse(TypedDict): + kicked_users: List[str] + failed_to_kick_users: List[str] + local_aliases: List[str] + new_room_id: Optional[str] + + class RoomShutdownHandler: DEFAULT_MESSAGE = ( @@ -1302,7 +1310,7 @@ class RoomShutdownHandler: new_room_name: Optional[str] = None, message: Optional[str] = None, block: bool = False, - ) -> dict: + ) -> ShutdownRoomResponse: """ Shuts down a room. Moves all local users and room aliases automatically to a new room if `new_room_user_id` is set. Otherwise local users only @@ -1336,8 +1344,13 @@ class RoomShutdownHandler: Defaults to `Sharing illegal content on this server is not permitted and rooms in violation will be blocked.` block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. + If set to `True`, users will be prevented from joining the old + room. This option can also be used to pre-emptively block a room, + even if it's unknown to this homeserver. In this case, the room + will be blocked, and no further action will be taken. If `False`, + attempting to delete an unknown room is invalid. + + Defaults to `False`. Returns: a dict containing the following keys: kicked_users: An array of users (`user_id`) that were kicked. @@ -1346,7 +1359,9 @@ class RoomShutdownHandler: local_aliases: An array of strings representing the local aliases that were migrated from the old room to the new. - new_room_id: A string representing the room ID of the new room. + new_room_id: + A string representing the room ID of the new room, or None if + no such room was created. """ if not new_room_name: @@ -1357,14 +1372,28 @@ class RoomShutdownHandler: if not RoomID.is_valid(room_id): raise SynapseError(400, "%s is not a legal room ID" % (room_id,)) - if not await self.store.get_room(room_id): - raise NotFoundError("Unknown room id %s" % (room_id,)) - - # This will work even if the room is already blocked, but that is - # desirable in case the first attempt at blocking the room failed below. + # Action the block first (even if the room doesn't exist yet) if block: + # This will work even if the room is already blocked, but that is + # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) + if not await self.store.get_room(room_id): + if block: + # We allow you to block an unknown room. + return { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + else: + # But if you don't want to preventatively block another room, + # this function can't do anything useful. + raise NotFoundError( + "Cannot shut down room: unknown room id %s" % (room_id,) + ) + if new_room_user_id is not None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 05c823d9ce..a2f4edebb8 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -13,7 +13,7 @@ # limitations under the License. import logging from http import HTTPStatus -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from urllib import parse as urlparse from synapse.api.constants import EventTypes, JoinRules, Membership @@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet): # Purge room if purge: - await pagination_handler.purge_room(room_id, force=force_purge) - - return 200, ret + try: + await pagination_handler.purge_room(room_id, force=force_purge) + except NotFoundError: + if block: + # We can block unknown rooms with this endpoint, in which case + # a failed purge is expected. + pass + else: + # But otherwise, we expect this purge to have succeeded. + raise + + # Cast safety: cast away the knowledge that this is a TypedDict. + # See https://github.com/python/mypy/issues/4976#issuecomment-579883622 + # for some discussion on why this is necessary. Either way, + # `ret` is an opaque dictionary blob as far as the rest of the app cares. + return 200, cast(JsonDict, ret) class RoomMembersRestServlet(RestServlet): diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index cefc77fa0f..17b398bb69 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1751,7 +1751,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): ) async def block_room(self, room_id: str, user_id: str) -> None: - """Marks the room as blocked. Can be called multiple times. + """Marks the room as blocked. + + Can be called multiple times (though we'll only track the last user to + block this room). + + Can be called on a room unknown to this homeserver. Args: room_id: Room to block diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 46116644ce..11ec54c82e 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -14,9 +14,12 @@ import json import urllib.parse +from http import HTTPStatus from typing import List, Optional from unittest.mock import Mock +from parameterized import parameterized + import synapse.rest.admin from synapse.api.constants import EventTypes, Membership from synapse.api.errors import Codes @@ -281,6 +284,31 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self._is_blocked(self.room_id, expect=True) self._has_no_members(self.room_id) + @parameterized.expand([(True,), (False,)]) + def test_block_unknown_room(self, purge: bool) -> None: + """ + We can block an unknown room. In this case, the `purge` argument + should be ignored. + """ + room_id = "!unknown:test" + + # The room isn't already in the blocked rooms table + self._is_blocked(room_id, expect=False) + + # Request the room be blocked. + channel = self.make_request( + "DELETE", + f"/_synapse/admin/v1/rooms/{room_id}", + {"block": True, "purge": purge}, + access_token=self.admin_user_tok, + ) + + # The room is now blocked. + self.assertEqual( + HTTPStatus.OK, int(channel.result["code"]), msg=channel.result["body"] + ) + self._is_blocked(room_id) + def test_shutdown_room_consent(self): """Test that we can shutdown rooms with local users who have not yet accepted the privacy policy. This used to fail when we tried to -- cgit 1.5.1 From a026695083d66f9fc67a892b59eee66f039e2038 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 9 Nov 2021 14:31:15 +0000 Subject: Clarifications and small fixes to to-device related code (#11247) Co-authored-by: Patrick Cloke --- changelog.d/11247.misc | 1 + synapse/handlers/appservice.py | 24 +++++++++++++++++---- synapse/handlers/devicemessage.py | 31 +++++++++++++++++++++++---- synapse/storage/databases/main/appservice.py | 8 +++---- synapse/storage/databases/main/deviceinbox.py | 23 +++++++++++++++++--- tests/handlers/test_appservice.py | 8 +++++-- 6 files changed, 78 insertions(+), 17 deletions(-) create mode 100644 changelog.d/11247.misc (limited to 'synapse/storage') diff --git a/changelog.d/11247.misc b/changelog.d/11247.misc new file mode 100644 index 0000000000..5ce701560e --- /dev/null +++ b/changelog.d/11247.misc @@ -0,0 +1 @@ +Clean up code relating to to-device messages and sending ephemeral events to application services. \ No newline at end of file diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ddc9105ee9..9abdad262b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -188,7 +188,7 @@ class ApplicationServicesHandler: self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Optional[Collection[Union[str, UserID]]] = None, + users: Collection[Union[str, UserID]], ) -> None: """ This is called by the notifier in the background when an ephemeral event is handled @@ -203,7 +203,9 @@ class ApplicationServicesHandler: value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into - them. + receiving them by setting `push_ephemeral` to true in their registration + file. Note that while MSC2409 is experimental, this option is called + `de.sorunome.msc2409.push_ephemeral`. Appservices will only receive ephemeral events that fall within their registered user and room namespaces. @@ -214,6 +216,7 @@ class ApplicationServicesHandler: if not self.notify_appservices: return + # Ignore any unsupported streams if stream_key not in ("typing_key", "receipt_key", "presence_key"): return @@ -230,18 +233,25 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Check whether there are any appservices which have registered to receive + # ephemeral events. + # + # Note that whether these events are actually relevant to these appservices + # is decided later on. services = [ service for service in self.store.get_app_services() if service.supports_ephemeral ] if not services: + # Bail out early if none of the target appservices have explicitly registered + # to receive these ephemeral events. return # We only start a new background process if necessary rather than # optimistically (to cut down on overhead). self._notify_interested_services_ephemeral( - services, stream_key, new_token, users or [] + services, stream_key, new_token, users ) @wrap_as_background_process("notify_interested_services_ephemeral") @@ -252,7 +262,7 @@ class ApplicationServicesHandler: new_token: int, users: Collection[Union[str, UserID]], ) -> None: - logger.debug("Checking interested services for %s" % (stream_key)) + logger.debug("Checking interested services for %s", stream_key) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": @@ -345,6 +355,9 @@ class ApplicationServicesHandler: Args: service: The application service to check for which events it should receive. + new_token: A receipts event stream token. Purely used to double-check that the + from_token we pull from the database isn't greater than or equal to this + token. Prevents accidentally duplicating work. Returns: A list of JSON dictionaries containing data derived from the read receipts that @@ -382,6 +395,9 @@ class ApplicationServicesHandler: Args: service: The application service that ephemeral events are being sent to. users: The users that should receive the presence update. + new_token: A presence update stream token. Purely used to double-check that the + from_token we pull from the database isn't greater than or equal to this + token. Prevents accidentally duplicating work. Returns: A list of json dictionaries containing data derived from the presence events diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index b6a2a34ab7..b582266af9 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -89,6 +89,13 @@ class DeviceMessageHandler: ) async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: + """ + Handle receiving to-device messages from remote homeservers. + + Args: + origin: The remote homeserver. + content: The JSON dictionary containing the to-device messages. + """ local_messages = {} sender_user_id = content["sender"] if origin != get_domain_from_id(sender_user_id): @@ -135,12 +142,16 @@ class DeviceMessageHandler: message_type, sender_user_id, by_device ) - stream_id = await self.store.add_messages_from_remote_to_device_inbox( + # Add messages to the database. + # Retrieve the stream id of the last-processed to-device message. + last_stream_id = await self.store.add_messages_from_remote_to_device_inbox( origin, message_id, local_messages ) + # Notify listeners that there are new to-device messages to process, + # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + "to_device_key", last_stream_id, users=local_messages.keys() ) async def _check_for_unknown_devices( @@ -195,6 +206,14 @@ class DeviceMessageHandler: message_type: str, messages: Dict[str, Dict[str, JsonDict]], ) -> None: + """ + Handle a request from a user to send to-device message(s). + + Args: + requester: The user that is sending the to-device messages. + message_type: The type of to-device messages that are being sent. + messages: A dictionary containing recipients mapped to messages intended for them. + """ sender_user_id = requester.user.to_string() message_id = random_string(16) @@ -257,12 +276,16 @@ class DeviceMessageHandler: "org.matrix.opentracing_context": json_encoder.encode(context), } - stream_id = await self.store.add_messages_to_device_inbox( + # Add messages to the database. + # Retrieve the stream id of the last-processed to-device message. + last_stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) + # Notify listeners that there are new to-device messages to process, + # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + "to_device_key", last_stream_id, users=local_messages.keys() ) if self.federation_sender: diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 2da2659f41..baec35ee27 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore( ) async def set_type_stream_id_for_appservice( - self, service: ApplicationService, type: str, pos: Optional[int] + self, service: ApplicationService, stream_type: str, pos: Optional[int] ) -> None: - if type not in ("read_receipt", "presence"): + if stream_type not in ("read_receipt", "presence"): raise ValueError( "Expected type to be a valid application stream id type, got %s" - % (type,) + % (stream_type,) ) def set_type_stream_id_for_appservice_txn(txn): - stream_id_type = "%s_stream_id" % type + stream_id_type = "%s_stream_id" % stream_type txn.execute( "UPDATE application_services_state SET %s = ? WHERE as_id=?" % stream_id_type, diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 264e625bd7..ae3afdd5d2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit: The maximum number of messages to retrieve. Returns: - A list of messages for the device and where in the stream the messages got to. + A tuple containing: + * A list of messages for the device. + * The max stream token of these messages. There may be more to retrieve + if the given limit was reached. """ has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_stream_id @@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn.execute( sql, (user_id, device_id, last_stream_id, current_stream_id, limit) ) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (destination, last_stream_id, current_stream_id, limit)) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): """Used to send messages from this server. Args: - local_messages_by_user_and_device: - Dictionary of user_id to device_id to message. + local_messages_by_user_then_device: + Dictionary of recipient user_id to recipient device_id to message. remote_messages_by_destination: Dictionary of destination server_name to the EDU JSON to send. diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 1f6a924452..d6f14e2dba 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -272,7 +272,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): make_awaitable(([event], None)) ) - self.handler.notify_interested_services_ephemeral("receipt_key", 580) + self.handler.notify_interested_services_ephemeral( + "receipt_key", 580, ["@fakerecipient:example.com"] + ) self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( interested_service, [event] ) @@ -300,7 +302,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): make_awaitable(([event], None)) ) - self.handler.notify_interested_services_ephemeral("receipt_key", 579) + self.handler.notify_interested_services_ephemeral( + "receipt_key", 580, ["@fakerecipient:example.com"] + ) self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() def _mkservice(self, is_interested, protocols=None): -- cgit 1.5.1 From 64ef25391d22795463ebf3c48604f7aee1690fe4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 11 Nov 2021 08:47:31 -0500 Subject: Add type hints to some storage classes (#11307) --- changelog.d/11307.misc | 1 + mypy.ini | 7 ---- synapse/storage/databases/main/censor_events.py | 30 +++++++------- synapse/storage/databases/main/deviceinbox.py | 52 +++++++++++++++++-------- synapse/storage/databases/main/filtering.py | 6 ++- synapse/storage/databases/main/lock.py | 6 ++- synapse/storage/databases/main/openid.py | 17 +++++++- synapse/storage/databases/main/tags.py | 27 ++++++++----- synapse/storage/util/id_generators.py | 24 +++++++++++- 9 files changed, 116 insertions(+), 54 deletions(-) create mode 100644 changelog.d/11307.misc (limited to 'synapse/storage') diff --git a/changelog.d/11307.misc b/changelog.d/11307.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11307.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 989116a1a8..a06c7fc66b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -27,8 +27,6 @@ exclude = (?x) |synapse/storage/databases/main/__init__.py |synapse/storage/databases/main/account_data.py |synapse/storage/databases/main/cache.py - |synapse/storage/databases/main/censor_events.py - |synapse/storage/databases/main/deviceinbox.py |synapse/storage/databases/main/devices.py |synapse/storage/databases/main/directory.py |synapse/storage/databases/main/e2e_room_keys.py @@ -38,19 +36,15 @@ exclude = (?x) |synapse/storage/databases/main/events_bg_updates.py |synapse/storage/databases/main/events_forward_extremities.py |synapse/storage/databases/main/events_worker.py - |synapse/storage/databases/main/filtering.py |synapse/storage/databases/main/group_server.py - |synapse/storage/databases/main/lock.py |synapse/storage/databases/main/media_repository.py |synapse/storage/databases/main/metrics.py |synapse/storage/databases/main/monthly_active_users.py - |synapse/storage/databases/main/openid.py |synapse/storage/databases/main/presence.py |synapse/storage/databases/main/profile.py |synapse/storage/databases/main/purge_events.py |synapse/storage/databases/main/push_rule.py |synapse/storage/databases/main/receipts.py - |synapse/storage/databases/main/rejections.py |synapse/storage/databases/main/room.py |synapse/storage/databases/main/room_batch.py |synapse/storage/databases/main/roommember.py @@ -59,7 +53,6 @@ exclude = (?x) |synapse/storage/databases/main/state.py |synapse/storage/databases/main/state_deltas.py |synapse/storage/databases/main/stats.py - |synapse/storage/databases/main/tags.py |synapse/storage/databases/main/transactions.py |synapse/storage/databases/main/user_directory.py |synapse/storage/databases/main/user_erasure_store.py diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index eee07227ef..0f56e10220 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -13,12 +13,12 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from synapse.events.utils import prune_event_dict from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.util import json_encoder @@ -41,7 +41,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000) @wrap_as_background_process("_censor_redactions") - async def _censor_redactions(self): + async def _censor_redactions(self) -> None: """Censors all redactions older than the configured period that haven't been censored yet. @@ -105,7 +105,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase and original_event.internal_metadata.is_redacted() ): # Redaction was allowed - pruned_json = json_encoder.encode( + pruned_json: Optional[str] = json_encoder.encode( prune_event_dict( original_event.room_version, original_event.get_dict() ) @@ -116,7 +116,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase updates.append((redaction_id, event_id, pruned_json)) - def _update_censor_txn(txn): + def _update_censor_txn(txn: LoggingTransaction) -> None: for redaction_id, event_id, pruned_json in updates: if pruned_json: self._censor_event_txn(txn, event_id, pruned_json) @@ -130,14 +130,16 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase await self.db_pool.runInteraction("_update_censor_txn", _update_censor_txn) - def _censor_event_txn(self, txn, event_id, pruned_json): + def _censor_event_txn( + self, txn: LoggingTransaction, event_id: str, pruned_json: str + ) -> None: """Censor an event by replacing its JSON in the event_json table with the provided pruned JSON. Args: - txn (LoggingTransaction): The database transaction. - event_id (str): The ID of the event to censor. - pruned_json (str): The pruned JSON + txn: The database transaction. + event_id: The ID of the event to censor. + pruned_json: The pruned JSON """ self.db_pool.simple_update_one_txn( txn, @@ -157,7 +159,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase # Try to retrieve the event's content from the database or the event cache. event = await self.get_event(event_id) - def delete_expired_event_txn(txn): + def delete_expired_event_txn(txn: LoggingTransaction) -> None: # Delete the expiry timestamp associated with this event from the database. self._delete_event_expiry_txn(txn, event_id) @@ -194,14 +196,14 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase "delete_expired_event", delete_expired_event_txn ) - def _delete_event_expiry_txn(self, txn, event_id): + def _delete_event_expiry_txn(self, txn: LoggingTransaction, event_id: str) -> None: """Delete the expiry timestamp associated with an event ID without deleting the actual event. Args: - txn (LoggingTransaction): The transaction to use to perform the deletion. - event_id (str): The event ID to delete the associated expiry timestamp of. + txn: The transaction to use to perform the deletion. + event_id: The event ID to delete the associated expiry timestamp of. """ - return self.db_pool.simple_delete_txn( + self.db_pool.simple_delete_txn( txn=txn, table="event_expiry", keyvalues={"event_id": event_id} ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index ae3afdd5d2..7c0f953365 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -1,4 +1,5 @@ # Copyright 2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,9 +20,17 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.engines import PostgresEngine -from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + MultiWriterIdGenerator, + StreamIdGenerator, +) from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache @@ -34,14 +43,21 @@ logger = logging.getLogger(__name__) class DeviceInboxWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. - self._last_device_delete_cache = ExpiringCache( + self._last_device_delete_cache: ExpiringCache[ + Tuple[str, Optional[str]], int + ] = ExpiringCache( cache_name="last_device_delete_cache", clock=self._clock, max_len=10000, @@ -53,14 +69,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._instance_name in hs.config.worker.writers.to_device ) - self._device_inbox_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - stream_name="to_device", - instance_name=self._instance_name, - tables=[("device_inbox", "instance_name", "stream_id")], - sequence_name="device_inbox_sequence", - writers=hs.config.worker.writers.to_device, + self._device_inbox_id_gen: AbstractStreamIdGenerator = ( + MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="to_device", + instance_name=self._instance_name, + tables=[("device_inbox", "instance_name", "stream_id")], + sequence_name="device_inbox_sequence", + writers=hs.config.worker.writers.to_device, + ) ) else: self._can_write_to_device = True @@ -101,6 +119,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == ToDeviceStream.NAME: + # If replication is happening than postgres must be being used. + assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator) self._device_inbox_id_gen.advance(instance_name, token) for row in rows: if row.entity.startswith("@"): @@ -220,11 +240,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) # Update the cache, ensuring that we only ever increase the value - last_deleted_stream_id = self._last_device_delete_cache.get( + updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 ) self._last_device_delete_cache[(user_id, device_id)] = max( - last_deleted_stream_id, up_to_stream_id + updated_last_deleted_stream_id, up_to_stream_id ) return count @@ -432,7 +452,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) async with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_msec() + now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id ) @@ -483,7 +503,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) async with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_msec() + now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_from_remote_to_device_inbox", add_messages_txn, diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 434986fa64..cf842803bc 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,7 @@ from canonicaljson import encode_canonical_json from synapse.api.errors import Codes, SynapseError from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import LoggingTransaction from synapse.types import JsonDict from synapse.util.caches.descriptors import cached @@ -49,7 +51,7 @@ class FilteringStore(SQLBaseStore): # Need an atomic transaction to SELECT the maximal ID so far then # INSERT a new one - def _do_txn(txn): + def _do_txn(txn: LoggingTransaction) -> int: sql = ( "SELECT filter_id FROM user_filters " "WHERE user_id = ? AND filter_json = ?" @@ -61,7 +63,7 @@ class FilteringStore(SQLBaseStore): sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?" txn.execute(sql, (user_localpart,)) - max_id = txn.fetchone()[0] + max_id = txn.fetchone()[0] # type: ignore[index] if max_id is None: filter_id = 0 else: diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 3d0df0cbd4..a540f7fb26 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -13,7 +13,7 @@ # limitations under the License. import logging from types import TracebackType -from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type +from typing import TYPE_CHECKING, Optional, Tuple, Type from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -62,7 +62,9 @@ class LockStore(SQLBaseStore): # A map from `(lock_name, lock_key)` to the token of any locks that we # think we currently hold. - self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary() + self._live_tokens: WeakValueDictionary[ + Tuple[str, str], Lock + ] = WeakValueDictionary() # When we shut down we want to remove the locks. Technically this can # lead to a race, as we may drop the lock while we are still processing. diff --git a/synapse/storage/databases/main/openid.py b/synapse/storage/databases/main/openid.py index 2aac64901b..a46685219f 100644 --- a/synapse/storage/databases/main/openid.py +++ b/synapse/storage/databases/main/openid.py @@ -1,6 +1,21 @@ +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from typing import Optional from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction class OpenIdStore(SQLBaseStore): @@ -20,7 +35,7 @@ class OpenIdStore(SQLBaseStore): async def get_user_id_for_open_id_token( self, token: str, ts_now_ms: int ) -> Optional[str]: - def get_user_id_for_token_txn(txn): + def get_user_id_for_token_txn(txn: LoggingTransaction) -> Optional[str]: sql = ( "SELECT user_id FROM open_id_tokens" " WHERE token = ? AND ? <= ts_valid_until_ms" diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index f93ff0a545..8f510de53d 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -1,5 +1,6 @@ # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +15,10 @@ # limitations under the License. import logging -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, cast from synapse.storage._base import db_to_json +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.account_data import AccountDataWorkerStore from synapse.types import JsonDict from synapse.util import json_encoder @@ -50,7 +52,7 @@ class TagsWorkerStore(AccountDataWorkerStore): async def get_all_updated_tags( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + ) -> Tuple[List[Tuple[int, Tuple[str, str, str]]], int, bool]: """Get updates for tags replication stream. Args: @@ -75,7 +77,9 @@ class TagsWorkerStore(AccountDataWorkerStore): if last_id == current_id: return [], current_id, False - def get_all_updated_tags_txn(txn): + def get_all_updated_tags_txn( + txn: LoggingTransaction, + ) -> List[Tuple[int, str, str]]: sql = ( "SELECT stream_id, user_id, room_id" " FROM room_tags_revisions as r" @@ -83,13 +87,16 @@ class TagsWorkerStore(AccountDataWorkerStore): " ORDER BY stream_id ASC LIMIT ?" ) txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + # mypy doesn't understand what the query is selecting. + return cast(List[Tuple[int, str, str]], txn.fetchall()) tag_ids = await self.db_pool.runInteraction( "get_all_updated_tags", get_all_updated_tags_txn ) - def get_tag_content(txn, tag_ids): + def get_tag_content( + txn: LoggingTransaction, tag_ids + ) -> List[Tuple[int, Tuple[str, str, str]]]: sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?" results = [] for stream_id, user_id, room_id in tag_ids: @@ -127,15 +134,15 @@ class TagsWorkerStore(AccountDataWorkerStore): given version Args: - user_id(str): The user to get the tags for. - stream_id(int): The earliest update to get for the user. + user_id: The user to get the tags for. + stream_id: The earliest update to get for the user. Returns: A mapping from room_id strings to lists of tag strings for all the rooms that changed since the stream_id token. """ - def get_updated_tags_txn(txn): + def get_updated_tags_txn(txn: LoggingTransaction) -> List[str]: sql = ( "SELECT room_id from room_tags_revisions" " WHERE user_id = ? AND stream_id > ?" @@ -200,7 +207,7 @@ class TagsWorkerStore(AccountDataWorkerStore): content_json = json_encoder.encode(content) - def add_tag_txn(txn, next_id): + def add_tag_txn(txn: LoggingTransaction, next_id: int) -> None: self.db_pool.simple_upsert_txn( txn, table="room_tags", @@ -224,7 +231,7 @@ class TagsWorkerStore(AccountDataWorkerStore): """ assert self._can_write_to_account_data - def remove_tag_txn(txn, next_id): + def remove_tag_txn(txn: LoggingTransaction, next_id: int) -> None: sql = ( "DELETE FROM room_tags " " WHERE user_id = ? AND room_id = ? AND tag = ?" diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 670811611f..ac56bc9a05 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -1,4 +1,5 @@ # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import heapq import logging import threading @@ -87,7 +89,25 @@ def _load_current_id( return (max if step > 0 else min)(current_id, step) -class StreamIdGenerator: +class AbstractStreamIdGenerator(metaclass=abc.ABCMeta): + @abc.abstractmethod + def get_next(self) -> AsyncContextManager[int]: + raise NotImplementedError() + + @abc.abstractmethod + def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]: + raise NotImplementedError() + + @abc.abstractmethod + def get_current_token(self) -> int: + raise NotImplementedError() + + @abc.abstractmethod + def get_current_token_for_writer(self, instance_name: str) -> int: + raise NotImplementedError() + + +class StreamIdGenerator(AbstractStreamIdGenerator): """Used to generate new stream ids when persisting events while keeping track of which transactions have been completed. @@ -209,7 +229,7 @@ class StreamIdGenerator: return self.get_current_token() -class MultiWriterIdGenerator: +class MultiWriterIdGenerator(AbstractStreamIdGenerator): """An ID generator that tracks a stream that can have multiple writers. Uses a Postgres sequence to coordinate ID assignment, but positions of other -- cgit 1.5.1 From 48278a0d09c19f008910b10ec5922327c09533f2 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Thu, 11 Nov 2021 16:01:13 +0100 Subject: Move sql file for `remove_deleted_devices_from_device_inbox` into v65 (#11303) --- changelog.d/11303.misc | 1 + .../02remove_deleted_devices_from_device_inbox.sql | 22 ---------------------- .../05remove_deleted_devices_from_device_inbox.sql | 22 ++++++++++++++++++++++ 3 files changed, 23 insertions(+), 22 deletions(-) create mode 100644 changelog.d/11303.misc delete mode 100644 synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql create mode 100644 synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql (limited to 'synapse/storage') diff --git a/changelog.d/11303.misc b/changelog.d/11303.misc new file mode 100644 index 0000000000..50af92bfa5 --- /dev/null +++ b/changelog.d/11303.misc @@ -0,0 +1 @@ +Fix an issue which prevented the 'remove deleted devices from device_inbox column' background process from running when updating from a recent Synapse version. diff --git a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql deleted file mode 100644 index fca7290741..0000000000 --- a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql +++ /dev/null @@ -1,22 +0,0 @@ -/* Copyright 2021 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - --- Remove messages from the device_inbox table which were orphaned --- when a device was deleted using Synapse earlier than 1.47.0. --- This runs as background task, but may take a bit to finish. - -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (6402, 'remove_deleted_devices_from_device_inbox', '{}'); diff --git a/synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql new file mode 100644 index 0000000000..076179123d --- /dev/null +++ b/synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Remove messages from the device_inbox table which were orphaned +-- when a device was deleted using Synapse earlier than 1.47.0. +-- This runs as background task, but may take a bit to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6505, 'remove_deleted_devices_from_device_inbox', '{}'); -- cgit 1.5.1 From 8dc666f785c653e958728a74758bdb8afb494de4 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 11 Nov 2021 16:49:28 +0000 Subject: Correct type hint for room_batch.py (#11310) Co-authored-by: Patrick Cloke --- changelog.d/11310.misc | 1 + mypy.ini | 4 +++- synapse/storage/databases/main/room_batch.py | 18 ++++++++---------- 3 files changed, 12 insertions(+), 11 deletions(-) create mode 100644 changelog.d/11310.misc (limited to 'synapse/storage') diff --git a/changelog.d/11310.misc b/changelog.d/11310.misc new file mode 100644 index 0000000000..e5c12445d5 --- /dev/null +++ b/changelog.d/11310.misc @@ -0,0 +1 @@ +Add type hints to storage classes. \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index a06c7fc66b..48dfdfa0e0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -46,7 +46,6 @@ exclude = (?x) |synapse/storage/databases/main/push_rule.py |synapse/storage/databases/main/receipts.py |synapse/storage/databases/main/room.py - |synapse/storage/databases/main/room_batch.py |synapse/storage/databases/main/roommember.py |synapse/storage/databases/main/search.py |synapse/storage/databases/main/signatures.py @@ -183,6 +182,9 @@ disallow_untyped_defs = True [mypy-synapse.storage.databases.main.client_ips] disallow_untyped_defs = True +[mypy-synapse.storage.databases.main.room_batch] +disallow_untyped_defs = True + [mypy-synapse.storage.util.*] disallow_untyped_defs = True diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index 97b2618437..39e80f6f5b 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -39,13 +39,11 @@ class RoomBatchStore(SQLBaseStore): async def store_state_group_id_for_event_id( self, event_id: str, state_group_id: int - ) -> Optional[str]: - { - await self.db_pool.simple_upsert( - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - values={"state_group": state_group_id, "event_id": event_id}, - # Unique constraint on event_id so we don't have to lock - lock=False, - ) - } + ) -> None: + await self.db_pool.simple_upsert( + table="event_to_state_groups", + keyvalues={"event_id": event_id}, + values={"state_group": state_group_id, "event_id": event_id}, + # Unique constraint on event_id so we don't have to lock + lock=False, + ) -- cgit 1.5.1 From 6a605f4a77bebbbffa2ac812fbff4fe5f252d88e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 11 Nov 2021 17:04:44 +0000 Subject: Get db signatures file to pass mypy (#11312) --- changelog.d/11312.misc | 1 + mypy.ini | 1 - synapse/events/builder.py | 12 +++++------- synapse/storage/databases/main/signatures.py | 4 ++-- 4 files changed, 8 insertions(+), 10 deletions(-) create mode 100644 changelog.d/11312.misc (limited to 'synapse/storage') diff --git a/changelog.d/11312.misc b/changelog.d/11312.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11312.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 48dfdfa0e0..3b7e1eb708 100644 --- a/mypy.ini +++ b/mypy.ini @@ -48,7 +48,6 @@ exclude = (?x) |synapse/storage/databases/main/room.py |synapse/storage/databases/main/roommember.py |synapse/storage/databases/main/search.py - |synapse/storage/databases/main/signatures.py |synapse/storage/databases/main/state.py |synapse/storage/databases/main/state_deltas.py |synapse/storage/databases/main/stats.py diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 4f409f31e1..eb39e0ae32 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -128,14 +128,12 @@ class EventBuilder: ) format_version = self.room_version.event_format + # The types of auth/prev events changes between event versions. + prev_events: Union[List[str], List[Tuple[str, Dict[str, str]]]] + auth_events: Union[List[str], List[Tuple[str, Dict[str, str]]]] if format_version == EventFormatVersions.V1: - # The types of auth/prev events changes between event versions. - auth_events: Union[ - List[str], List[Tuple[str, Dict[str, str]]] - ] = await self._store.add_event_hashes(auth_event_ids) - prev_events: Union[ - List[str], List[Tuple[str, Dict[str, str]]] - ] = await self._store.add_event_hashes(prev_event_ids) + auth_events = await self._store.add_event_hashes(auth_event_ids) + prev_events = await self._store.add_event_hashes(prev_event_ids) else: auth_events = auth_event_ids prev_events = prev_event_ids diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index ab2159c2d3..3201623fe4 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -63,12 +63,12 @@ class SignatureWorkerStore(SQLBaseStore): A list of tuples of event ID and a mapping of algorithm to base-64 encoded hash. """ hashes = await self.get_event_reference_hashes(event_ids) - hashes = { + encoded_hashes = { e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"} for e_id, h in hashes.items() } - return list(hashes.items()) + return list(encoded_hashes.items()) def _get_event_reference_hashes_txn( self, txn: Cursor, event_id: str -- cgit 1.5.1 From c99da2d0794de7240e791eba1a2f2d703fdecb60 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 11 Nov 2021 19:22:19 +0000 Subject: Annotations for user_erasure_store (#11313) I'm not sure why this was excluded---it seemed to be passing for me. But it's easy enough to fixup. --- changelog.d/11313.misc | 1 + mypy.ini | 4 +++- synapse/storage/databases/main/user_erasure_store.py | 9 +++++---- 3 files changed, 9 insertions(+), 5 deletions(-) create mode 100644 changelog.d/11313.misc (limited to 'synapse/storage') diff --git a/changelog.d/11313.misc b/changelog.d/11313.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11313.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 3b7e1eb708..f0af4ab289 100644 --- a/mypy.ini +++ b/mypy.ini @@ -53,7 +53,6 @@ exclude = (?x) |synapse/storage/databases/main/stats.py |synapse/storage/databases/main/transactions.py |synapse/storage/databases/main/user_directory.py - |synapse/storage/databases/main/user_erasure_store.py |synapse/storage/schema/ |tests/api/test_auth.py @@ -184,6 +183,9 @@ disallow_untyped_defs = True [mypy-synapse.storage.databases.main.room_batch] disallow_untyped_defs = True +[mypy-synapse.storage.databases.main.user_erasure_store] +disallow_untyped_defs = True + [mypy-synapse.storage.util.*] disallow_untyped_defs = True diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index 1ecdd40c38..f79006533f 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -14,11 +14,12 @@ from typing import Dict, Iterable -from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.util.caches.descriptors import cached, cachedList -class UserErasureWorkerStore(SQLBaseStore): +class UserErasureWorkerStore(CacheInvalidationWorkerStore): @cached() async def is_user_erased(self, user_id: str) -> bool: """ @@ -69,7 +70,7 @@ class UserErasureStore(UserErasureWorkerStore): user_id: full user_id to be erased """ - def f(txn): + def f(txn: LoggingTransaction) -> None: # first check if they are already in the list txn.execute("SELECT 1 FROM erased_users WHERE user_id = ?", (user_id,)) if txn.fetchone(): @@ -89,7 +90,7 @@ class UserErasureStore(UserErasureWorkerStore): user_id: full user_id to be un-erased """ - def f(txn): + def f(txn: LoggingTransaction) -> None: # first check if they are already in the list txn.execute("SELECT 1 FROM erased_users WHERE user_id = ?", (user_id,)) if not txn.fetchone(): -- cgit 1.5.1 From 6f8f3d4bc5283abdbc52f7376975d7f3bacfe159 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 12 Nov 2021 15:58:17 +0000 Subject: Attempt to annotate events_forward_extremities (#11314) * Make DataStore inherit from EventForwardExtremitiesStore before CacheInvalidationWorkerStore the former implicitly inherits from the latter, so they should be ordered like this when used. --- changelog.d/11314.misc | 1 + mypy.ini | 1 - synapse/storage/databases/main/__init__.py | 2 +- .../databases/main/events_forward_extremities.py | 21 +++++++++++++++------ 4 files changed, 17 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11314.misc (limited to 'synapse/storage') diff --git a/changelog.d/11314.misc b/changelog.d/11314.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11314.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 60672ea5c8..d81e964dc7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -34,7 +34,6 @@ exclude = (?x) |synapse/storage/databases/main/event_federation.py |synapse/storage/databases/main/event_push_actions.py |synapse/storage/databases/main/events_bg_updates.py - |synapse/storage/databases/main/events_forward_extremities.py |synapse/storage/databases/main/events_worker.py |synapse/storage/databases/main/group_server.py |synapse/storage/databases/main/media_repository.py diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 259cae5b37..e22aa0b9bc 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -123,9 +123,9 @@ class DataStore( RelationsStore, CensorEventsStore, UIAuthStore, + EventForwardExtremitiesStore, CacheInvalidationWorkerStore, ServerMetricsStore, - EventForwardExtremitiesStore, LockStore, SessionStore, ): diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 6d2688d711..68901b4335 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -13,15 +13,20 @@ # limitations under the License. import logging -from typing import Dict, List +from typing import Any, Dict, List from synapse.api.errors import SynapseError -from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main import CacheInvalidationWorkerStore +from synapse.storage.databases.main.event_federation import EventFederationWorkerStore logger = logging.getLogger(__name__) -class EventForwardExtremitiesStore(SQLBaseStore): +class EventForwardExtremitiesStore( + EventFederationWorkerStore, + CacheInvalidationWorkerStore, +): async def delete_forward_extremities_for_room(self, room_id: str) -> int: """Delete any extra forward extremities for a room. @@ -31,7 +36,7 @@ class EventForwardExtremitiesStore(SQLBaseStore): Returns count deleted. """ - def delete_forward_extremities_for_room_txn(txn): + def delete_forward_extremities_for_room_txn(txn: LoggingTransaction) -> int: # First we need to get the event_id to not delete sql = """ SELECT event_id FROM event_forward_extremities @@ -82,10 +87,14 @@ class EventForwardExtremitiesStore(SQLBaseStore): delete_forward_extremities_for_room_txn, ) - async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + async def get_forward_extremities_for_room( + self, room_id: str + ) -> List[Dict[str, Any]]: """Get list of forward extremities for a room.""" - def get_forward_extremities_for_room_txn(txn): + def get_forward_extremities_for_room_txn( + txn: LoggingTransaction, + ) -> List[Dict[str, Any]]: sql = """ SELECT event_id, state_group, depth, received_ts FROM event_forward_extremities -- cgit 1.5.1 From 9b90b9454b8855e0575785560662d8e47378094d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 12 Nov 2021 11:05:26 -0500 Subject: Add type hints to media repository storage module (#11311) --- changelog.d/11311.misc | 1 + mypy.ini | 1 - synapse/rest/media/v1/preview_url_resource.py | 8 +- synapse/storage/databases/main/media_repository.py | 141 ++++++++++++--------- 4 files changed, 89 insertions(+), 62 deletions(-) create mode 100644 changelog.d/11311.misc (limited to 'synapse/storage') diff --git a/changelog.d/11311.misc b/changelog.d/11311.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11311.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index d81e964dc7..56a62bb9b7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -36,7 +36,6 @@ exclude = (?x) |synapse/storage/databases/main/events_bg_updates.py |synapse/storage/databases/main/events_worker.py |synapse/storage/databases/main/group_server.py - |synapse/storage/databases/main/media_repository.py |synapse/storage/databases/main/metrics.py |synapse/storage/databases/main/monthly_active_users.py |synapse/storage/databases/main/presence.py diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 8ca97b5b18..054f3c296d 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -45,7 +45,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.media.v1._base import get_filename_from_headers from synapse.rest.media.v1.media_storage import MediaStorage from synapse.rest.media.v1.oembed import OEmbedProvider -from synapse.types import JsonDict +from synapse.types import JsonDict, UserID from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache @@ -231,7 +231,7 @@ class PreviewUrlResource(DirectServeJsonResource): og = await make_deferred_yieldable(observable.observe()) respond_with_json_bytes(request, 200, og, send_cors=True) - async def _do_preview(self, url: str, user: str, ts: int) -> bytes: + async def _do_preview(self, url: str, user: UserID, ts: int) -> bytes: """Check the db, and download the URL and build a preview Args: @@ -360,7 +360,7 @@ class PreviewUrlResource(DirectServeJsonResource): return jsonog.encode("utf8") - async def _download_url(self, url: str, user: str) -> MediaInfo: + async def _download_url(self, url: str, user: UserID) -> MediaInfo: # TODO: we should probably honour robots.txt... except in practice # we're most likely being explicitly triggered by a human rather than a # bot, so are we really a robot? @@ -450,7 +450,7 @@ class PreviewUrlResource(DirectServeJsonResource): ) async def _precache_image_url( - self, user: str, media_info: MediaInfo, og: JsonDict + self, user: UserID, media_info: MediaInfo, og: JsonDict ) -> None: """ Pre-cache the image (if one exists) for posterity diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 717487be28..1b076683f7 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -13,10 +13,25 @@ # See the License for the specific language governing permissions and # limitations under the License. from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, +) from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -46,7 +61,12 @@ class MediaSortOrder(Enum): class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_index_update( @@ -102,13 +122,15 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): self._drop_media_index_without_method, ) - async def _drop_media_index_without_method(self, progress, batch_size): + async def _drop_media_index_without_method( + self, progress: JsonDict, batch_size: int + ) -> int: """background update handler which removes the old constraints. Note that this is only run on postgres. """ - def f(txn): + def f(txn: LoggingTransaction) -> None: txn.execute( "ALTER TABLE local_media_repository_thumbnails DROP CONSTRAINT IF EXISTS local_media_repository_thumbn_media_id_thumbnail_width_thum_key" ) @@ -126,7 +148,12 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): """Persistence for attachments and avatars""" - def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self.server_name = hs.hostname @@ -174,7 +201,9 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): plus the total count of all the user's media """ - def get_local_media_by_user_paginate_txn(txn): + def get_local_media_by_user_paginate_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Dict[str, Any]], int]: # Set ordering order_by_column = MediaSortOrder(order_by).value @@ -184,14 +213,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): else: order = "ASC" - args = [user_id] + args: List[Union[str, int]] = [user_id] sql = """ SELECT COUNT(*) as total_media FROM local_media_repository WHERE user_id = ? """ txn.execute(sql, args) - count = txn.fetchone()[0] + count = txn.fetchone()[0] # type: ignore[index] sql = """ SELECT @@ -268,7 +297,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ) sql += sql_keep - def _get_local_media_before_txn(txn): + def _get_local_media_before_txn(txn: LoggingTransaction) -> List[str]: txn.execute(sql, (before_ts, before_ts, size_gt)) return [row[0] for row in txn] @@ -278,13 +307,13 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def store_local_media( self, - media_id, - media_type, - time_now_ms, - upload_name, - media_length, - user_id, - url_cache=None, + media_id: str, + media_type: str, + time_now_ms: int, + upload_name: Optional[str], + media_length: int, + user_id: UserID, + url_cache: Optional[str] = None, ) -> None: await self.db_pool.simple_insert( "local_media_repository", @@ -315,7 +344,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): None if the URL isn't cached. """ - def get_url_cache_txn(txn): + def get_url_cache_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: # get the most recently cached result (relative to the given ts) sql = ( "SELECT response_code, etag, expires_ts, og, media_id, download_ts" @@ -359,7 +388,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def store_url_cache( self, url, response_code, etag, expires_ts, og, media_id, download_ts - ): + ) -> None: await self.db_pool.simple_insert( "local_media_repository_url_cache", { @@ -390,13 +419,13 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def store_local_thumbnail( self, - media_id, - thumbnail_width, - thumbnail_height, - thumbnail_type, - thumbnail_method, - thumbnail_length, - ): + media_id: str, + thumbnail_width: int, + thumbnail_height: int, + thumbnail_type: str, + thumbnail_method: str, + thumbnail_length: int, + ) -> None: await self.db_pool.simple_upsert( table="local_media_repository_thumbnails", keyvalues={ @@ -430,14 +459,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def store_cached_remote_media( self, - origin, - media_id, - media_type, - media_length, - time_now_ms, - upload_name, - filesystem_id, - ): + origin: str, + media_id: str, + media_type: str, + media_length: int, + time_now_ms: int, + upload_name: Optional[str], + filesystem_id: str, + ) -> None: await self.db_pool.simple_insert( "remote_media_cache", { @@ -458,7 +487,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): local_media: Iterable[str], remote_media: Iterable[Tuple[str, str]], time_ms: int, - ): + ) -> None: """Updates the last access time of the given media Args: @@ -467,7 +496,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): time_ms: Current time in milliseconds """ - def update_cache_txn(txn): + def update_cache_txn(txn: LoggingTransaction) -> None: sql = ( "UPDATE remote_media_cache SET last_access_ts = ?" " WHERE media_origin = ? AND media_id = ?" @@ -488,7 +517,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media)) - return await self.db_pool.runInteraction( + await self.db_pool.runInteraction( "update_cached_last_access_time", update_cache_txn ) @@ -542,15 +571,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def store_remote_media_thumbnail( self, - origin, - media_id, - filesystem_id, - thumbnail_width, - thumbnail_height, - thumbnail_type, - thumbnail_method, - thumbnail_length, - ): + origin: str, + media_id: str, + filesystem_id: str, + thumbnail_width: int, + thumbnail_height: int, + thumbnail_type: str, + thumbnail_method: str, + thumbnail_length: int, + ) -> None: await self.db_pool.simple_upsert( table="remote_media_cache_thumbnails", keyvalues={ @@ -566,7 +595,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_remote_media_thumbnail", ) - async def get_remote_media_before(self, before_ts): + async def get_remote_media_before(self, before_ts: int) -> List[Dict[str, str]]: sql = ( "SELECT media_origin, media_id, filesystem_id" " FROM remote_media_cache" @@ -602,7 +631,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " LIMIT 500" ) - def _get_expired_url_cache_txn(txn): + def _get_expired_url_cache_txn(txn: LoggingTransaction) -> List[str]: txn.execute(sql, (now_ts,)) return [row[0] for row in txn] @@ -610,18 +639,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "get_expired_url_cache", _get_expired_url_cache_txn ) - async def delete_url_cache(self, media_ids): + async def delete_url_cache(self, media_ids: Collection[str]) -> None: if len(media_ids) == 0: return sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" - def _delete_url_cache_txn(txn): + def _delete_url_cache_txn(txn: LoggingTransaction) -> None: txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) - return await self.db_pool.runInteraction( - "delete_url_cache", _delete_url_cache_txn - ) + await self.db_pool.runInteraction("delete_url_cache", _delete_url_cache_txn) async def get_url_cache_media_before(self, before_ts: int) -> List[str]: sql = ( @@ -631,7 +658,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " LIMIT 500" ) - def _get_url_cache_media_before_txn(txn): + def _get_url_cache_media_before_txn(txn: LoggingTransaction) -> List[str]: txn.execute(sql, (before_ts,)) return [row[0] for row in txn] @@ -639,11 +666,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "get_url_cache_media_before", _get_url_cache_media_before_txn ) - async def delete_url_cache_media(self, media_ids): + async def delete_url_cache_media(self, media_ids: Collection[str]) -> None: if len(media_ids) == 0: return - def _delete_url_cache_media_txn(txn): + def _delete_url_cache_media_txn(txn: LoggingTransaction) -> None: sql = "DELETE FROM local_media_repository WHERE media_id = ?" txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) @@ -652,6 +679,6 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) - return await self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn ) -- cgit 1.5.1 From 0bcae8ad56a64da72f278b4ec425d89c068b5df0 Mon Sep 17 00:00:00 2001 From: Shay Date: Fri, 12 Nov 2021 10:38:24 -0800 Subject: Change display names/avatar URLs to None if they contain null bytes before storing in DB (#11230) * change display names/avatar URLS to None if they contain null bytes * add changelog * add POC test, requested changes * add a saner test and remove old one * update test to verify that display name has been changed to None * make test less fragile --- changelog.d/11230.bugfix | 2 ++ synapse/storage/databases/main/events.py | 10 ++++--- tests/storage/test_roommember.py | 48 ++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 changelog.d/11230.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/11230.bugfix b/changelog.d/11230.bugfix new file mode 100644 index 0000000000..b2d6d4d024 --- /dev/null +++ b/changelog.d/11230.bugfix @@ -0,0 +1,2 @@ +Fix a long-standing bug wherein display names or avatar URLs containing null bytes cause an internal server error +when stored in the DB. \ No newline at end of file diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 596275c23c..120e4807d1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1641,8 +1641,8 @@ class PersistEventsStore: def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database.""" - def str_or_none(val: Any) -> Optional[str]: - return val if isinstance(val, str) else None + def non_null_str_or_none(val: Any) -> Optional[str]: + return val if isinstance(val, str) and "\u0000" not in val else None self.db_pool.simple_insert_many_txn( txn, @@ -1654,8 +1654,10 @@ class PersistEventsStore: "sender": event.user_id, "room_id": event.room_id, "membership": event.membership, - "display_name": str_or_none(event.content.get("displayname")), - "avatar_url": str_or_none(event.content.get("avatar_url")), + "display_name": non_null_str_or_none( + event.content.get("displayname") + ), + "avatar_url": non_null_str_or_none(event.content.get("avatar_url")), } for event in events ], diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 2873e22ccf..fccab733c0 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -161,6 +161,54 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase): ) self.assertEqual(users.keys(), {self.u_alice, self.u_bob}) + def test__null_byte_in_display_name_properly_handled(self): + room = self.helper.create_room_as(self.u_alice, tok=self.t_alice) + + res = self.get_success( + self.store.db_pool.simple_select_list( + "room_memberships", + {"user_id": "@alice:test"}, + ["display_name", "event_id"], + ) + ) + # Check that we only got one result back + self.assertEqual(len(res), 1) + + # Check that alice's display name is "alice" + self.assertEqual(res[0]["display_name"], "alice") + + # Grab the event_id to use later + event_id = res[0]["event_id"] + + # Create a profile with the offending null byte in the display name + new_profile = {"displayname": "ali\u0000ce"} + + # Ensure that the change goes smoothly and does not fail due to the null byte + self.helper.change_membership( + room, + self.u_alice, + self.u_alice, + "join", + extra_data=new_profile, + tok=self.t_alice, + ) + + res2 = self.get_success( + self.store.db_pool.simple_select_list( + "room_memberships", + {"user_id": "@alice:test"}, + ["display_name", "event_id"], + ) + ) + # Check that we only have two results + self.assertEqual(len(res2), 2) + + # Filter out the previous event using the event_id we grabbed above + row = [row for row in res2 if row["event_id"] != event_id] + + # Check that alice's display name is now None + self.assertEqual(row[0]["display_name"], None) + class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, homeserver): -- cgit 1.5.1 From bea815cec82096efc15e75875d3b8372fcb4f28b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 12 Nov 2021 19:56:00 +0000 Subject: Test room alias deletion (#11327) * Prefer `HTTPStatus` over plain `int` This is an Opinion that no-one has seemed to object to yet. * `--disallow-untyped-defs` for `tests.rest.client.test_directory` * Improve synapse's annotations for deleting aliases * Test case for deleting a room alias * Changelog --- changelog.d/11327.misc | 1 + mypy.ini | 3 + synapse/handlers/directory.py | 6 +- synapse/storage/databases/main/directory.py | 7 +- tests/rest/client/test_directory.py | 105 ++++++++++++++++++++-------- 5 files changed, 91 insertions(+), 31 deletions(-) create mode 100644 changelog.d/11327.misc (limited to 'synapse/storage') diff --git a/changelog.d/11327.misc b/changelog.d/11327.misc new file mode 100644 index 0000000000..389e360457 --- /dev/null +++ b/changelog.d/11327.misc @@ -0,0 +1 @@ +Test that room alias deletion works as intended. \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index 56a62bb9b7..67dde5991b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -271,6 +271,9 @@ disallow_untyped_defs = True [mypy-tests] disallow_untyped_defs = True +[mypy-tests.rest.client.test_directory] +disallow_untyped_defs = True + ;; Dependencies without annotations ;; Before ignoring a module, check to see if type stubs are available. ;; The `typeshed` project maintains stubs here: diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8ca5f60b1c..7ee5c47fd9 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -204,6 +204,10 @@ class DirectoryHandler: ) room_id = await self._delete_association(room_alias) + if room_id is None: + # It's possible someone else deleted the association after the + # checks above, but before we did the deletion. + raise NotFoundError("Unknown room alias") try: await self._update_canonical_alias(requester, user_id, room_id, room_alias) @@ -225,7 +229,7 @@ class DirectoryHandler: ) await self._delete_association(room_alias) - async def _delete_association(self, room_alias: RoomAlias) -> str: + async def _delete_association(self, room_alias: RoomAlias) -> Optional[str]: if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index 6daf8b8ffb..25131b1ea9 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -17,6 +17,7 @@ from typing import Iterable, List, Optional from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction from synapse.types import RoomAlias from synapse.util.caches.descriptors import cached @@ -126,14 +127,16 @@ class DirectoryWorkerStore(SQLBaseStore): class DirectoryStore(DirectoryWorkerStore): - async def delete_room_alias(self, room_alias: RoomAlias) -> str: + async def delete_room_alias(self, room_alias: RoomAlias) -> Optional[str]: room_id = await self.db_pool.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias ) return room_id - def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str: + def _delete_room_alias_txn( + self, txn: LoggingTransaction, room_alias: RoomAlias + ) -> Optional[str]: txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),), diff --git a/tests/rest/client/test_directory.py b/tests/rest/client/test_directory.py index d2181ea907..aca03afd0e 100644 --- a/tests/rest/client/test_directory.py +++ b/tests/rest/client/test_directory.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import json +from http import HTTPStatus + +from twisted.test.proto_helpers import MemoryReactor from synapse.rest import admin from synapse.rest.client import directory, login, room +from synapse.server import HomeServer from synapse.types import RoomAlias +from synapse.util import Clock from synapse.util.stringutils import random_string from tests import unittest @@ -32,7 +36,7 @@ class DirectoryTestCase(unittest.HomeserverTestCase): room.register_servlets, ] - def make_homeserver(self, reactor, clock): + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: config = self.default_config() config["require_membership_for_aliases"] = True @@ -40,7 +44,11 @@ class DirectoryTestCase(unittest.HomeserverTestCase): return self.hs - def prepare(self, reactor, clock, homeserver): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + """Create two local users and access tokens for them. + One of them creates a room.""" self.room_owner = self.register_user("room_owner", "test") self.room_owner_tok = self.login("room_owner", "test") @@ -51,39 +59,39 @@ class DirectoryTestCase(unittest.HomeserverTestCase): self.user = self.register_user("user", "test") self.user_tok = self.login("user", "test") - def test_state_event_not_in_room(self): + def test_state_event_not_in_room(self) -> None: self.ensure_user_left_room() - self.set_alias_via_state_event(403) + self.set_alias_via_state_event(HTTPStatus.FORBIDDEN) - def test_directory_endpoint_not_in_room(self): + def test_directory_endpoint_not_in_room(self) -> None: self.ensure_user_left_room() - self.set_alias_via_directory(403) + self.set_alias_via_directory(HTTPStatus.FORBIDDEN) - def test_state_event_in_room_too_long(self): + def test_state_event_in_room_too_long(self) -> None: self.ensure_user_joined_room() - self.set_alias_via_state_event(400, alias_length=256) + self.set_alias_via_state_event(HTTPStatus.BAD_REQUEST, alias_length=256) - def test_directory_in_room_too_long(self): + def test_directory_in_room_too_long(self) -> None: self.ensure_user_joined_room() - self.set_alias_via_directory(400, alias_length=256) + self.set_alias_via_directory(HTTPStatus.BAD_REQUEST, alias_length=256) @override_config({"default_room_version": 5}) - def test_state_event_user_in_v5_room(self): + def test_state_event_user_in_v5_room(self) -> None: """Test that a regular user can add alias events before room v6""" self.ensure_user_joined_room() - self.set_alias_via_state_event(200) + self.set_alias_via_state_event(HTTPStatus.OK) @override_config({"default_room_version": 6}) - def test_state_event_v6_room(self): + def test_state_event_v6_room(self) -> None: """Test that a regular user can *not* add alias events from room v6""" self.ensure_user_joined_room() - self.set_alias_via_state_event(403) + self.set_alias_via_state_event(HTTPStatus.FORBIDDEN) - def test_directory_in_room(self): + def test_directory_in_room(self) -> None: self.ensure_user_joined_room() - self.set_alias_via_directory(200) + self.set_alias_via_directory(HTTPStatus.OK) - def test_room_creation_too_long(self): + def test_room_creation_too_long(self) -> None: url = "/_matrix/client/r0/createRoom" # We use deliberately a localpart under the length threshold so @@ -93,9 +101,9 @@ class DirectoryTestCase(unittest.HomeserverTestCase): channel = self.make_request( "POST", url, request_data, access_token=self.user_tok ) - self.assertEqual(channel.code, 400, channel.result) + self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result) - def test_room_creation(self): + def test_room_creation(self) -> None: url = "/_matrix/client/r0/createRoom" # Check with an alias of allowed length. There should already be @@ -106,9 +114,46 @@ class DirectoryTestCase(unittest.HomeserverTestCase): channel = self.make_request( "POST", url, request_data, access_token=self.user_tok ) - self.assertEqual(channel.code, 200, channel.result) + self.assertEqual(channel.code, HTTPStatus.OK, channel.result) + + def test_deleting_alias_via_directory(self) -> None: + # Add an alias for the room. We must be joined to do so. + self.ensure_user_joined_room() + alias = self.set_alias_via_directory(HTTPStatus.OK) + + # Then try to remove the alias + channel = self.make_request( + "DELETE", + f"/_matrix/client/r0/directory/room/{alias}", + access_token=self.user_tok, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.result) + + def test_deleting_nonexistant_alias(self) -> None: + # Check that no alias exists + alias = "#potato:test" + channel = self.make_request( + "GET", + f"/_matrix/client/r0/directory/room/{alias}", + access_token=self.user_tok, + ) + self.assertEqual(channel.code, HTTPStatus.NOT_FOUND, channel.result) + self.assertIn("error", channel.json_body, channel.json_body) + self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND", channel.json_body) + + # Then try to remove the alias + channel = self.make_request( + "DELETE", + f"/_matrix/client/r0/directory/room/{alias}", + access_token=self.user_tok, + ) + self.assertEqual(channel.code, HTTPStatus.NOT_FOUND, channel.result) + self.assertIn("error", channel.json_body, channel.json_body) + self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND", channel.json_body) - def set_alias_via_state_event(self, expected_code, alias_length=5): + def set_alias_via_state_event( + self, expected_code: HTTPStatus, alias_length: int = 5 + ) -> None: url = "/_matrix/client/r0/rooms/%s/state/m.room.aliases/%s" % ( self.room_id, self.hs.hostname, @@ -122,8 +167,11 @@ class DirectoryTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, expected_code, channel.result) - def set_alias_via_directory(self, expected_code, alias_length=5): - url = "/_matrix/client/r0/directory/room/%s" % self.random_alias(alias_length) + def set_alias_via_directory( + self, expected_code: HTTPStatus, alias_length: int = 5 + ) -> str: + alias = self.random_alias(alias_length) + url = "/_matrix/client/r0/directory/room/%s" % alias data = {"room_id": self.room_id} request_data = json.dumps(data) @@ -131,17 +179,18 @@ class DirectoryTestCase(unittest.HomeserverTestCase): "PUT", url, request_data, access_token=self.user_tok ) self.assertEqual(channel.code, expected_code, channel.result) + return alias - def random_alias(self, length): + def random_alias(self, length: int) -> str: return RoomAlias(random_string(length), self.hs.hostname).to_string() - def ensure_user_left_room(self): + def ensure_user_left_room(self) -> None: self.ensure_membership("leave") - def ensure_user_joined_room(self): + def ensure_user_joined_room(self) -> None: self.ensure_membership("join") - def ensure_membership(self, membership): + def ensure_membership(self, membership: str) -> None: try: if membership == "leave": self.helper.leave(room=self.room_id, user=self.user, tok=self.user_tok) -- cgit 1.5.1 From fe58672546ce6e43e3cb4f9924efee51c905d801 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 12 Nov 2021 20:24:12 +0000 Subject: Annotations for state_deltas.py (#11316) I was sad that I couldn't do better for `_curr_state_delta_stream_cache`. At least it's explicitly called out in a comment with #TODO. --- changelog.d/11316.misc | 1 + mypy.ini | 4 +++- synapse/storage/databases/main/state_deltas.py | 16 +++++++++++++--- 3 files changed, 17 insertions(+), 4 deletions(-) create mode 100644 changelog.d/11316.misc (limited to 'synapse/storage') diff --git a/changelog.d/11316.misc b/changelog.d/11316.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11316.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 108e1e895c..d07273e63c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -47,7 +47,6 @@ exclude = (?x) |synapse/storage/databases/main/roommember.py |synapse/storage/databases/main/search.py |synapse/storage/databases/main/state.py - |synapse/storage/databases/main/state_deltas.py |synapse/storage/databases/main/stats.py |synapse/storage/databases/main/transactions.py |synapse/storage/databases/main/user_directory.py @@ -181,6 +180,9 @@ disallow_untyped_defs = True [mypy-synapse.storage.databases.main.room_batch] disallow_untyped_defs = True +[mypy-synapse.storage.databases.main.state_deltas] +disallow_untyped_defs = True + [mypy-synapse.storage.databases.main.user_erasure_store] disallow_untyped_defs = True diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index a89747d741..7f3624b128 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -16,11 +16,17 @@ import logging from typing import Any, Dict, List, Tuple from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction +from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) class StateDeltasStore(SQLBaseStore): + # This class must be mixed in with a child class which provides the following + # attribute. TODO: can we get static analysis to enforce this? + _curr_state_delta_stream_cache: StreamChangeCache + async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[Dict[str, Any]]]: @@ -60,7 +66,9 @@ class StateDeltasStore(SQLBaseStore): # max_stream_id. return max_stream_id, [] - def get_current_state_deltas_txn(txn): + def get_current_state_deltas_txn( + txn: LoggingTransaction, + ) -> Tuple[int, List[Dict[str, Any]]]: # First we calculate the max stream id that will give us less than # N results. # We arbitrarily limit to 100 stream_id entries to ensure we don't @@ -106,7 +114,9 @@ class StateDeltasStore(SQLBaseStore): "get_current_state_deltas", get_current_state_deltas_txn ) - def _get_max_stream_id_in_current_state_deltas_txn(self, txn): + def _get_max_stream_id_in_current_state_deltas_txn( + self, txn: LoggingTransaction + ) -> int: return self.db_pool.simple_select_one_onecol_txn( txn, table="current_state_delta_stream", @@ -114,7 +124,7 @@ class StateDeltasStore(SQLBaseStore): retcol="COALESCE(MAX(stream_id), -1)", ) - async def get_max_stream_id_in_current_state_deltas(self): + async def get_max_stream_id_in_current_state_deltas(self) -> int: return await self.db_pool.runInteraction( "get_max_stream_id_in_current_state_deltas", self._get_max_stream_id_in_current_state_deltas_txn, -- cgit 1.5.1 From 605921bc6b3d3f48142e3c8b5ddcc6c2a0016062 Mon Sep 17 00:00:00 2001 From: Shay Date: Fri, 12 Nov 2021 16:47:56 -0800 Subject: Remove unused tables `room_stats_historical` and `user_stats_historical` (#11280) * remove unused tables room_stats_historical and user_stats_historical * update changelog number * Bump schema compat version comment * make linter happy * Update comment to give more info Co-authored-by: reivilibre Co-authored-by: reivilibre --- changelog.d/11280.misc | 1 + synapse/storage/schema/__init__.py | 5 ++++- ...oom_stats_historical_and_user_stats_historical.sql | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11280.misc create mode 100644 synapse/storage/schema/main/delta/65/05_remove_room_stats_historical_and_user_stats_historical.sql (limited to 'synapse/storage') diff --git a/changelog.d/11280.misc b/changelog.d/11280.misc new file mode 100644 index 0000000000..8417c8ffef --- /dev/null +++ b/changelog.d/11280.misc @@ -0,0 +1 @@ +Drop unused db tables `room_stats_historical` and `user_stats_historical`. \ No newline at end of file diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index a1d2332326..3a00ed6835 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -45,10 +45,13 @@ Changes in SCHEMA_VERSION = 64: Changes in SCHEMA_VERSION = 65: - MSC2716: Remove unique event_id constraint from insertion_event_edges because an insertion event can have multiple edges. + - Remove unused tables `user_stats_historical` and `room_stats_historical`. """ -SCHEMA_COMPAT_VERSION = 60 # 60: "outlier" not in internal_metadata. +SCHEMA_COMPAT_VERSION = ( + 61 # 61: Remove unused tables `user_stats_historical` and `room_stats_historical` +) """Limit on how far the synapse codebase can be rolled back without breaking db compat This value is stored in the database, and checked on startup. If the value in the diff --git a/synapse/storage/schema/main/delta/65/05_remove_room_stats_historical_and_user_stats_historical.sql b/synapse/storage/schema/main/delta/65/05_remove_room_stats_historical_and_user_stats_historical.sql new file mode 100644 index 0000000000..a145180e7a --- /dev/null +++ b/synapse/storage/schema/main/delta/65/05_remove_room_stats_historical_and_user_stats_historical.sql @@ -0,0 +1,19 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + -- Remove unused tables room_stats_historical and user_stats_historical + -- which have not been read or written since schema version 61. + DROP TABLE IF EXISTS room_stats_historical; + DROP TABLE IF EXISTS user_stats_historical; \ No newline at end of file -- cgit 1.5.1 From 5562ce6a534db61777ad81338aa5dd0a9a54032f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 12:59:05 +0000 Subject: Get directory db file to pass mypy (#11339) --- changelog.d/11339.misc | 1 + mypy.ini | 4 +++- synapse/storage/databases/main/__init__.py | 1 + synapse/storage/databases/main/directory.py | 12 ++++++------ 4 files changed, 11 insertions(+), 7 deletions(-) create mode 100644 changelog.d/11339.misc (limited to 'synapse/storage') diff --git a/changelog.d/11339.misc b/changelog.d/11339.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11339.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index d07273e63c..710b1f3a4b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -28,7 +28,6 @@ exclude = (?x) |synapse/storage/databases/main/account_data.py |synapse/storage/databases/main/cache.py |synapse/storage/databases/main/devices.py - |synapse/storage/databases/main/directory.py |synapse/storage/databases/main/e2e_room_keys.py |synapse/storage/databases/main/end_to_end_keys.py |synapse/storage/databases/main/event_federation.py @@ -177,6 +176,9 @@ disallow_untyped_defs = True [mypy-synapse.storage.databases.main.client_ips] disallow_untyped_defs = True +[mypy-synapse.storage.databases.main.directory] +disallow_untyped_defs = True + [mypy-synapse.storage.databases.main.room_batch] disallow_untyped_defs = True diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index e22aa0b9bc..9ff2d8d8c3 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -154,6 +154,7 @@ class DataStore( db_conn, "local_group_updates", "stream_id" ) + self._cache_id_gen: Optional[MultiWriterIdGenerator] if isinstance(self.database_engine, PostgresEngine): # We set the `writers` to an empty list here as we don't care about # missing updates over restarts, as we'll not have anything in our diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index 25131b1ea9..a3442814d7 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -13,18 +13,18 @@ # limitations under the License. from collections import namedtuple -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple from synapse.api.errors import SynapseError -from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.types import RoomAlias from synapse.util.caches.descriptors import cached RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers")) -class DirectoryWorkerStore(SQLBaseStore): +class DirectoryWorkerStore(CacheInvalidationWorkerStore): async def get_association_from_room_alias( self, room_alias: RoomAlias ) -> Optional[RoomAliasMapping]: @@ -92,7 +92,7 @@ class DirectoryWorkerStore(SQLBaseStore): creator: Optional user_id of creator. """ - def alias_txn(txn): + def alias_txn(txn: LoggingTransaction) -> None: self.db_pool.simple_insert_txn( txn, "room_aliases", @@ -176,9 +176,9 @@ class DirectoryStore(DirectoryWorkerStore): If None, the creator will be left unchanged. """ - def _update_aliases_for_room_txn(txn): + def _update_aliases_for_room_txn(txn: LoggingTransaction) -> None: update_creator_sql = "" - sql_params = (new_room_id, old_room_id) + sql_params: Tuple[str, ...] = (new_room_id, old_room_id) if creator: update_creator_sql = ", creator = ?" sql_params = (new_room_id, creator, old_room_id) -- cgit 1.5.1 From e605e4b8f2447f0b6afa9acc104ae1882a732090 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 12:59:33 +0000 Subject: Database storage profile passes mypy (#11342) It already seems to pass mypy. I wonder what changed, given that it was on the exclusion list. So this commit consists of me ensuring `--disallow-untyped-defs` passes and a minor fixup to a function that returned either `True` or `None`. --- changelog.d/11342.misc | 1 + mypy.ini | 7 ++++++- synapse/storage/databases/main/profile.py | 12 ++++++++---- tests/storage/test_profile.py | 9 ++++++--- 4 files changed, 21 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11342.misc (limited to 'synapse/storage') diff --git a/changelog.d/11342.misc b/changelog.d/11342.misc new file mode 100644 index 0000000000..86594a332d --- /dev/null +++ b/changelog.d/11342.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index 710b1f3a4b..b2953974ea 100644 --- a/mypy.ini +++ b/mypy.ini @@ -38,7 +38,6 @@ exclude = (?x) |synapse/storage/databases/main/metrics.py |synapse/storage/databases/main/monthly_active_users.py |synapse/storage/databases/main/presence.py - |synapse/storage/databases/main/profile.py |synapse/storage/databases/main/purge_events.py |synapse/storage/databases/main/push_rule.py |synapse/storage/databases/main/receipts.py @@ -182,6 +181,9 @@ disallow_untyped_defs = True [mypy-synapse.storage.databases.main.room_batch] disallow_untyped_defs = True +[mypy-synapse.storage.databases.main.profile] +disallow_untyped_defs = True + [mypy-synapse.storage.databases.main.state_deltas] disallow_untyped_defs = True @@ -284,6 +286,9 @@ disallow_untyped_defs = True [mypy-tests.handlers.test_user_directory] disallow_untyped_defs = True +[mypy-tests.storage.test_profile] +disallow_untyped_defs = True + [mypy-tests.storage.test_user_directory] disallow_untyped_defs = True diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index dd8e27e226..e197b7203e 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo @@ -104,7 +105,7 @@ class ProfileWorkerStore(SQLBaseStore): desc="update_remote_profile_cache", ) - async def maybe_delete_remote_profile_cache(self, user_id): + async def maybe_delete_remote_profile_cache(self, user_id: str) -> None: """Check if we still care about the remote user's profile, and if we don't then remove their profile from the cache """ @@ -116,9 +117,9 @@ class ProfileWorkerStore(SQLBaseStore): desc="delete_remote_profile_cache", ) - async def is_subscribed_remote_profile_for_user(self, user_id): + async def is_subscribed_remote_profile_for_user(self, user_id: str) -> bool: """Check whether we are interested in a remote user's profile.""" - res = await self.db_pool.simple_select_one_onecol( + res: Optional[str] = await self.db_pool.simple_select_one_onecol( table="group_users", keyvalues={"user_id": user_id}, retcol="user_id", @@ -139,13 +140,16 @@ class ProfileWorkerStore(SQLBaseStore): if res: return True + return False async def get_remote_profile_cache_entries_that_expire( self, last_checked: int ) -> List[Dict[str, str]]: """Get all users who haven't been checked since `last_checked`""" - def _get_remote_profile_cache_entries_that_expire_txn(txn): + def _get_remote_profile_cache_entries_that_expire_txn( + txn: LoggingTransaction, + ) -> List[Dict[str, str]]: sql = """ SELECT user_id, displayname, avatar_url FROM remote_profile_cache diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index a1ba99ff14..d37736edf8 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -11,19 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.test.proto_helpers import MemoryReactor +from synapse.server import HomeServer from synapse.types import UserID +from synapse.util import Clock from tests import unittest class ProfileStoreTestCase(unittest.HomeserverTestCase): - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.u_frank = UserID.from_string("@frank:test") - def test_displayname(self): + def test_displayname(self) -> None: self.get_success(self.store.create_profile(self.u_frank.localpart)) self.get_success( @@ -48,7 +51,7 @@ class ProfileStoreTestCase(unittest.HomeserverTestCase): self.get_success(self.store.get_profile_displayname(self.u_frank.localpart)) ) - def test_avatar_url(self): + def test_avatar_url(self) -> None: self.get_success(self.store.create_profile(self.u_frank.localpart)) self.get_success( -- cgit 1.5.1 From 24b61f379ac1fc740e1b569b85363e2a0411883a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 16 Nov 2021 07:43:53 -0500 Subject: Add ability to un-shadow-ban via the admin API. (#11347) --- changelog.d/11347.feature | 1 + docs/admin_api/user_admin_api.md | 12 +++++++++--- synapse/rest/admin/users.py | 24 ++++++++++++++++++++++-- synapse/storage/databases/main/registration.py | 2 +- tests/rest/admin/test_user.py | 26 ++++++++++++++++++++------ 5 files changed, 53 insertions(+), 12 deletions(-) create mode 100644 changelog.d/11347.feature (limited to 'synapse/storage') diff --git a/changelog.d/11347.feature b/changelog.d/11347.feature new file mode 100644 index 0000000000..b0cb5345a0 --- /dev/null +++ b/changelog.d/11347.feature @@ -0,0 +1 @@ +Add admin API to un-shadow-ban a user. diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index 16ec33b3c1..ba574d795f 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -948,7 +948,7 @@ The following fields are returned in the JSON response body: See also the [Client-Server API Spec on pushers](https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-pushers). -## Shadow-banning users +## Controlling whether a user is shadow-banned Shadow-banning is a useful tool for moderating malicious or egregiously abusive users. A shadow-banned users receives successful responses to their client-server API requests, @@ -961,16 +961,22 @@ or broken behaviour for the client. A shadow-banned user will not receive any notification and it is generally more appropriate to ban or kick abusive users. A shadow-banned user will be unable to contact anyone on the server. -The API is: +To shadow-ban a user the API is: ``` POST /_synapse/admin/v1/users//shadow_ban ``` +To un-shadow-ban a user the API is: + +``` +DELETE /_synapse/admin/v1/users//shadow_ban +``` + To use it, you will need to authenticate by providing an `access_token` for a server admin: [Admin API](../usage/administration/admin_api) -An empty JSON dict is returned. +An empty JSON dict is returned in both cases. **Parameters** diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index d14fafbbc9..23a8bf1fdb 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -909,7 +909,7 @@ class UserTokenRestServlet(RestServlet): class ShadowBanRestServlet(RestServlet): - """An admin API for shadow-banning a user. + """An admin API for controlling whether a user is shadow-banned. A shadow-banned users receives successful responses to their client-server API requests, but the events are not propagated into rooms. @@ -917,11 +917,19 @@ class ShadowBanRestServlet(RestServlet): Shadow-banning a user should be used as a tool of last resort and may lead to confusing or broken behaviour for the client. - Example: + Example of shadow-banning a user: POST /_synapse/admin/v1/users/@test:example.com/shadow_ban {} + 200 OK + {} + + Example of removing a user from being shadow-banned: + + DELETE /_synapse/admin/v1/users/@test:example.com/shadow_ban + {} + 200 OK {} """ @@ -945,6 +953,18 @@ class ShadowBanRestServlet(RestServlet): return 200, {} + async def on_DELETE( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self.auth, request) + + if not self.hs.is_mine_id(user_id): + raise SynapseError(400, "Only local users can be shadow-banned") + + await self.store.set_shadow_banned(UserID.from_string(user_id), False) + + return 200, {} + class RateLimitRestServlet(RestServlet): """An admin API to override ratelimiting for an user. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 6c7d6ba508..5e55440570 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -476,7 +476,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): shadow_banned: true iff the user is to be shadow-banned, false otherwise. """ - def set_shadow_banned_txn(txn): + def set_shadow_banned_txn(txn: LoggingTransaction) -> None: user_id = user.to_string() self.db_pool.simple_update_one_txn( txn, diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 25e8d6cf27..c9fe0f06c2 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -3592,31 +3592,34 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): self.other_user ) - def test_no_auth(self): + @parameterized.expand(["POST", "DELETE"]) + def test_no_auth(self, method: str): """ Try to get information of an user without authentication. """ - channel = self.make_request("POST", self.url) + channel = self.make_request(method, self.url) self.assertEqual(401, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_not_admin(self): + @parameterized.expand(["POST", "DELETE"]) + def test_requester_is_not_admin(self, method: str): """ If the user is not a server admin, an error is returned. """ other_user_token = self.login("user", "pass") - channel = self.make_request("POST", self.url, access_token=other_user_token) + channel = self.make_request(method, self.url, access_token=other_user_token) self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - def test_user_is_not_local(self): + @parameterized.expand(["POST", "DELETE"]) + def test_user_is_not_local(self, method: str): """ Tests that shadow-banning for a user that is not a local returns a 400 """ url = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain" - channel = self.make_request("POST", url, access_token=self.admin_user_tok) + channel = self.make_request(method, url, access_token=self.admin_user_tok) self.assertEqual(400, channel.code, msg=channel.json_body) def test_success(self): @@ -3636,6 +3639,17 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): result = self.get_success(self.store.get_user_by_access_token(other_user_token)) self.assertTrue(result.shadow_banned) + # Un-shadow-ban the user. + channel = self.make_request( + "DELETE", self.url, access_token=self.admin_user_tok + ) + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual({}, channel.json_body) + + # Ensure the user is no longer shadow-banned (and the cache was cleared). + result = self.get_success(self.store.get_user_by_access_token(other_user_token)) + self.assertFalse(result.shadow_banned) + class RateLimitTestCase(unittest.HomeserverTestCase): -- cgit 1.5.1 From 84fac0f814f69645ff1ad564ef8294b31203dc95 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Wed, 17 Nov 2021 19:07:02 +0000 Subject: Add type annotations to `synapse.metrics` (#10847) --- changelog.d/10847.misc | 1 + mypy.ini | 3 + synapse/app/_base.py | 2 +- synapse/groups/attestations.py | 4 +- synapse/handlers/typing.py | 2 +- synapse/metrics/__init__.py | 101 ++++++++++++++++++-------- synapse/metrics/_exposition.py | 34 ++++----- synapse/metrics/background_process_metrics.py | 78 +++++++++++++++----- synapse/metrics/jemalloc.py | 10 ++- synapse/storage/database.py | 6 +- synapse/util/caches/expiringcache.py | 2 +- synapse/util/metrics.py | 15 ++-- 12 files changed, 173 insertions(+), 85 deletions(-) create mode 100644 changelog.d/10847.misc (limited to 'synapse/storage') diff --git a/changelog.d/10847.misc b/changelog.d/10847.misc new file mode 100644 index 0000000000..7933a38dca --- /dev/null +++ b/changelog.d/10847.misc @@ -0,0 +1 @@ +Add type annotations to `synapse.metrics`. diff --git a/mypy.ini b/mypy.ini index f32c6c41a3..308cfd95d8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -160,6 +160,9 @@ disallow_untyped_defs = True [mypy-synapse.handlers.*] disallow_untyped_defs = True +[mypy-synapse.metrics.*] +disallow_untyped_defs = True + [mypy-synapse.push.*] disallow_untyped_defs = True diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 573bb487b2..807ee3d46e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -402,7 +402,7 @@ async def start(hs: "HomeServer") -> None: if hasattr(signal, "SIGHUP"): @wrap_as_background_process("sighup") - def handle_sighup(*args: Any, **kwargs: Any) -> None: + async def handle_sighup(*args: Any, **kwargs: Any) -> None: # Tell systemd our state, if we're using it. This will silently fail if # we're not using systemd. sdnotify(b"RELOADING=1") diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 53f99031b1..a87896e538 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -40,6 +40,8 @@ from typing import TYPE_CHECKING, Optional, Tuple from signedjson.sign import sign_json +from twisted.internet.defer import Deferred + from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonDict, get_domain_from_id @@ -166,7 +168,7 @@ class GroupAttestionRenewer: return {} - def _start_renew_attestations(self) -> None: + def _start_renew_attestations(self) -> "Deferred[None]": return run_as_background_process("renew_attestations", self._renew_attestations) async def _renew_attestations(self) -> None: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 22c6174821..1676ebd057 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -90,7 +90,7 @@ class FollowerTypingHandler: self.wheel_timer = WheelTimer(bucket_size=5000) @wrap_as_background_process("typing._handle_timeouts") - def _handle_timeouts(self) -> None: + async def _handle_timeouts(self) -> None: logger.debug("Checking for typing timeouts") now = self.clock.time_msec() diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 91ee5c8193..ceef57ad88 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -20,10 +20,25 @@ import os import platform import threading import time -from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + Mapping, + Optional, + Sequence, + Set, + Tuple, + Type, + TypeVar, + Union, + cast, +) import attr -from prometheus_client import Counter, Gauge, Histogram +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric from prometheus_client.core import ( REGISTRY, CounterMetricFamily, @@ -32,6 +47,7 @@ from prometheus_client.core import ( ) from twisted.internet import reactor +from twisted.internet.base import ReactorBase from twisted.python.threadpool import ThreadPool import synapse @@ -54,7 +70,7 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") class RegistryProxy: @staticmethod - def collect(): + def collect() -> Iterable[Metric]: for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric @@ -74,7 +90,7 @@ class LaterGauge: ] ) - def collect(self): + def collect(self) -> Iterable[Metric]: g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) @@ -93,10 +109,10 @@ class LaterGauge: yield g - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self._register() - def _register(self): + def _register(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -105,7 +121,12 @@ class LaterGauge: all_gauges[self.name] = self -class InFlightGauge: +# `MetricsEntry` only makes sense when it is a `Protocol`, +# but `Protocol` can't be used as a `TypeVar` bound. +MetricsEntry = TypeVar("MetricsEntry") + + +class InFlightGauge(Generic[MetricsEntry]): """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -115,14 +136,19 @@ class InFlightGauge: callbacks. Args: - name (str) - desc (str) - labels (list[str]) - sub_metrics (list[str]): A list of sub metrics that the callbacks - will update. + name + desc + labels + sub_metrics: A list of sub metrics that the callbacks will update. """ - def __init__(self, name, desc, labels, sub_metrics): + def __init__( + self, + name: str, + desc: str, + labels: Sequence[str], + sub_metrics: Sequence[str], + ): self.name = name self.desc = desc self.labels = labels @@ -130,19 +156,25 @@ class InFlightGauge: # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. - self._metrics_class = attr.make_class( + self._metrics_class: Type[MetricsEntry] = attr.make_class( "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True ) # Counts number of in flight blocks for a given set of label values - self._registrations: Dict = {} + self._registrations: Dict[ + Tuple[str, ...], Set[Callable[[MetricsEntry], None]] + ] = {} # Protects access to _registrations self._lock = threading.Lock() self._register_with_collector() - def register(self, key, callback): + def register( + self, + key: Tuple[str, ...], + callback: Callable[[MetricsEntry], None], + ) -> None: """Registers that we've entered a new block with labels `key`. `callback` gets called each time the metrics are collected. The same @@ -158,13 +190,17 @@ class InFlightGauge: with self._lock: self._registrations.setdefault(key, set()).add(callback) - def unregister(self, key, callback): + def unregister( + self, + key: Tuple[str, ...], + callback: Callable[[MetricsEntry], None], + ) -> None: """Registers that we've exited a block with labels `key`.""" with self._lock: self._registrations.setdefault(key, set()).discard(callback) - def collect(self): + def collect(self) -> Iterable[Metric]: """Called by prometheus client when it reads metrics. Note: may be called by a separate thread. @@ -200,7 +236,7 @@ class InFlightGauge: gauge.add_metric(key, getattr(metrics, name)) yield gauge - def _register_with_collector(self): + def _register_with_collector(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -230,7 +266,7 @@ class GaugeBucketCollector: name: str, documentation: str, buckets: Iterable[float], - registry=REGISTRY, + registry: CollectorRegistry = REGISTRY, ): """ Args: @@ -257,12 +293,12 @@ class GaugeBucketCollector: registry.register(self) - def collect(self): + def collect(self) -> Iterable[Metric]: # Don't report metrics unless we've already collected some data if self._metric is not None: yield self._metric - def update_data(self, values: Iterable[float]): + def update_data(self, values: Iterable[float]) -> None: """Update the data to be reported by the metric The existing data is cleared, and each measurement in the input is assigned @@ -304,7 +340,7 @@ class GaugeBucketCollector: class CPUMetrics: - def __init__(self): + def __init__(self) -> None: ticks_per_sec = 100 try: # Try and get the system config @@ -314,7 +350,7 @@ class CPUMetrics: self.ticks_per_sec = ticks_per_sec - def collect(self): + def collect(self) -> Iterable[Metric]: if not HAVE_PROC_SELF_STAT: return @@ -364,7 +400,7 @@ gc_time = Histogram( class GCCounts: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) @@ -382,7 +418,7 @@ if not running_on_pypy: class PyPyGCStats: - def collect(self): + def collect(self) -> Iterable[Metric]: # @stats is a pretty-printer object with __str__() returning a nice table, # plus some fields that contain data from that table. @@ -565,7 +601,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: class ReactorLastSeenMetric: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", "Seconds since the Twisted reactor was last seen", @@ -584,9 +620,12 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) _last_gc = [0.0, 0.0, 0.0] -def runUntilCurrentTimer(reactor, func): +F = TypeVar("F", bound=Callable[..., Any]) + + +def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: @functools.wraps(func) - def f(*args, **kwargs): + def f(*args: Any, **kwargs: Any) -> Any: now = reactor.seconds() num_pending = 0 @@ -649,7 +688,7 @@ def runUntilCurrentTimer(reactor, func): return ret - return f + return cast(F, f) try: @@ -677,5 +716,5 @@ __all__ = [ "start_http_server", "LaterGauge", "InFlightGauge", - "BucketCollector", + "GaugeBucketCollector", ] diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index bb9bcb5592..353d0a63b6 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -25,27 +25,25 @@ import math import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Dict, List +from typing import Any, Dict, List, Type, Union from urllib.parse import parse_qs, urlparse -from prometheus_client import REGISTRY +from prometheus_client import REGISTRY, CollectorRegistry +from prometheus_client.core import Sample from twisted.web.resource import Resource +from twisted.web.server import Request from synapse.util import caches CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" -INF = float("inf") -MINUS_INF = float("-inf") - - -def floatToGoString(d): +def floatToGoString(d: Union[int, float]) -> str: d = float(d) - if d == INF: + if d == math.inf: return "+Inf" - elif d == MINUS_INF: + elif d == -math.inf: return "-Inf" elif math.isnan(d): return "NaN" @@ -60,7 +58,7 @@ def floatToGoString(d): return s -def sample_line(line, name): +def sample_line(line: Sample, name: str) -> str: if line.labels: labelstr = "{{{0}}}".format( ",".join( @@ -82,7 +80,7 @@ def sample_line(line, name): return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp) -def generate_latest(registry, emit_help=False): +def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes: # Trigger the cache metrics to be rescraped, which updates the common # metrics but do not produce metrics themselves @@ -187,7 +185,7 @@ class MetricsHandler(BaseHTTPRequestHandler): registry = REGISTRY - def do_GET(self): + def do_GET(self) -> None: registry = self.registry params = parse_qs(urlparse(self.path).query) @@ -207,11 +205,11 @@ class MetricsHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(output) - def log_message(self, format, *args): + def log_message(self, format: str, *args: Any) -> None: """Log nothing.""" @classmethod - def factory(cls, registry): + def factory(cls, registry: CollectorRegistry) -> Type: """Returns a dynamic MetricsHandler class tied to the passed registry. """ @@ -236,7 +234,9 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): daemon_threads = True -def start_http_server(port, addr="", registry=REGISTRY): +def start_http_server( + port: int, addr: str = "", registry: CollectorRegistry = REGISTRY +) -> None: """Starts an HTTP server for prometheus metrics as a daemon thread""" CustomMetricsHandler = MetricsHandler.factory(registry) httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler) @@ -252,10 +252,10 @@ class MetricsResource(Resource): isLeaf = True - def __init__(self, registry=REGISTRY): + def __init__(self, registry: CollectorRegistry = REGISTRY): self.registry = registry - def render_GET(self, request): + def render_GET(self, request: Request) -> bytes: request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) response = generate_latest(self.registry) request.setHeader(b"Content-Length", str(len(response))) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 2ab599a334..53c508af91 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -15,19 +15,37 @@ import logging import threading from functools import wraps -from typing import TYPE_CHECKING, Dict, Optional, Set, Union +from types import TracebackType +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + Iterable, + Optional, + Set, + Type, + TypeVar, + Union, + cast, +) +from prometheus_client import Metric from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + PreserveLoggingContext, +) from synapse.logging.opentracing import ( SynapseTags, noop_context_manager, start_active_span, ) -from synapse.util.async_helpers import maybe_awaitable if TYPE_CHECKING: import resource @@ -116,7 +134,7 @@ class _Collector: before they are returned. """ - def collect(self): + def collect(self) -> Iterable[Metric]: global _background_processes_active_since_last_scrape # We swap out the _background_processes set with an empty one so that @@ -144,12 +162,12 @@ REGISTRY.register(_Collector()) class _BackgroundProcess: - def __init__(self, desc, ctx): + def __init__(self, desc: str, ctx: LoggingContext): self.desc = desc self._context = ctx - self._reported_stats = None + self._reported_stats: Optional[ContextResourceUsage] = None - def update_metrics(self): + def update_metrics(self) -> None: """Updates the metrics with values from this process.""" new_stats = self._context.get_resource_usage() if self._reported_stats is None: @@ -169,7 +187,16 @@ class _BackgroundProcess: ) -def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): +R = TypeVar("R") + + +def run_as_background_process( + desc: str, + func: Callable[..., Awaitable[Optional[R]]], + *args: Any, + bg_start_span: bool = True, + **kwargs: Any, +) -> "defer.Deferred[Optional[R]]": """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -189,11 +216,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar args: positional args for func kwargs: keyword args for func - Returns: Deferred which returns the result of func, but note that it does not - follow the synapse logcontext rules. + Returns: + Deferred which returns the result of func, or `None` if func raises. + Note that the returned Deferred does not follow the synapse logcontext + rules. """ - async def run(): + async def run() -> Optional[R]: with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -210,12 +239,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar else: ctx = noop_context_manager() with ctx: - return await maybe_awaitable(func(*args, **kwargs)) + return await func(*args, **kwargs) except Exception: logger.exception( "Background process '%s' threw an exception", desc, ) + return None finally: _background_process_in_flight_count.labels(desc).dec() @@ -225,19 +255,24 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar return defer.ensureDeferred(run()) -def wrap_as_background_process(desc): +F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]]) + + +def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. - Equivalent of calling the function with `run_as_background_process` + Equivalent to calling the function with `run_as_background_process` """ - def wrap_as_background_process_inner(func): + def wrap_as_background_process_inner(func: F) -> F: @wraps(func) - def wrap_as_background_process_inner_2(*args, **kwargs): + def wrap_as_background_process_inner_2( + *args: Any, **kwargs: Any + ) -> "defer.Deferred[Optional[R]]": return run_as_background_process(desc, func, *args, **kwargs) - return wrap_as_background_process_inner_2 + return cast(F, wrap_as_background_process_inner_2) return wrap_as_background_process_inner @@ -265,7 +300,7 @@ class BackgroundProcessLoggingContext(LoggingContext): super().__init__("%s-%s" % (name, instance_id)) self._proc = _BackgroundProcess(name, self) - def start(self, rusage: "Optional[resource.struct_rusage]"): + def start(self, rusage: "Optional[resource.struct_rusage]") -> None: """Log context has started running (again).""" super().start(rusage) @@ -276,7 +311,12 @@ class BackgroundProcessLoggingContext(LoggingContext): with _bg_metrics_lock: _background_processes_active_since_last_scrape.add(self._proc) - def __exit__(self, type, value, traceback) -> None: + def __exit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: """Log context has finished.""" super().__exit__(type, value, traceback) diff --git a/synapse/metrics/jemalloc.py b/synapse/metrics/jemalloc.py index 29ab6c0229..98ed9c0829 100644 --- a/synapse/metrics/jemalloc.py +++ b/synapse/metrics/jemalloc.py @@ -16,14 +16,16 @@ import ctypes import logging import os import re -from typing import Optional +from typing import Iterable, Optional + +from prometheus_client import Metric from synapse.metrics import REGISTRY, GaugeMetricFamily logger = logging.getLogger(__name__) -def _setup_jemalloc_stats(): +def _setup_jemalloc_stats() -> None: """Checks to see if jemalloc is loaded, and hooks up a collector to record statistics exposed by jemalloc. """ @@ -135,7 +137,7 @@ def _setup_jemalloc_stats(): class JemallocCollector: """Metrics for internal jemalloc stats.""" - def collect(self): + def collect(self) -> Iterable[Metric]: _jemalloc_refresh_stats() g = GaugeMetricFamily( @@ -185,7 +187,7 @@ def _setup_jemalloc_stats(): logger.debug("Added jemalloc stats") -def setup_jemalloc_stats(): +def setup_jemalloc_stats() -> None: """Try to setup jemalloc stats, if jemalloc is loaded.""" try: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index d4cab69ebf..0693d39006 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -188,7 +188,7 @@ class LoggingDatabaseConnection: # The type of entry which goes on our after_callbacks and exception_callbacks lists. -_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]] +_CallbackListEntry = Tuple[Callable[..., object], Iterable[Any], Dict[str, Any]] R = TypeVar("R") @@ -235,7 +235,7 @@ class LoggingTransaction: self.after_callbacks = after_callbacks self.exception_callbacks = exception_callbacks - def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): + def call_after(self, callback: Callable[..., object], *args: Any, **kwargs: Any): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. @@ -247,7 +247,7 @@ class LoggingTransaction: self.after_callbacks.append((callback, args, kwargs)) def call_on_exception( - self, callback: Callable[..., None], *args: Any, **kwargs: Any + self, callback: Callable[..., object], *args: Any, **kwargs: Any ): # if self.exception_callbacks is None, that means that whatever constructed the # LoggingTransaction isn't expecting there to be any callbacks; assert that diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 6a7e534576..67ee4c693b 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -159,7 +159,7 @@ class ExpiringCache(Generic[KT, VT]): self[key] = value return value - def _prune_cache(self) -> None: + async def _prune_cache(self) -> None: if not self._expiry_ms: # zero expiry time means don't expire. This should never get called # since we have this check in start too. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index ad775dfc7d..98ee49af6e 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -56,8 +56,15 @@ block_db_sched_duration = Counter( "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"] ) + +# This is dynamically created in InFlightGauge.__init__. +class _InFlightMetric(Protocol): + real_time_max: float + real_time_sum: float + + # Tracks the number of blocks currently active -in_flight = InFlightGauge( +in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge( "synapse_util_metrics_block_in_flight", "", labels=["block_name"], @@ -65,12 +72,6 @@ in_flight = InFlightGauge( ) -# This is dynamically created in InFlightGauge.__init__. -class _InFlightMetric(Protocol): - real_time_max: float - real_time_sum: float - - T = TypeVar("T", bound=Callable[..., Any]) -- cgit 1.5.1 From 4bd54b263ef7e2ac29acdc85e0c6392684c44281 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 18 Nov 2021 08:43:09 -0500 Subject: Do not allow MSC3440 threads to fork threads (#11161) Adds validation to the Client-Server API to ensure that the potential thread head does not relate to another event already. This results in not allowing a thread to "fork" into other threads. If the target event is unknown for some reason (maybe it isn't visible to your homeserver), but is the target of other events it is assumed that the thread can be created from it. Otherwise, it is rejected as an unknown event. --- changelog.d/11161.feature | 1 + synapse/handlers/message.py | 54 ++++++++++++++++++++--- synapse/storage/databases/main/relations.py | 67 ++++++++++++++++++++++++++++- tests/rest/client/test_relations.py | 62 ++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11161.feature (limited to 'synapse/storage') diff --git a/changelog.d/11161.feature b/changelog.d/11161.feature new file mode 100644 index 0000000000..76b0d28084 --- /dev/null +++ b/changelog.d/11161.feature @@ -0,0 +1 @@ +Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d4c2a6ab7a..22dd4cf5fd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1001,13 +1001,52 @@ class EventCreationHandler: ) self.validator.validate_new(event, self.config) + await self._validate_event_relation(event) + logger.debug("Created event %s", event.event_id) + + return event, context + + async def _validate_event_relation(self, event: EventBase) -> None: + """ + Ensure the relation data on a new event is not bogus. + + Args: + event: The event being created. + + Raises: + SynapseError if the event is invalid. + """ + + relation = event.content.get("m.relates_to") + if not relation: + return + + relation_type = relation.get("rel_type") + if not relation_type: + return + + # Ensure the parent is real. + relates_to = relation.get("event_id") + if not relates_to: + return + + parent_event = await self.store.get_event(relates_to, allow_none=True) + if parent_event: + # And in the same room. + if parent_event.room_id != event.room_id: + raise SynapseError(400, "Relations must be in the same room") + + else: + # There must be some reason that the client knows the event exists, + # see if there are existing relations. If so, assume everything is fine. + if not await self.store.event_is_target_of_relation(relates_to): + # Otherwise, the client can't know about the parent event! + raise SynapseError(400, "Can't send relation to unknown event") # If this event is an annotation then we check that that the sender # can't annotate the same way twice (e.g. stops users from liking an # event multiple times). - relation = event.content.get("m.relates_to", {}) - if relation.get("rel_type") == RelationTypes.ANNOTATION: - relates_to = relation["event_id"] + if relation_type == RelationTypes.ANNOTATION: aggregation_key = relation["key"] already_exists = await self.store.has_user_annotated_event( @@ -1016,9 +1055,12 @@ class EventCreationHandler: if already_exists: raise SynapseError(400, "Can't send same reaction twice") - logger.debug("Created event %s", event.event_id) - - return event, context + # Don't attempt to start a thread if the parent event is a relation. + elif relation_type == RelationTypes.THREAD: + if await self.store.event_includes_relation(relates_to): + raise SynapseError( + 400, "Cannot start threads from an event with a relation" + ) @measure_func("handle_new_client_event") async def handle_new_client_event( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 907af10995..0a43acda07 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -132,6 +132,69 @@ class RelationsWorkerStore(SQLBaseStore): "get_recent_references_for_event", _get_recent_references_for_event_txn ) + async def event_includes_relation(self, event_id: str) -> bool: + """Check if the given event relates to another event. + + An event has a relation if it has a valid m.relates_to with a rel_type + and event_id in the content: + + { + "content": { + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$other_event_id" + } + } + } + + Args: + event_id: The event to check. + + Returns: + True if the event includes a valid relation. + """ + + result = await self.db_pool.simple_select_one_onecol( + table="event_relations", + keyvalues={"event_id": event_id}, + retcol="event_id", + allow_none=True, + desc="event_includes_relation", + ) + return result is not None + + async def event_is_target_of_relation(self, parent_id: str) -> bool: + """Check if the given event is the target of another event's relation. + + An event is the target of an event relation if it has a valid + m.relates_to with a rel_type and event_id pointing to parent_id in the + content: + + { + "content": { + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$parent_id" + } + } + } + + Args: + parent_id: The event to check. + + Returns: + True if the event is the target of another event's relation. + """ + + result = await self.db_pool.simple_select_one_onecol( + table="event_relations", + keyvalues={"relates_to_id": parent_id}, + retcol="event_id", + allow_none=True, + desc="event_is_target_of_relation", + ) + return result is not None + @cached(tree=True) async def get_aggregation_groups_for_event( self, @@ -362,7 +425,7 @@ class RelationsWorkerStore(SQLBaseStore): %s; """ - def _get_if_event_has_relations(txn) -> List[str]: + def _get_if_events_have_relations(txn) -> List[str]: clauses: List[str] = [] clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", parent_ids @@ -387,7 +450,7 @@ class RelationsWorkerStore(SQLBaseStore): return [row[0] for row in txn] return await self.db_pool.runInteraction( - "get_if_event_has_relations", _get_if_event_has_relations + "get_if_events_have_relations", _get_if_events_have_relations ) async def has_user_annotated_event( diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 78c2fb86b9..b8a1b92a89 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -91,6 +91,49 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel = self._send_relation(RelationTypes.ANNOTATION, EventTypes.Member) self.assertEquals(400, channel.code, channel.json_body) + def test_deny_invalid_event(self): + """Test that we deny relations on non-existant events""" + channel = self._send_relation( + RelationTypes.ANNOTATION, + EventTypes.Message, + parent_id="foo", + content={"body": "foo", "msgtype": "m.text"}, + ) + self.assertEquals(400, channel.code, channel.json_body) + + # Unless that event is referenced from another event! + self.get_success( + self.hs.get_datastore().db_pool.simple_insert( + table="event_relations", + values={ + "event_id": "bar", + "relates_to_id": "foo", + "relation_type": RelationTypes.THREAD, + }, + desc="test_deny_invalid_event", + ) + ) + channel = self._send_relation( + RelationTypes.THREAD, + EventTypes.Message, + parent_id="foo", + content={"body": "foo", "msgtype": "m.text"}, + ) + self.assertEquals(200, channel.code, channel.json_body) + + def test_deny_invalid_room(self): + """Test that we deny relations on non-existant events""" + # Create another room and send a message in it. + room2 = self.helper.create_room_as(self.user_id, tok=self.user_token) + res = self.helper.send(room2, body="Hi!", tok=self.user_token) + parent_id = res["event_id"] + + # Attempt to send an annotation to that event. + channel = self._send_relation( + RelationTypes.ANNOTATION, "m.reaction", parent_id=parent_id, key="A" + ) + self.assertEquals(400, channel.code, channel.json_body) + def test_deny_double_react(self): """Test that we deny relations on membership events""" channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="a") @@ -99,6 +142,25 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") self.assertEquals(400, channel.code, channel.json_body) + def test_deny_forked_thread(self): + """It is invalid to start a thread off a thread.""" + channel = self._send_relation( + RelationTypes.THREAD, + "m.room.message", + content={"msgtype": "m.text", "body": "foo"}, + parent_id=self.parent_id, + ) + self.assertEquals(200, channel.code, channel.json_body) + parent_id = channel.json_body["event_id"] + + channel = self._send_relation( + RelationTypes.THREAD, + "m.room.message", + content={"msgtype": "m.text", "body": "foo"}, + parent_id=parent_id, + ) + self.assertEquals(400, channel.code, channel.json_body) + def test_basic_paginate_relations(self): """Tests that calling pagination API correctly the latest relations.""" channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") -- cgit 1.5.1 From 539e44139911dc95c34784f3df2b3706c00b7db9 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 18 Nov 2021 14:40:26 +0000 Subject: Use auto_attribs for RefreshTokenLookupResult (#11386) --- changelog.d/11386.misc | 1 + synapse/storage/databases/main/registration.py | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 changelog.d/11386.misc (limited to 'synapse/storage') diff --git a/changelog.d/11386.misc b/changelog.d/11386.misc new file mode 100644 index 0000000000..3178d53f05 --- /dev/null +++ b/changelog.d/11386.misc @@ -0,0 +1 @@ +Use `auto_attribs` on the `attrs` class `RefreshTokenLookupResult`. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 5e55440570..7ee699f981 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -84,26 +84,26 @@ class TokenLookupResult: return self.user_id -@attr.s(frozen=True, slots=True) +@attr.s(auto_attribs=True, frozen=True, slots=True) class RefreshTokenLookupResult: """Result of looking up a refresh token.""" - user_id = attr.ib(type=str) + user_id: str """The user this token belongs to.""" - device_id = attr.ib(type=str) + device_id: str """The device associated with this refresh token.""" - token_id = attr.ib(type=int) + token_id: int """The ID of this refresh token.""" - next_token_id = attr.ib(type=Optional[int]) + next_token_id: Optional[int] """The ID of the refresh token which replaced this one.""" - has_next_refresh_token_been_refreshed = attr.ib(type=bool) + has_next_refresh_token_been_refreshed: bool """True if the next refresh token was used for another refresh.""" - has_next_access_token_been_used = attr.ib(type=bool) + has_next_access_token_been_used: bool """True if the next access token was already used at least once.""" -- cgit 1.5.1 From 81b18fe5c060a0532ab64b9575d54b84ddbad278 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Thu, 18 Nov 2021 18:43:49 +0100 Subject: Add dedicated admin API for blocking a room (#11324) --- changelog.d/11324.feature | 1 + docs/admin_api/rooms.md | 78 +++++++++++ synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/rooms.py | 63 +++++++++ synapse/storage/databases/main/room.py | 32 +++++ tests/rest/admin/test_room.py | 228 +++++++++++++++++++++++++++++++++ 6 files changed, 404 insertions(+) create mode 100644 changelog.d/11324.feature (limited to 'synapse/storage') diff --git a/changelog.d/11324.feature b/changelog.d/11324.feature new file mode 100644 index 0000000000..55494358bb --- /dev/null +++ b/changelog.d/11324.feature @@ -0,0 +1 @@ +Add dedicated admin API for blocking a room. \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 6a6ae92d66..0f1a74134f 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -3,6 +3,7 @@ - [Room Details API](#room-details-api) - [Room Members API](#room-members-api) - [Room State API](#room-state-api) +- [Block Room API](#block-room-api) - [Delete Room API](#delete-room-api) * [Version 1 (old version)](#version-1-old-version) * [Version 2 (new version)](#version-2-new-version) @@ -386,6 +387,83 @@ A response body like the following is returned: } ``` +# Block Room API +The Block Room admin API allows server admins to block and unblock rooms, +and query to see if a given room is blocked. +This API can be used to pre-emptively block a room, even if it's unknown to this +homeserver. Users will be prevented from joining a blocked room. + +## Block or unblock a room + +The API is: + +``` +PUT /_synapse/admin/v1/rooms//block +``` + +with a body of: + +```json +{ + "block": true +} +``` + +A response body like the following is returned: + +```json +{ + "block": true +} +``` + +**Parameters** + +The following parameters should be set in the URL: + +- `room_id` - The ID of the room. + +The following JSON body parameters are available: + +- `block` - If `true` the room will be blocked and if `false` the room will be unblocked. + +**Response** + +The following fields are possible in the JSON response body: + +- `block` - A boolean. `true` if the room is blocked, otherwise `false` + +## Get block status + +The API is: + +``` +GET /_synapse/admin/v1/rooms//block +``` + +A response body like the following is returned: + +```json +{ + "block": true, + "user_id": "" +} +``` + +**Parameters** + +The following parameters should be set in the URL: + +- `room_id` - The ID of the room. + +**Response** + +The following fields are possible in the JSON response body: + +- `block` - A boolean. `true` if the room is blocked, otherwise `false` +- `user_id` - An optional string. If the room is blocked (`block` is `true`) shows + the user who has add the room to blocking list. Otherwise it is not displayed. + # Delete Room API The Delete Room admin API allows server admins to remove rooms from the server diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index d78fe406c4..65b76fa10c 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -46,6 +46,7 @@ from synapse.rest.admin.registration_tokens import ( RegistrationTokenRestServlet, ) from synapse.rest.admin.rooms import ( + BlockRoomRestServlet, DeleteRoomStatusByDeleteIdRestServlet, DeleteRoomStatusByRoomIdRestServlet, ForwardExtremitiesRestServlet, @@ -223,6 +224,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: Register all the admin servlets. """ register_servlets_for_client_rest_resource(hs, http_server) + BlockRoomRestServlet(hs).register(http_server) ListRoomRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server) RoomRestServlet(hs).register(http_server) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 37cb4d0796..5b8ec1e5ca 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -782,3 +782,66 @@ class RoomEventContextServlet(RestServlet): ) return 200, results + + +class BlockRoomRestServlet(RestServlet): + """ + Manage blocking of rooms. + On PUT: Add or remove a room from blocking list. + On GET: Get blocking status of room and user who has blocked this room. + """ + + PATTERNS = admin_patterns("/rooms/(?P[^/]+)/block$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + if not RoomID.is_valid(room_id): + raise SynapseError( + HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) + ) + + blocked_by = await self._store.room_is_blocked_by(room_id) + # Test `not None` if `user_id` is an empty string + # if someone add manually an entry in database + if blocked_by is not None: + response = {"block": True, "user_id": blocked_by} + else: + response = {"block": False} + + return HTTPStatus.OK, response + + async def on_PUT( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) + + content = parse_json_object_from_request(request) + + if not RoomID.is_valid(room_id): + raise SynapseError( + HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) + ) + + assert_params_in_dict(content, ["block"]) + block = content.get("block") + if not isinstance(block, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'block' must be a boolean.", + Codes.BAD_JSON, + ) + + if block: + await self._store.block_room(room_id, requester.user.to_string()) + else: + await self._store.unblock_room(room_id) + + return HTTPStatus.OK, {"block": block} diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 17b398bb69..7d694d852d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -397,6 +397,20 @@ class RoomWorkerStore(SQLBaseStore): desc="is_room_blocked", ) + async def room_is_blocked_by(self, room_id: str) -> Optional[str]: + """ + Function to retrieve user who has blocked the room. + user_id is non-nullable + It returns None if the room is not blocked. + """ + return await self.db_pool.simple_select_one_onecol( + table="blocked_rooms", + keyvalues={"room_id": room_id}, + retcol="user_id", + allow_none=True, + desc="room_is_blocked_by", + ) + async def get_rooms_paginate( self, start: int, @@ -1775,3 +1789,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): self.is_room_blocked, (room_id,), ) + + async def unblock_room(self, room_id: str) -> None: + """Remove the room from blocking list. + + Args: + room_id: Room to unblock + """ + await self.db_pool.simple_delete( + table="blocked_rooms", + keyvalues={"room_id": room_id}, + desc="unblock_room", + ) + await self.db_pool.runInteraction( + "block_room_invalidation", + self._invalidate_cache_and_stream, + self.is_room_blocked, + (room_id,), + ) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index b48fc12e5f..07077aff78 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -2226,6 +2226,234 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase): ) +class BlockRoomTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self._store = hs.get_datastore() + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.other_user = self.register_user("user", "pass") + self.other_user_tok = self.login("user", "pass") + + self.room_id = self.helper.create_room_as( + self.other_user, tok=self.other_user_tok + ) + self.url = "/_synapse/admin/v1/rooms/%s/block" + + @parameterized.expand([("PUT",), ("GET",)]) + def test_requester_is_no_admin(self, method: str): + """If the user is not a server admin, an error 403 is returned.""" + + channel = self.make_request( + method, + self.url % self.room_id, + content={}, + access_token=self.other_user_tok, + ) + + self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + @parameterized.expand([("PUT",), ("GET",)]) + def test_room_is_not_valid(self, method: str): + """Check that invalid room names, return an error 400.""" + + channel = self.make_request( + method, + self.url % "invalidroom", + content={}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual( + "invalidroom is not a legal room ID", + channel.json_body["error"], + ) + + def test_block_is_not_valid(self): + """If parameter `block` is not valid, return an error.""" + + # `block` is not valid + channel = self.make_request( + "PUT", + self.url % self.room_id, + content={"block": "NotBool"}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"]) + + # `block` is not set + channel = self.make_request( + "PUT", + self.url % self.room_id, + content={}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"]) + + # no content is send + channel = self.make_request( + "PUT", + self.url % self.room_id, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.NOT_JSON, channel.json_body["errcode"]) + + def test_block_room(self): + """Test that block a room is successful.""" + + def _request_and_test_block_room(room_id: str) -> None: + self._is_blocked(room_id, expect=False) + channel = self.make_request( + "PUT", + self.url % room_id, + content={"block": True}, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertTrue(channel.json_body["block"]) + self._is_blocked(room_id, expect=True) + + # known internal room + _request_and_test_block_room(self.room_id) + + # unknown internal room + _request_and_test_block_room("!unknown:test") + + # unknown remote room + _request_and_test_block_room("!unknown:remote") + + def test_block_room_twice(self): + """Test that block a room that is already blocked is successful.""" + + self._is_blocked(self.room_id, expect=False) + for _ in range(2): + channel = self.make_request( + "PUT", + self.url % self.room_id, + content={"block": True}, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertTrue(channel.json_body["block"]) + self._is_blocked(self.room_id, expect=True) + + def test_unblock_room(self): + """Test that unblock a room is successful.""" + + def _request_and_test_unblock_room(room_id: str) -> None: + self._block_room(room_id) + + channel = self.make_request( + "PUT", + self.url % room_id, + content={"block": False}, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertFalse(channel.json_body["block"]) + self._is_blocked(room_id, expect=False) + + # known internal room + _request_and_test_unblock_room(self.room_id) + + # unknown internal room + _request_and_test_unblock_room("!unknown:test") + + # unknown remote room + _request_and_test_unblock_room("!unknown:remote") + + def test_unblock_room_twice(self): + """Test that unblock a room that is not blocked is successful.""" + + self._block_room(self.room_id) + for _ in range(2): + channel = self.make_request( + "PUT", + self.url % self.room_id, + content={"block": False}, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertFalse(channel.json_body["block"]) + self._is_blocked(self.room_id, expect=False) + + def test_get_blocked_room(self): + """Test get status of a blocked room""" + + def _request_blocked_room(room_id: str) -> None: + self._block_room(room_id) + + channel = self.make_request( + "GET", + self.url % room_id, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertTrue(channel.json_body["block"]) + self.assertEqual(self.other_user, channel.json_body["user_id"]) + + # known internal room + _request_blocked_room(self.room_id) + + # unknown internal room + _request_blocked_room("!unknown:test") + + # unknown remote room + _request_blocked_room("!unknown:remote") + + def test_get_unblocked_room(self): + """Test get status of a unblocked room""" + + def _request_unblocked_room(room_id: str) -> None: + self._is_blocked(room_id, expect=False) + + channel = self.make_request( + "GET", + self.url % room_id, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertFalse(channel.json_body["block"]) + self.assertNotIn("user_id", channel.json_body) + + # known internal room + _request_unblocked_room(self.room_id) + + # unknown internal room + _request_unblocked_room("!unknown:test") + + # unknown remote room + _request_unblocked_room("!unknown:remote") + + def _is_blocked(self, room_id: str, expect: bool = True) -> None: + """Assert that the room is blocked or not""" + d = self._store.is_room_blocked(room_id) + if expect: + self.assertTrue(self.get_success(d)) + else: + self.assertIsNone(self.get_success(d)) + + def _block_room(self, room_id: str) -> None: + """Block a room in database""" + self.get_success(self._store.block_room(room_id, self.other_user)) + self._is_blocked(room_id, expect=True) + + PURGE_TABLES = [ "current_state_events", "event_backward_extremities", -- cgit 1.5.1 From eca7cffb73fce77d025a0d7a08badb855b6df133 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Fri, 19 Nov 2021 06:40:12 -0500 Subject: Keep fallback key marked as used if it's re-uploaded (#11382) --- changelog.d/11382.misc | 1 + synapse/storage/databases/main/end_to_end_keys.py | 51 ++++++++++++++++++----- tests/handlers/test_e2e_keys.py | 32 +++++++++++++- 3 files changed, 72 insertions(+), 12 deletions(-) create mode 100644 changelog.d/11382.misc (limited to 'synapse/storage') diff --git a/changelog.d/11382.misc b/changelog.d/11382.misc new file mode 100644 index 0000000000..d812ef309e --- /dev/null +++ b/changelog.d/11382.misc @@ -0,0 +1 @@ +Keep fallback key marked as used if it's re-uploaded. diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index a95ac34f09..b06c1dc45b 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -408,29 +408,58 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): fallback_keys: the keys to set. This is a map from key ID (which is of the form "algorithm:id") to key data. """ + await self.db_pool.runInteraction( + "set_e2e_fallback_keys_txn", + self._set_e2e_fallback_keys_txn, + user_id, + device_id, + fallback_keys, + ) + + await self.invalidate_cache_and_stream( + "get_e2e_unused_fallback_key_types", (user_id, device_id) + ) + + def _set_e2e_fallback_keys_txn( + self, txn: Connection, user_id: str, device_id: str, fallback_keys: JsonDict + ) -> None: # fallback_keys will usually only have one item in it, so using a for # loop (as opposed to calling simple_upsert_many_txn) won't be too bad # FIXME: make sure that only one key per algorithm is uploaded for key_id, fallback_key in fallback_keys.items(): algorithm, key_id = key_id.split(":", 1) - await self.db_pool.simple_upsert( - "e2e_fallback_keys_json", + old_key_json = self.db_pool.simple_select_one_onecol_txn( + txn, + table="e2e_fallback_keys_json", keyvalues={ "user_id": user_id, "device_id": device_id, "algorithm": algorithm, }, - values={ - "key_id": key_id, - "key_json": json_encoder.encode(fallback_key), - "used": False, - }, - desc="set_e2e_fallback_key", + retcol="key_json", + allow_none=True, ) - await self.invalidate_cache_and_stream( - "get_e2e_unused_fallback_key_types", (user_id, device_id) - ) + new_key_json = encode_canonical_json(fallback_key).decode("utf-8") + + # If the uploaded key is the same as the current fallback key, + # don't do anything. This prevents marking the key as unused if it + # was already used. + if old_key_json != new_key_json: + self.db_pool.simple_upsert_txn( + txn, + table="e2e_fallback_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + "algorithm": algorithm, + }, + values={ + "key_id": key_id, + "key_json": json_encoder.encode(fallback_key), + "used": False, + }, + ) @cached(max_entries=10000) async def get_e2e_unused_fallback_key_types( diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 0c3b86fda9..f0723892e4 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -162,6 +162,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): local_user = "@boris:" + self.hs.hostname device_id = "xyz" fallback_key = {"alg1:k1": "key1"} + fallback_key2 = {"alg1:k2": "key2"} otk = {"alg1:k2": "key2"} # we shouldn't have any unused fallback keys yet @@ -213,6 +214,35 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}}, ) + # re-uploading the same fallback key should still result in no unused fallback + # keys + self.get_success( + self.handler.upload_keys_for_user( + local_user, + device_id, + {"org.matrix.msc2732.fallback_keys": fallback_key}, + ) + ) + + res = self.get_success( + self.store.get_e2e_unused_fallback_key_types(local_user, device_id) + ) + self.assertEqual(res, []) + + # uploading a new fallback key should result in an unused fallback key + self.get_success( + self.handler.upload_keys_for_user( + local_user, + device_id, + {"org.matrix.msc2732.fallback_keys": fallback_key2}, + ) + ) + + res = self.get_success( + self.store.get_e2e_unused_fallback_key_types(local_user, device_id) + ) + self.assertEqual(res, ["alg1"]) + # if the user uploads a one-time key, the next claim should fetch the # one-time key, and then go back to the fallback self.get_success( @@ -238,7 +268,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): ) self.assertEqual( res, - {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}}, + {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key2}}}, ) def test_replace_master_key(self): -- cgit 1.5.1 From ea20937084903864865f76e22f67d27729f2d6dc Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 19 Nov 2021 20:39:46 +0100 Subject: Add an admin API to run background jobs. (#11352) Instead of having admins poke into the database directly. Can currently run jobs to populate stats and to populate the user directory. --- changelog.d/11352.feature | 1 + docs/sample_config.yaml | 4 +- .../administration/admin_api/background_updates.md | 27 +++- docs/user_directory.md | 6 +- synapse/config/user_directory.py | 4 +- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/background_updates.py | 123 ++++++++++++---- synapse/storage/background_updates.py | 2 + tests/rest/admin/test_background_updates.py | 154 +++++++++++++++++++-- 9 files changed, 280 insertions(+), 43 deletions(-) create mode 100644 changelog.d/11352.feature (limited to 'synapse/storage') diff --git a/changelog.d/11352.feature b/changelog.d/11352.feature new file mode 100644 index 0000000000..a4d01b3549 --- /dev/null +++ b/changelog.d/11352.feature @@ -0,0 +1 @@ +Add admin API to run background jobs. \ No newline at end of file diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 3c931468aa..aee300013f 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2360,8 +2360,8 @@ user_directory: # indexes were (re)built was before Synapse 1.44, you'll have to # rebuild the indexes in order to search through all known users. # These indexes are built the first time Synapse starts; admins can - # manually trigger a rebuild following the instructions at - # https://matrix-org.github.io/synapse/latest/user_directory.html + # manually trigger a rebuild via API following the instructions at + # https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run # # Uncomment to return search results containing all known users, even if that # user does not share a room with the requester. diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md index b36d7fe398..9f6ac7d567 100644 --- a/docs/usage/administration/admin_api/background_updates.md +++ b/docs/usage/administration/admin_api/background_updates.md @@ -42,7 +42,6 @@ For each update: `average_items_per_ms` how many items are processed per millisecond based on an exponential average. - ## Enabled This API allow pausing background updates. @@ -82,3 +81,29 @@ The API returns the `enabled` param. ``` There is also a `GET` version which returns the `enabled` state. + + +## Run + +This API schedules a specific background update to run. The job starts immediately after calling the API. + + +The API is: + +``` +POST /_synapse/admin/v1/background_updates/start_job +``` + +with the following body: + +```json +{ + "job_name": "populate_stats_process_rooms" +} +``` + +The following JSON body parameters are available: + +- `job_name` - A string which job to run. Valid values are: + - `populate_stats_process_rooms` - Recalculate the stats for all rooms. + - `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync. diff --git a/docs/user_directory.md b/docs/user_directory.md index 07fe954891..c4794b04cf 100644 --- a/docs/user_directory.md +++ b/docs/user_directory.md @@ -6,9 +6,9 @@ on this particular server - i.e. ones which your account shares a room with, or who are present in a publicly viewable room present on the server. The directory info is stored in various tables, which can (typically after -DB corruption) get stale or out of sync. If this happens, for now the -solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql) -and then restart synapse. This should then start a background task to +DB corruption) get stale or out of sync. If this happens, for now the +solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run) +and execute the job `regenerate_directory`. This should then start a background task to flush the current tables and regenerate the directory. Data model diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index 2552f688d0..6d6678c7e4 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -53,8 +53,8 @@ class UserDirectoryConfig(Config): # indexes were (re)built was before Synapse 1.44, you'll have to # rebuild the indexes in order to search through all known users. # These indexes are built the first time Synapse starts; admins can - # manually trigger a rebuild following the instructions at - # https://matrix-org.github.io/synapse/latest/user_directory.html + # manually trigger a rebuild via API following the instructions at + # https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run # # Uncomment to return search results containing all known users, even if that # user does not share a room with the requester. diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 65b76fa10c..ee4a5e481b 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -28,6 +28,7 @@ from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin from synapse.rest.admin.background_updates import ( BackgroundUpdateEnabledRestServlet, BackgroundUpdateRestServlet, + BackgroundUpdateStartJobRestServlet, ) from synapse.rest.admin.devices import ( DeleteDevicesRestServlet, @@ -261,6 +262,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SendServerNoticeServlet(hs).register(http_server) BackgroundUpdateEnabledRestServlet(hs).register(http_server) BackgroundUpdateRestServlet(hs).register(http_server) + BackgroundUpdateStartJobRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource( diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 0d0183bf20..479672d4d5 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Tuple from synapse.api.errors import SynapseError -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import ( + RestServlet, + assert_params_in_dict, + parse_json_object_from_request, +) from synapse.http.site import SynapseRequest from synapse.rest.admin._base import admin_patterns, assert_user_is_admin from synapse.types import JsonDict @@ -29,37 +34,36 @@ logger = logging.getLogger(__name__) class BackgroundUpdateEnabledRestServlet(RestServlet): """Allows temporarily disabling background updates""" - PATTERNS = admin_patterns("/background_updates/enabled") + PATTERNS = admin_patterns("/background_updates/enabled$") def __init__(self, hs: "HomeServer"): - self.group_server = hs.get_groups_server_handler() - self.is_mine_id = hs.is_mine_id - self.auth = hs.get_auth() - - self.data_stores = hs.get_datastores() + self._auth = hs.get_auth() + self._data_stores = hs.get_datastores() async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) # We need to check that all configured databases have updates enabled. # (They *should* all be in sync.) - enabled = all(db.updates.enabled for db in self.data_stores.databases) + enabled = all(db.updates.enabled for db in self._data_stores.databases) - return 200, {"enabled": enabled} + return HTTPStatus.OK, {"enabled": enabled} async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) body = parse_json_object_from_request(request) enabled = body.get("enabled", True) if not isinstance(enabled, bool): - raise SynapseError(400, "'enabled' parameter must be a boolean") + raise SynapseError( + HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean" + ) - for db in self.data_stores.databases: + for db in self._data_stores.databases: db.updates.enabled = enabled # If we're re-enabling them ensure that we start the background @@ -67,32 +71,29 @@ class BackgroundUpdateEnabledRestServlet(RestServlet): if enabled: db.updates.start_doing_background_updates() - return 200, {"enabled": enabled} + return HTTPStatus.OK, {"enabled": enabled} class BackgroundUpdateRestServlet(RestServlet): """Fetch information about background updates""" - PATTERNS = admin_patterns("/background_updates/status") + PATTERNS = admin_patterns("/background_updates/status$") def __init__(self, hs: "HomeServer"): - self.group_server = hs.get_groups_server_handler() - self.is_mine_id = hs.is_mine_id - self.auth = hs.get_auth() - - self.data_stores = hs.get_datastores() + self._auth = hs.get_auth() + self._data_stores = hs.get_datastores() async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) # We need to check that all configured databases have updates enabled. # (They *should* all be in sync.) - enabled = all(db.updates.enabled for db in self.data_stores.databases) + enabled = all(db.updates.enabled for db in self._data_stores.databases) current_updates = {} - for db in self.data_stores.databases: + for db in self._data_stores.databases: update = db.updates.get_current_update() if not update: continue @@ -104,4 +105,72 @@ class BackgroundUpdateRestServlet(RestServlet): "average_items_per_ms": update.average_items_per_ms(), } - return 200, {"enabled": enabled, "current_updates": current_updates} + return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates} + + +class BackgroundUpdateStartJobRestServlet(RestServlet): + """Allows to start specific background updates""" + + PATTERNS = admin_patterns("/background_updates/start_job") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) + + body = parse_json_object_from_request(request) + assert_params_in_dict(body, ["job_name"]) + + job_name = body["job_name"] + + if job_name == "populate_stats_process_rooms": + jobs = [ + { + "update_name": "populate_stats_process_rooms", + "progress_json": "{}", + }, + ] + elif job_name == "regenerate_directory": + jobs = [ + { + "update_name": "populate_user_directory_createtables", + "progress_json": "{}", + "depends_on": "", + }, + { + "update_name": "populate_user_directory_process_rooms", + "progress_json": "{}", + "depends_on": "populate_user_directory_createtables", + }, + { + "update_name": "populate_user_directory_process_users", + "progress_json": "{}", + "depends_on": "populate_user_directory_process_rooms", + }, + { + "update_name": "populate_user_directory_cleanup", + "progress_json": "{}", + "depends_on": "populate_user_directory_process_users", + }, + ] + else: + raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") + + try: + await self._store.db_pool.simple_insert_many( + table="background_updates", + values=jobs, + desc=f"admin_api_run_{job_name}", + ) + except self._store.db_pool.engine.module.IntegrityError: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Job %s is already in queue of background updates." % (job_name,), + ) + + self._store.db_pool.updates.start_doing_background_updates() + + return HTTPStatus.OK, {} diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b9a8ca997e..b104f9032c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -122,6 +122,8 @@ class BackgroundUpdater: def start_doing_background_updates(self) -> None: if self.enabled: + # if we start a new background update, not all updates are done. + self._all_done = False run_as_background_process("background_updates", self.run_background_updates) async def run_background_updates(self, sleep: bool = True) -> None: diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py index 78c48db552..1786316763 100644 --- a/tests/rest/admin/test_background_updates.py +++ b/tests/rest/admin/test_background_updates.py @@ -11,8 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from http import HTTPStatus +from typing import Collection + +from parameterized import parameterized import synapse.rest.admin +from synapse.api.errors import Codes from synapse.rest.client import login from synapse.server import HomeServer @@ -30,6 +35,60 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") + @parameterized.expand( + [ + ("GET", "/_synapse/admin/v1/background_updates/enabled"), + ("POST", "/_synapse/admin/v1/background_updates/enabled"), + ("GET", "/_synapse/admin/v1/background_updates/status"), + ("POST", "/_synapse/admin/v1/background_updates/start_job"), + ] + ) + def test_requester_is_no_admin(self, method: str, url: str): + """ + If the user is not a server admin, an error 403 is returned. + """ + + self.register_user("user", "pass", admin=False) + other_user_tok = self.login("user", "pass") + + channel = self.make_request( + method, + url, + content={}, + access_token=other_user_tok, + ) + + self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_invalid_parameter(self): + """ + If parameters are invalid, an error is returned. + """ + url = "/_synapse/admin/v1/background_updates/start_job" + + # empty content + channel = self.make_request( + "POST", + url, + content={}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"]) + + # job_name invalid + channel = self.make_request( + "POST", + url, + content={"job_name": "unknown"}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) + def _register_bg_update(self): "Adds a bg update but doesn't start it" @@ -60,7 +119,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) # Background updates should be enabled, but none should be running. self.assertDictEqual( @@ -82,7 +141,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) # Background updates should be enabled, and one should be running. self.assertDictEqual( @@ -114,7 +173,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/enabled", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": True}) # Disable the BG updates @@ -124,7 +183,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): content={"enabled": False}, access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": False}) # Advance a bit and get the current status, note this will finish the in @@ -137,7 +196,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertDictEqual( channel.json_body, { @@ -162,7 +221,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) # There should be no change from the previous /status response. self.assertDictEqual( @@ -188,7 +247,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): content={"enabled": True}, access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": True}) @@ -199,7 +258,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) # Background updates should be enabled and making progress. self.assertDictEqual( @@ -216,3 +275,82 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "enabled": True, }, ) + + @parameterized.expand( + [ + ("populate_stats_process_rooms", ["populate_stats_process_rooms"]), + ( + "regenerate_directory", + [ + "populate_user_directory_createtables", + "populate_user_directory_process_rooms", + "populate_user_directory_process_users", + "populate_user_directory_cleanup", + ], + ), + ] + ) + def test_start_backround_job(self, job_name: str, updates: Collection[str]): + """ + Test that background updates add to database and be processed. + + Args: + job_name: name of the job to call with API + updates: collection of background updates to be started + """ + + # no background update is waiting + self.assertTrue( + self.get_success( + self.store.db_pool.updates.has_completed_background_updates() + ) + ) + + channel = self.make_request( + "POST", + "/_synapse/admin/v1/background_updates/start_job", + content={"job_name": job_name}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + + # test that each background update is waiting now + for update in updates: + self.assertFalse( + self.get_success( + self.store.db_pool.updates.has_completed_background_update(update) + ) + ) + + self.wait_for_background_updates() + + # background updates are done + self.assertTrue( + self.get_success( + self.store.db_pool.updates.has_completed_background_updates() + ) + ) + + def test_start_backround_job_twice(self): + """Test that add a background update twice return an error.""" + + # add job to database + self.get_success( + self.store.db_pool.simple_insert( + table="background_updates", + values={ + "update_name": "populate_stats_process_rooms", + "progress_json": "{}", + }, + ) + ) + + channel = self.make_request( + "POST", + "/_synapse/admin/v1/background_updates/start_job", + content={"job_name": "populate_stats_process_rooms"}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) -- cgit 1.5.1 From 3d893b8cf2358f947678dfb995b73f426200b099 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 22 Nov 2021 12:01:47 -0500 Subject: Store arbitrary relations from events. (#11391) Instead of only known relation types. This also reworks the background update for thread relations to crawl events and search for any relation type, not just threaded relations. --- changelog.d/11391.feature | 1 + synapse/storage/databases/main/events.py | 29 +++--- .../storage/databases/main/events_bg_updates.py | 88 ++++++++++------ .../schema/main/delta/65/02_thread_relations.sql | 18 ---- .../main/delta/65/07_arbitrary_relations.sql | 18 ++++ tests/rest/client/test_relations.py | 111 +++++++++++++++++++++ tests/unittest.py | 7 +- 7 files changed, 210 insertions(+), 62 deletions(-) create mode 100644 changelog.d/11391.feature delete mode 100644 synapse/storage/schema/main/delta/65/02_thread_relations.sql create mode 100644 synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql (limited to 'synapse/storage') diff --git a/changelog.d/11391.feature b/changelog.d/11391.feature new file mode 100644 index 0000000000..4f696285a7 --- /dev/null +++ b/changelog.d/11391.feature @@ -0,0 +1 @@ +Store and allow querying of arbitrary event relations. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 120e4807d1..06832221ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1,6 +1,6 @@ # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1696,34 +1696,33 @@ class PersistEventsStore: }, ) - def _handle_event_relations(self, txn, event): - """Handles inserting relation data during peristence of events + def _handle_event_relations( + self, txn: LoggingTransaction, event: EventBase + ) -> None: + """Handles inserting relation data during persistence of events Args: - txn - event (EventBase) + txn: The current database transaction. + event: The event which might have relations. """ relation = event.content.get("m.relates_to") if not relation: # No relations return + # Relations must have a type and parent event ID. rel_type = relation.get("rel_type") - if rel_type not in ( - RelationTypes.ANNOTATION, - RelationTypes.REFERENCE, - RelationTypes.REPLACE, - RelationTypes.THREAD, - ): - # Unknown relation type + if not isinstance(rel_type, str): return parent_id = relation.get("event_id") - if not parent_id: - # Invalid relation + if not isinstance(parent_id, str): return - aggregation_key = relation.get("key") + # Annotations have a key field. + aggregation_key = None + if rel_type == RelationTypes.ANNOTATION: + aggregation_key = relation.get("key") self.db_pool.simple_insert_txn( txn, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index ae3a8a63e4..c88fd35e7f 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,4 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -171,8 +171,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + # The event_thread_relation background update was replaced with the + # event_arbitrary_relations one, which handles any relation to avoid + # needed to potentially crawl the entire events table in the future. + self.db_pool.updates.register_noop_background_update("event_thread_relation") + self.db_pool.updates.register_background_update_handler( - "event_thread_relation", self._event_thread_relation + "event_arbitrary_relations", + self._event_arbitrary_relations, ) ################################################################################ @@ -1099,23 +1105,27 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): return result - async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int: - """Background update handler which will store thread relations for existing events.""" + async def _event_arbitrary_relations( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update handler which will store previously unknown relations for existing events.""" last_event_id = progress.get("last_event_id", "") - def _event_thread_relation_txn(txn: LoggingTransaction) -> int: + def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int: + # Fetch events and then filter based on whether the event has a + # relation or not. txn.execute( """ SELECT event_id, json FROM event_json - LEFT JOIN event_relations USING (event_id) - WHERE event_id > ? AND event_relations.event_id IS NULL + WHERE event_id > ? ORDER BY event_id LIMIT ? """, (last_event_id, batch_size), ) results = list(txn) - missing_thread_relations = [] + # (event_id, parent_id, rel_type) for each relation + relations_to_insert: List[Tuple[str, str, str]] = [] for (event_id, event_json_raw) in results: try: event_json = db_to_json(event_json_raw) @@ -1127,48 +1137,70 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) continue - # If there's no relation (or it is not a thread), skip! + # If there's no relation, skip! relates_to = event_json["content"].get("m.relates_to") if not relates_to or not isinstance(relates_to, dict): continue - if relates_to.get("rel_type") != RelationTypes.THREAD: + + # If the relation type or parent event ID is not a string, skip it. + # + # Do not consider relation types that have existed for a long time, + # since they will already be listed in the `event_relations` table. + rel_type = relates_to.get("rel_type") + if not isinstance(rel_type, str) or rel_type in ( + RelationTypes.ANNOTATION, + RelationTypes.REFERENCE, + RelationTypes.REPLACE, + ): continue - # Get the parent ID. parent_id = relates_to.get("event_id") if not isinstance(parent_id, str): continue - missing_thread_relations.append((event_id, parent_id)) + relations_to_insert.append((event_id, parent_id, rel_type)) + + # Insert the missing data, note that we upsert here in case the event + # has already been processed. + if relations_to_insert: + self.db_pool.simple_upsert_many_txn( + txn=txn, + table="event_relations", + key_names=("event_id",), + key_values=[(r[0],) for r in relations_to_insert], + value_names=("relates_to_id", "relation_type"), + value_values=[r[1:] for r in relations_to_insert], + ) - # Insert the missing data. - self.db_pool.simple_insert_many_txn( - txn=txn, - table="event_relations", - values=[ - { - "event_id": event_id, - "relates_to_Id": parent_id, - "relation_type": RelationTypes.THREAD, - } - for event_id, parent_id in missing_thread_relations - ], - ) + # Iterate the parent IDs and invalidate caches. + for parent_id in {r[1] for r in relations_to_insert}: + cache_tuple = (parent_id,) + self._invalidate_cache_and_stream( + txn, self.get_relations_for_event, cache_tuple + ) + self._invalidate_cache_and_stream( + txn, self.get_aggregation_groups_for_event, cache_tuple + ) + self._invalidate_cache_and_stream( + txn, self.get_thread_summary, cache_tuple + ) if results: latest_event_id = results[-1][0] self.db_pool.updates._background_update_progress_txn( - txn, "event_thread_relation", {"last_event_id": latest_event_id} + txn, "event_arbitrary_relations", {"last_event_id": latest_event_id} ) return len(results) num_rows = await self.db_pool.runInteraction( - desc="event_thread_relation", func=_event_thread_relation_txn + desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn ) if not num_rows: - await self.db_pool.updates._end_background_update("event_thread_relation") + await self.db_pool.updates._end_background_update( + "event_arbitrary_relations" + ) return num_rows diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/02_thread_relations.sql deleted file mode 100644 index d60517f7b4..0000000000 --- a/synapse/storage/schema/main/delta/65/02_thread_relations.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Copyright 2021 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Check old events for thread relations. -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (6502, 'event_thread_relation', '{}'); diff --git a/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql new file mode 100644 index 0000000000..267b2cb539 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Check old events for thread relations. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6507, 'event_arbitrary_relations', '{}'); diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index b8a1b92a89..eb10d43217 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1,4 +1,5 @@ # Copyright 2019 New Vector Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,6 +47,8 @@ class RelationsTestCase(unittest.HomeserverTestCase): return config def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.user_id, self.user_token = self._create_user("alice") self.user2_id, self.user2_token = self._create_user("bob") @@ -765,6 +768,52 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertIn("chunk", channel.json_body) self.assertEquals(channel.json_body["chunk"], []) + def test_unknown_relations(self): + """Unknown relations should be accepted.""" + channel = self._send_relation("m.relation.test", "m.room.test") + self.assertEquals(200, channel.code, channel.json_body) + event_id = channel.json_body["event_id"] + + channel = self.make_request( + "GET", + "/_matrix/client/unstable/rooms/%s/relations/%s?limit=1" + % (self.room, self.parent_id), + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + + # We expect to get back a single pagination result, which is the full + # relation event we sent above. + self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body) + self.assert_dict( + {"event_id": event_id, "sender": self.user_id, "type": "m.room.test"}, + channel.json_body["chunk"][0], + ) + + # We also expect to get the original event (the id of which is self.parent_id) + self.assertEquals( + channel.json_body["original_event"]["event_id"], self.parent_id + ) + + # When bundling the unknown relation is not included. + channel = self.make_request( + "GET", + "/rooms/%s/event/%s" % (self.room, self.parent_id), + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + self.assertNotIn("m.relations", channel.json_body["unsigned"]) + + # But unknown relations can be directly queried. + channel = self.make_request( + "GET", + "/_matrix/client/unstable/rooms/%s/aggregations/%s?limit=1" + % (self.room, self.parent_id), + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + self.assertEquals(channel.json_body["chunk"], []) + def _send_relation( self, relation_type: str, @@ -811,3 +860,65 @@ class RelationsTestCase(unittest.HomeserverTestCase): access_token = self.login(localpart, "abc123") return user_id, access_token + + def test_background_update(self): + """Test the event_arbitrary_relations background update.""" + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍") + self.assertEquals(200, channel.code, channel.json_body) + annotation_event_id_good = channel.json_body["event_id"] + + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="A") + self.assertEquals(200, channel.code, channel.json_body) + annotation_event_id_bad = channel.json_body["event_id"] + + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + self.assertEquals(200, channel.code, channel.json_body) + thread_event_id = channel.json_body["event_id"] + + # Clean-up the table as if the inserts did not happen during event creation. + self.get_success( + self.store.db_pool.simple_delete_many( + table="event_relations", + column="event_id", + iterable=(annotation_event_id_bad, thread_event_id), + keyvalues={}, + desc="RelationsTestCase.test_background_update", + ) + ) + + # Only the "good" annotation should be found. + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + self.assertEquals( + [ev["event_id"] for ev in channel.json_body["chunk"]], + [annotation_event_id_good], + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + {"update_name": "event_arbitrary_relations", "progress_json": "{}"}, + ) + ) + + # Ugh, have to reset this flag + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # The "good" annotation and the thread should be found, but not the "bad" + # annotation. + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + self.assertCountEqual( + [ev["event_id"] for ev in channel.json_body["chunk"]], + [annotation_event_id_good, thread_event_id], + ) diff --git a/tests/unittest.py b/tests/unittest.py index c9a08a3420..165aafc574 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -331,7 +331,12 @@ class HomeserverTestCase(TestCase): time.sleep(0.01) def wait_for_background_updates(self) -> None: - """Block until all background database updates have completed.""" + """ + Block until all background database updates have completed. + + Note that callers must ensure that's a store property created on the + testcase. + """ while not self.get_success( self.store.db_pool.updates.has_completed_background_updates() ): -- cgit 1.5.1 From 7cebaf96447a8ff50c4525ba7667f58127876c5e Mon Sep 17 00:00:00 2001 From: Shay Date: Tue, 23 Nov 2021 06:46:40 -0800 Subject: Remove code invalidated by deprecated config flag 'trust_identity_servers_for_password_resets' (#11395) * remove background update code related to deprecated config flag * changelog entry * update changelog * Delete 11394.removal Duplicate, wrong number * add no-op background update and change newfragment so it will be consolidated with associated work * remove unused code * Remove code associated with deprecated flag from legacy docker dynamic config file Co-authored-by: reivilibre --- changelog.d/11395.removal | 1 + docker/conf/homeserver.yaml | 8 ------ synapse/storage/databases/main/registration.py | 35 +++----------------------- tests/utils.py | 1 - 4 files changed, 4 insertions(+), 41 deletions(-) create mode 100644 changelog.d/11395.removal (limited to 'synapse/storage') diff --git a/changelog.d/11395.removal b/changelog.d/11395.removal new file mode 100644 index 0000000000..6c1fd560ad --- /dev/null +++ b/changelog.d/11395.removal @@ -0,0 +1 @@ +Remove deprecated `trust_identity_server_for_password_resets` configuration flag. \ No newline at end of file diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index 3cba594d02..f10f78a48c 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -148,14 +148,6 @@ bcrypt_rounds: 12 allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }} enable_group_creation: true -# The list of identity servers trusted to verify third party -# identifiers by this server. -# -# Also defines the ID server which will be called when an account is -# deactivated (one will be picked arbitrarily). -trusted_third_party_id_servers: - - matrix.org - - vector.im ## Metrics ### diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 7ee699f981..8478463a2a 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1728,11 +1728,11 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): ) self.db_pool.updates.register_background_update_handler( - "user_threepids_grandfather", self._bg_user_threepids_grandfather + "users_set_deactivated_flag", self._background_update_set_deactivated_flag ) - self.db_pool.updates.register_background_update_handler( - "users_set_deactivated_flag", self._background_update_set_deactivated_flag + self.db_pool.updates.register_noop_background_update( + "user_threepids_grandfather" ) self.db_pool.updates.register_background_index_update( @@ -1805,35 +1805,6 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): return nb_processed - async def _bg_user_threepids_grandfather(self, progress, batch_size): - """We now track which identity servers a user binds their 3PID to, so - we need to handle the case of existing bindings where we didn't track - this. - - We do this by grandfathering in existing user threepids assuming that - they used one of the server configured trusted identity servers. - """ - id_servers = set(self.config.registration.trusted_third_party_id_servers) - - def _bg_user_threepids_grandfather_txn(txn): - sql = """ - INSERT INTO user_threepid_id_server - (user_id, medium, address, id_server) - SELECT user_id, medium, address, ? - FROM user_threepids - """ - - txn.execute_batch(sql, [(id_server,) for id_server in id_servers]) - - if id_servers: - await self.db_pool.runInteraction( - "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn - ) - - await self.db_pool.updates._end_background_update("user_threepids_grandfather") - - return 1 - async def set_user_deactivated_status( self, user_id: str, deactivated: bool ) -> None: diff --git a/tests/utils.py b/tests/utils.py index cf8ba5c5db..983859120f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -119,7 +119,6 @@ def default_config(name, parse=False): "enable_registration": True, "enable_registration_captcha": False, "macaroon_secret_key": "not even a little secret", - "trusted_third_party_id_servers": [], "password_providers": [], "worker_replication_url": "", "worker_app": None, -- cgit 1.5.1 From 55669bd3de7137553085e9f1c16a686ff657108c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Nov 2021 10:21:19 -0500 Subject: Add missing type hints to config base classes (#11377) --- changelog.d/11377.bugfix | 1 + changelog.d/11377.misc | 1 + mypy.ini | 3 + synapse/config/_base.py | 157 +++++++++++++++---------- synapse/config/_base.pyi | 87 +++++++++----- synapse/config/cache.py | 4 +- synapse/config/key.py | 3 +- synapse/config/logger.py | 4 +- synapse/config/server.py | 4 +- synapse/config/tls.py | 2 +- synapse/module_api/__init__.py | 2 +- synapse/storage/databases/main/registration.py | 3 +- tests/config/test_load.py | 22 ++-- 13 files changed, 184 insertions(+), 109 deletions(-) create mode 100644 changelog.d/11377.bugfix create mode 100644 changelog.d/11377.misc (limited to 'synapse/storage') diff --git a/changelog.d/11377.bugfix b/changelog.d/11377.bugfix new file mode 100644 index 0000000000..9831fb7bbe --- /dev/null +++ b/changelog.d/11377.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.45.0 where the `read_templates` method of the module API would error. diff --git a/changelog.d/11377.misc b/changelog.d/11377.misc new file mode 100644 index 0000000000..3dac625576 --- /dev/null +++ b/changelog.d/11377.misc @@ -0,0 +1 @@ +Add type hints to configuration classes. diff --git a/mypy.ini b/mypy.ini index 308cfd95d8..bc4f59154d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -151,6 +151,9 @@ disallow_untyped_defs = True [mypy-synapse.app.*] disallow_untyped_defs = True +[mypy-synapse.config._base] +disallow_untyped_defs = True + [mypy-synapse.crypto.*] disallow_untyped_defs = True diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 7c4428a138..1265738dc1 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -20,7 +20,18 @@ import os from collections import OrderedDict from hashlib import sha256 from textwrap import dedent -from typing import Any, Iterable, List, MutableMapping, Optional, Union +from typing import ( + Any, + Dict, + Iterable, + List, + MutableMapping, + Optional, + Tuple, + Type, + TypeVar, + Union, +) import attr import jinja2 @@ -78,7 +89,7 @@ CONFIG_FILE_HEADER = """\ """ -def path_exists(file_path): +def path_exists(file_path: str) -> bool: """Check if a file exists Unlike os.path.exists, this throws an exception if there is an error @@ -86,7 +97,7 @@ def path_exists(file_path): the parent dir). Returns: - bool: True if the file exists; False if not. + True if the file exists; False if not. """ try: os.stat(file_path) @@ -102,15 +113,15 @@ class Config: A configuration section, containing configuration keys and values. Attributes: - section (str): The section title of this config object, such as + section: The section title of this config object, such as "tls" or "logger". This is used to refer to it on the root logger (for example, `config.tls.some_option`). Must be defined in subclasses. """ - section = None + section: str - def __init__(self, root_config=None): + def __init__(self, root_config: "RootConfig" = None): self.root = root_config # Get the path to the default Synapse template directory @@ -119,7 +130,7 @@ class Config: ) @staticmethod - def parse_size(value): + def parse_size(value: Union[str, int]) -> int: if isinstance(value, int): return value sizes = {"K": 1024, "M": 1024 * 1024} @@ -162,15 +173,15 @@ class Config: return int(value) * size @staticmethod - def abspath(file_path): + def abspath(file_path: str) -> str: return os.path.abspath(file_path) if file_path else file_path @classmethod - def path_exists(cls, file_path): + def path_exists(cls, file_path: str) -> bool: return path_exists(file_path) @classmethod - def check_file(cls, file_path, config_name): + def check_file(cls, file_path: Optional[str], config_name: str) -> str: if file_path is None: raise ConfigError("Missing config for %s." % (config_name,)) try: @@ -183,7 +194,7 @@ class Config: return cls.abspath(file_path) @classmethod - def ensure_directory(cls, dir_path): + def ensure_directory(cls, dir_path: str) -> str: dir_path = cls.abspath(dir_path) os.makedirs(dir_path, exist_ok=True) if not os.path.isdir(dir_path): @@ -191,7 +202,7 @@ class Config: return dir_path @classmethod - def read_file(cls, file_path, config_name): + def read_file(cls, file_path: Any, config_name: str) -> str: """Deprecated: call read_file directly""" return read_file(file_path, (config_name,)) @@ -284,6 +295,9 @@ class Config: return [env.get_template(filename) for filename in filenames] +TRootConfig = TypeVar("TRootConfig", bound="RootConfig") + + class RootConfig: """ Holder of an application's configuration. @@ -308,7 +322,9 @@ class RootConfig: raise Exception("Failed making %s: %r" % (config_class.section, e)) setattr(self, config_class.section, conf) - def invoke_all(self, func_name: str, *args, **kwargs) -> MutableMapping[str, Any]: + def invoke_all( + self, func_name: str, *args: Any, **kwargs: Any + ) -> MutableMapping[str, Any]: """ Invoke a function on all instantiated config objects this RootConfig is configured to use. @@ -317,6 +333,7 @@ class RootConfig: func_name: Name of function to invoke *args **kwargs + Returns: ordered dictionary of config section name and the result of the function from it. @@ -332,7 +349,7 @@ class RootConfig: return res @classmethod - def invoke_all_static(cls, func_name: str, *args, **kwargs): + def invoke_all_static(cls, func_name: str, *args: Any, **kwargs: any) -> None: """ Invoke a static function on config objects this RootConfig is configured to use. @@ -341,6 +358,7 @@ class RootConfig: func_name: Name of function to invoke *args **kwargs + Returns: ordered dictionary of config section name and the result of the function from it. @@ -351,16 +369,16 @@ class RootConfig: def generate_config( self, - config_dir_path, - data_dir_path, - server_name, - generate_secrets=False, - report_stats=None, - open_private_ports=False, - listeners=None, - tls_certificate_path=None, - tls_private_key_path=None, - ): + config_dir_path: str, + data_dir_path: str, + server_name: str, + generate_secrets: bool = False, + report_stats: Optional[bool] = None, + open_private_ports: bool = False, + listeners: Optional[List[dict]] = None, + tls_certificate_path: Optional[str] = None, + tls_private_key_path: Optional[str] = None, + ) -> str: """ Build a default configuration file @@ -368,27 +386,27 @@ class RootConfig: (eg with --generate_config). Args: - config_dir_path (str): The path where the config files are kept. Used to + config_dir_path: The path where the config files are kept. Used to create filenames for things like the log config and the signing key. - data_dir_path (str): The path where the data files are kept. Used to create + data_dir_path: The path where the data files are kept. Used to create filenames for things like the database and media store. - server_name (str): The server name. Used to initialise the server_name + server_name: The server name. Used to initialise the server_name config param, but also used in the names of some of the config files. - generate_secrets (bool): True if we should generate new secrets for things + generate_secrets: True if we should generate new secrets for things like the macaroon_secret_key. If False, these parameters will be left unset. - report_stats (bool|None): Initial setting for the report_stats setting. + report_stats: Initial setting for the report_stats setting. If None, report_stats will be left unset. - open_private_ports (bool): True to leave private ports (such as the non-TLS + open_private_ports: True to leave private ports (such as the non-TLS HTTP listener) open to the internet. - listeners (list(dict)|None): A list of descriptions of the listeners - synapse should start with each of which specifies a port (str), a list of + listeners: A list of descriptions of the listeners synapse should + start with each of which specifies a port (int), a list of resources (list(str)), tls (bool) and type (str). For example: [{ "port": 8448, @@ -403,16 +421,12 @@ class RootConfig: "type": "http", }], + tls_certificate_path: The path to the tls certificate. - database (str|None): The database type to configure, either `psycog2` - or `sqlite3`. - - tls_certificate_path (str|None): The path to the tls certificate. - - tls_private_key_path (str|None): The path to the tls private key. + tls_private_key_path: The path to the tls private key. Returns: - str: the yaml config file + The yaml config file """ return CONFIG_FILE_HEADER + "\n\n".join( @@ -432,12 +446,15 @@ class RootConfig: ) @classmethod - def load_config(cls, description, argv): + def load_config( + cls: Type[TRootConfig], description: str, argv: List[str] + ) -> TRootConfig: """Parse the commandline and config files Doesn't support config-file-generation: used by the worker apps. - Returns: Config object. + Returns: + Config object. """ config_parser = argparse.ArgumentParser(description=description) cls.add_arguments_to_parser(config_parser) @@ -446,7 +463,7 @@ class RootConfig: return obj @classmethod - def add_arguments_to_parser(cls, config_parser): + def add_arguments_to_parser(cls, config_parser: argparse.ArgumentParser) -> None: """Adds all the config flags to an ArgumentParser. Doesn't support config-file-generation: used by the worker apps. @@ -454,7 +471,7 @@ class RootConfig: Used for workers where we want to add extra flags/subcommands. Args: - config_parser (ArgumentParser): App description + config_parser: App description """ config_parser.add_argument( @@ -477,7 +494,9 @@ class RootConfig: cls.invoke_all_static("add_arguments", config_parser) @classmethod - def load_config_with_parser(cls, parser, argv): + def load_config_with_parser( + cls: Type[TRootConfig], parser: argparse.ArgumentParser, argv: List[str] + ) -> Tuple[TRootConfig, argparse.Namespace]: """Parse the commandline and config files with the given parser Doesn't support config-file-generation: used by the worker apps. @@ -485,13 +504,12 @@ class RootConfig: Used for workers where we want to add extra flags/subcommands. Args: - parser (ArgumentParser) - argv (list[str]) + parser + argv Returns: - tuple[HomeServerConfig, argparse.Namespace]: Returns the parsed - config object and the parsed argparse.Namespace object from - `parser.parse_args(..)` + Returns the parsed config object and the parsed argparse.Namespace + object from parser.parse_args(..)` """ obj = cls() @@ -520,12 +538,15 @@ class RootConfig: return obj, config_args @classmethod - def load_or_generate_config(cls, description, argv): + def load_or_generate_config( + cls: Type[TRootConfig], description: str, argv: List[str] + ) -> Optional[TRootConfig]: """Parse the commandline and config files Supports generation of config files, so is used for the main homeserver app. - Returns: Config object, or None if --generate-config or --generate-keys was set + Returns: + Config object, or None if --generate-config or --generate-keys was set """ parser = argparse.ArgumentParser(description=description) parser.add_argument( @@ -680,16 +701,21 @@ class RootConfig: return obj - def parse_config_dict(self, config_dict, config_dir_path=None, data_dir_path=None): + def parse_config_dict( + self, + config_dict: Dict[str, Any], + config_dir_path: Optional[str] = None, + data_dir_path: Optional[str] = None, + ) -> None: """Read the information from the config dict into this Config object. Args: - config_dict (dict): Configuration data, as read from the yaml + config_dict: Configuration data, as read from the yaml - config_dir_path (str): The path where the config files are kept. Used to + config_dir_path: The path where the config files are kept. Used to create filenames for things like the log config and the signing key. - data_dir_path (str): The path where the data files are kept. Used to create + data_dir_path: The path where the data files are kept. Used to create filenames for things like the database and media store. """ self.invoke_all( @@ -699,17 +725,20 @@ class RootConfig: data_dir_path=data_dir_path, ) - def generate_missing_files(self, config_dict, config_dir_path): + def generate_missing_files( + self, config_dict: Dict[str, Any], config_dir_path: str + ) -> None: self.invoke_all("generate_files", config_dict, config_dir_path) -def read_config_files(config_files): +def read_config_files(config_files: Iterable[str]) -> Dict[str, Any]: """Read the config files into a dict Args: - config_files (iterable[str]): A list of the config files to read + config_files: A list of the config files to read - Returns: dict + Returns: + The configuration dictionary. """ specified_config = {} for config_file in config_files: @@ -733,17 +762,17 @@ def read_config_files(config_files): return specified_config -def find_config_files(search_paths): +def find_config_files(search_paths: List[str]) -> List[str]: """Finds config files using a list of search paths. If a path is a file then that file path is added to the list. If a search path is a directory then all the "*.yaml" files in that directory are added to the list in sorted order. Args: - search_paths(list(str)): A list of paths to search. + search_paths: A list of paths to search. Returns: - list(str): A list of file paths. + A list of file paths. """ config_files = [] @@ -777,7 +806,7 @@ def find_config_files(search_paths): return config_files -@attr.s +@attr.s(auto_attribs=True) class ShardedWorkerHandlingConfig: """Algorithm for choosing which instance is responsible for handling some sharded work. @@ -787,7 +816,7 @@ class ShardedWorkerHandlingConfig: below). """ - instances = attr.ib(type=List[str]) + instances: List[str] def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key.""" diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index c1d9069798..1eb5f5a68c 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -1,4 +1,18 @@ -from typing import Any, Iterable, List, Optional +import argparse +from typing import ( + Any, + Dict, + Iterable, + List, + MutableMapping, + Optional, + Tuple, + Type, + TypeVar, + Union, +) + +import jinja2 from synapse.config import ( account_validity, @@ -19,6 +33,7 @@ from synapse.config import ( logger, metrics, modules, + oembed, oidc, password_auth_providers, push, @@ -27,6 +42,7 @@ from synapse.config import ( registration, repository, retention, + room, room_directory, saml2, server, @@ -51,7 +67,9 @@ MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS: str MISSING_REPORT_STATS_SPIEL: str MISSING_SERVER_NAME: str -def path_exists(file_path: str): ... +def path_exists(file_path: str) -> bool: ... + +TRootConfig = TypeVar("TRootConfig", bound="RootConfig") class RootConfig: server: server.ServerConfig @@ -61,6 +79,7 @@ class RootConfig: logging: logger.LoggingConfig ratelimiting: ratelimiting.RatelimitConfig media: repository.ContentRepositoryConfig + oembed: oembed.OembedConfig captcha: captcha.CaptchaConfig voip: voip.VoipConfig registration: registration.RegistrationConfig @@ -80,6 +99,7 @@ class RootConfig: authproviders: password_auth_providers.PasswordAuthProviderConfig push: push.PushConfig spamchecker: spam_checker.SpamCheckerConfig + room: room.RoomConfig groups: groups.GroupsConfig userdirectory: user_directory.UserDirectoryConfig consent: consent.ConsentConfig @@ -87,72 +107,85 @@ class RootConfig: servernotices: server_notices.ServerNoticesConfig roomdirectory: room_directory.RoomDirectoryConfig thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig - tracer: tracer.TracerConfig + tracing: tracer.TracerConfig redis: redis.RedisConfig modules: modules.ModulesConfig caches: cache.CacheConfig federation: federation.FederationConfig retention: retention.RetentionConfig - config_classes: List = ... + config_classes: List[Type["Config"]] = ... def __init__(self) -> None: ... - def invoke_all(self, func_name: str, *args: Any, **kwargs: Any): ... + def invoke_all( + self, func_name: str, *args: Any, **kwargs: Any + ) -> MutableMapping[str, Any]: ... @classmethod def invoke_all_static(cls, func_name: str, *args: Any, **kwargs: Any) -> None: ... - def __getattr__(self, item: str): ... def parse_config_dict( self, - config_dict: Any, - config_dir_path: Optional[Any] = ..., - data_dir_path: Optional[Any] = ..., + config_dict: Dict[str, Any], + config_dir_path: Optional[str] = ..., + data_dir_path: Optional[str] = ..., ) -> None: ... - read_config: Any = ... def generate_config( self, config_dir_path: str, data_dir_path: str, server_name: str, generate_secrets: bool = ..., - report_stats: Optional[str] = ..., + report_stats: Optional[bool] = ..., open_private_ports: bool = ..., listeners: Optional[Any] = ..., - database_conf: Optional[Any] = ..., tls_certificate_path: Optional[str] = ..., tls_private_key_path: Optional[str] = ..., - ): ... + ) -> str: ... @classmethod - def load_or_generate_config(cls, description: Any, argv: Any): ... + def load_or_generate_config( + cls: Type[TRootConfig], description: str, argv: List[str] + ) -> Optional[TRootConfig]: ... @classmethod - def load_config(cls, description: Any, argv: Any): ... + def load_config( + cls: Type[TRootConfig], description: str, argv: List[str] + ) -> TRootConfig: ... @classmethod - def add_arguments_to_parser(cls, config_parser: Any) -> None: ... + def add_arguments_to_parser( + cls, config_parser: argparse.ArgumentParser + ) -> None: ... @classmethod - def load_config_with_parser(cls, parser: Any, argv: Any): ... + def load_config_with_parser( + cls: Type[TRootConfig], parser: argparse.ArgumentParser, argv: List[str] + ) -> Tuple[TRootConfig, argparse.Namespace]: ... def generate_missing_files( self, config_dict: dict, config_dir_path: str ) -> None: ... class Config: root: RootConfig + default_template_dir: str def __init__(self, root_config: Optional[RootConfig] = ...) -> None: ... - def __getattr__(self, item: str, from_root: bool = ...): ... @staticmethod - def parse_size(value: Any): ... + def parse_size(value: Union[str, int]) -> int: ... @staticmethod - def parse_duration(value: Any): ... + def parse_duration(value: Union[str, int]) -> int: ... @staticmethod - def abspath(file_path: Optional[str]): ... + def abspath(file_path: Optional[str]) -> str: ... @classmethod - def path_exists(cls, file_path: str): ... + def path_exists(cls, file_path: str) -> bool: ... @classmethod - def check_file(cls, file_path: str, config_name: str): ... + def check_file(cls, file_path: str, config_name: str) -> str: ... @classmethod - def ensure_directory(cls, dir_path: str): ... + def ensure_directory(cls, dir_path: str) -> str: ... @classmethod - def read_file(cls, file_path: str, config_name: str): ... + def read_file(cls, file_path: str, config_name: str) -> str: ... + def read_template(self, filenames: str) -> jinja2.Template: ... + def read_templates( + self, + filenames: List[str], + custom_template_directories: Optional[Iterable[str]] = None, + ) -> List[jinja2.Template]: ... -def read_config_files(config_files: List[str]): ... -def find_config_files(search_paths: List[str]): ... +def read_config_files(config_files: Iterable[str]) -> Dict[str, Any]: ... +def find_config_files(search_paths: List[str]) -> List[str]: ... class ShardedWorkerHandlingConfig: instances: List[str] diff --git a/synapse/config/cache.py b/synapse/config/cache.py index d119427ad8..f054455534 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -15,7 +15,7 @@ import os import re import threading -from typing import Callable, Dict +from typing import Callable, Dict, Optional from synapse.python_dependencies import DependencyException, check_requirements @@ -217,7 +217,7 @@ class CacheConfig(Config): expiry_time = cache_config.get("expiry_time") if expiry_time: - self.expiry_time_msec = self.parse_duration(expiry_time) + self.expiry_time_msec: Optional[int] = self.parse_duration(expiry_time) else: self.expiry_time_msec = None diff --git a/synapse/config/key.py b/synapse/config/key.py index 015dbb8a67..035ee2416b 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -16,6 +16,7 @@ import hashlib import logging import os +from typing import Any, Dict import attr import jsonschema @@ -312,7 +313,7 @@ class KeyConfig(Config): ) return keys - def generate_files(self, config, config_dir_path): + def generate_files(self, config: Dict[str, Any], config_dir_path: str) -> None: if "signing_key" in config: return diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 5252e61a99..63aab0babe 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -18,7 +18,7 @@ import os import sys import threading from string import Template -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict import yaml from zope.interface import implementer @@ -185,7 +185,7 @@ class LoggingConfig(Config): help=argparse.SUPPRESS, ) - def generate_files(self, config, config_dir_path): + def generate_files(self, config: Dict[str, Any], config_dir_path: str) -> None: log_config = config.get("log_config") if log_config and not os.path.exists(log_config): log_file = self.abspath("homeserver.log") diff --git a/synapse/config/server.py b/synapse/config/server.py index 7bc0030a9e..8445e9dd05 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -421,7 +421,7 @@ class ServerConfig(Config): # before redacting them. redaction_retention_period = config.get("redaction_retention_period", "7d") if redaction_retention_period is not None: - self.redaction_retention_period = self.parse_duration( + self.redaction_retention_period: Optional[int] = self.parse_duration( redaction_retention_period ) else: @@ -430,7 +430,7 @@ class ServerConfig(Config): # How long to keep entries in the `users_ips` table. user_ips_max_age = config.get("user_ips_max_age", "28d") if user_ips_max_age is not None: - self.user_ips_max_age = self.parse_duration(user_ips_max_age) + self.user_ips_max_age: Optional[int] = self.parse_duration(user_ips_max_age) else: self.user_ips_max_age = None diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 21e5ddd15f..4ca111618f 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -245,7 +245,7 @@ class TlsConfig(Config): cert_path = self.tls_certificate_file logger.info("Loading TLS certificate from %s", cert_path) cert_pem = self.read_file(cert_path, "tls_certificate_path") - cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem.encode()) return cert diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index ac8e8142f1..96d7a8f2a9 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1014,7 +1014,7 @@ class ModuleApi: A list containing the loaded templates, with the orders matching the one of the filenames parameter. """ - return self._hs.config.read_templates( + return self._hs.config.server.read_templates( filenames, (td for td in (self.custom_template_dir, custom_template_directory) if td), ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 8478463a2a..0e8c168667 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1198,8 +1198,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): expiration_ts = now_ms + self._account_validity_period if use_delta: + assert self._account_validity_startup_job_max_delta is not None expiration_ts = random.randrange( - expiration_ts - self._account_validity_startup_job_max_delta, + int(expiration_ts - self._account_validity_startup_job_max_delta), expiration_ts, ) diff --git a/tests/config/test_load.py b/tests/config/test_load.py index d8668d56b2..69a4e9413b 100644 --- a/tests/config/test_load.py +++ b/tests/config/test_load.py @@ -46,15 +46,16 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase): "was: %r" % (config.key.macaroon_secret_key,) ) - config = HomeServerConfig.load_or_generate_config("", ["-c", self.config_file]) + config2 = HomeServerConfig.load_or_generate_config("", ["-c", self.config_file]) + assert config2 is not None self.assertTrue( - hasattr(config.key, "macaroon_secret_key"), + hasattr(config2.key, "macaroon_secret_key"), "Want config to have attr macaroon_secret_key", ) - if len(config.key.macaroon_secret_key) < 5: + if len(config2.key.macaroon_secret_key) < 5: self.fail( "Want macaroon secret key to be string of at least length 5," - "was: %r" % (config.key.macaroon_secret_key,) + "was: %r" % (config2.key.macaroon_secret_key,) ) def test_load_succeeds_if_macaroon_secret_key_missing(self): @@ -62,6 +63,9 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase): config1 = HomeServerConfig.load_config("", ["-c", self.config_file]) config2 = HomeServerConfig.load_config("", ["-c", self.config_file]) config3 = HomeServerConfig.load_or_generate_config("", ["-c", self.config_file]) + assert config1 is not None + assert config2 is not None + assert config3 is not None self.assertEqual( config1.key.macaroon_secret_key, config2.key.macaroon_secret_key ) @@ -78,14 +82,16 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase): config = HomeServerConfig.load_config("", ["-c", self.config_file]) self.assertFalse(config.registration.enable_registration) - config = HomeServerConfig.load_or_generate_config("", ["-c", self.config_file]) - self.assertFalse(config.registration.enable_registration) + config2 = HomeServerConfig.load_or_generate_config("", ["-c", self.config_file]) + assert config2 is not None + self.assertFalse(config2.registration.enable_registration) # Check that either config value is clobbered by the command line. - config = HomeServerConfig.load_or_generate_config( + config3 = HomeServerConfig.load_or_generate_config( "", ["-c", self.config_file, "--enable-registration"] ) - self.assertTrue(config.registration.enable_registration) + assert config3 is not None + self.assertTrue(config3.registration.enable_registration) def test_stats_enabled(self): self.generate_config_and_remove_lines_containing("enable_metrics") -- cgit 1.5.1 From 7f9841bdec349075eb424c943db5569439acb83c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 20:21:44 +0100 Subject: Lower minumum batch size to 1 for background updates (#11422) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/11422.bugfix | 1 + synapse/storage/background_updates.py | 2 +- tests/rest/admin/test_background_updates.py | 25 +++++++++++++++++-------- tests/storage/test_background_update.py | 8 ++++---- 4 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 changelog.d/11422.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/11422.bugfix b/changelog.d/11422.bugfix new file mode 100644 index 0000000000..28ac65ea7c --- /dev/null +++ b/changelog.d/11422.bugfix @@ -0,0 +1 @@ +Improve performance of various background database schema updates. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b104f9032c..bc8364400d 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -82,7 +82,7 @@ class BackgroundUpdater: process and autotuning the batch size. """ - MINIMUM_BACKGROUND_BATCH_SIZE = 100 + MINIMUM_BACKGROUND_BATCH_SIZE = 1 DEFAULT_BACKGROUND_BATCH_SIZE = 100 BACKGROUND_UPDATE_INTERVAL_MS = 1000 BACKGROUND_UPDATE_DURATION_MS = 100 diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py index 1786316763..cd5c60b65c 100644 --- a/tests/rest/admin/test_background_updates.py +++ b/tests/rest/admin/test_background_updates.py @@ -20,6 +20,7 @@ import synapse.rest.admin from synapse.api.errors import Codes from synapse.rest.client import login from synapse.server import HomeServer +from synapse.storage.background_updates import BackgroundUpdater from tests import unittest @@ -150,9 +151,11 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "current_updates": { "master": { "name": "test_update", - "average_items_per_ms": 0.1, + "average_items_per_ms": 0.001, "total_duration_ms": 1000.0, - "total_item_count": 100, + "total_item_count": ( + BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE + ), } }, "enabled": True, @@ -203,9 +206,11 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "current_updates": { "master": { "name": "test_update", - "average_items_per_ms": 0.1, + "average_items_per_ms": 0.001, "total_duration_ms": 1000.0, - "total_item_count": 100, + "total_item_count": ( + BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE + ), } }, "enabled": False, @@ -230,9 +235,11 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "current_updates": { "master": { "name": "test_update", - "average_items_per_ms": 0.1, + "average_items_per_ms": 0.001, "total_duration_ms": 1000.0, - "total_item_count": 100, + "total_item_count": ( + BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE + ), } }, "enabled": False, @@ -267,9 +274,11 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "current_updates": { "master": { "name": "test_update", - "average_items_per_ms": 0.1, + "average_items_per_ms": 0.001, "total_duration_ms": 2000.0, - "total_item_count": 200, + "total_item_count": ( + 2 * BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE + ), } }, "enabled": True, diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 0da42b5ac5..a5f5ebad41 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -19,11 +19,11 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): ) def test_do_background_update(self): - # the time we claim each update takes - duration_ms = 42 + # the time we claim it takes to update one item when running the update + duration_ms = 4200 # the target runtime for each bg update - target_background_update_duration_ms = 50000 + target_background_update_duration_ms = 5000000 store = self.hs.get_datastore() self.get_success( @@ -57,7 +57,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): # on the first call, we should get run with the default background update size self.update_handler.assert_called_once_with( - {"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE + {"my_key": 1}, self.updates.MINIMUM_BACKGROUND_BATCH_SIZE ) # second step: complete the update -- cgit 1.5.1 From 0d88c4f9030c50bb9e016d9a28f6db27f7913d0b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 16:14:54 +0100 Subject: Improve performance of `remove_{hidden,deleted}_devices_from_device_inbox` (#11421) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/11421.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 209 +++++++-------------- .../65/08_device_inbox_background_updates.sql | 18 ++ tests/storage/databases/main/test_deviceinbox.py | 4 +- 4 files changed, 84 insertions(+), 148 deletions(-) create mode 100644 changelog.d/11421.bugfix create mode 100644 synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql (limited to 'synapse/storage') diff --git a/changelog.d/11421.bugfix b/changelog.d/11421.bugfix new file mode 100644 index 0000000000..28ac65ea7c --- /dev/null +++ b/changelog.d/11421.bugfix @@ -0,0 +1 @@ +Improve performance of various background database schema updates. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 7c0f953365..ab8766c75b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" + REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -614,14 +615,18 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) - self.db_pool.updates.register_background_update_handler( - self.REMOVE_DELETED_DEVICES, - self._remove_deleted_devices_from_device_inbox, + # Used to be a background update that deletes all device_inboxes for deleted + # devices. + self.db_pool.updates.register_noop_background_update( + self.REMOVE_DELETED_DEVICES ) + # Used to be a background update that deletes all device_inboxes for hidden + # devices. + self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES) self.db_pool.updates.register_background_update_handler( - self.REMOVE_HIDDEN_DEVICES, - self._remove_hidden_devices_from_device_inbox, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, + self._remove_dead_devices_from_device_inbox, ) async def _background_drop_index_device_inbox(self, progress, batch_size): @@ -636,171 +641,83 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return 1 - async def _remove_deleted_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int + async def _remove_dead_devices_from_device_inbox( + self, + progress: JsonDict, + batch_size: int, ) -> int: - """A background update that deletes all device_inboxes for deleted devices. - - This should only need to be run once (when users upgrade to v1.47.0) + """A background update to remove devices that were either deleted or hidden from + the device_inbox table. Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query + progress: The update's progress dict. + batch_size: The batch size for this update. Returns: - The number of deleted rows + The number of rows deleted. """ - def _remove_deleted_devices_from_device_inbox_txn( + def _remove_dead_devices_from_device_inbox_txn( txn: LoggingTransaction, - ) -> int: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all dead device messages for the stream_id - returned from the previous query + ) -> Tuple[int, bool]: - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM device_inbox") + # There's a type mismatch here between how we want to type the row and + # what fetchone says it returns, but we silence it because we know that + # res can't be None. + res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] + if res[0] is None: + # this can only happen if the `device_inbox` table is empty, in which + # case we have no work to do. + return 0, True + else: + max_stream_id = res[0] - last_stream_id = progress.get("stream_id", 0) + start = progress.get("stream_id", 0) + stop = start + batch_size + # delete rows in `device_inbox` which do *not* correspond to a known, + # unhidden device. sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox + DELETE FROM device_inbox WHERE - stream_id >= ? - AND (device_id, user_id) NOT IN ( - SELECT device_id, user_id FROM devices + stream_id >= ? AND stream_id < ? + AND NOT EXISTS ( + SELECT * FROM devices d + WHERE + d.device_id=device_inbox.device_id + AND d.user_id=device_inbox.user_id + AND NOT hidden ) - ORDER BY stream_id - LIMIT ? - """ - - txn.execute(sql, (last_stream_id, batch_size)) - rows = txn.fetchall() + """ - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) + txn.execute(sql, (start, stop)) - if rows: - # send more than stream_id to progress - # otherwise it can happen in large deployments that - # no change of status is visible in the log file - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_DELETED_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - }, - ) - - return num_deleted - - number_deleted = await self.db_pool.runInteraction( - "_remove_deleted_devices_from_device_inbox", - _remove_deleted_devices_from_device_inbox_txn, - ) - - # The task is finished when no more lines are deleted. - if not number_deleted: - await self.db_pool.updates._end_background_update( - self.REMOVE_DELETED_DEVICES + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, ) - return number_deleted - - async def _remove_hidden_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int - ) -> int: - """A background update that deletes all device_inboxes for hidden devices. - - This should only need to be run once (when users upgrade to v1.47.0) - - Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query - - Returns: - The number of deleted rows - """ - - def _remove_hidden_devices_from_device_inbox_txn( - txn: LoggingTransaction, - ) -> int: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all hidden device messages for the stream_id - returned from the previous query - - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ - - last_stream_id = progress.get("stream_id", 0) - - sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox - WHERE - stream_id >= ? - AND (device_id, user_id) IN ( - SELECT device_id, user_id FROM devices WHERE hidden = ? - ) - ORDER BY stream_id - LIMIT ? - """ - - txn.execute(sql, (last_stream_id, True, batch_size)) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) - - if rows: - # We don't just save the `stream_id` in progress as - # otherwise it can happen in large deployments that - # no change of status is visible in the log file, as - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_HIDDEN_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - }, - ) - - return num_deleted + return stop > max_stream_id - number_deleted = await self.db_pool.runInteraction( - "_remove_hidden_devices_from_device_inbox", - _remove_hidden_devices_from_device_inbox_txn, + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _remove_dead_devices_from_device_inbox_txn, ) - # The task is finished when no more lines are deleted. - if not number_deleted: + if finished: await self.db_pool.updates._end_background_update( - self.REMOVE_HIDDEN_DEVICES + self.REMOVE_DEAD_DEVICES_FROM_INBOX, ) - return number_deleted + return batch_size class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): diff --git a/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql new file mode 100644 index 0000000000..d79455c2ce --- /dev/null +++ b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Background update to clear the inboxes of hidden and deleted devices. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6508, 'remove_dead_devices_from_device_inbox', '{}'); diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py index 4b67bd15b7..36c933b9e9 100644 --- a/tests/storage/databases/main/test_deviceinbox.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -66,7 +66,7 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_deleted_devices_from_device_inbox", + "update_name": "remove_dead_devices_from_device_inbox", "progress_json": "{}", }, ) @@ -140,7 +140,7 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_hidden_devices_from_device_inbox", + "update_name": "remove_dead_devices_from_device_inbox", "progress_json": "{}", }, ) -- cgit 1.5.1