From 80ec81dcc54bdb823b95c2f870a919868de9a481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Oct 2023 18:28:40 +0300 Subject: Some refactors around receipts stream (#16426) --- tests/handlers/test_appservice.py | 8 ++++---- tests/handlers/test_typing.py | 26 +++++++++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) (limited to 'tests') diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a7e6cdd66a..8ce6ccf529 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,7 +31,7 @@ from synapse.appservice import ( from synapse.handlers.appservice import ApplicationServicesHandler from synapse.rest.client import login, receipts, register, room, sendtodevice from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType from synapse.util import Clock from synapse.util.stringutils import random_string @@ -304,7 +304,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.handler.notify_interested_services_ephemeral( - "receipt_key", 580, ["@fakerecipient:example.com"] + StreamKeyType.RECEIPT, 580, ["@fakerecipient:example.com"] ) self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( interested_service, ephemeral=[event] @@ -332,7 +332,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.handler.notify_interested_services_ephemeral( - "receipt_key", 580, ["@fakerecipient:example.com"] + StreamKeyType.RECEIPT, 580, ["@fakerecipient:example.com"] ) # This method will be called, but with an empty list of events self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( @@ -634,7 +634,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): self.get_success( self.hs.get_application_service_handler()._notify_interested_services_ephemeral( services=[interested_appservice], - stream_key="receipt_key", + stream_key=StreamKeyType.RECEIPT, new_token=stream_token, users=[self.exclusive_as_user], ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 95106ec8f3..3060bc9744 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -28,7 +28,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.typing import TypingWriterHandler from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.server import HomeServer -from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester from synapse.util import Clock from tests import unittest @@ -203,7 +203,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 1) events = self.get_success( @@ -273,7 +275,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, 200) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 1) events = self.get_success( @@ -349,7 +353,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.mock_federation_client.put_json.assert_called_once_with( "farm", @@ -399,7 +405,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.on_new_event.reset_mock() self.assertEqual(self.event_source.get_current_key(), 1) @@ -425,7 +433,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.reactor.pump([16]) - self.on_new_event.assert_has_calls([call("typing_key", 2, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 2, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 2) events = self.get_success( @@ -459,7 +469,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 3, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 3, rooms=[ROOM_ID])] + ) self.on_new_event.reset_mock() self.assertEqual(self.event_source.get_current_key(), 3) -- cgit 1.5.1 From 009b47badfed7593cff5f8acbd61e8fddb3ca788 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Oct 2023 12:46:28 +0300 Subject: Factor out `MultiWriter` token from `RoomStreamToken` (#16427) --- changelog.d/16427.misc | 1 + synapse/handlers/admin.py | 4 +- synapse/handlers/initial_sync.py | 3 +- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/rest/admin/__init__.py | 2 +- synapse/storage/databases/main/stream.py | 22 +++--- synapse/types/__init__.py | 132 +++++++++++++++++++++---------- tests/handlers/test_appservice.py | 8 +- 9 files changed, 115 insertions(+), 61 deletions(-) create mode 100644 changelog.d/16427.misc (limited to 'tests') diff --git a/changelog.d/16427.misc b/changelog.d/16427.misc new file mode 100644 index 0000000000..44f0e0595e --- /dev/null +++ b/changelog.d/16427.misc @@ -0,0 +1 @@ +Factor out `MultiWriter` token from `RoomStreamToken`. diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index ba9704a065..97fd1fd427 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -171,8 +171,8 @@ class AdminHandler: else: stream_ordering = room.stream_ordering - from_key = RoomStreamToken(0, 0) - to_key = RoomStreamToken(None, stream_ordering) + from_key = RoomStreamToken(topological=0, stream=0) + to_key = RoomStreamToken(stream=stream_ordering) # Events that we've processed in this room written_events: Set[str] = set() diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 5737f8014d..c34bd7db95 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -192,8 +192,7 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - None, - event.stream_ordering, + stream=event.stream_ordering, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a0c3b16819..4cdf0a8502 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1708,7 +1708,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = RoomStreamToken(None, from_key.stream) + from_key = RoomStreamToken(stream=from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7bd42f635f..744e080309 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2333,7 +2333,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index e42dade246..9bd0d764f8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -146,7 +146,7 @@ class PurgeHistoryRestServlet(RestServlet): # RoomStreamToken expects [int] not Optional[int] assert event.internal_metadata.stream_ordering is not None room_token = RoomStreamToken( - event.depth, event.internal_metadata.stream_ordering + topological=event.depth, stream=event.internal_metadata.stream_ordering ) token = await room_token.to_string(self.store) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 5a3611c415..ea06e4eee0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -266,7 +266,7 @@ def generate_next_token( # when we are going backwards so we subtract one from the # stream part. last_stream_ordering -= 1 - return RoomStreamToken(last_topo_ordering, last_stream_ordering) + return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) def _make_generic_sql_bound( @@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if p > min_pos } - return RoomStreamToken(None, min_pos, immutabledict(positions)) + return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) async def get_room_events_stream_for_rooms( self, @@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = RoomStreamToken(None, min(r.stream_ordering for r in rows)) + key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return RoomStreamToken(topo, stream_ordering) + return RoomStreamToken(topological=topo, stream=stream_ordering) @overload def get_stream_id_for_event_txn( @@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", ) - return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + return RoomStreamToken( + topological=row["topological_ordering"], stream=row["stream_ordering"] + ) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream @@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): else: topo = None internal = event.internal_metadata - internal.before = RoomStreamToken(topo, stream - 1) - internal.after = RoomStreamToken(topo, stream) + internal.before = RoomStreamToken(topological=topo, stream=stream - 1) + internal.after = RoomStreamToken(topological=topo, stream=stream) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( @@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, results["stream_ordering"] + topological=results["topological_ordering"] - 1, + stream=results["stream_ordering"], ) after_token = RoomStreamToken( - results["topological_ordering"], results["stream_ordering"] + topological=results["topological_ordering"], + stream=results["stream_ordering"], ) rows, start_token = self._paginate_room_events_txn( diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 406d5b1611..09a88c86a7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -61,6 +61,8 @@ from synapse.util.cancellation import cancellable from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: + from typing_extensions import Self + from synapse.appservice.api import ApplicationService from synapse.storage.databases.main import DataStore, PurgeEventsStore from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore @@ -437,7 +439,78 @@ def map_username_to_mxid_localpart( @attr.s(frozen=True, slots=True, order=False) -class RoomStreamToken: +class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): + """An abstract stream token class for streams that supports multiple + writers. + + This works by keeping track of the stream position of each writer, + represented by a default `stream` attribute and a map of instance name to + stream position of any writers that are ahead of the default stream + position. + """ + + stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) + + instance_map: "immutabledict[str, int]" = attr.ib( + factory=immutabledict, + validator=attr.validators.deep_mapping( + key_validator=attr.validators.instance_of(str), + value_validator=attr.validators.instance_of(int), + mapping_validator=attr.validators.instance_of(immutabledict), + ), + kw_only=True, + ) + + @classmethod + @abc.abstractmethod + async def parse(cls, store: "DataStore", string: str) -> "Self": + """Parse the string representation of the token.""" + ... + + @abc.abstractmethod + async def to_string(self, store: "DataStore") -> str: + """Serialize the token into its string representation.""" + ... + + def copy_and_advance(self, other: "Self") -> "Self": + """Return a new token such that if an event is after both this token and + the other token, then its after the returned token too. + """ + + max_stream = max(self.stream, other.stream) + + instance_map = { + instance: max( + self.instance_map.get(instance, self.stream), + other.instance_map.get(instance, other.stream), + ) + for instance in set(self.instance_map).union(other.instance_map) + } + + return attr.evolve( + self, stream=max_stream, instance_map=immutabledict(instance_map) + ) + + def get_max_stream_pos(self) -> int: + """Get the maximum stream position referenced in this token. + + The corresponding "min" position is, by definition just `self.stream`. + + This is used to handle tokens that have non-empty `instance_map`, and so + reference stream positions after the `self.stream` position. + """ + return max(self.instance_map.values(), default=self.stream) + + def get_stream_pos_for_instance(self, instance_name: str) -> int: + """Get the stream position that the given writer was at at this token.""" + + # If we don't have an entry for the instance we can assume that it was + # at `self.stream`. + return self.instance_map.get(instance_name, self.stream) + + +@attr.s(frozen=True, slots=True, order=False) +class RoomStreamToken(AbstractMultiWriterStreamToken): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -514,16 +587,8 @@ class RoomStreamToken: topological: Optional[int] = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(int)), - ) - stream: int = attr.ib(validator=attr.validators.instance_of(int)) - - instance_map: "immutabledict[str, int]" = attr.ib( - factory=immutabledict, - validator=attr.validators.deep_mapping( - key_validator=attr.validators.instance_of(str), - value_validator=attr.validators.instance_of(int), - mapping_validator=attr.validators.instance_of(immutabledict), - ), + kw_only=True, + default=None, ) def __attrs_post_init__(self) -> None: @@ -583,17 +648,7 @@ class RoomStreamToken: if self.topological or other.topological: raise Exception("Can't advance topological tokens") - max_stream = max(self.stream, other.stream) - - instance_map = { - instance: max( - self.instance_map.get(instance, self.stream), - other.instance_map.get(instance, other.stream), - ) - for instance in set(self.instance_map).union(other.instance_map) - } - - return RoomStreamToken(None, max_stream, immutabledict(instance_map)) + return super().copy_and_advance(other) def as_historical_tuple(self) -> Tuple[int, int]: """Returns a tuple of `(topological, stream)` for historical tokens. @@ -619,16 +674,6 @@ class RoomStreamToken: # at `self.stream`. return self.instance_map.get(instance_name, self.stream) - def get_max_stream_pos(self) -> int: - """Get the maximum stream position referenced in this token. - - The corresponding "min" position is, by definition just `self.stream`. - - This is used to handle tokens that have non-empty `instance_map`, and so - reference stream positions after the `self.stream` position. - """ - return max(self.instance_map.values(), default=self.stream) - async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) @@ -838,23 +883,28 @@ class StreamToken: return getattr(self, key.value) -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) +StreamToken.START = StreamToken(RoomStreamToken(stream=0), 0, 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True, auto_attribs=True) -class PersistedEventPosition: - """Position of a newly persisted event with instance that persisted it. - - This can be used to test whether the event is persisted before or after a - RoomStreamToken. - """ +class PersistedPosition: + """Position of a newly persisted row with instance that persisted it.""" instance_name: str stream: int - def persisted_after(self, token: RoomStreamToken) -> bool: + def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: return token.get_stream_pos_for_instance(self.instance_name) < self.stream + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PersistedEventPosition(PersistedPosition): + """Position of a newly persisted event with instance that persisted it. + + This can be used to test whether the event is persisted before or after a + RoomStreamToken. + """ + def to_room_stream_token(self) -> RoomStreamToken: """Converts the position to a room stream token such that events persisted in the same room after this position will be after the @@ -865,7 +915,7 @@ class PersistedEventPosition: """ # Doing the naive thing satisfies the desired properties described in # the docstring. - return RoomStreamToken(None, self.stream) + return RoomStreamToken(stream=self.stream) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 8ce6ccf529..867dbd6001 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -86,7 +86,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): [event], ] ) - self.handler.notify_interested_services(RoomStreamToken(None, 1)) + self.handler.notify_interested_services(RoomStreamToken(stream=1)) self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( interested_service, events=[event] @@ -107,7 +107,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ] ) self.mock_store.get_events_as_list = AsyncMock(side_effect=[[event]]) - self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.handler.notify_interested_services(RoomStreamToken(stream=0)) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @@ -126,7 +126,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ] ) - self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.handler.notify_interested_services(RoomStreamToken(stream=0)) self.assertFalse( self.mock_as_api.query_user.called, @@ -441,7 +441,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): self.get_success( self.hs.get_application_service_handler()._notify_interested_services( RoomStreamToken( - None, self.hs.get_application_service_handler().current_max + stream=self.hs.get_application_service_handler().current_max ) ) ) -- cgit 1.5.1 From 26b960b08ba0110ef3246e5749bb75b9b04a231c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 6 Oct 2023 07:22:55 -0400 Subject: Register media servlets via regex. (#16419) This converts the media servlet URLs in the same way as (most) of the rest of Synapse. This will give more flexibility in the versions each endpoint exists under. --- changelog.d/16419.misc | 1 + synapse/http/server.py | 2 +- synapse/media/_base.py | 48 +-------- synapse/media/media_repository.py | 10 +- synapse/rest/media/config_resource.py | 13 ++- synapse/rest/media/download_resource.py | 40 +++++--- synapse/rest/media/media_repository_resource.py | 33 ++++--- synapse/rest/media/preview_url_resource.py | 26 ++--- synapse/rest/media/thumbnail_resource.py | 35 ++++--- synapse/rest/media/upload_resource.py | 14 ++- tests/media/test_media_storage.py | 88 ++++++++--------- tests/media/test_url_previewer.py | 6 +- tests/replication/test_multi_media_repo.py | 19 +++- tests/rest/admin/test_admin.py | 58 ++++------- tests/rest/admin/test_media.py | 71 ++++---------- tests/rest/admin/test_statistics.py | 15 +-- tests/rest/admin/test_user.py | 21 ++-- tests/rest/client/utils.py | 6 +- tests/rest/media/test_url_preview.py | 124 ++++++++++++++---------- tests/unittest.py | 4 +- 20 files changed, 297 insertions(+), 337 deletions(-) create mode 100644 changelog.d/16419.misc (limited to 'tests') diff --git a/changelog.d/16419.misc b/changelog.d/16419.misc new file mode 100644 index 0000000000..591f371d00 --- /dev/null +++ b/changelog.d/16419.misc @@ -0,0 +1 @@ +Update registration of media repository URLs. diff --git a/synapse/http/server.py b/synapse/http/server.py index 3bbf91298e..1e4e56f36b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -266,7 +266,7 @@ class HttpServer(Protocol): def register_paths( self, method: str, - path_patterns: Iterable[Pattern], + path_patterns: Iterable[Pattern[str]], callback: ServletCallback, servlet_classname: str, ) -> None: diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 80c448de2b..d103b43449 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -26,11 +26,11 @@ from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender from twisted.web.server import Request -from synapse.api.errors import Codes, SynapseError, cs_error +from synapse.api.errors import Codes, cs_error from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable -from synapse.util.stringutils import is_ascii, parse_and_validate_server_name +from synapse.util.stringutils import is_ascii logger = logging.getLogger(__name__) @@ -84,52 +84,12 @@ INLINE_CONTENT_TYPES = [ ] -def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: - """Parses the server name, media ID and optional file name from the request URI - - Also performs some rough validation on the server name. - - Args: - request: The `Request`. - - Returns: - A tuple containing the parsed server name, media ID and optional file name. - - Raises: - SynapseError(404): if parsing or validation fail for any reason - """ - try: - # The type on postpath seems incorrect in Twisted 21.2.0. - postpath: List[bytes] = request.postpath # type: ignore - assert postpath - - # This allows users to append e.g. /test.png to the URL. Useful for - # clients that parse the URL to see content type. - server_name_bytes, media_id_bytes = postpath[:2] - server_name = server_name_bytes.decode("utf-8") - media_id = media_id_bytes.decode("utf8") - - # Validate the server name, raising if invalid - parse_and_validate_server_name(server_name) - - file_name = None - if len(postpath) > 2: - try: - file_name = urllib.parse.unquote(postpath[-1].decode("utf-8")) - except UnicodeDecodeError: - pass - return server_name, media_id, file_name - except Exception: - raise SynapseError( - 404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN - ) - - def respond_404(request: SynapseRequest) -> None: + assert request.path is not None respond_with_json( request, 404, - cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND), + cs_error("Not found '%s'" % (request.path.decode(),), code=Codes.NOT_FOUND), send_cors=True, ) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1b7b014f9a..d11c2ff4ee 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -48,6 +48,7 @@ from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError +from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID from synapse.util.async_helpers import Linearizer @@ -114,7 +115,7 @@ class MediaRepository: ) storage_providers.append(provider) - self.media_storage = MediaStorage( + self.media_storage: MediaStorage = MediaStorage( self.hs, self.primary_base_path, self.filepaths, storage_providers ) @@ -142,6 +143,13 @@ class MediaRepository: MEDIA_RETENTION_CHECK_PERIOD_MS, ) + if hs.config.media.url_preview_enabled: + self.url_previewer: Optional[UrlPreviewer] = UrlPreviewer( + hs, self, self.media_storage + ) + else: + self.url_previewer = None + def _start_update_recently_accessed(self) -> Deferred: return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed diff --git a/synapse/rest/media/config_resource.py b/synapse/rest/media/config_resource.py index a95804d327..dbf5133c72 100644 --- a/synapse/rest/media/config_resource.py +++ b/synapse/rest/media/config_resource.py @@ -14,17 +14,19 @@ # limitations under the License. # +import re from typing import TYPE_CHECKING -from synapse.http.server import DirectServeJsonResource, respond_with_json +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet from synapse.http.site import SynapseRequest if TYPE_CHECKING: from synapse.server import HomeServer -class MediaConfigResource(DirectServeJsonResource): - isLeaf = True +class MediaConfigResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/config$")] def __init__(self, hs: "HomeServer"): super().__init__() @@ -33,9 +35,6 @@ class MediaConfigResource(DirectServeJsonResource): self.auth = hs.get_auth() self.limits_dict = {"m.upload.size": config.media.max_upload_size} - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: await self.auth.get_user_by_req(request) respond_with_json(request, 200, self.limits_dict, send_cors=True) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 3c618ef60a..65b9ff52fa 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -13,16 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING +import re +from typing import TYPE_CHECKING, Optional -from synapse.http.server import ( - DirectServeJsonResource, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_boolean +from synapse.http.server import set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_boolean from synapse.http.site import SynapseRequest -from synapse.media._base import parse_media_id, respond_404 +from synapse.media._base import respond_404 +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -31,15 +29,28 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class DownloadResource(DirectServeJsonResource): - isLeaf = True +class DownloadResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/download/(?P[^/]*)/(?P[^/]*)(/(?P[^/]*))?$" + ) + ] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() self.media_repo = media_repo self._is_mine_server_name = hs.is_mine_server_name - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, + request: SynapseRequest, + server_name: str, + media_id: str, + file_name: Optional[str] = None, + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) request.setHeader( @@ -58,9 +69,8 @@ class DownloadResource(DirectServeJsonResource): b"Referrer-Policy", b"no-referrer", ) - server_name, media_id, name = parse_media_id(request) if self._is_mine_server_name(server_name): - await self.media_repo.get_local_media(request, media_id, name) + await self.media_repo.get_local_media(request, media_id, file_name) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -72,4 +82,6 @@ class DownloadResource(DirectServeJsonResource): respond_404(request) return - await self.media_repo.get_remote_media(request, server_name, media_id, name) + await self.media_repo.get_remote_media( + request, server_name, media_id, file_name + ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 5ebaa3b032..2089bb1029 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING from synapse.config._base import ConfigError -from synapse.http.server import UnrecognizedRequestResource +from synapse.http.server import HttpServer, JsonResource from .config_resource import MediaConfigResource from .download_resource import DownloadResource @@ -27,7 +27,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer -class MediaRepositoryResource(UnrecognizedRequestResource): +class MediaRepositoryResource(JsonResource): """File uploading and downloading. Uploads are POSTed to a resource which returns a token which is used to GET @@ -70,6 +70,11 @@ class MediaRepositoryResource(UnrecognizedRequestResource): width and height are close to the requested size and the aspect matches the requested size. The client should scale the image if it needs to fit within a given rectangle. + + This gets mounted at various points under /_matrix/media, including: + * /_matrix/media/r0 + * /_matrix/media/v1 + * /_matrix/media/v3 """ def __init__(self, hs: "HomeServer"): @@ -77,17 +82,23 @@ class MediaRepositoryResource(UnrecognizedRequestResource): if not hs.config.media.can_load_media_repo: raise ConfigError("Synapse is not configured to use a media repo.") - super().__init__() + JsonResource.__init__(self, hs, canonical_json=False) + self.register_servlets(self, hs) + + @staticmethod + def register_servlets(http_server: HttpServer, hs: "HomeServer") -> None: media_repo = hs.get_media_repository() - self.putChild(b"upload", UploadResource(hs, media_repo)) - self.putChild(b"download", DownloadResource(hs, media_repo)) - self.putChild( - b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage) + # Note that many of these should not exist as v1 endpoints, but empirically + # a lot of traffic still goes to them. + + UploadResource(hs, media_repo).register(http_server) + DownloadResource(hs, media_repo).register(http_server) + ThumbnailResource(hs, media_repo, media_repo.media_storage).register( + http_server ) if hs.config.media.url_preview_enabled: - self.putChild( - b"preview_url", - PreviewUrlResource(hs, media_repo, media_repo.media_storage), + PreviewUrlResource(hs, media_repo, media_repo.media_storage).register( + http_server ) - self.putChild(b"config", MediaConfigResource(hs)) + MediaConfigResource(hs).register(http_server) diff --git a/synapse/rest/media/preview_url_resource.py b/synapse/rest/media/preview_url_resource.py index 58513c4be4..c8acb65dca 100644 --- a/synapse/rest/media/preview_url_resource.py +++ b/synapse/rest/media/preview_url_resource.py @@ -13,24 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from typing import TYPE_CHECKING -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - respond_with_json_bytes, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json_bytes +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media.media_storage import MediaStorage -from synapse.media.url_previewer import UrlPreviewer if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository from synapse.server import HomeServer -class PreviewUrlResource(DirectServeJsonResource): +class PreviewUrlResource(RestServlet): """ The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API for URLs which outputs Open Graph (https://ogp.me/) responses (with some Matrix @@ -48,7 +44,7 @@ class PreviewUrlResource(DirectServeJsonResource): * Matrix cannot be used to distribute the metadata between homeservers. """ - isLeaf = True + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/preview_url$")] def __init__( self, @@ -62,14 +58,10 @@ class PreviewUrlResource(DirectServeJsonResource): self.clock = hs.get_clock() self.media_repo = media_repo self.media_storage = media_storage + assert self.media_repo.url_previewer is not None + self.url_previewer = self.media_repo.url_previewer - self._url_previewer = UrlPreviewer(hs, media_repo, media_storage) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - request.setHeader(b"Allow", b"OPTIONS, GET") - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: # XXX: if get_user_by_req fails, what should we do in an async render? requester = await self.auth.get_user_by_req(request) url = parse_string(request, "url", required=True) @@ -77,5 +69,5 @@ class PreviewUrlResource(DirectServeJsonResource): if ts is None: ts = self.clock.time_msec() - og = await self._url_previewer.preview(url, requester.user, ts) + og = await self.url_previewer.preview(url, requester.user, ts) respond_with_json_bytes(request, 200, og, send_cors=True) diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index 661e604b85..f9cd773f77 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -13,29 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging +import re from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError, cs_error from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json, set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( FileInfo, ThumbnailInfo, - parse_media_id, respond_404, respond_with_file, respond_with_responder, ) from synapse.media.media_storage import MediaStorage +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -44,8 +39,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ThumbnailResource(DirectServeJsonResource): - isLeaf = True +class ThumbnailResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/thumbnail/(?P[^/]*)/(?P[^/]*)$" + ) + ] def __init__( self, @@ -60,12 +59,17 @@ class ThumbnailResource(DirectServeJsonResource): self.media_storage = media_storage self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails self._is_mine_server_name = hs.is_mine_server_name + self._server_name = hs.hostname self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, request: SynapseRequest, server_name: str, media_id: str + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) - server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width", required=True) height = parse_integer(request, "height", required=True) method = parse_string(request, "method", "scale") @@ -418,13 +422,14 @@ class ThumbnailResource(DirectServeJsonResource): # `dynamic_thumbnails` is disabled. logger.info("Failed to find any generated thumbnails") + assert request.path is not None respond_with_json( request, 400, cs_error( - "Cannot find any thumbnails for the requested media (%r). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" + "Cannot find any thumbnails for the requested media ('%s'). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" % ( - request.postpath, + request.path.decode(), ", ".join(THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP.keys()), ), code=Codes.UNKNOWN, diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 043e8d6077..949326d85d 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -14,11 +14,12 @@ # limitations under the License. import logging +import re from typing import IO, TYPE_CHECKING, Dict, List, Optional from synapse.api.errors import Codes, SynapseError -from synapse.http.server import DirectServeJsonResource, respond_with_json -from synapse.http.servlet import parse_bytes_from_args +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet, parse_bytes_from_args from synapse.http.site import SynapseRequest from synapse.media.media_storage import SpamMediaException @@ -29,8 +30,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class UploadResource(DirectServeJsonResource): - isLeaf = True +class UploadResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() @@ -43,10 +44,7 @@ class UploadResource(DirectServeJsonResource): self.max_upload_size = hs.config.media.max_upload_size self.clock = hs.get_clock() - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_POST(self, request: SynapseRequest) -> None: + async def on_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 04fc7bdcef..ba00e35a9e 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -28,6 +28,7 @@ from typing_extensions import Literal from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource from synapse.api.errors import Codes from synapse.events import EventBase @@ -41,12 +42,13 @@ from synapse.module_api import ModuleApi from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers from synapse.rest import admin from synapse.rest.client import login +from synapse.rest.media.thumbnail_resource import ThumbnailResource from synapse.server import HomeServer from synapse.types import JsonDict, RoomAlias from synapse.util import Clock from tests import unittest -from tests.server import FakeChannel, FakeSite, make_request +from tests.server import FakeChannel from tests.test_utils import SMALL_PNG from tests.utils import default_config @@ -288,22 +290,22 @@ class MediaRepoTests(unittest.HomeserverTestCase): return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_resource = hs.get_media_repository_resource() - self.download_resource = media_resource.children[b"download"] - self.thumbnail_resource = media_resource.children[b"thumbnail"] self.store = hs.get_datastores().main self.media_repo = hs.get_media_repository() self.media_id = "example.com/12345" + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def _req( self, content_disposition: Optional[bytes], include_content_type: bool = True ) -> FakeChannel: - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id, + f"/_matrix/media/v3/download/{self.media_id}", shorthand=False, await_result=False, ) @@ -481,11 +483,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): # Fetching again should work, without re-requesting the image from the # remote. params = "?width=32&height=32&method=scale" - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/v3/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -511,11 +511,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): ) shutil.rmtree(thumbnail_dir, ignore_errors=True) - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/v3/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -549,11 +547,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): """ params = "?width=32&height=32&method=" + method - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/r0/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -590,7 +586,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): channel.json_body, { "errcode": "M_UNKNOWN", - "error": "Cannot find any thumbnails for the requested media ([b'example.com', b'12345']). This might mean the media is not a supported_media_format=(image/jpeg, image/jpg, image/webp, image/gif, image/png) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)", + "error": "Cannot find any thumbnails for the requested media ('/_matrix/media/r0/thumbnail/example.com/12345'). This might mean the media is not a supported_media_format=(image/jpeg, image/jpg, image/webp, image/gif, image/png) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)", }, ) else: @@ -600,7 +596,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): channel.json_body, { "errcode": "M_NOT_FOUND", - "error": "Not found [b'example.com', b'12345']", + "error": "Not found '/_matrix/media/r0/thumbnail/example.com/12345'", }, ) @@ -609,12 +605,17 @@ class MediaRepoTests(unittest.HomeserverTestCase): """Test that choosing between thumbnails with the same quality rating succeeds. We are not particular about which thumbnail is chosen.""" + media_repo = self.hs.get_media_repository() + thumbnail_resouce = ThumbnailResource( + self.hs, media_repo, media_repo.media_storage + ) + self.assertIsNotNone( - self.thumbnail_resource._select_thumbnail( + thumbnail_resouce._select_thumbnail( desired_width=desired_size, desired_height=desired_size, desired_method=method, - desired_type=self.test_image.content_type, + desired_type=self.test_image.content_type, # type: ignore[arg-type] # Provide two identical thumbnails which are guaranteed to have the same # quality rating. thumbnail_infos=[ @@ -636,7 +637,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): }, ], file_id=f"image{self.test_image.extension.decode()}", - url_cache=None, + url_cache=False, server_name=None, ) ) @@ -725,13 +726,13 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): self.user = self.register_user("user", "pass") self.tok = self.login("user", "pass") - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] - load_legacy_spam_checkers(hs) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def default_config(self) -> Dict[str, Any]: config = default_config("test") @@ -751,9 +752,7 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): def test_upload_innocent(self) -> None: """Attempt to upload some innocent data that should be allowed.""" - self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200) def test_upload_ban(self) -> None: """Attempt to upload some data that includes bytes "evil", which should @@ -762,9 +761,7 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): data = b"Some evil data" - self.helper.upload_media( - self.upload_resource, data, tok=self.tok, expect_code=400 - ) + self.helper.upload_media(data, tok=self.tok, expect_code=400) EVIL_DATA = b"Some evil data" @@ -781,15 +778,15 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase): self.user = self.register_user("user", "pass") self.tok = self.login("user", "pass") - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] - hs.get_module_api().register_spam_checker_callbacks( check_media_file_for_spam=self.check_media_file_for_spam ) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + async def check_media_file_for_spam( self, file_wrapper: ReadableFileWrapper, file_info: FileInfo ) -> Union[Codes, Literal["NOT_SPAM"], Tuple[Codes, JsonDict]]: @@ -805,21 +802,16 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase): def test_upload_innocent(self) -> None: """Attempt to upload some innocent data that should be allowed.""" - self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200) def test_upload_ban(self) -> None: """Attempt to upload some data that includes bytes "evil", which should get rejected by the spam checker. """ - self.helper.upload_media( - self.upload_resource, EVIL_DATA, tok=self.tok, expect_code=400 - ) + self.helper.upload_media(EVIL_DATA, tok=self.tok, expect_code=400) self.helper.upload_media( - self.upload_resource, EVIL_DATA_EXPERIMENT, tok=self.tok, expect_code=400, diff --git a/tests/media/test_url_previewer.py b/tests/media/test_url_previewer.py index 46ecde5344..04b69f378a 100644 --- a/tests/media/test_url_previewer.py +++ b/tests/media/test_url_previewer.py @@ -61,9 +61,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): return self.setup_test_homeserver(config=config) def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo_resource = hs.get_media_repository_resource() - preview_url = media_repo_resource.children[b"preview_url"] - self.url_previewer = preview_url._url_previewer + media_repo = hs.get_media_repository() + assert media_repo.url_previewer is not None + self.url_previewer = media_repo.url_previewer def test_all_urls_allowed(self) -> None: self.assertFalse(self.url_previewer._is_url_blocked("http://matrix.org")) diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 6e78daa830..b230a6c361 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import os -from typing import Optional, Tuple +from typing import Any, Optional, Tuple from twisted.internet.interfaces import IOpenSSLServerConnectionCreator from twisted.internet.protocol import Factory @@ -29,7 +29,7 @@ from synapse.util import Clock from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file from tests.replication._base import BaseMultiWorkerStreamTestCase -from tests.server import FakeChannel, FakeSite, FakeTransport, make_request +from tests.server import FakeChannel, FakeTransport, make_request from tests.test_utils import SMALL_PNG logger = logging.getLogger(__name__) @@ -56,6 +56,16 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): conf["federation_custom_ca_list"] = [get_test_ca_cert_file()] return conf + def make_worker_hs( + self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any + ) -> HomeServer: + worker_hs = super().make_worker_hs(worker_app, extra_config, **kwargs) + # Force the media paths onto the replication resource. + worker_hs.get_media_repository_resource().register_servlets( + self._hs_to_site[worker_hs].resource, worker_hs + ) + return worker_hs + def _get_media_req( self, hs: HomeServer, target: str, media_id: str ) -> Tuple[FakeChannel, Request]: @@ -68,12 +78,11 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): The channel for the *client* request and the *outbound* request for the media which the caller should respond to. """ - resource = hs.get_media_repository_resource().children[b"download"] channel = make_request( self.reactor, - FakeSite(resource, self.reactor), + self._hs_to_site[hs], "GET", - f"/{target}/{media_id}", + f"/_matrix/media/r0/download/{target}/{media_id}", shorthand=False, access_token=self.access_token, await_result=False, diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 359d131b37..8646b2f0fd 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -13,10 +13,12 @@ # limitations under the License. import urllib.parse +from typing import Dict from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.http.server import JsonResource @@ -26,7 +28,6 @@ from synapse.server import HomeServer from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG @@ -55,21 +56,18 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): room.register_servlets, ] - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources def _ensure_quarantined( self, admin_user_tok: str, server_and_media_id: str ) -> None: """Ensure a piece of media is quarantined when trying to access it.""" - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=admin_user_tok, ) @@ -117,20 +115,16 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("id_nonadmin", "pass") # Upload some media into the room - response = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=admin_user_tok - ) + response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok) # Extract media ID from the response server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://' server_name, media_id = server_name_and_media_id.split("/") # Attempt to access the media - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_name_and_media_id, + f"/_matrix/media/v3/download/{server_name_and_media_id}", shorthand=False, access_token=non_admin_user_tok, ) @@ -173,12 +167,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): self.helper.join(room_id, non_admin_user, tok=non_admin_user_tok) # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract mxcs mxc_1 = response_1["content_uri"] @@ -227,12 +217,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] @@ -265,12 +251,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] @@ -304,11 +286,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): self._ensure_quarantined(admin_user_tok, server_and_media_id_1) # Attempt to access each piece of media - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id_2, + f"/_matrix/media/v3/download/{server_and_media_id_2}", shorthand=False, access_token=non_admin_user_tok, ) diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py index 6d04911d67..278808abb5 100644 --- a/tests/rest/admin/test_media.py +++ b/tests/rest/admin/test_media.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Dict from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.errors import Codes @@ -26,22 +28,27 @@ from synapse.server import HomeServer from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds INVALID_TIMESTAMP_IN_S = 1893456000 # 2030-01-01 in seconds -class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): +class _AdminMediaTests(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, login.register_servlets, ] + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + + +class DeleteMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.media_repo = hs.get_media_repository_resource() self.server_name = hs.hostname self.admin_user = self.register_user("admin", "pass", admin=True) @@ -117,12 +124,8 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): Tests that delete a media is successfully """ - download_resource = self.media_repo.children[b"download"] - upload_resource = self.media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -134,11 +137,9 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): self.assertEqual(server_name, self.server_name) # Attempt to access media - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -173,11 +174,9 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): ) # Attempt to access media - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -194,7 +193,7 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(os.path.exists(local_path)) -class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): +class DeleteMediaByDateSizeTestCase(_AdminMediaTests): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, @@ -529,11 +528,8 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): """ Create a media and return media_id and server_and_media_id """ - upload_resource = self.media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -553,16 +549,12 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): """ Try to access a media and check the result """ - download_resource = self.media_repo.children[b"download"] - media_id = server_and_media_id.split("/")[1] local_path = self.filepaths.local_media_filepath(media_id) - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -591,27 +583,16 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): self.assertFalse(os.path.exists(local_path)) -class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase): - servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.admin.register_servlets_for_media_repo, - login.register_servlets, - ] - +class QuarantineMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo = hs.get_media_repository_resource() self.store = hs.get_datastores().main self.server_name = hs.hostname self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - # Create media - upload_resource = media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -720,26 +701,16 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(media_info["quarantined_by"]) -class ProtectMediaByIDTestCase(unittest.HomeserverTestCase): - servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.admin.register_servlets_for_media_repo, - login.register_servlets, - ] - +class ProtectMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo = hs.get_media_repository_resource() + hs.get_media_repository_resource() self.store = hs.get_datastores().main self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - # Create media - upload_resource = media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -816,7 +787,7 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(media_info["safe_from_quarantine"]) -class PurgeMediaCacheTestCase(unittest.HomeserverTestCase): +class PurgeMediaCacheTestCase(_AdminMediaTests): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py index b60f16b914..cd8ee274d8 100644 --- a/tests/rest/admin/test_statistics.py +++ b/tests/rest/admin/test_statistics.py @@ -12,9 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional +from typing import Dict, List, Optional from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.errors import Codes @@ -34,8 +35,6 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.media_repo = hs.get_media_repository_resource() - self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") @@ -44,6 +43,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): self.url = "/_synapse/admin/v1/statistics/users/media" + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def test_no_auth(self) -> None: """ Try to list users without authentication. @@ -470,12 +474,9 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): user_token: Access token of the user number_media: Number of media to be created for the user """ - upload_resource = self.media_repo.children[b"upload"] for _ in range(number_media): # Upload some media into the room - self.helper.upload_media( - upload_resource, SMALL_PNG, tok=user_token, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=user_token, expect_code=200) def _check_fields(self, content: List[JsonDict]) -> None: """Checks that all attributes are present in content diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index b326ad2c90..37f37a09d8 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -17,12 +17,13 @@ import hmac import os import urllib.parse from binascii import unhexlify -from typing import List, Optional +from typing import Dict, List, Optional from unittest.mock import AsyncMock, Mock, patch from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.constants import ApprovalNoticeMedium, LoginType, UserTypes @@ -45,7 +46,6 @@ from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG from tests.unittest import override_config @@ -3421,7 +3421,6 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main - self.media_repo = hs.get_media_repository_resource() self.filepaths = MediaFilePaths(hs.config.media.media_store_path) self.admin_user = self.register_user("admin", "pass", admin=True) @@ -3432,6 +3431,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.other_user ) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + @parameterized.expand(["GET", "DELETE"]) def test_no_auth(self, method: str) -> None: """Try to list media of an user without authentication.""" @@ -3907,12 +3911,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): Returns: The ID of the newly created media. """ - upload_resource = self.media_repo.children[b"upload"] - download_resource = self.media_repo.children[b"download"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, image_data, user_token, filename, expect_code=200 + image_data, user_token, filename, expect_code=200 ) # Extract media ID from the response @@ -3920,11 +3921,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): media_id = server_and_media_id.split("/")[1] # Try to access a media and to create `last_access_ts` - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=user_token, ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 9532e5ddc1..465b696c0b 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -37,7 +37,6 @@ import attr from typing_extensions import Literal from twisted.test.proto_helpers import MemoryReactorClock -from twisted.web.resource import Resource from twisted.web.server import Site from synapse.api.constants import Membership @@ -45,7 +44,7 @@ from synapse.api.errors import Codes from synapse.server import HomeServer from synapse.types import JsonDict -from tests.server import FakeChannel, FakeSite, make_request +from tests.server import FakeChannel, make_request from tests.test_utils.html_parsers import TestHtmlParser from tests.test_utils.oidc import FakeAuthorizationGrant, FakeOidcServer @@ -558,7 +557,6 @@ class RestHelper: def upload_media( self, - resource: Resource, image_data: bytes, tok: str, filename: str = "test.png", @@ -576,7 +574,7 @@ class RestHelper: path = "/_matrix/media/r0/upload?filename=%s" % (filename,) channel = make_request( self.reactor, - FakeSite(resource, self.reactor), + self.site, "POST", path, content=image_data, diff --git a/tests/rest/media/test_url_preview.py b/tests/rest/media/test_url_preview.py index 05d5e39cab..24459c6af4 100644 --- a/tests/rest/media/test_url_preview.py +++ b/tests/rest/media/test_url_preview.py @@ -24,10 +24,10 @@ from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IAddress, IResolutionReceiver from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor +from twisted.web.resource import Resource from synapse.config.oembed import OEmbedEndpointConfig from synapse.media.url_previewer import IMAGE_CACHE_EXPIRY_MS -from synapse.rest.media.media_repository_resource import MediaRepositoryResource from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -117,8 +117,8 @@ class URLPreviewTests(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.media_repo = hs.get_media_repository() - media_repo_resource = hs.get_media_repository_resource() - self.preview_url = media_repo_resource.children[b"preview_url"] + assert self.media_repo.url_previewer is not None + self.url_previewer = self.media_repo.url_previewer self.lookups: Dict[str, Any] = {} @@ -143,8 +143,15 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.reactor.nameResolver = Resolver() # type: ignore[assignment] - def create_test_resource(self) -> MediaRepositoryResource: - return self.hs.get_media_repository_resource() + def create_resource_dict(self) -> Dict[str, Resource]: + """Create a resource tree for the test server + + A resource tree is a mapping from path to twisted.web.resource. + + The default implementation creates a JsonResource and calls each function in + `servlets` to register servlets against it. + """ + return {"/_matrix/media": self.hs.get_media_repository_resource()} def _assert_small_png(self, json_body: JsonDict) -> None: """Assert properties from the SMALL_PNG test image.""" @@ -159,7 +166,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -183,7 +190,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check the cache returns the correct response channel = self.make_request( - "GET", "preview_url?url=http://matrix.org", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://matrix.org", + shorthand=False, ) # Check the cache response has the same content @@ -193,13 +202,15 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) # Clear the in-memory cache - self.assertIn("http://matrix.org", self.preview_url._url_previewer._cache) - self.preview_url._url_previewer._cache.pop("http://matrix.org") - self.assertNotIn("http://matrix.org", self.preview_url._url_previewer._cache) + self.assertIn("http://matrix.org", self.url_previewer._cache) + self.url_previewer._cache.pop("http://matrix.org") + self.assertNotIn("http://matrix.org", self.url_previewer._cache) # Check the database cache returns the correct response channel = self.make_request( - "GET", "preview_url?url=http://matrix.org", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://matrix.org", + shorthand=False, ) # Check the cache response has the same content @@ -221,7 +232,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -251,7 +262,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -287,7 +298,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -328,7 +339,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -363,7 +374,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -396,7 +407,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -425,7 +436,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv4Address, "192.168.1.1")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) # No requests made. @@ -446,7 +459,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv4Address, "1.1.1.2")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) @@ -463,7 +478,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): Blocked IP addresses, accessed directly, are not spidered. """ channel = self.make_request( - "GET", "preview_url?url=http://192.168.1.1", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://192.168.1.1", + shorthand=False, ) # No requests made. @@ -479,7 +496,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): Blocked IP ranges, accessed directly, are not spidered. """ channel = self.make_request( - "GET", "preview_url?url=http://1.1.1.2", shorthand=False + "GET", "/_matrix/media/v3/preview_url?url=http://1.1.1.2", shorthand=False ) self.assertEqual(channel.code, 403) @@ -497,7 +514,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -533,7 +550,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): ] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) self.assertEqual( @@ -553,7 +572,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): ] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) # No requests made. @@ -574,7 +595,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv6Address, "2001:800::1")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) @@ -591,10 +614,11 @@ class URLPreviewTests(unittest.HomeserverTestCase): OPTIONS returns the OPTIONS. """ channel = self.make_request( - "OPTIONS", "preview_url?url=http://example.com", shorthand=False + "OPTIONS", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) - self.assertEqual(channel.code, 200) - self.assertEqual(channel.json_body, {}) + self.assertEqual(channel.code, 204) def test_accept_language_config_option(self) -> None: """ @@ -605,7 +629,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Build and make a request to the server channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -658,7 +682,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -708,7 +732,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -750,7 +774,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -790,7 +814,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -831,7 +855,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"preview_url?{query_params}", + f"/_matrix/media/v3/preview_url?{query_params}", shorthand=False, ) self.pump() @@ -852,7 +876,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -889,7 +913,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -949,7 +973,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -998,7 +1022,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.hulu.com/watch/12345", + "/_matrix/media/v3/preview_url?url=http://www.hulu.com/watch/12345", shorthand=False, await_result=False, ) @@ -1043,7 +1067,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1072,7 +1096,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1164,7 +1188,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1205,7 +1229,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://cdn.twitter.com/matrixdotorg", + "/_matrix/media/v3/preview_url?url=http://cdn.twitter.com/matrixdotorg", shorthand=False, await_result=False, ) @@ -1247,7 +1271,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check fetching channel = self.make_request( "GET", - f"download/{host}/{media_id}", + f"/_matrix/media/v3/download/{host}/{media_id}", shorthand=False, await_result=False, ) @@ -1260,7 +1284,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"download/{host}/{media_id}", + f"/_matrix/media/v3/download/{host}/{media_id}", shorthand=False, await_result=False, ) @@ -1295,7 +1319,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check fetching channel = self.make_request( "GET", - f"thumbnail/{host}/{media_id}?width=32&height=32&method=scale", + f"/_matrix/media/v3/thumbnail/{host}/{media_id}?width=32&height=32&method=scale", shorthand=False, await_result=False, ) @@ -1313,7 +1337,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"thumbnail/{host}/{media_id}?width=32&height=32&method=scale", + f"/_matrix/media/v3/thumbnail/{host}/{media_id}?width=32&height=32&method=scale", shorthand=False, await_result=False, ) @@ -1343,7 +1367,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.assertTrue(os.path.isdir(thumbnail_dir)) self.reactor.advance(IMAGE_CACHE_EXPIRY_MS * 1000 + 1) - self.get_success(self.preview_url._url_previewer._expire_url_cache_data()) + self.get_success(self.url_previewer._expire_url_cache_data()) for path in [file_path] + file_dirs + [thumbnail_dir] + thumbnail_dirs: self.assertFalse( @@ -1363,7 +1387,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + bad_url, + "/_matrix/media/v3/preview_url?url=" + bad_url, shorthand=False, await_result=False, ) @@ -1372,7 +1396,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + good_url, + "/_matrix/media/v3/preview_url?url=" + good_url, shorthand=False, await_result=False, ) @@ -1404,7 +1428,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + bad_url, + "/_matrix/media/v3/preview_url?url=" + bad_url, shorthand=False, await_result=False, ) diff --git a/tests/unittest.py b/tests/unittest.py index dbaff361b4..99ad02eb06 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -60,7 +60,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.federation.transport.server import TransportLayerServer -from synapse.http.server import JsonResource +from synapse.http.server import JsonResource, OptionsResource from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import ( SENTINEL_CONTEXT, @@ -459,7 +459,7 @@ class HomeserverTestCase(TestCase): The default calls `self.create_resource_dict` and builds the resultant dict into a tree. """ - root_resource = Resource() + root_resource = OptionsResource() create_resource_tree(self.create_resource_dict(), root_resource) return root_resource -- cgit 1.5.1 From fc31b495b3a7f170019591c2e40e699b61c067a1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 6 Oct 2023 07:27:35 -0400 Subject: Stop sending incorrect knock_state_events. (#16403) Synapse was incorrectly implemented with a knock_state_events property on some APIs (instead of knock_room_state). This was correct in Synapse 1.70.0, but *both* fields were sent to also be compatible with Synapse versions expecting the wrong field. Enough time has passed that only the correct field needs to be included/handled. --- changelog.d/16403.bugfix | 1 + synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 9 +-------- synapse/federation/transport/client.py | 2 +- synapse/handlers/federation.py | 13 ++----------- tests/federation/transport/test_knocking.py | 2 +- 6 files changed, 8 insertions(+), 23 deletions(-) create mode 100644 changelog.d/16403.bugfix (limited to 'tests') diff --git a/changelog.d/16403.bugfix b/changelog.d/16403.bugfix new file mode 100644 index 0000000000..453c975a63 --- /dev/null +++ b/changelog.d/16403.bugfix @@ -0,0 +1 @@ +Remove legacy unspecced `knock_state_events` field returned in some responses. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c8bc46415d..1a7fa175ec 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1402,7 +1402,7 @@ class FederationClient(FederationBase): The remote homeserver return some state from the room. The response dictionary is in the form: - {"knock_state_events": [, ...]} + {"knock_room_state": [, ...]} The list of state events may be empty. @@ -1429,7 +1429,7 @@ class FederationClient(FederationBase): The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [, ...]} + {"knock_room_state": [, ...]} The list of state events may be empty. """ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ec8e770430..6ac8d16095 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -850,14 +850,7 @@ class FederationServer(FederationBase): context, self._room_prejoin_state_types ) ) - return { - "knock_room_state": stripped_room_state, - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also populate a 'knock_state_events' with the same content to - # support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - "knock_state_events": stripped_room_state, - } + return {"knock_room_state": stripped_room_state} async def _on_send_membership_event( self, origin: str, content: JsonDict, membership_type: str, room_id: str diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index b5e4b2680e..fab4800717 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -431,7 +431,7 @@ class TransportLayerClient: The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [, ...]} + {"knock_room_state": [, ...]} The list of state events may be empty. """ diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 29cd45550a..807a0867cc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -868,19 +868,10 @@ class FederationHandler: # This is a bit of a hack and is cribbing off of invites. Basically we # store the room state here and retrieve it again when this event appears # in the invitee's sync stream. It is stripped out for all other local users. - stripped_room_state = ( - knock_response.get("knock_room_state") - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also check for a 'knock_state_events' to support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - or knock_response.get("knock_state_events") - ) + stripped_room_state = knock_response.get("knock_room_state") if stripped_room_state is None: - raise KeyError( - "Missing 'knock_room_state' (or legacy 'knock_state_events') field in " - "send_knock response" - ) + raise KeyError("Missing 'knock_room_state' field in send_knock response") event.unsigned["knock_room_state"] = stripped_room_state diff --git a/tests/federation/transport/test_knocking.py b/tests/federation/transport/test_knocking.py index 3f42f79f26..b63ef3d4ed 100644 --- a/tests/federation/transport/test_knocking.py +++ b/tests/federation/transport/test_knocking.py @@ -308,7 +308,7 @@ class FederationKnockingTestCase( self.assertEqual(200, channel.code, channel.result) # Check that we got the stripped room state in return - room_state_events = channel.json_body["knock_state_events"] + room_state_events = channel.json_body["knock_room_state"] # Validate the stripped room state events self.check_knock_room_state_against_room_state( -- cgit 1.5.1 From 7615e2bf48d7bed3da7235d60f84a3c847ac78f5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 6 Oct 2023 10:12:43 -0400 Subject: Return ThumbnailInfo in more places (#16438) Improves type hints by using concrete types instead of dictionaries. --- changelog.d/16438.misc | 1 + synapse/media/_base.py | 2 +- synapse/media/media_repository.py | 3 + synapse/rest/media/thumbnail_resource.py | 98 ++++++++++------------ synapse/storage/databases/main/media_repository.py | 30 +++++-- tests/media/test_media_storage.py | 36 ++++---- 6 files changed, 90 insertions(+), 80 deletions(-) create mode 100644 changelog.d/16438.misc (limited to 'tests') diff --git a/changelog.d/16438.misc b/changelog.d/16438.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16438.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/synapse/media/_base.py b/synapse/media/_base.py index d103b43449..13345acf75 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -332,7 +332,7 @@ class ThumbnailInfo: # Content type of thumbnail, e.g. image/png type: str # The size of the media file, in bytes. - length: Optional[int] = None + length: int @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index d11c2ff4ee..7fd46901f7 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -624,6 +624,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -694,6 +695,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -839,6 +841,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index f9cd773f77..85b6bdbe72 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -15,7 +15,7 @@ import logging import re -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError, cs_error from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP @@ -159,30 +159,24 @@ class ThumbnailResource(RestServlet): thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=None, file_id=media_id, url_cache=media_info["url_cache"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -222,29 +216,23 @@ class ThumbnailResource(RestServlet): file_id = media_info["filesystem_id"] for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=server_name, file_id=media_info["filesystem_id"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -304,7 +292,7 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], media_id: str, file_id: str, url_cache: bool, @@ -319,7 +307,7 @@ class ThumbnailResource(RestServlet): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail info of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -443,7 +431,7 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], file_id: str, url_cache: bool, server_name: Optional[str], @@ -456,7 +444,7 @@ class ThumbnailResource(RestServlet): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail infos of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -474,21 +462,25 @@ class ThumbnailResource(RestServlet): if desired_method == "crop": # Thumbnails that match equal or larger sizes of desired width/height. - crop_info_list: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] # Other thumbnails. - crop_info_list2: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list2: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "crop": + if info.method != "crop": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height aspect_quality = abs(d_w * t_h - d_h * t_w) min_quality = 0 if d_w <= t_w and d_h <= t_h else 1 size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: crop_info_list.append( ( @@ -513,7 +505,7 @@ class ThumbnailResource(RestServlet): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if crop_info_list: thumbnail_info = min(crop_info_list, key=lambda t: t[:-1])[-1] @@ -521,20 +513,20 @@ class ThumbnailResource(RestServlet): thumbnail_info = min(crop_info_list2, key=lambda t: t[:-1])[-1] elif desired_method == "scale": # Thumbnails that match equal or larger sizes of desired width/height. - info_list: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list: List[Tuple[int, bool, int, ThumbnailInfo]] = [] # Other thumbnails. - info_list2: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list2: List[Tuple[int, bool, int, ThumbnailInfo]] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "scale": + if info.method != "scale": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: info_list.append((size_quality, type_quality, length_quality, info)) else: @@ -543,7 +535,7 @@ class ThumbnailResource(RestServlet): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if info_list: thumbnail_info = min(info_list, key=lambda t: t[:-1])[-1] @@ -555,13 +547,7 @@ class ThumbnailResource(RestServlet): file_id=file_id, url_cache=url_cache, server_name=server_name, - thumbnail=ThumbnailInfo( - width=thumbnail_info["thumbnail_width"], - height=thumbnail_info["thumbnail_height"], - type=thumbnail_info["thumbnail_type"], - method=thumbnail_info["thumbnail_method"], - length=thumbnail_info["thumbnail_length"], - ), + thumbnail=thumbnail_info, ) # No matching thumbnail was found. diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 8cebeb5189..2e6b176bd2 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -28,6 +28,7 @@ from typing import ( from synapse.api.constants import Direction from synapse.logging.opentracing import trace +from synapse.media._base import ThumbnailInfo from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_url_cache", ) - async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "local_media_repository_thumbnails", {"media_id": media_id}, ( @@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ), desc="get_local_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def store_local_thumbnail( @@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def get_remote_media_thumbnails( self, origin: str, media_id: str - ) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + ) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "remote_media_cache_thumbnails", {"media_origin": origin, "media_id": media_id}, ( @@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "thumbnail_method", "thumbnail_type", "thumbnail_length", - "filesystem_id", ), desc="get_remote_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def get_remote_media_thumbnail( diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index ba00e35a9e..15f5d644e4 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -34,7 +34,7 @@ from synapse.api.errors import Codes from synapse.events import EventBase from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable -from synapse.media._base import FileInfo +from synapse.media._base import FileInfo, ThumbnailInfo from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage, ReadableFileWrapper from synapse.media.storage_provider import FileStorageProviderBackend @@ -605,6 +605,8 @@ class MediaRepoTests(unittest.HomeserverTestCase): """Test that choosing between thumbnails with the same quality rating succeeds. We are not particular about which thumbnail is chosen.""" + + content_type = self.test_image.content_type.decode() media_repo = self.hs.get_media_repository() thumbnail_resouce = ThumbnailResource( self.hs, media_repo, media_repo.media_storage @@ -615,26 +617,24 @@ class MediaRepoTests(unittest.HomeserverTestCase): desired_width=desired_size, desired_height=desired_size, desired_method=method, - desired_type=self.test_image.content_type, # type: ignore[arg-type] + desired_type=content_type, # Provide two identical thumbnails which are guaranteed to have the same # quality rating. thumbnail_infos=[ - { - "thumbnail_width": 32, - "thumbnail_height": 32, - "thumbnail_method": method, - "thumbnail_type": self.test_image.content_type, - "thumbnail_length": 256, - "filesystem_id": f"thumbnail1{self.test_image.extension.decode()}", - }, - { - "thumbnail_width": 32, - "thumbnail_height": 32, - "thumbnail_method": method, - "thumbnail_type": self.test_image.content_type, - "thumbnail_length": 256, - "filesystem_id": f"thumbnail2{self.test_image.extension.decode()}", - }, + ThumbnailInfo( + width=32, + height=32, + method=method, + type=content_type, + length=256, + ), + ThumbnailInfo( + width=32, + height=32, + method=method, + type=content_type, + length=256, + ), ], file_id=f"image{self.test_image.extension.decode()}", url_cache=False, -- cgit 1.5.1 From 1f10c208068ef8788b6796c54a3604ae51caf951 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 6 Oct 2023 18:31:52 +0100 Subject: Apply join rate limiter outside the lineariser (#16441) --- changelog.d/16441.misc | 1 + synapse/handlers/room_member.py | 43 ++++++++++++++++++++++------------------- tests/rest/client/test_rooms.py | 24 +++++++++++++++++++++++ 3 files changed, 48 insertions(+), 20 deletions(-) create mode 100644 changelog.d/16441.misc (limited to 'tests') diff --git a/changelog.d/16441.misc b/changelog.d/16441.misc new file mode 100644 index 0000000000..32264a62b2 --- /dev/null +++ b/changelog.d/16441.misc @@ -0,0 +1 @@ +Improve rate limiting logic. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 90343c2306..1b50495af1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): and persist a new event for the new membership change. Args: - requester: - target: + requester: User requesting the membership change, i.e. the sender of the + desired membership event. + target: Use whose membership should change, i.e. the state_key of the + desired membership event. room_id: membership: @@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Returns: Tuple of event ID and stream ordering position """ - user_id = target.to_string() if content is None: @@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): (EventTypes.Member, user_id), None ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event( - prev_member_event_id - ) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False - ) with opentracing.start_active_span("handle_new_client_event"): result_event = ( await self.event_creation_handler.handle_new_client_event( @@ -618,6 +604,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Raises: ShadowBanError if a shadow-banned requester attempts to send an invite. """ + if ratelimit: + if action == Membership.JOIN: + # Only rate-limit if the user isn't already joined to the room, otherwise + # we'll end up blocking profile updates. + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + requester.user.to_string(), + room_id, + ) + if current_membership != Membership.JOIN: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + elif action == Membership.INVITE: + await self.ratelimit_invite(requester, room_id, target.to_string()) + if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. await self.clock.sleep(random.randint(1, 10)) @@ -794,8 +799,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if effective_membership_state == Membership.INVITE: target_id = target.to_string() - if ratelimit: - await self.ratelimit_invite(requester, room_id, target_id) # block any attempts to invite the server notices mxid if target_id == self._server_notices_mxid: diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 7627823d3f..aaa4f3bba0 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -1444,6 +1444,30 @@ class RoomJoinRatelimitTestCase(RoomBase): room_ids[3], joiner_user_id, expect_code=HTTPStatus.TOO_MANY_REQUESTS ) + @unittest.override_config( + {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} + ) + def test_join_attempts_local_ratelimit(self) -> None: + """Tests that unsuccessful joins that end up being denied are rate-limited.""" + # Create 4 rooms + room_ids = [ + self.helper.create_room_as(self.user_id, is_public=True) for _ in range(4) + ] + # Pre-emptively ban the user who will attempt to join. + joiner_user_id = self.register_user("joiner", "secret") + for room_id in room_ids: + self.helper.ban(room_id, self.user_id, joiner_user_id) + + # Now make a new user try to join some of them. + # The user can make 3 requests, each of which should be denied. + for room_id in room_ids[0:3]: + self.helper.join(room_id, joiner_user_id, expect_code=HTTPStatus.FORBIDDEN) + + # The fourth attempt should be rate limited. + self.helper.join( + room_ids[3], joiner_user_id, expect_code=HTTPStatus.TOO_MANY_REQUESTS + ) + @unittest.override_config( {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) -- cgit 1.5.1 From d6b7d49a61ca4c6f87d93ff9eb6a9fa6faef443c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 11 Oct 2023 07:50:34 -0400 Subject: Handle content types with parameters. (#16440) --- changelog.d/16440.bugfix | 1 + synapse/media/_base.py | 4 +++- tests/media/test_base.py | 19 ++++++++++++++++++- 3 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16440.bugfix (limited to 'tests') diff --git a/changelog.d/16440.bugfix b/changelog.d/16440.bugfix new file mode 100644 index 0000000000..6ce0b1e4af --- /dev/null +++ b/changelog.d/16440.bugfix @@ -0,0 +1 @@ +Properly return inline media when content types have parameters. diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 13345acf75..860e5ddca2 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -148,7 +148,9 @@ def add_file_headers( # A strict subset of content types is allowed to be inlined so that they may # be viewed directly in a browser. Other file types are forced to be downloads. - if media_type.lower() in INLINE_CONTENT_TYPES: + # + # Only the type & subtype are important, parameters can be ignored. + if media_type.lower().split(";", 1)[0] in INLINE_CONTENT_TYPES: disposition = "inline" else: disposition = "attachment" diff --git a/tests/media/test_base.py b/tests/media/test_base.py index 119d7ba66f..144948f23c 100644 --- a/tests/media/test_base.py +++ b/tests/media/test_base.py @@ -42,18 +42,35 @@ class GetFileNameFromHeadersTests(unittest.TestCase): class AddFileHeadersTests(unittest.TestCase): TEST_CASES = { + # Safe values use inline. "text/plain": b"inline; filename=file.name", "text/csv": b"inline; filename=file.name", "image/png": b"inline; filename=file.name", + # Unlisted values are set to attachment. "text/html": b"attachment; filename=file.name", "any/thing": b"attachment; filename=file.name", + # Parameters get ignored. + "text/plain; charset=utf-8": b"inline; filename=file.name", + "text/markdown; charset=utf-8; variant=CommonMark": b"attachment; filename=file.name", + # Parsed as lowercase. + "Text/Plain": b"inline; filename=file.name", + # Bad values don't choke. + "": b"attachment; filename=file.name", + ";": b"attachment; filename=file.name", } def test_content_disposition(self) -> None: for media_type, expected in self.TEST_CASES.items(): request = Mock() add_file_headers(request, media_type, 0, "file.name") - request.setHeader.assert_any_call(b"Content-Disposition", expected) + # There should be a single call to set Content-Disposition. + for call in request.setHeader.call_args_list: + args, _ = call + if args[0] == b"Content-Disposition": + break + else: + self.fail(f"No Content-Disposition header found for {media_type}") + self.assertEqual(args[1], expected, media_type) def test_no_filename(self) -> None: request = Mock() -- cgit 1.5.1 From a4904dcb04b31ce8ed0deaa2c5c80657780f6618 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 11 Oct 2023 13:24:56 -0400 Subject: Convert simple_select_many_batch, simple_select_many_txn to tuples. (#16444) --- changelog.d/16444.misc | 1 + synapse/storage/database.py | 18 ++-- synapse/storage/databases/main/deviceinbox.py | 42 ++++---- synapse/storage/databases/main/devices.py | 49 ++++++---- synapse/storage/databases/main/end_to_end_keys.py | 19 ++-- synapse/storage/databases/main/event_federation.py | 107 +++++++++++---------- synapse/storage/databases/main/events.py | 79 ++++++++------- .../storage/databases/main/events_bg_updates.py | 62 ++++++------ synapse/storage/databases/main/events_worker.py | 36 ++++--- synapse/storage/databases/main/keys.py | 46 +++++---- synapse/storage/databases/main/presence.py | 51 ++++++---- synapse/storage/databases/main/push_rule.py | 97 +++++++++++++------ synapse/storage/databases/main/relations.py | 19 ++-- synapse/storage/databases/main/room.py | 19 ++-- synapse/storage/databases/main/roommember.py | 78 ++++++++------- synapse/storage/databases/main/state.py | 62 +++++++----- synapse/storage/databases/main/stats.py | 37 +++---- synapse/storage/databases/main/transactions.py | 28 ++++-- synapse/storage/databases/main/ui_auth.py | 41 ++++---- synapse/storage/databases/main/user_directory.py | 54 ++++++----- .../storage/databases/main/user_erasure_store.py | 19 ++-- synapse/storage/databases/state/store.py | 54 +++++++---- tests/storage/test_event_chain.py | 64 +++++++----- 23 files changed, 640 insertions(+), 442 deletions(-) create mode 100644 changelog.d/16444.misc (limited to 'tests') diff --git a/changelog.d/16444.misc b/changelog.d/16444.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16444.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 7714ec2bf9..81f661160c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1874,9 +1874,9 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, desc: str = "simple_select_many_batch", batch_size: int = 100, - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. + more rows. Filters rows by whether the value of `column` is in `iterable`. @@ -1888,10 +1888,13 @@ class DatabasePool: keyvalues: dict of column names and values to select the rows with desc: description of the transaction, for logging and metrics batch_size: the number of rows for each select query + + Returns: + The results as a list of tuples. """ keyvalues = keyvalues or {} - results: List[Dict[str, Any]] = [] + results: List[Tuple[Any, ...]] = [] for chunk in batch_iter(iterable, batch_size): rows = await self.runInteraction( @@ -1918,9 +1921,9 @@ class DatabasePool: iterable: Collection[Any], keyvalues: Dict[str, Any], retcols: Iterable[str], - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """Executes a SELECT query on the named table, which may return zero or - more rows, returning the result as a list of dicts. + more rows. Filters rows by whether the value of `column` is in `iterable`. @@ -1931,6 +1934,9 @@ class DatabasePool: iterable: list keyvalues: dict of column names and values to select the rows with retcols: list of strings giving the names of the columns to return + + Returns: + The results as a list of tuples. """ if not iterable: return [] @@ -1949,7 +1955,7 @@ class DatabasePool: ) txn.execute(sql, values) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_update( self, diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 744e98c6d0..1cf649d371 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -344,18 +344,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Note that this is more efficient than just dropping `device_id` from the query, # since device_inbox has an index on `(user_id, device_id, stream_id)` if not device_ids_to_query: - user_device_dicts = self.db_pool.simple_select_many_txn( - txn, - table="devices", - column="user_id", - iterable=user_ids_to_query, - keyvalues={"hidden": False}, - retcols=("device_id",), + user_device_dicts = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + column="user_id", + iterable=user_ids_to_query, + keyvalues={"hidden": False}, + retcols=("device_id",), + ), ) - device_ids_to_query.update( - {row["device_id"] for row in user_device_dicts} - ) + device_ids_to_query.update({row[0] for row in user_device_dicts}) if not device_ids_to_query: # We've ended up with no devices to query. @@ -845,20 +846,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We exclude hidden devices (such as cross-signing keys) here as they are # not expected to receive to-device messages. - rows = self.db_pool.simple_select_many_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "hidden": False}, - column="device_id", - iterable=devices, - retcols=("device_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="devices", + keyvalues={"user_id": user_id, "hidden": False}, + column="device_id", + iterable=devices, + retcols=("device_id",), + ), ) - for row in rows: + for (device_id,) in rows: # Only insert into the local inbox if the device exists on # this server - device_id = row["device_id"] - with start_active_span("serialise_to_device_message"): msg = messages_by_device[device_id] set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9f3804a504..fc23d18eba 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1052,16 +1052,19 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): async def get_device_list_last_stream_id_for_remotes( self, user_ids: Iterable[str] ) -> Mapping[str, Optional[str]]: - rows = await self.db_pool.simple_select_many_batch( - table="device_lists_remote_extremeties", - column="user_id", - iterable=user_ids, - retcols=("user_id", "stream_id"), - desc="get_device_list_last_stream_id_for_remotes", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id"), + desc="get_device_list_last_stream_id_for_remotes", + ), ) results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids} - results.update({row["user_id"]: row["stream_id"] for row in rows}) + results.update(rows) return results @@ -1077,22 +1080,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): The IDs of users whose device lists need resync. """ if user_ids: - rows = await self.db_pool.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync_with_iterable", + row_tuples = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ), ) + + return {row[0] for row in row_tuples} else: - rows = await self.db_pool.simple_select_list( - table="device_lists_remote_resync", - keyvalues=None, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", + rows = cast( + List[Dict[str, str]], + await self.db_pool.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ), ) - return {row["user_id"] for row in rows} + return {row["user_id"] for row in rows} async def mark_remote_users_device_caches_as_stale( self, user_ids: StrCollection diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 749ae54e20..f13d776b0d 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -493,15 +493,18 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker A map from (algorithm, key_id) to json string for key """ - rows = await self.db_pool.simple_select_many_batch( - table="e2e_one_time_keys_json", - column="key_id", - iterable=key_ids, - retcols=("algorithm", "key_id", "key_json"), - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="add_e2e_one_time_keys_check", + rows = cast( + List[Tuple[str, str, str]], + await self.db_pool.simple_select_many_batch( + table="e2e_one_time_keys_json", + column="key_id", + iterable=key_ids, + retcols=("algorithm", "key_id", "key_json"), + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="add_e2e_one_time_keys_check", + ), ) - result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + result = {(algorithm, key_id): key_json for algorithm, key_id, key_json in rows} log_kv({"message": "Fetched one time keys for user", "one_time_keys": result}) return result diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index afffa54985..4f80ce75cc 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1049,15 +1049,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_max_depth_of", ), - desc="get_max_depth_of", ) if not rows: @@ -1065,10 +1068,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: max_depth_event_id = "" current_max_depth = 0 - for row in rows: - if row["depth"] > current_max_depth: - max_depth_event_id = row["event_id"] - current_max_depth = row["depth"] + for event_id, depth in rows: + if depth > current_max_depth: + max_depth_event_id = event_id + current_max_depth = depth return max_depth_event_id, current_max_depth @@ -1078,15 +1081,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_min_depth_of", ), - desc="get_min_depth_of", ) if not rows: @@ -1094,10 +1100,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: min_depth_event_id = "" current_min_depth = MAX_DEPTH - for row in rows: - if row["depth"] < current_min_depth: - min_depth_event_id = row["event_id"] - current_min_depth = row["depth"] + for event_id, depth in rows: + if depth < current_min_depth: + min_depth_event_id = event_id + current_min_depth = depth return min_depth_event_id, current_min_depth @@ -1553,19 +1559,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A filtered down list of `event_ids` that have previous failed pull attempts. """ - rows = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("event_id",), - desc="get_event_ids_with_failed_pull_attempts", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ), ) - event_ids_with_failed_pull_attempts: Set[str] = { - row["event_id"] for row in rows - } - - return event_ids_with_failed_pull_attempts + return {row[0] for row in rows} @trace async def get_event_ids_to_not_pull_from_backoff( @@ -1585,32 +1590,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A dictionary of event_ids that should not be attempted to be pulled and the next timestamp at which we may try pulling them again. """ - event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=( - "event_id", - "last_attempt_ts", - "num_attempts", + event_failed_pull_attempts = cast( + List[Tuple[str, int, int]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", + ), + desc="get_event_ids_to_not_pull_from_backoff", ), - desc="get_event_ids_to_not_pull_from_backoff", ) current_time = self._clock.time_msec() event_ids_with_backoff = {} - for event_failed_pull_attempt in event_failed_pull_attempts: - event_id = event_failed_pull_attempt["event_id"] + for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts: # Exponential back-off (up to the upper bound) so we don't try to # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. backoff_end_time = ( - event_failed_pull_attempt["last_attempt_ts"] + last_attempt_ts + ( 2 ** min( - event_failed_pull_attempt["num_attempts"], + num_attempts, BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, ) ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index d4dcdb898c..ef6766b5e0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -27,6 +27,7 @@ from typing import ( Optional, Set, Tuple, + Union, cast, ) @@ -501,16 +502,19 @@ class PersistEventsStore: # We ignore legacy rooms that we aren't filling the chain cover index # for. - rows = self.db_pool.simple_select_many_txn( - txn, - table="rooms", - column="room_id", - iterable={event.room_id for event in events if event.is_state()}, - keyvalues={}, - retcols=("room_id", "has_auth_chain_index"), + rows = cast( + List[Tuple[str, Optional[Union[int, bool]]]], + self.db_pool.simple_select_many_txn( + txn, + table="rooms", + column="room_id", + iterable={event.room_id for event in events if event.is_state()}, + keyvalues={}, + retcols=("room_id", "has_auth_chain_index"), + ), ) rooms_using_chain_index = { - row["room_id"] for row in rows if row["has_auth_chain_index"] + room_id for room_id, has_auth_chain_index in rows if has_auth_chain_index } state_events = { @@ -571,19 +575,18 @@ class PersistEventsStore: # We check if there are any events that need to be handled in the rooms # we're looking at. These should just be out of band memberships, where # we didn't have the auth chain when we first persisted. - rows = db_pool.simple_select_many_txn( - txn, - table="event_auth_chain_to_calculate", - keyvalues={}, - column="room_id", - iterable=set(event_to_room_id.values()), - retcols=("event_id", "type", "state_key"), + auth_chain_to_calc_rows = cast( + List[Tuple[str, str, str]], + db_pool.simple_select_many_txn( + txn, + table="event_auth_chain_to_calculate", + keyvalues={}, + column="room_id", + iterable=set(event_to_room_id.values()), + retcols=("event_id", "type", "state_key"), + ), ) - for row in rows: - event_id = row["event_id"] - event_type = row["type"] - state_key = row["state_key"] - + for event_id, event_type, state_key in auth_chain_to_calc_rows: # (We could pull out the auth events for all rows at once using # simple_select_many, but this case happens rarely and almost always # with a single row.) @@ -753,23 +756,31 @@ class PersistEventsStore: # Step 1, fetch all existing links from all the chains we've seen # referenced. chain_links = _LinkMap() - rows = db_pool.simple_select_many_txn( - txn, - table="event_auth_chain_links", - column="origin_chain_id", - iterable={chain_id for chain_id, _ in chain_map.values()}, - keyvalues={}, - retcols=( - "origin_chain_id", - "origin_sequence_number", - "target_chain_id", - "target_sequence_number", + auth_chain_rows = cast( + List[Tuple[int, int, int, int]], + db_pool.simple_select_many_txn( + txn, + table="event_auth_chain_links", + column="origin_chain_id", + iterable={chain_id for chain_id, _ in chain_map.values()}, + keyvalues={}, + retcols=( + "origin_chain_id", + "origin_sequence_number", + "target_chain_id", + "target_sequence_number", + ), ), ) - for row in rows: + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in auth_chain_rows: chain_links.add_link( - (row["origin_chain_id"], row["origin_sequence_number"]), - (row["target_chain_id"], row["target_sequence_number"]), + (origin_chain_id, origin_sequence_number), + (target_chain_id, target_sequence_number), new=False, ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index daef3685b0..c5fce1c82b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -369,18 +369,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: - ev_rows = self.db_pool.simple_select_many_txn( - txn, - table="event_json", - column="event_id", - iterable=chunk, - retcols=["event_id", "json"], - keyvalues={}, + ev_rows = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ), ) - for row in ev_rows: - event_id = row["event_id"] - event_json = db_to_json(row["json"]) + for event_id, json in ev_rows: + event_json = db_to_json(json) try: origin_server_ts = event_json["origin_server_ts"] except (KeyError, AttributeError): @@ -563,15 +565,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if deleted: # We now need to invalidate the caches of these rooms - rows = self.db_pool.simple_select_many_txn( - txn, - table="events", - column="event_id", - iterable=to_delete, - keyvalues={}, - retcols=("room_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=to_delete, + keyvalues={}, + retcols=("room_id",), + ), ) - room_ids = {row["room_id"] for row in rows} + room_ids = {row[0] for row in rows} for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) # type: ignore[attr-defined] @@ -1038,18 +1043,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): count = len(rows) # We also need to fetch the auth events for them. - auth_events = self.db_pool.simple_select_many_txn( - txn, - table="event_auth", - column="event_id", - iterable=event_to_room_id, - keyvalues={}, - retcols=("event_id", "auth_id"), + auth_events = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_auth", + column="event_id", + iterable=event_to_room_id, + keyvalues={}, + retcols=("event_id", "auth_id"), + ), ) event_to_auth_chain: Dict[str, List[str]] = {} - for row in auth_events: - event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"]) + for event_id, auth_id in auth_events: + event_to_auth_chain.setdefault(event_id, []).append(auth_id) # Calculate and persist the chain cover index for this set of events. # diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b788d70fc5..8af638d60f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1584,16 +1584,19 @@ class EventsWorkerStore(SQLBaseStore): """Given a list of event ids, check if we have already processed and stored them as non outliers. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ), ) - return {r["event_id"] for r in rows} + return {r[0] for r in rows} @trace @tag_args @@ -2336,15 +2339,18 @@ class EventsWorkerStore(SQLBaseStore): a dict mapping from event id to partial-stateness. We return True for any of the events which are unknown (or are outliers). """ - result = await self.db_pool.simple_select_many_batch( - table="partial_state_events", - column="event_id", - iterable=event_ids, - retcols=["event_id"], - desc="get_partial_state_events", + result = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="partial_state_events", + column="event_id", + iterable=event_ids, + retcols=["event_id"], + desc="get_partial_state_events", + ), ) # convert the result to a dict, to make @cachedList work - partial = {r["event_id"] for r in result} + partial = {r[0] for r in result} return {e_id: e_id in partial for e_id in event_ids} @cached() diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index 889c578b9c..ea797864b9 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -16,7 +16,7 @@ import itertools import json import logging -from typing import Dict, Iterable, Mapping, Optional, Tuple +from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -205,35 +205,39 @@ class KeyStore(CacheInvalidationWorkerStore): If we have multiple entries for a given key ID, returns the most recent. """ - rows = await self.db_pool.simple_select_many_batch( - table="server_keys_json", - column="key_id", - iterable=key_ids, - keyvalues={"server_name": server_name}, - retcols=( - "key_id", - "from_server", - "ts_added_ms", - "ts_valid_until_ms", - "key_json", + rows = cast( + List[Tuple[str, str, int, int, Union[bytes, memoryview]]], + await self.db_pool.simple_select_many_batch( + table="server_keys_json", + column="key_id", + iterable=key_ids, + keyvalues={"server_name": server_name}, + retcols=( + "key_id", + "from_server", + "ts_added_ms", + "ts_valid_until_ms", + "key_json", + ), + desc="get_server_keys_json_for_remote", ), - desc="get_server_keys_json_for_remote", ) if not rows: return {} - # We sort the rows so that the most recently added entry is picked up. - rows.sort(key=lambda r: r["ts_added_ms"]) + # We sort the rows by ts_added_ms so that the most recently added entry + # will stomp over older entries in the dictionary. + rows.sort(key=lambda r: r[2]) return { - row["key_id"]: FetchKeyResultForRemote( + key_id: FetchKeyResultForRemote( # Cast to bytes since postgresql returns a memoryview. - key_json=bytes(row["key_json"]), - valid_until_ts=row["ts_valid_until_ms"], - added_ts=row["ts_added_ms"], + key_json=bytes(key_json), + valid_until_ts=ts_valid_until_ms, + added_ts=ts_added_ms, ) - for row in rows + for key_id, from_server, ts_added_ms, ts_valid_until_ms, key_json in rows } async def get_all_server_keys_json_for_remote( @@ -260,6 +264,8 @@ class KeyStore(CacheInvalidationWorkerStore): if not rows: return {} + # We sort the rows by ts_added_ms so that the most recently added entry + # will stomp over older entries in the dictionary. rows.sort(key=lambda r: r["ts_added_ms"]) return { diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 519f05fb60..3b444d2d07 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -261,27 +261,40 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) async def get_presence_for_users( self, user_ids: Iterable[str] ) -> Mapping[str, UserPresenceState]: - rows = await self.db_pool.simple_select_many_batch( - table="presence_stream", - column="user_id", - iterable=user_ids, - keyvalues={}, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + # TODO All these columns are nullable, but we don't expect that: + # https://github.com/matrix-org/synapse/issues/16467 + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.simple_select_many_batch( + table="presence_stream", + column="user_id", + iterable=user_ids, + keyvalues={}, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + desc="get_presence_for_users", ), - desc="get_presence_for_users", ) - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return {row["user_id"]: UserPresenceState(**row) for row in rows} + return { + user_id: UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) + for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows + } async def should_user_receive_full_presence_with_token( self, @@ -386,6 +399,8 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) limit = 100 offset = 0 while True: + # TODO All these columns are nullable, but we don't expect that: + # https://github.com/matrix-org/synapse/issues/16467 rows = cast( List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 923166974c..f5356e7f80 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -62,20 +62,34 @@ logger = logging.getLogger(__name__) def _load_rules( - rawrules: List[JsonDict], + rawrules: List[Tuple[str, int, str, str]], enabled_map: Dict[str, bool], experimental_config: ExperimentalConfig, ) -> FilteredPushRules: """Take the DB rows returned from the DB and convert them into a full `FilteredPushRules` object. + + Args: + rawrules: List of tuples of: + * rule ID + * Priority lass + * Conditions (as serialized JSON) + * Actions (as serialized JSON) + enabled_map: A dictionary of rule ID to a boolean of whether the rule is + enabled. This might not include all rule IDs from rawrules. + experimental_config: The `experimental_features` section of the Synapse + config. (Used to check if various features are enabled.) + + Returns: + A new FilteredPushRules object. """ ruleslist = [ PushRule.from_db( - rule_id=rawrule["rule_id"], - priority_class=rawrule["priority_class"], - conditions=rawrule["conditions"], - actions=rawrule["actions"], + rule_id=rawrule[0], + priority_class=rawrule[1], + conditions=rawrule[2], + actions=rawrule[3], ) for rawrule in rawrules ] @@ -183,7 +197,19 @@ class PushRulesWorkerStore( enabled_map = await self.get_push_rules_enabled_for_user(user_id) - return _load_rules(rows, enabled_map, self.hs.config.experimental) + return _load_rules( + [ + ( + row["rule_id"], + row["priority_class"], + row["conditions"], + row["actions"], + ) + for row in rows + ], + enabled_map, + self.hs.config.experimental, + ) async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]: results = await self.db_pool.simple_select_list( @@ -221,21 +247,36 @@ class PushRulesWorkerStore( if not user_ids: return {} - raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} + raw_rules: Dict[str, List[Tuple[str, int, str, str]]] = { + user_id: [] for user_id in user_ids + } - rows = await self.db_pool.simple_select_many_batch( - table="push_rules", - column="user_name", - iterable=user_ids, - retcols=("*",), - desc="bulk_get_push_rules", - batch_size=1000, + rows = cast( + List[Tuple[str, str, int, int, str, str]], + await self.db_pool.simple_select_many_batch( + table="push_rules", + column="user_name", + iterable=user_ids, + retcols=( + "user_name", + "rule_id", + "priority_class", + "priority", + "conditions", + "actions", + ), + desc="bulk_get_push_rules", + batch_size=1000, + ), ) - rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) + # Sort by highest priority_class, then highest priority. + rows.sort(key=lambda row: (-int(row[2]), -int(row[3]))) - for row in rows: - raw_rules.setdefault(row["user_name"], []).append(row) + for user_name, rule_id, priority_class, _, conditions, actions in rows: + raw_rules.setdefault(user_name, []).append( + (rule_id, priority_class, conditions, actions) + ) enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) @@ -256,17 +297,19 @@ class PushRulesWorkerStore( results: Dict[str, Dict[str, bool]] = {user_id: {} for user_id in user_ids} - rows = await self.db_pool.simple_select_many_batch( - table="push_rules_enable", - column="user_name", - iterable=user_ids, - retcols=("user_name", "rule_id", "enabled"), - desc="bulk_get_push_rules_enabled", - batch_size=1000, + rows = cast( + List[Tuple[str, str, Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="push_rules_enable", + column="user_name", + iterable=user_ids, + retcols=("user_name", "rule_id", "enabled"), + desc="bulk_get_push_rules_enabled", + batch_size=1000, + ), ) - for row in rows: - enabled = bool(row["enabled"]) - results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled + for user_name, rule_id, enabled in rows: + results.setdefault(user_name, {})[rule_id] = bool(enabled) return results async def get_all_push_rule_updates( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 9246b418f5..7f40e2c446 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -349,16 +349,19 @@ class RelationsWorkerStore(SQLBaseStore): def get_all_relation_ids_for_event_with_types_txn( txn: LoggingTransaction, ) -> List[str]: - rows = self.db_pool.simple_select_many_txn( - txn=txn, - table="event_relations", - column="relation_type", - iterable=relation_types, - keyvalues={"relates_to_id": event_id}, - retcols=["event_id"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn=txn, + table="event_relations", + column="relation_type", + iterable=relation_types, + keyvalues={"relates_to_id": event_id}, + retcols=["event_id"], + ), ) - return [row["event_id"] for row in rows] + return [row[0] for row in rows] return await self.db_pool.runInteraction( desc="get_all_relation_ids_for_event_with_types", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 1d4d99932b..9d24d2c347 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1296,14 +1296,17 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): complete. """ - rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch( - table="partial_state_rooms", - column="room_id", - iterable=room_ids, - retcols=("room_id",), - desc="is_partial_state_room_batched", - ) - partial_state_rooms = {row_dict["room_id"] for row_dict in rows} + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="partial_state_rooms", + column="room_id", + iterable=room_ids, + retcols=("room_id",), + desc="is_partial_state_room_batched", + ), + ) + partial_state_rooms = {row[0] for row in rows} return {room_id: room_id in partial_state_rooms for room_id in room_ids} async def get_join_event_id_and_device_lists_stream_id_for_partial_state( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index bbe08368db..3a87eba430 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -27,6 +27,7 @@ from typing import ( Set, Tuple, Union, + cast, ) import attr @@ -683,25 +684,28 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): Map from user_id to set of rooms that is currently in. """ - rows = await self.db_pool.simple_select_many_batch( - table="current_state_events", - column="state_key", - iterable=user_ids, - retcols=( - "state_key", - "room_id", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="current_state_events", + column="state_key", + iterable=user_ids, + retcols=( + "state_key", + "room_id", + ), + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + desc="get_rooms_for_users", ), - keyvalues={ - "type": EventTypes.Member, - "membership": Membership.JOIN, - }, - desc="get_rooms_for_users", ) user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} - for row in rows: - user_rooms[row["state_key"]].add(row["room_id"]) + for state_key, room_id in rows: + user_rooms[state_key].add(room_id) return {key: frozenset(rooms) for key, rooms in user_rooms.items()} @@ -892,17 +896,20 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): Map from event ID to `user_id`, or None if event is not a join. """ - rows = await self.db_pool.simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=event_ids, - retcols=("user_id", "event_id"), - keyvalues={"membership": Membership.JOIN}, - batch_size=1000, - desc="_get_user_ids_from_membership_event_ids", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=event_ids, + retcols=("event_id", "user_id"), + keyvalues={"membership": Membership.JOIN}, + batch_size=1000, + desc="_get_user_ids_from_membership_event_ids", + ), ) - return {row["event_id"]: row["user_id"] for row in rows} + return dict(rows) @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: @@ -1202,21 +1209,22 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): membership event, otherwise the value is None. """ - rows = await self.db_pool.simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=member_event_ids, - retcols=("user_id", "membership", "event_id"), - keyvalues={}, - batch_size=500, - desc="get_membership_from_event_ids", + rows = cast( + List[Tuple[str, str, str]], + await self.db_pool.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=member_event_ids, + retcols=("user_id", "membership", "event_id"), + keyvalues={}, + batch_size=500, + desc="get_membership_from_event_ids", + ), ) return { - row["event_id"]: EventIdMembership( - membership=row["membership"], user_id=row["user_id"] - ) - for row in rows + event_id: EventIdMembership(membership=membership, user_id=user_id) + for user_id, membership, event_id in rows } async def is_local_host_in_room_ignoring_users( diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 5eaaff5b68..598025dd91 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -20,10 +20,12 @@ from typing import ( Collection, Dict, Iterable, + List, Mapping, Optional, Set, Tuple, + cast, ) import attr @@ -388,16 +390,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): Raises: RuntimeError if the state is unknown at any of the given events """ - rows = await self.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("event_id", "state_group"), - desc="_get_state_group_for_events", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id", "state_group"), + desc="_get_state_group_for_events", + ), ) - res = {row["event_id"]: row["state_group"] for row in rows} + res = dict(rows) for e in event_ids: if e not in res: raise RuntimeError("No state group for unknown or outlier event %s" % e) @@ -415,16 +420,19 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): The subset of state groups that are referenced. """ - rows = await self.db_pool.simple_select_many_batch( - table="event_to_state_groups", - column="state_group", - iterable=state_groups, - keyvalues={}, - retcols=("DISTINCT state_group",), - desc="get_referenced_state_groups", + rows = cast( + List[Tuple[int]], + await self.db_pool.simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=state_groups, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ), ) - return {row["state_group"] for row in rows} + return {row[0] for row in rows} async def update_state_for_partial_state_event( self, @@ -624,16 +632,22 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): # potentially stale, since there may have been a period where the # server didn't share a room with the remote user and therefore may # have missed any device updates. - rows = self.db_pool.simple_select_many_txn( - txn, - table="current_state_events", - column="room_id", - iterable=to_delete, - keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN}, - retcols=("state_key",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="current_state_events", + column="room_id", + iterable=to_delete, + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + retcols=("state_key",), + ), ) - potentially_left_users = {row["state_key"] for row in rows} + potentially_left_users = {row[0] for row in rows} # Now lets actually delete the rooms from the DB. self.db_pool.simple_delete_many_txn( diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 9d403919e4..5b2d0ba870 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -506,25 +506,28 @@ class StatsStore(StateDeltasStore): ) -> Tuple[List[str], Dict[str, int], int, List[str], int]: pos = self.get_room_max_stream_ordering() # type: ignore[attr-defined] - rows = self.db_pool.simple_select_many_txn( - txn, - table="current_state_events", - column="type", - iterable=[ - EventTypes.Create, - EventTypes.JoinRules, - EventTypes.RoomHistoryVisibility, - EventTypes.RoomEncryption, - EventTypes.Name, - EventTypes.Topic, - EventTypes.RoomAvatar, - EventTypes.CanonicalAlias, - ], - keyvalues={"room_id": room_id, "state_key": ""}, - retcols=["event_id"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="current_state_events", + column="type", + iterable=[ + EventTypes.Create, + EventTypes.JoinRules, + EventTypes.RoomHistoryVisibility, + EventTypes.RoomEncryption, + EventTypes.Name, + EventTypes.Topic, + EventTypes.RoomAvatar, + EventTypes.CanonicalAlias, + ], + keyvalues={"room_id": room_id, "state_key": ""}, + retcols=["event_id"], + ), ) - event_ids = cast(List[str], [row["event_id"] for row in rows]) + event_ids = [row[0] for row in rows] txn.execute( """ diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index f35757280d..c4a6475060 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -211,18 +211,28 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): async def get_destination_retry_timings_batch( self, destinations: StrCollection ) -> Mapping[str, Optional[DestinationRetryTimings]]: - rows = await self.db_pool.simple_select_many_batch( - table="destinations", - iterable=destinations, - column="destination", - retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), - desc="get_destination_retry_timings_batch", + rows = cast( + List[Tuple[str, Optional[int], Optional[int], Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=( + "destination", + "failure_ts", + "retry_last_ts", + "retry_interval", + ), + desc="get_destination_retry_timings_batch", + ), ) return { - row.pop("destination"): DestinationRetryTimings(**row) - for row in rows - if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + destination: DestinationRetryTimings( + failure_ts, retry_last_ts, retry_interval + ) + for destination, failure_ts, retry_last_ts, retry_interval in rows + if retry_last_ts and failure_ts and retry_interval } async def set_destination_retry_timings( diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index f38bedbbcd..919c66f553 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -337,13 +337,16 @@ class UIAuthWorkerStore(SQLBaseStore): # If a registration token was used, decrement the pending counter # before deleting the session. - rows = self.db_pool.simple_select_many_txn( - txn, - table="ui_auth_sessions_credentials", - column="session_id", - iterable=session_ids, - keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN}, - retcols=["result"], + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="ui_auth_sessions_credentials", + column="session_id", + iterable=session_ids, + keyvalues={"stage_type": LoginType.REGISTRATION_TOKEN}, + retcols=["result"], + ), ) # Get the tokens used and how much pending needs to be decremented by. @@ -353,23 +356,25 @@ class UIAuthWorkerStore(SQLBaseStore): # registration token stage for that session will be True. # If a token was used to authenticate, but registration was # never completed, the result will be the token used. - token = db_to_json(r["result"]) + token = db_to_json(r[0]) if isinstance(token, str): token_counts[token] = token_counts.get(token, 0) + 1 # Update the `pending` counters. if len(token_counts) > 0: - token_rows = self.db_pool.simple_select_many_txn( - txn, - table="registration_tokens", - column="token", - iterable=list(token_counts.keys()), - keyvalues={}, - retcols=["token", "pending"], + token_rows = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_many_txn( + txn, + table="registration_tokens", + column="token", + iterable=list(token_counts.keys()), + keyvalues={}, + retcols=["token", "pending"], + ), ) - for token_row in token_rows: - token = token_row["token"] - new_pending = token_row["pending"] - token_counts[token] + for token, pending in token_rows: + new_pending = pending - token_counts[token] self.db_pool.simple_update_one_txn( txn, table="registration_tokens", diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index f0dc31fee6..23eb92c514 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -410,25 +410,24 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) # Next fetch their profiles. Note that not all users have profiles. - profile_rows = self.db_pool.simple_select_many_txn( - txn, - table="profiles", - column="full_user_id", - iterable=list(users_to_insert), - retcols=( - "full_user_id", - "displayname", - "avatar_url", + profile_rows = cast( + List[Tuple[str, Optional[str], Optional[str]]], + self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="full_user_id", + iterable=list(users_to_insert), + retcols=( + "full_user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, ), - keyvalues={}, ) profiles = { - row["full_user_id"]: _UserDirProfile( - row["full_user_id"], - row["displayname"], - row["avatar_url"], - ) - for row in profile_rows + full_user_id: _UserDirProfile(full_user_id, displayname, avatar_url) + for full_user_id, displayname, avatar_url in profile_rows } profiles_to_insert = [ @@ -517,18 +516,21 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] ] - rows = self.db_pool.simple_select_many_txn( - txn, - table="users", - column="name", - iterable=users, - keyvalues={ - "deactivated": 0, - }, - retcols=("name", "user_type"), + rows = cast( + List[Tuple[str, Optional[str]]], + self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ), ) - return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + return [name for name, user_type in rows if user_type != UserTypes.SUPPORT] async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index 06fcbe5e54..8bd58c6e3d 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Mapping +from typing import Iterable, List, Mapping, Tuple, cast from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main import CacheInvalidationWorkerStore @@ -50,14 +50,17 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore): Returns: for each user, whether the user has requested erasure. """ - rows = await self.db_pool.simple_select_many_batch( - table="erased_users", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="are_users_erased", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="erased_users", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="are_users_erased", + ), ) - erased_users = {row["user_id"] for row in rows} + erased_users = {row[0] for row in rows} return {u: u in erased_users for u in user_ids} diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 6984d11352..09d2a8c5b3 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -13,7 +13,17 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + cast, +) import attr @@ -730,19 +740,22 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "[purge] found %i state groups to delete", len(state_groups_to_delete) ) - rows = self.db_pool.simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=state_groups_to_delete, - keyvalues={}, - retcols=("state_group",), + rows = cast( + List[Tuple[int]], + self.db_pool.simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=state_groups_to_delete, + keyvalues={}, + retcols=("state_group",), + ), ) remaining_state_groups = { - row["state_group"] - for row in rows - if row["state_group"] not in state_groups_to_delete + state_group + for state_group, in rows + if state_group not in state_groups_to_delete } logger.info( @@ -799,16 +812,19 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): A mapping from state group to previous state group. """ - rows = await self.db_pool.simple_select_many_batch( - table="state_group_edges", - column="prev_state_group", - iterable=state_groups, - keyvalues={}, - retcols=("prev_state_group", "state_group"), - desc="get_previous_state_groups", + rows = cast( + List[Tuple[int, int]], + await self.db_pool.simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=state_groups, + keyvalues={}, + retcols=("state_group", "prev_state_group"), + desc="get_previous_state_groups", + ), ) - return {row["state_group"]: row["prev_state_group"] for row in rows} + return dict(rows) async def purge_room_state( self, room_id: str, state_groups_to_delete: Collection[int] diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index b55dd07f14..2f6499966c 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple, cast from twisted.test.proto_helpers import MemoryReactor from twisted.trial import unittest @@ -421,41 +421,53 @@ class EventChainStoreTestCase(HomeserverTestCase): self, events: List[EventBase] ) -> Tuple[Dict[str, Tuple[int, int]], _LinkMap]: # Fetch the map from event ID -> (chain ID, sequence number) - rows = self.get_success( - self.store.db_pool.simple_select_many_batch( - table="event_auth_chains", - column="event_id", - iterable=[e.event_id for e in events], - retcols=("event_id", "chain_id", "sequence_number"), - keyvalues={}, - ) + rows = cast( + List[Tuple[str, int, int]], + self.get_success( + self.store.db_pool.simple_select_many_batch( + table="event_auth_chains", + column="event_id", + iterable=[e.event_id for e in events], + retcols=("event_id", "chain_id", "sequence_number"), + keyvalues={}, + ) + ), ) chain_map = { - row["event_id"]: (row["chain_id"], row["sequence_number"]) for row in rows + event_id: (chain_id, sequence_number) + for event_id, chain_id, sequence_number in rows } # Fetch all the links and pass them to the _LinkMap. - rows = self.get_success( - self.store.db_pool.simple_select_many_batch( - table="event_auth_chain_links", - column="origin_chain_id", - iterable=[chain_id for chain_id, _ in chain_map.values()], - retcols=( - "origin_chain_id", - "origin_sequence_number", - "target_chain_id", - "target_sequence_number", - ), - keyvalues={}, - ) + auth_chain_rows = cast( + List[Tuple[int, int, int, int]], + self.get_success( + self.store.db_pool.simple_select_many_batch( + table="event_auth_chain_links", + column="origin_chain_id", + iterable=[chain_id for chain_id, _ in chain_map.values()], + retcols=( + "origin_chain_id", + "origin_sequence_number", + "target_chain_id", + "target_sequence_number", + ), + keyvalues={}, + ) + ), ) link_map = _LinkMap() - for row in rows: + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in auth_chain_rows: added = link_map.add_link( - (row["origin_chain_id"], row["origin_sequence_number"]), - (row["target_chain_id"], row["target_sequence_number"]), + (origin_chain_id, origin_sequence_number), + (target_chain_id, target_sequence_number), ) # We shouldn't have persisted any redundant links -- cgit 1.5.1 From cc865fffc0e556005a6ab596717a77230ba82ee7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 11 Oct 2023 20:08:11 -0400 Subject: Convert user_get_threepids response to attrs. (#16468) This improves type annotations by not having a dictionary of Any values. --- changelog.d/16468.misc | 1 + synapse/handlers/account_validity.py | 4 ++-- synapse/handlers/admin.py | 4 +++- synapse/handlers/deactivate_account.py | 4 ++-- synapse/module_api/__init__.py | 2 +- synapse/rest/admin/users.py | 3 +-- synapse/rest/client/account.py | 4 +++- synapse/storage/databases/main/registration.py | 19 ++++++++++++++----- tests/module_api/test_api.py | 8 ++++---- 9 files changed, 31 insertions(+), 18 deletions(-) create mode 100644 changelog.d/16468.misc (limited to 'tests') diff --git a/changelog.d/16468.misc b/changelog.d/16468.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/16468.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index f1a7a05df6..6c2a49a3b9 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -212,8 +212,8 @@ class AccountValidityHandler: addresses = [] for threepid in threepids: - if threepid["medium"] == "email": - addresses.append(threepid["address"]) + if threepid.medium == "email": + addresses.append(threepid.address) return addresses diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 97fd1fd427..2c2baeac67 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -16,6 +16,8 @@ import abc import logging from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set +import attr + from synapse.api.constants import Direction, Membership from synapse.events import EventBase from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo @@ -93,7 +95,7 @@ class AdminHandler: ] user_info_dict["displayname"] = profile.display_name user_info_dict["avatar_url"] = profile.avatar_url - user_info_dict["threepids"] = threepids + user_info_dict["threepids"] = [attr.asdict(t) for t in threepids] user_info_dict["external_ids"] = external_ids user_info_dict["erased"] = await self._store.is_user_erased(user.to_string()) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 67adeae6a7..6a8f8f2fd1 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -117,9 +117,9 @@ class DeactivateAccountHandler: # Remove any local threepid associations for this account. local_threepids = await self.store.user_get_threepids(user_id) - for threepid in local_threepids: + for local_threepid in local_threepids: await self._auth_handler.delete_local_threepid( - user_id, threepid["medium"], threepid["address"] + user_id, local_threepid.medium, local_threepid.address ) # delete any devices belonging to the user, which will also diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 65e2aca456..0786d20635 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -678,7 +678,7 @@ class ModuleApi: "msisdn" for phone numbers, and an "address" key which value is the threepid's address. """ - return await self._store.user_get_threepids(user_id) + return [attr.asdict(t) for t in await self._store.user_get_threepids(user_id)] def check_user_exists(self, user_id: str) -> "defer.Deferred[Optional[str]]": """Check if user exists. diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index cd995e8dbb..7fe16130e7 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -329,9 +329,8 @@ class UserRestServletV2(RestServlet): if threepids is not None: # get changed threepids (added and removed) - # convert List[Dict[str, Any]] into Set[Tuple[str, str]] cur_threepids = { - (threepid["medium"], threepid["address"]) + (threepid.medium, threepid.address) for threepid in await self.store.user_get_threepids(user_id) } add_threepids = new_threepids - cur_threepids diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index e74a87af4d..641390cb30 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -24,6 +24,8 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2: from pydantic.v1 import StrictBool, StrictStr, constr else: from pydantic import StrictBool, StrictStr, constr + +import attr from typing_extensions import Literal from twisted.web.server import Request @@ -595,7 +597,7 @@ class ThreepidRestServlet(RestServlet): threepids = await self.datastore.user_get_threepids(requester.user.to_string()) - return 200, {"threepids": threepids} + return 200, {"threepids": [attr.asdict(t) for t in threepids]} # NOTE(dmr): I have chosen not to use Pydantic to parse this request's body, because # the endpoint is deprecated. (If you really want to, you could do this by reusing diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 64a2c31a5d..9e8643ae4d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -143,6 +143,14 @@ class LoginTokenLookupResult: """The session ID advertised by the SSO Identity Provider.""" +@attr.s(frozen=True, slots=True, auto_attribs=True) +class ThreepidResult: + medium: str + address: str + validated_at: int + added_at: int + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -988,13 +996,14 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): {"user_id": user_id, "validated_at": validated_at, "added_at": added_at}, ) - async def user_get_threepids(self, user_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]: + results = await self.db_pool.simple_select_list( "user_threepids", - {"user_id": user_id}, - ["medium", "address", "validated_at", "added_at"], - "user_get_threepids", + keyvalues={"user_id": user_id}, + retcols=["medium", "address", "validated_at", "added_at"], + desc="user_get_threepids", ) + return [ThreepidResult(**r) for r in results] async def user_delete_threepid( self, user_id: str, medium: str, address: str diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 172fc3a736..1dabf52156 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -94,12 +94,12 @@ class ModuleApiTestCase(BaseModuleApiTestCase): self.assertEqual(len(emails), 1) email = emails[0] - self.assertEqual(email["medium"], "email") - self.assertEqual(email["address"], "bob@bobinator.bob") + self.assertEqual(email.medium, "email") + self.assertEqual(email.address, "bob@bobinator.bob") # Should these be 0? - self.assertEqual(email["validated_at"], 0) - self.assertEqual(email["added_at"], 0) + self.assertEqual(email.validated_at, 0) + self.assertEqual(email.added_at, 0) # Check that the displayname was assigned displayname = self.get_success( -- cgit 1.5.1 From e3e0ae4ab1f48974ca66a4c4e6be8019aaa38fd1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 16 Oct 2023 07:35:22 -0400 Subject: Convert state delta processing from a dict to attrs. (#16469) For improved type checking & memory usage. --- changelog.d/16469.misc | 1 + synapse/handlers/presence.py | 32 ++++++------- synapse/handlers/room_member.py | 21 ++++----- synapse/handlers/stats.py | 64 +++++++++++++------------- synapse/handlers/user_directory.py | 34 +++++++------- synapse/storage/controllers/state.py | 14 +----- synapse/storage/databases/main/state_deltas.py | 52 +++++++++++++-------- tests/handlers/test_typing.py | 2 +- 8 files changed, 111 insertions(+), 109 deletions(-) create mode 100644 changelog.d/16469.misc (limited to 'tests') diff --git a/changelog.d/16469.misc b/changelog.d/16469.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/16469.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7c7cda3e95..dfc0b9db07 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -110,6 +110,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.streams import EventSource from synapse.types import ( JsonDict, @@ -1499,9 +1500,9 @@ class PresenceHandler(BasePresenceHandler): # We may get multiple deltas for different rooms, but we want to # handle them on a room by room basis, so we batch them up by # room. - deltas_by_room: Dict[str, List[JsonDict]] = {} + deltas_by_room: Dict[str, List[StateDelta]] = {} for delta in deltas: - deltas_by_room.setdefault(delta["room_id"], []).append(delta) + deltas_by_room.setdefault(delta.room_id, []).append(delta) for room_id, deltas_for_room in deltas_by_room.items(): await self._handle_state_delta(room_id, deltas_for_room) @@ -1513,7 +1514,7 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: + async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None: """Process current state deltas for the room to find new joins that need to be handled. """ @@ -1524,31 +1525,30 @@ class PresenceHandler(BasePresenceHandler): newly_joined_users = set() for delta in deltas: - assert room_id == delta["room_id"] + assert room_id == delta.room_id - typ = delta["type"] - state_key = delta["state_key"] - event_id = delta["event_id"] - prev_event_id = delta["prev_event_id"] - - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + logger.debug( + "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id + ) # Drop any event that isn't a membership join - if typ != EventTypes.Member: + if delta.event_type != EventTypes.Member: continue - if event_id is None: + if delta.event_id is None: # state has been deleted, so this is not a join. We only care about # joins. continue - event = await self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(delta.event_id, allow_none=True) if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue - if prev_event_id: - prev_event = await self.store.get_event(prev_event_id, allow_none=True) + if delta.prev_event_id: + prev_event = await self.store.get_event( + delta.prev_event_id, allow_none=True + ) if ( prev_event and prev_event.content.get("membership") == Membership.JOIN @@ -1556,7 +1556,7 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - newly_joined_users.add(state_key) + newly_joined_users.add(delta.state_key) if not newly_joined_users: # If nobody has joined then there's nothing to do. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 130eee7e1d..918eb203e2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -16,7 +16,7 @@ import abc import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple from synapse import types from synapse.api.constants import ( @@ -44,6 +44,7 @@ from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import ( JsonDict, Requester, @@ -2146,24 +2147,18 @@ class RoomForgetterHandler(StateDeltasHandler): await self._store.update_room_forgetter_stream_pos(max_pos) - async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + async def _handle_deltas(self, deltas: List[StateDelta]) -> None: """Called with the state deltas to process""" for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id = delta["event_id"] - prev_event_id = delta["prev_event_id"] - - if typ != EventTypes.Member: + if delta.event_type != EventTypes.Member: continue - if not self._hs.is_mine_id(state_key): + if not self._hs.is_mine_id(delta.state_key): continue change = await self._get_key_change( - prev_event_id, - event_id, + delta.prev_event_id, + delta.event_id, key_name="membership", public_value=Membership.JOIN, ) @@ -2172,7 +2167,7 @@ class RoomForgetterHandler(StateDeltasHandler): if is_leave: try: await self._room_member_handler.forget( - UserID.from_string(state_key), room_id + UserID.from_string(delta.state_key), delta.room_id ) except SynapseError as e: if e.code == 400: diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 3dde19fc81..817b41aa37 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -27,6 +27,7 @@ from typing import ( from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import JsonDict if TYPE_CHECKING: @@ -142,7 +143,7 @@ class StatsHandler: self.pos = max_pos async def _handle_deltas( - self, deltas: Iterable[JsonDict] + self, deltas: Iterable[StateDelta] ) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]: """Called with the state deltas to process @@ -157,51 +158,50 @@ class StatsHandler: room_to_state_updates: Dict[str, Dict[str, Any]] = {} for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id = delta["event_id"] - stream_id = delta["stream_id"] - prev_event_id = delta["prev_event_id"] - - logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) + logger.debug( + "Handling: %r, %r %r, %s", + delta.room_id, + delta.event_type, + delta.state_key, + delta.event_id, + ) - token = await self.store.get_earliest_token_for_stats("room", room_id) + token = await self.store.get_earliest_token_for_stats("room", delta.room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. - if token is not None and token >= stream_id: + if token is not None and token >= delta.stream_id: logger.debug( "Ignoring: %s as earlier than this room's initial ingestion event", - event_id, + delta.event_id, ) continue - if event_id is None and prev_event_id is None: + if delta.event_id is None and delta.prev_event_id is None: logger.error( "event ID is None and so is the previous event ID. stream_id: %s", - stream_id, + delta.stream_id, ) continue event_content: JsonDict = {} - if event_id is not None: - event = await self.store.get_event(event_id, allow_none=True) + if delta.event_id is not None: + event = await self.store.get_event(delta.event_id, allow_none=True) if event: event_content = event.content or {} # All the values in this dict are deltas (RELATIVE changes) - room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter()) + room_stats_delta = room_to_stats_deltas.setdefault(delta.room_id, Counter()) - room_state = room_to_state_updates.setdefault(room_id, {}) + room_state = room_to_state_updates.setdefault(delta.room_id, {}) - if prev_event_id is None: + if delta.prev_event_id is None: # this state event doesn't overwrite another, # so it is a new effective/current state event room_stats_delta["current_state_events"] += 1 - if typ == EventTypes.Member: + if delta.event_type == EventTypes.Member: # we could use StateDeltasHandler._get_key_change here but it's # a bit inefficient given we're not testing for a specific # result; might as well just grab the prev_membership and @@ -210,9 +210,9 @@ class StatsHandler: # in the absence of a previous event because we do not want to # reduce the leave count when a new-to-the-room user joins. prev_membership = None - if prev_event_id is not None: + if delta.prev_event_id is not None: prev_event = await self.store.get_event( - prev_event_id, allow_none=True + delta.prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content @@ -256,7 +256,7 @@ class StatsHandler: else: raise ValueError("%r is not a valid membership" % (membership,)) - user_id = state_key + user_id = delta.state_key if self.is_mine_id(user_id): # this accounts for transitions like leave → ban and so on. has_changed_joinedness = (prev_membership == Membership.JOIN) != ( @@ -272,30 +272,30 @@ class StatsHandler: room_stats_delta["local_users_in_room"] += membership_delta - elif typ == EventTypes.Create: + elif delta.event_type == EventTypes.Create: room_state["is_federatable"] = ( event_content.get(EventContentFields.FEDERATE, True) is True ) room_type = event_content.get(EventContentFields.ROOM_TYPE) if isinstance(room_type, str): room_state["room_type"] = room_type - elif typ == EventTypes.JoinRules: + elif delta.event_type == EventTypes.JoinRules: room_state["join_rules"] = event_content.get("join_rule") - elif typ == EventTypes.RoomHistoryVisibility: + elif delta.event_type == EventTypes.RoomHistoryVisibility: room_state["history_visibility"] = event_content.get( "history_visibility" ) - elif typ == EventTypes.RoomEncryption: + elif delta.event_type == EventTypes.RoomEncryption: room_state["encryption"] = event_content.get("algorithm") - elif typ == EventTypes.Name: + elif delta.event_type == EventTypes.Name: room_state["name"] = event_content.get("name") - elif typ == EventTypes.Topic: + elif delta.event_type == EventTypes.Topic: room_state["topic"] = event_content.get("topic") - elif typ == EventTypes.RoomAvatar: + elif delta.event_type == EventTypes.RoomAvatar: room_state["avatar"] = event_content.get("url") - elif typ == EventTypes.CanonicalAlias: + elif delta.event_type == EventTypes.CanonicalAlias: room_state["canonical_alias"] = event_content.get("alias") - elif typ == EventTypes.GuestAccess: + elif delta.event_type == EventTypes.GuestAccess: room_state["guest_access"] = event_content.get( EventContentFields.GUEST_ACCESS ) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a0f5568000..75717ba4f9 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,7 +14,7 @@ import logging from http import HTTPStatus -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, List, Optional, Set, Tuple from twisted.internet.interfaces import IDelayedCall @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Memb from synapse.api.errors import Codes, SynapseError from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo from synapse.types import UserID @@ -247,32 +248,31 @@ class UserDirectoryHandler(StateDeltasHandler): await self.store.update_user_directory_stream_pos(max_pos) - async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + async def _handle_deltas(self, deltas: List[StateDelta]) -> None: """Called with the state deltas to process""" for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id: Optional[str] = delta["event_id"] - prev_event_id: Optional[str] = delta["prev_event_id"] - - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + logger.debug( + "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id + ) # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room - if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): + if delta.event_type in ( + EventTypes.RoomHistoryVisibility, + EventTypes.JoinRules, + ): await self._handle_room_publicity_change( - room_id, prev_event_id, event_id, typ + delta.room_id, delta.prev_event_id, delta.event_id, delta.event_type ) - elif typ == EventTypes.Member: + elif delta.event_type == EventTypes.Member: await self._handle_room_membership_event( - room_id, - prev_event_id, - event_id, - state_key, + delta.room_id, + delta.prev_event_id, + delta.event_id, + delta.state_key, ) else: - logger.debug("Ignoring irrelevant type: %r", typ) + logger.debug("Ignoring irrelevant type: %r", delta.event_type) async def _handle_room_publicity_change( self, diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 46957723a1..9f7959c45d 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -16,7 +16,6 @@ from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, - Any, Callable, Collection, Dict, @@ -32,6 +31,7 @@ from typing import ( from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.logging.opentracing import tag_args, trace +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.roommember import ProfileInfo from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -531,19 +531,9 @@ class StateStorageController: @tag_args async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: """Fetch a list of room state changes since the given stream id - Each entry in the result contains the following fields: - - stream_id (int) - - room_id (str) - - type (str): event type - - state_key (str): - - event_id (str|None): new event_id for this state key. None if the - state has been deleted. - - prev_event_id (str|None): previous event_id for this state key. None - if it's new state. - Args: prev_stream_id: point to get changes since (exclusive) max_stream_id: the point that we know has been correctly persisted diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 445213e12a..3151186e0c 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -13,7 +13,9 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Tuple +from typing import List, Optional, Tuple + +import attr from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction @@ -22,6 +24,20 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class StateDelta: + stream_id: int + room_id: str + event_type: str + state_key: str + + event_id: Optional[str] + """new event_id for this state key. None if the state has been deleted.""" + + prev_event_id: Optional[str] + """previous event_id for this state key. None if it's new state.""" + + class StateDeltasStore(SQLBaseStore): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? @@ -29,31 +45,21 @@ class StateDeltasStore(SQLBaseStore): async def get_partial_current_state_deltas( self, prev_stream_id: int, max_stream_id: int - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: """Fetch a list of room state changes since the given stream id - Each entry in the result contains the following fields: - - stream_id (int) - - room_id (str) - - type (str): event type - - state_key (str): - - event_id (str|None): new event_id for this state key. None if the - state has been deleted. - - prev_event_id (str|None): previous event_id for this state key. None - if it's new state. - This may be the partial state if we're lazy joining the room. Args: prev_stream_id: point to get changes since (exclusive) max_stream_id: the point that we know has been correctly persisted - - ie, an upper limit to return changes from. + - ie, an upper limit to return changes from. Returns: A tuple consisting of: - - the stream id which these results go up to - - list of current_state_delta_stream rows. If it is empty, we are - up to date. + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. """ prev_stream_id = int(prev_stream_id) @@ -72,7 +78,7 @@ class StateDeltasStore(SQLBaseStore): def get_current_state_deltas_txn( txn: LoggingTransaction, - ) -> Tuple[int, List[Dict[str, Any]]]: + ) -> Tuple[int, List[StateDelta]]: # First we calculate the max stream id that will give us less than # N results. # We arbitrarily limit to 100 stream_id entries to ensure we don't @@ -112,7 +118,17 @@ class StateDeltasStore(SQLBaseStore): ORDER BY stream_id ASC """ txn.execute(sql, (prev_stream_id, clipped_stream_id)) - return clipped_stream_id, self.db_pool.cursor_to_dict(txn) + return clipped_stream_id, [ + StateDelta( + stream_id=row[0], + room_id=row[1], + event_type=row[2], + state_key=row[3], + event_id=row[4], + prev_event_id=row[5], + ) + for row in txn.fetchall() + ] return await self.db_pool.runInteraction( "get_current_state_deltas", get_current_state_deltas_txn diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 3060bc9744..d7025c6f2c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -174,7 +174,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): return_value=1 ) - self.datastore.get_partial_current_state_deltas = Mock(return_value=(0, None)) # type: ignore[method-assign] + self.datastore.get_partial_current_state_deltas = Mock(return_value=(0, [])) # type: ignore[method-assign] self.datastore.get_to_device_stream_token = Mock( # type: ignore[method-assign] return_value=0 -- cgit 1.5.1 From 77dfc1f93967f4157ba961c3b5201206c1bbf797 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 17 Oct 2023 07:32:40 -0400 Subject: Fix a bug where servers could be marked as up when they were failing (#16506) After this change a server will only be reported as back online if they were previously having requests fail. --- changelog.d/16506.bugfix | 1 + synapse/util/retryutils.py | 30 +++++++++-------- tests/util/test_retryutils.py | 75 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 13 deletions(-) create mode 100644 changelog.d/16506.bugfix (limited to 'tests') diff --git a/changelog.d/16506.bugfix b/changelog.d/16506.bugfix new file mode 100644 index 0000000000..a2c7e82b9e --- /dev/null +++ b/changelog.d/16506.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.59.0 where servers would be incorrectly marked as available when a request resulted in an error. diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 0e1f907667..547202c96b 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -170,10 +170,10 @@ class RetryDestinationLimiter: database in milliseconds, or zero if the last request was successful. backoff_on_404: Back off if we get a 404 - backoff_on_failure: set to False if we should not increase the retry interval on a failure. - + notifier: A notifier used to mark servers as up. + replication_client A replication client used to mark servers as up. backoff_on_all_error_codes: Whether we should back off on any error code. """ @@ -237,6 +237,9 @@ class RetryDestinationLimiter: else: valid_err_code = False + # Whether previous requests to the destination had been failing. + previously_failing = bool(self.failure_ts) + if success: # We connected successfully. if not self.retry_interval: @@ -282,6 +285,9 @@ class RetryDestinationLimiter: if self.failure_ts is None: self.failure_ts = retry_last_ts + # Whether the current request to the destination had been failing. + currently_failing = bool(self.failure_ts) + async def store_retry_timings() -> None: try: await self.store.set_destination_retry_timings( @@ -291,17 +297,15 @@ class RetryDestinationLimiter: self.retry_interval, ) - if self.notifier: - # Inform the relevant places that the remote server is back up. - self.notifier.notify_remote_server_up(self.destination) - - if self.replication_client: - # If we're on a worker we try and inform master about this. The - # replication client doesn't hook into the notifier to avoid - # infinite loops where we send a `REMOTE_SERVER_UP` command to - # master, which then echoes it back to us which in turn pokes - # the notifier. - self.replication_client.send_remote_server_up(self.destination) + # If the server was previously failing, but is no longer. + if previously_failing and not currently_failing: + if self.notifier: + # Inform the relevant places that the remote server is back up. + self.notifier.notify_remote_server_up(self.destination) + + if self.replication_client: + # Inform other workers that the remote server is up. + self.replication_client.send_remote_server_up(self.destination) except Exception: logger.exception("Failed to store destination_retry_timings") diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py index 4bcd17a6fc..ad88b24566 100644 --- a/tests/util/test_retryutils.py +++ b/tests/util/test_retryutils.py @@ -11,6 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + +from synapse.notifier import Notifier +from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from tests.unittest import HomeserverTestCase @@ -109,6 +113,77 @@ class RetryLimiterTestCase(HomeserverTestCase): new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) self.assertIsNone(new_timings) + def test_notifier_replication(self) -> None: + """Ensure the notifier/replication client is called only when expected.""" + store = self.hs.get_datastores().main + + notifier = mock.Mock(spec=Notifier) + replication_client = mock.Mock(spec=ReplicationCommandHandler) + + limiter = self.get_success( + get_retry_limiter( + "test_dest", + self.clock, + store, + notifier=notifier, + replication_client=replication_client, + ) + ) + + # The server is already up, nothing should occur. + self.pump(1) + with limiter: + pass + self.pump() + + new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) + self.assertIsNone(new_timings) + notifier.notify_remote_server_up.assert_not_called() + replication_client.send_remote_server_up.assert_not_called() + + # Attempt again, but return an error. This will cause new retry timings, but + # should not trigger server up notifications. + self.pump(1) + try: + with limiter: + raise AssertionError("argh") + except AssertionError: + pass + self.pump() + + new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) + # The exact retry timings are tested separately. + self.assertIsNotNone(new_timings) + notifier.notify_remote_server_up.assert_not_called() + replication_client.send_remote_server_up.assert_not_called() + + # A second failing request should be treated as the above. + self.pump(1) + try: + with limiter: + raise AssertionError("argh") + except AssertionError: + pass + self.pump() + + new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) + # The exact retry timings are tested separately. + self.assertIsNotNone(new_timings) + notifier.notify_remote_server_up.assert_not_called() + replication_client.send_remote_server_up.assert_not_called() + + # A final successful attempt should generate a server up notification. + self.pump(1) + with limiter: + pass + self.pump() + + new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) + # The exact retry timings are tested separately. + self.assertIsNone(new_timings) + notifier.notify_remote_server_up.assert_called_once_with("test_dest") + replication_client.send_remote_server_up.assert_called_once_with("test_dest") + def test_max_retry_interval(self) -> None: """Test that `destination_max_retry_interval` setting works as expected""" store = self.hs.get_datastores().main -- cgit 1.5.1 From 6ad1f9eac2c5ffc496597acbc5728482441c64c7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 17 Oct 2023 08:47:42 -0400 Subject: Convert DeviceLastConnectionInfo to attrs. (#16507) To improve type safety & memory usage. --- changelog.d/16507.misc | 1 + synapse/handlers/device.py | 23 ++--- synapse/storage/databases/main/client_ips.py | 46 +++++---- tests/storage/test_client_ips.py | 137 ++++++++++++++------------- 4 files changed, 104 insertions(+), 103 deletions(-) create mode 100644 changelog.d/16507.misc (limited to 'tests') diff --git a/changelog.d/16507.misc b/changelog.d/16507.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/16507.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 50df4f2b06..544bc7c13d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,17 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterable, - List, - Mapping, - Optional, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple from synapse.api import errors from synapse.api.constants import EduTypes, EventTypes @@ -41,6 +31,7 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) +from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo from synapse.types import ( JsonDict, JsonMapping, @@ -1008,14 +999,14 @@ class DeviceHandler(DeviceWorkerHandler): def _update_device_from_client_ips( - device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]] + device: JsonDict, client_ips: Mapping[Tuple[str, str], DeviceLastConnectionInfo] ) -> None: - ip = client_ips.get((device["user_id"], device["device_id"]), {}) + ip = client_ips.get((device["user_id"], device["device_id"])) device.update( { - "last_seen_user_agent": ip.get("user_agent"), - "last_seen_ts": ip.get("last_seen"), - "last_seen_ip": ip.get("ip"), + "last_seen_user_agent": ip.user_agent if ip else None, + "last_seen_ts": ip.last_seen if ip else None, + "last_seen_ip": ip.ip if ip else None, } ) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 7da47c3dd7..8be1511859 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast +import attr from typing_extensions import TypedDict from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -42,7 +43,8 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class DeviceLastConnectionInfo(TypedDict): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class DeviceLastConnectionInfo: """Metadata for the last connection seen for a user and device combination""" # These types must match the columns in the `devices` table @@ -499,24 +501,29 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ keyvalues = {"user_id": user_id} if device_id is not None: keyvalues["device_id"] = device_id - res = cast( - List[DeviceLastConnectionInfo], - await self.db_pool.simple_select_list( - table="devices", - keyvalues=keyvalues, - retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), - ), + res = await self.db_pool.simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), ) - return {(d["user_id"], d["device_id"]): d for d in res} + return { + (d["user_id"], d["device_id"]): DeviceLastConnectionInfo( + user_id=d["user_id"], + device_id=d["device_id"], + ip=d["ip"], + user_agent=d["user_agent"], + last_seen=d["last_seen"], + ) + for d in res + } async def _get_user_ip_and_agents_from_database( self, user: UserID, since_ts: int = 0 @@ -683,8 +690,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id) @@ -705,13 +711,13 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke continue if not device_id or did == device_id: - ret[(user_id, did)] = { - "user_id": user_id, - "ip": ip, - "user_agent": user_agent, - "device_id": did, - "last_seen": last_seen, - } + ret[(user_id, did)] = DeviceLastConnectionInfo( + user_id=user_id, + ip=ip, + user_agent=user_agent, + device_id=did, + last_seen=last_seen, + ) return ret async def get_user_ip_and_agents( diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 6b9692c486..0c054a598f 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -24,7 +24,10 @@ import synapse.rest.admin from synapse.http.site import XForwardedForRequest from synapse.rest.client import login from synapse.server import HomeServer -from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY +from synapse.storage.databases.main.client_ips import ( + LAST_SEEN_GRANULARITY, + DeviceLastConnectionInfo, +) from synapse.types import UserID from synapse.util import Clock @@ -65,15 +68,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): ) r = result[(user_id, device_id)] - self.assertLessEqual( - { - "user_id": user_id, - "device_id": device_id, - "ip": "ip", - "user_agent": "user_agent", - "last_seen": 12345678000, - }.items(), - r.items(), + self.assertEqual( + DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id, + ip="ip", + user_agent="user_agent", + last_seen=12345678000, + ), + r, ) def test_insert_new_client_ip_none_device_id(self) -> None: @@ -201,13 +204,13 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): self.assertEqual( result, { - (user_id, device_id): { - "user_id": user_id, - "device_id": device_id, - "ip": "ip", - "user_agent": "user_agent", - "last_seen": 12345678000, - }, + (user_id, device_id): DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id, + ip="ip", + user_agent="user_agent", + last_seen=12345678000, + ), }, ) @@ -292,20 +295,20 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): self.assertEqual( result, { - (user_id, device_id_1): { - "user_id": user_id, - "device_id": device_id_1, - "ip": "ip_1", - "user_agent": "user_agent_1", - "last_seen": 12345678000, - }, - (user_id, device_id_2): { - "user_id": user_id, - "device_id": device_id_2, - "ip": "ip_2", - "user_agent": "user_agent_3", - "last_seen": 12345688000 + LAST_SEEN_GRANULARITY, - }, + (user_id, device_id_1): DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id_1, + ip="ip_1", + user_agent="user_agent_1", + last_seen=12345678000, + ), + (user_id, device_id_2): DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id_2, + ip="ip_2", + user_agent="user_agent_3", + last_seen=12345688000 + LAST_SEEN_GRANULARITY, + ), }, ) @@ -526,15 +529,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): ) r = result[(user_id, device_id)] - self.assertLessEqual( - { - "user_id": user_id, - "device_id": device_id, - "ip": None, - "user_agent": None, - "last_seen": None, - }.items(), - r.items(), + self.assertEqual( + DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id, + ip=None, + user_agent=None, + last_seen=None, + ), + r, ) # Register the background update to run again. @@ -561,15 +564,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): ) r = result[(user_id, device_id)] - self.assertLessEqual( - { - "user_id": user_id, - "device_id": device_id, - "ip": "ip", - "user_agent": "user_agent", - "last_seen": 0, - }.items(), - r.items(), + self.assertEqual( + DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id, + ip="ip", + user_agent="user_agent", + last_seen=0, + ), + r, ) def test_old_user_ips_pruned(self) -> None: @@ -640,15 +643,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): ) r = result2[(user_id, device_id)] - self.assertLessEqual( - { - "user_id": user_id, - "device_id": device_id, - "ip": "ip", - "user_agent": "user_agent", - "last_seen": 0, - }.items(), - r.items(), + self.assertEqual( + DeviceLastConnectionInfo( + user_id=user_id, + device_id=device_id, + ip="ip", + user_agent="user_agent", + last_seen=0, + ), + r, ) def test_invalid_user_agents_are_ignored(self) -> None: @@ -777,13 +780,13 @@ class ClientIpAuthTestCase(unittest.HomeserverTestCase): self.store.get_last_client_ip_by_device(self.user_id, device_id) ) r = result[(self.user_id, device_id)] - self.assertLessEqual( - { - "user_id": self.user_id, - "device_id": device_id, - "ip": expected_ip, - "user_agent": "Mozzila pizza", - "last_seen": 123456100, - }.items(), - r.items(), + self.assertEqual( + DeviceLastConnectionInfo( + user_id=self.user_id, + device_id=device_id, + ip=expected_ip, + user_agent="Mozzila pizza", + last_seen=123456100, + ), + r, ) -- cgit 1.5.1