diff options
Diffstat (limited to 'synapse')
61 files changed, 879 insertions, 655 deletions
diff --git a/synapse/api/auth/internal.py b/synapse/api/auth/internal.py index a75f6f2cc4..36ee9c8b8f 100644 --- a/synapse/api/auth/internal.py +++ b/synapse/api/auth/internal.py @@ -115,7 +115,7 @@ class InternalAuth(BaseAuth): Once get_user_by_req has set up the opentracing span, this does the actual work. """ try: - ip_addr = request.getClientAddress().host + ip_addr = request.get_client_ip_if_available() user_agent = get_request_user_agent(request) access_token = self.get_access_token_from_request(request) diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b78f419994..afef6712e1 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -80,10 +80,6 @@ class UserPresenceState: def as_dict(self) -> JsonDict: return attr.asdict(self) - @staticmethod - def from_dict(d: JsonDict) -> "UserPresenceState": - return UserPresenceState(**d) - def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": return attr.evolve(self, **kwargs) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c8bc46415d..1a7fa175ec 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1402,7 +1402,7 @@ class FederationClient(FederationBase): The remote homeserver return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. @@ -1429,7 +1429,7 @@ class FederationClient(FederationBase): The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. """ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ec8e770430..6ac8d16095 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -850,14 +850,7 @@ class FederationServer(FederationBase): context, self._room_prejoin_state_types ) ) - return { - "knock_room_state": stripped_room_state, - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also populate a 'knock_state_events' with the same content to - # support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - "knock_state_events": stripped_room_state, - } + return {"knock_room_state": stripped_room_state} async def _on_send_membership_event( self, origin: str, content: JsonDict, membership_type: str, room_id: str diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6520795635..525968bcba 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow): @staticmethod def from_data(data: JsonDict) -> "PresenceDestinationsRow": return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] + state=UserPresenceState(**data["state"]), destinations=data["dests"] ) def to_data(self) -> JsonDict: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index fb20fd8a10..7b6b1da090 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -67,7 +67,7 @@ The loop continues so long as there is anything to send. At each iteration of th When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop* (`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`. -(Only once the catch-up mode has been exited can the regular tranaction transmission behaviour +(Only once the catch-up mode has been exited can the regular transaction transmission behaviour be resumed.) *Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index b5e4b2680e..fab4800717 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -431,7 +431,7 @@ class TransportLayerClient: The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. """ diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index ba9704a065..97fd1fd427 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -171,8 +171,8 @@ class AdminHandler: else: stream_ordering = room.stream_ordering - from_key = RoomStreamToken(0, 0) - to_key = RoomStreamToken(None, stream_ordering) + from_key = RoomStreamToken(topological=0, stream=0) + to_key = RoomStreamToken(stream=stream_ordering) # Events that we've processed in this room written_events: Set[str] = set() diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 7de7bd3289..c200a45f3a 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -216,7 +216,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Collection[Union[str, UserID]], ) -> None: @@ -326,7 +326,7 @@ class ApplicationServicesHandler: async def _notify_interested_services_ephemeral( self, services: List[ApplicationService], - stream_key: str, + stream_key: StreamKeyType, new_token: int, users: Collection[Union[str, UserID]], ) -> None: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 86ad96d030..50df4f2b06 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler): else: assert max_stream_id == stream_id # Avoid moving `room_id` backwards. - pass if self._handle_new_device_update_new_data: continue diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 29cd45550a..9d72794e8b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -868,19 +868,10 @@ class FederationHandler: # This is a bit of a hack and is cribbing off of invites. Basically we # store the room state here and retrieve it again when this event appears # in the invitee's sync stream. It is stripped out for all other local users. - stripped_room_state = ( - knock_response.get("knock_room_state") - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also check for a 'knock_state_events' to support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - or knock_response.get("knock_state_events") - ) + stripped_room_state = knock_response.get("knock_room_state") if stripped_room_state is None: - raise KeyError( - "Missing 'knock_room_state' (or legacy 'knock_state_events') field in " - "send_knock response" - ) + raise KeyError("Missing 'knock_room_state' field in send_knock response") event.unsigned["knock_room_state"] = stripped_room_state @@ -1506,7 +1497,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass else: destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} @@ -1582,7 +1572,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass async def add_display_name_to_third_party_invite( self, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 5737f8014d..c34bd7db95 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -192,8 +192,7 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - None, - event.stream_ordering, + stream=event.stream_ordering, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 44dbbf81dd..8de4b8e816 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -693,13 +693,9 @@ class EventCreationHandler: if require_consent and not is_exempt: await self.assert_accepted_privacy_policy(requester) - # Save the access token ID, the device ID and the transaction ID in the event - # internal metadata. This is useful to determine if we should echo the - # transaction_id in events. + # Save the the device ID and the transaction ID in the event internal metadata. + # This is useful to determine if we should echo the transaction_id in events. # See `synapse.events.utils.EventClientSerializer.serialize_event` - if requester.access_token_id is not None: - builder.internal_metadata.token_id = requester.access_token_id - if requester.device_id is not None: builder.internal_metadata.device_id = requester.device_id @@ -1133,7 +1129,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert ev.internal_metadata.stream_ordering @@ -2038,7 +2033,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass return True except AuthError: logger.info( diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py index 7ed88a3611..87b428ab1c 100644 --- a/synapse/handlers/push_rules.py +++ b/synapse/handlers/push_rules.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.push_rule import RuleNotFoundException from synapse.synapse_rust.push import get_base_rule_ids -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -114,7 +114,9 @@ class PushRulesHandler: user_id: the user ID the change is for. """ stream_id = self._main_store.get_max_push_rules_stream_id() - self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) + self._notifier.on_new_event( + StreamKeyType.PUSH_RULES, stream_id, users=[user_id] + ) async def push_rules_for_user( self, user: UserID diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a7a29b758b..69ac468f75 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -130,11 +130,10 @@ class ReceiptsHandler: async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: """Takes a list of receipts, stores them and informs the notifier.""" - min_batch_id: Optional[int] = None - max_batch_id: Optional[int] = None + receipts_persisted: List[ReadReceipt] = [] for receipt in receipts: - res = await self.store.insert_receipt( + stream_id = await self.store.insert_receipt( receipt.room_id, receipt.receipt_type, receipt.user_id, @@ -143,30 +142,26 @@ class ReceiptsHandler: receipt.data, ) - if not res: - # res will be None if this receipt is 'old' + if stream_id is None: + # stream_id will be None if this receipt is 'old' continue - stream_id, max_persisted_id = res + receipts_persisted.append(receipt) - if min_batch_id is None or stream_id < min_batch_id: - min_batch_id = stream_id - if max_batch_id is None or max_persisted_id > max_batch_id: - max_batch_id = max_persisted_id - - # Either both of these should be None or neither. - if min_batch_id is None or max_batch_id is None: + if not receipts_persisted: # no new receipts return False - affected_room_ids = list({r.room_id for r in receipts}) + max_batch_id = self.store.get_max_receipt_stream_id() + + affected_room_ids = list({r.room_id for r in receipts_persisted}) self.notifier.on_new_event( StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( - min_batch_id, max_batch_id, affected_room_ids + {r.user_id for r in receipts_persisted} ) return True diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a0c3b16819..97c9f01245 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -261,7 +261,6 @@ class RoomCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # This is to satisfy mypy and should never happen raise PartialStateConflictError() @@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = RoomStreamToken(None, from_key.stream) + from_key = RoomStreamToken(stream=from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 90343c2306..130eee7e1d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): and persist a new event for the new membership change. Args: - requester: - target: + requester: User requesting the membership change, i.e. the sender of the + desired membership event. + target: Use whose membership should change, i.e. the state_key of the + desired membership event. room_id: membership: @@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Returns: Tuple of event ID and stream ordering position """ - user_id = target.to_string() if content is None: @@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): (EventTypes.Member, user_id), None ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event( - prev_member_event_id - ) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False - ) with opentracing.start_active_span("handle_new_client_event"): result_event = ( await self.event_creation_handler.handle_new_client_event( @@ -514,7 +500,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering @@ -618,6 +603,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Raises: ShadowBanError if a shadow-banned requester attempts to send an invite. """ + if ratelimit: + if action == Membership.JOIN: + # Only rate-limit if the user isn't already joined to the room, otherwise + # we'll end up blocking profile updates. + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + requester.user.to_string(), + room_id, + ) + if current_membership != Membership.JOIN: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + elif action == Membership.INVITE: + await self.ratelimit_invite(requester, room_id, target.to_string()) + if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. await self.clock.sleep(random.randint(1, 10)) @@ -794,8 +798,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if effective_membership_state == Membership.INVITE: target_id = target.to_string() - if ratelimit: - await self.ratelimit_invite(requester, room_id, target_id) # block any attempts to invite the server notices mxid if target_id == self._server_notices_mxid: @@ -2002,7 +2004,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert result_event.internal_metadata.stream_ordering diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7bd42f635f..744e080309 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2333,7 +2333,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/http/server.py b/synapse/http/server.py index 3bbf91298e..1e4e56f36b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -266,7 +266,7 @@ class HttpServer(Protocol): def register_paths( self, method: str, - path_patterns: Iterable[Pattern], + path_patterns: Iterable[Pattern[str]], callback: ServletCallback, servlet_classname: str, ) -> None: diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 80c448de2b..13345acf75 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -26,11 +26,11 @@ from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender from twisted.web.server import Request -from synapse.api.errors import Codes, SynapseError, cs_error +from synapse.api.errors import Codes, cs_error from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable -from synapse.util.stringutils import is_ascii, parse_and_validate_server_name +from synapse.util.stringutils import is_ascii logger = logging.getLogger(__name__) @@ -84,52 +84,12 @@ INLINE_CONTENT_TYPES = [ ] -def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: - """Parses the server name, media ID and optional file name from the request URI - - Also performs some rough validation on the server name. - - Args: - request: The `Request`. - - Returns: - A tuple containing the parsed server name, media ID and optional file name. - - Raises: - SynapseError(404): if parsing or validation fail for any reason - """ - try: - # The type on postpath seems incorrect in Twisted 21.2.0. - postpath: List[bytes] = request.postpath # type: ignore - assert postpath - - # This allows users to append e.g. /test.png to the URL. Useful for - # clients that parse the URL to see content type. - server_name_bytes, media_id_bytes = postpath[:2] - server_name = server_name_bytes.decode("utf-8") - media_id = media_id_bytes.decode("utf8") - - # Validate the server name, raising if invalid - parse_and_validate_server_name(server_name) - - file_name = None - if len(postpath) > 2: - try: - file_name = urllib.parse.unquote(postpath[-1].decode("utf-8")) - except UnicodeDecodeError: - pass - return server_name, media_id, file_name - except Exception: - raise SynapseError( - 404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN - ) - - def respond_404(request: SynapseRequest) -> None: + assert request.path is not None respond_with_json( request, 404, - cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND), + cs_error("Not found '%s'" % (request.path.decode(),), code=Codes.NOT_FOUND), send_cors=True, ) @@ -372,7 +332,7 @@ class ThumbnailInfo: # Content type of thumbnail, e.g. image/png type: str # The size of the media file, in bytes. - length: Optional[int] = None + length: int @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1b7b014f9a..7fd46901f7 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -48,6 +48,7 @@ from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError +from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID from synapse.util.async_helpers import Linearizer @@ -114,7 +115,7 @@ class MediaRepository: ) storage_providers.append(provider) - self.media_storage = MediaStorage( + self.media_storage: MediaStorage = MediaStorage( self.hs, self.primary_base_path, self.filepaths, storage_providers ) @@ -142,6 +143,13 @@ class MediaRepository: MEDIA_RETENTION_CHECK_PERIOD_MS, ) + if hs.config.media.url_preview_enabled: + self.url_previewer: Optional[UrlPreviewer] = UrlPreviewer( + hs, self, self.media_storage + ) + else: + self.url_previewer = None + def _start_update_recently_accessed(self) -> Deferred: return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed @@ -616,6 +624,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -686,6 +695,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -831,6 +841,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) diff --git a/synapse/notifier.py b/synapse/notifier.py index fc39e5c963..99e7715896 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -126,7 +126,7 @@ class _NotifierUserStream: def notify( self, - stream_key: str, + stream_key: StreamKeyType, stream_id: Union[int, RoomStreamToken], time_now_ms: int, ) -> None: @@ -454,7 +454,7 @@ class Notifier: def on_new_event( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, rooms: Optional[StrCollection] = None, @@ -655,30 +655,29 @@ class Notifier: events: List[Union[JsonDict, EventBase]] = [] end_token = from_token - for name, source in self.event_sources.sources.get_sources(): - keyname = "%s_key" % name - before_id = getattr(before_token, keyname) - after_id = getattr(after_token, keyname) + for keyname, source in self.event_sources.sources.get_sources(): + before_id = before_token.get_field(keyname) + after_id = after_token.get_field(keyname) if before_id == after_id: continue new_events, new_key = await source.get_new_events( user=user, - from_key=getattr(from_token, keyname), + from_key=from_token.get_field(keyname), limit=limit, is_guest=is_peeking, room_ids=room_ids, explicit_room_id=explicit_room_id, ) - if name == "room": + if keyname == StreamKeyType.ROOM: new_events = await filter_events_for_client( self._storage_controllers, user.to_string(), new_events, is_peeking=is_peeking, ) - elif name == "presence": + elif keyname == StreamKeyType.PRESENCE: now = self.clock.time_msec() new_events[:] = [ { diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 9e3a98741a..4d405f2a0c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -101,7 +101,7 @@ if TYPE_CHECKING: class PusherConfig: """Parameters necessary to configure a pusher.""" - id: Optional[str] + id: Optional[int] user_name: str profile_tag: str @@ -182,7 +182,7 @@ class Pusher(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: raise NotImplementedError() @abc.abstractmethod diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 1710dd51b9..cf45fd09a8 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -99,7 +99,7 @@ class EmailPusher(Pusher): pass self.timed_call = None - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 50027680cb..725910a659 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -160,7 +160,7 @@ class HttpPusher(Pusher): if should_check_for_notifs: self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: # Note that the min here shouldn't be relied upon to be accurate. # We could check the receipts are actually m.read receipts here, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 6517e3566f..15a2cc932f 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -292,20 +292,12 @@ class PusherPool: except Exception: logger.exception("Exception in pusher on_new_notifications") - async def on_new_receipts( - self, min_stream_id: int, max_stream_id: int, affected_room_ids: Iterable[str] - ) -> None: + async def on_new_receipts(self, users_affected: StrCollection) -> None: if not self.pushers: # nothing to do here. return try: - # Need to subtract 1 from the minimum because the lower bound here - # is not inclusive - users_affected = await self.store.get_users_sent_receipts_between( - min_stream_id - 1, max_stream_id - ) - for u in users_affected: # Don't push if the user account has expired expired = await self._account_validity_handler.is_user_expired(u) @@ -314,7 +306,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_receipts(min_stream_id, max_stream_id) + p.on_new_receipts() except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index f4f2b29e96..d5337fe588 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -129,9 +129,7 @@ class ReplicationDataHandler: self.notifier.on_new_event( StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) - await self._pusher_pool.on_new_receipts( - token, token, {row.room_id for row in rows} - ) + await self._pusher_pool.on_new_receipts({row.user_id for row in rows}) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index e616b5e1c8..0f0f851b79 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -18,7 +18,7 @@ allowed to be sent by which side. """ import abc import logging -from typing import Optional, Tuple, Type, TypeVar +from typing import List, Optional, Tuple, Type, TypeVar from synapse.replication.tcp.streams._base import StreamRow from synapse.util import json_decoder, json_encoder @@ -74,6 +74,8 @@ SC = TypeVar("SC", bound="_SimpleCommand") class _SimpleCommand(Command): """An implementation of Command whose argument is just a 'data' string.""" + __slots__ = ["data"] + def __init__(self, data: str): self.data = data @@ -122,6 +124,8 @@ class RdataCommand(Command): RDATA presence master 59 ["@baz:example.com", "online", ...] """ + __slots__ = ["stream_name", "instance_name", "token", "row"] + NAME = "RDATA" def __init__( @@ -179,6 +183,8 @@ class PositionCommand(Command): of the stream. """ + __slots__ = ["stream_name", "instance_name", "prev_token", "new_token"] + NAME = "POSITION" def __init__( @@ -235,6 +241,8 @@ class ReplicateCommand(Command): REPLICATE """ + __slots__: List[str] = [] + NAME = "REPLICATE" def __init__(self) -> None: @@ -264,6 +272,8 @@ class UserSyncCommand(Command): Where <state> is either "start" or "end" """ + __slots__ = ["instance_id", "user_id", "device_id", "is_syncing", "last_sync_ms"] + NAME = "USER_SYNC" def __init__( @@ -316,6 +326,8 @@ class ClearUserSyncsCommand(Command): CLEAR_USER_SYNC <instance_id> """ + __slots__ = ["instance_id"] + NAME = "CLEAR_USER_SYNC" def __init__(self, instance_id: str): @@ -343,6 +355,8 @@ class FederationAckCommand(Command): FEDERATION_ACK <instance_name> <token> """ + __slots__ = ["instance_name", "token"] + NAME = "FEDERATION_ACK" def __init__(self, instance_name: str, token: int): @@ -368,6 +382,15 @@ class UserIpCommand(Command): USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent> """ + __slots__ = [ + "user_id", + "access_token", + "ip", + "user_agent", + "device_id", + "last_seen", + ] + NAME = "USER_IP" def __init__( @@ -423,8 +446,6 @@ class RemoteServerUpCommand(_SimpleCommand): """Sent when a worker has detected that a remote server is no longer "down" and retry timings should be reset. - If sent from a client the server will relay to all other workers. - Format:: REMOTE_SERVER_UP <server> @@ -441,6 +462,8 @@ class LockReleasedCommand(Command): LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"] """ + __slots__ = ["instance_name", "lock_name", "lock_key"] + NAME = "LOCK_RELEASED" def __init__( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index e42dade246..9bd0d764f8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -146,7 +146,7 @@ class PurgeHistoryRestServlet(RestServlet): # RoomStreamToken expects [int] not Optional[int] assert event.internal_metadata.stream_ordering is not None room_token = RoomStreamToken( - event.depth, event.internal_metadata.stream_ordering + topological=event.depth, stream=event.internal_metadata.stream_ordering ) token = await room_token.to_string(self.store) diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index e0ee55bd0e..8a617af599 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -198,7 +198,13 @@ class DestinationMembershipRestServlet(RestServlet): rooms, total = await self._store.get_destination_rooms_paginate( destination, start, limit, direction ) - response = {"rooms": rooms, "total": total} + response = { + "rooms": [ + {"room_id": room_id, "stream_ordering": stream_ordering} + for room_id, stream_ordering in rooms + ], + "total": total, + } if (start + limit) < total: response["next_token"] = str(start + len(rooms)) diff --git a/synapse/rest/media/config_resource.py b/synapse/rest/media/config_resource.py index a95804d327..dbf5133c72 100644 --- a/synapse/rest/media/config_resource.py +++ b/synapse/rest/media/config_resource.py @@ -14,17 +14,19 @@ # limitations under the License. # +import re from typing import TYPE_CHECKING -from synapse.http.server import DirectServeJsonResource, respond_with_json +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet from synapse.http.site import SynapseRequest if TYPE_CHECKING: from synapse.server import HomeServer -class MediaConfigResource(DirectServeJsonResource): - isLeaf = True +class MediaConfigResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/config$")] def __init__(self, hs: "HomeServer"): super().__init__() @@ -33,9 +35,6 @@ class MediaConfigResource(DirectServeJsonResource): self.auth = hs.get_auth() self.limits_dict = {"m.upload.size": config.media.max_upload_size} - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: await self.auth.get_user_by_req(request) respond_with_json(request, 200, self.limits_dict, send_cors=True) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 3c618ef60a..65b9ff52fa 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -13,16 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING +import re +from typing import TYPE_CHECKING, Optional -from synapse.http.server import ( - DirectServeJsonResource, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_boolean +from synapse.http.server import set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_boolean from synapse.http.site import SynapseRequest -from synapse.media._base import parse_media_id, respond_404 +from synapse.media._base import respond_404 +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -31,15 +29,28 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class DownloadResource(DirectServeJsonResource): - isLeaf = True +class DownloadResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/download/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)(/(?P<file_name>[^/]*))?$" + ) + ] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() self.media_repo = media_repo self._is_mine_server_name = hs.is_mine_server_name - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, + request: SynapseRequest, + server_name: str, + media_id: str, + file_name: Optional[str] = None, + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) request.setHeader( @@ -58,9 +69,8 @@ class DownloadResource(DirectServeJsonResource): b"Referrer-Policy", b"no-referrer", ) - server_name, media_id, name = parse_media_id(request) if self._is_mine_server_name(server_name): - await self.media_repo.get_local_media(request, media_id, name) + await self.media_repo.get_local_media(request, media_id, file_name) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -72,4 +82,6 @@ class DownloadResource(DirectServeJsonResource): respond_404(request) return - await self.media_repo.get_remote_media(request, server_name, media_id, name) + await self.media_repo.get_remote_media( + request, server_name, media_id, file_name + ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 5ebaa3b032..2089bb1029 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING from synapse.config._base import ConfigError -from synapse.http.server import UnrecognizedRequestResource +from synapse.http.server import HttpServer, JsonResource from .config_resource import MediaConfigResource from .download_resource import DownloadResource @@ -27,7 +27,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer -class MediaRepositoryResource(UnrecognizedRequestResource): +class MediaRepositoryResource(JsonResource): """File uploading and downloading. Uploads are POSTed to a resource which returns a token which is used to GET @@ -70,6 +70,11 @@ class MediaRepositoryResource(UnrecognizedRequestResource): width and height are close to the requested size and the aspect matches the requested size. The client should scale the image if it needs to fit within a given rectangle. + + This gets mounted at various points under /_matrix/media, including: + * /_matrix/media/r0 + * /_matrix/media/v1 + * /_matrix/media/v3 """ def __init__(self, hs: "HomeServer"): @@ -77,17 +82,23 @@ class MediaRepositoryResource(UnrecognizedRequestResource): if not hs.config.media.can_load_media_repo: raise ConfigError("Synapse is not configured to use a media repo.") - super().__init__() + JsonResource.__init__(self, hs, canonical_json=False) + self.register_servlets(self, hs) + + @staticmethod + def register_servlets(http_server: HttpServer, hs: "HomeServer") -> None: media_repo = hs.get_media_repository() - self.putChild(b"upload", UploadResource(hs, media_repo)) - self.putChild(b"download", DownloadResource(hs, media_repo)) - self.putChild( - b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage) + # Note that many of these should not exist as v1 endpoints, but empirically + # a lot of traffic still goes to them. + + UploadResource(hs, media_repo).register(http_server) + DownloadResource(hs, media_repo).register(http_server) + ThumbnailResource(hs, media_repo, media_repo.media_storage).register( + http_server ) if hs.config.media.url_preview_enabled: - self.putChild( - b"preview_url", - PreviewUrlResource(hs, media_repo, media_repo.media_storage), + PreviewUrlResource(hs, media_repo, media_repo.media_storage).register( + http_server ) - self.putChild(b"config", MediaConfigResource(hs)) + MediaConfigResource(hs).register(http_server) diff --git a/synapse/rest/media/preview_url_resource.py b/synapse/rest/media/preview_url_resource.py index 58513c4be4..c8acb65dca 100644 --- a/synapse/rest/media/preview_url_resource.py +++ b/synapse/rest/media/preview_url_resource.py @@ -13,24 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from typing import TYPE_CHECKING -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - respond_with_json_bytes, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json_bytes +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media.media_storage import MediaStorage -from synapse.media.url_previewer import UrlPreviewer if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository from synapse.server import HomeServer -class PreviewUrlResource(DirectServeJsonResource): +class PreviewUrlResource(RestServlet): """ The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API for URLs which outputs Open Graph (https://ogp.me/) responses (with some Matrix @@ -48,7 +44,7 @@ class PreviewUrlResource(DirectServeJsonResource): * Matrix cannot be used to distribute the metadata between homeservers. """ - isLeaf = True + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/preview_url$")] def __init__( self, @@ -62,14 +58,10 @@ class PreviewUrlResource(DirectServeJsonResource): self.clock = hs.get_clock() self.media_repo = media_repo self.media_storage = media_storage + assert self.media_repo.url_previewer is not None + self.url_previewer = self.media_repo.url_previewer - self._url_previewer = UrlPreviewer(hs, media_repo, media_storage) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - request.setHeader(b"Allow", b"OPTIONS, GET") - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: # XXX: if get_user_by_req fails, what should we do in an async render? requester = await self.auth.get_user_by_req(request) url = parse_string(request, "url", required=True) @@ -77,5 +69,5 @@ class PreviewUrlResource(DirectServeJsonResource): if ts is None: ts = self.clock.time_msec() - og = await self._url_previewer.preview(url, requester.user, ts) + og = await self.url_previewer.preview(url, requester.user, ts) respond_with_json_bytes(request, 200, og, send_cors=True) diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index 661e604b85..85b6bdbe72 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -13,29 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +import re +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError, cs_error from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json, set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( FileInfo, ThumbnailInfo, - parse_media_id, respond_404, respond_with_file, respond_with_responder, ) from synapse.media.media_storage import MediaStorage +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -44,8 +39,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ThumbnailResource(DirectServeJsonResource): - isLeaf = True +class ThumbnailResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/thumbnail/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$" + ) + ] def __init__( self, @@ -60,12 +59,17 @@ class ThumbnailResource(DirectServeJsonResource): self.media_storage = media_storage self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails self._is_mine_server_name = hs.is_mine_server_name + self._server_name = hs.hostname self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, request: SynapseRequest, server_name: str, media_id: str + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) - server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width", required=True) height = parse_integer(request, "height", required=True) method = parse_string(request, "method", "scale") @@ -155,30 +159,24 @@ class ThumbnailResource(DirectServeJsonResource): thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=None, file_id=media_id, url_cache=media_info["url_cache"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -218,29 +216,23 @@ class ThumbnailResource(DirectServeJsonResource): file_id = media_info["filesystem_id"] for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=server_name, file_id=media_info["filesystem_id"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -300,7 +292,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], media_id: str, file_id: str, url_cache: bool, @@ -315,7 +307,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail info of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -418,13 +410,14 @@ class ThumbnailResource(DirectServeJsonResource): # `dynamic_thumbnails` is disabled. logger.info("Failed to find any generated thumbnails") + assert request.path is not None respond_with_json( request, 400, cs_error( - "Cannot find any thumbnails for the requested media (%r). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" + "Cannot find any thumbnails for the requested media ('%s'). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" % ( - request.postpath, + request.path.decode(), ", ".join(THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP.keys()), ), code=Codes.UNKNOWN, @@ -438,7 +431,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], file_id: str, url_cache: bool, server_name: Optional[str], @@ -451,7 +444,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail infos of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -469,21 +462,25 @@ class ThumbnailResource(DirectServeJsonResource): if desired_method == "crop": # Thumbnails that match equal or larger sizes of desired width/height. - crop_info_list: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] # Other thumbnails. - crop_info_list2: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list2: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "crop": + if info.method != "crop": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height aspect_quality = abs(d_w * t_h - d_h * t_w) min_quality = 0 if d_w <= t_w and d_h <= t_h else 1 size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: crop_info_list.append( ( @@ -508,7 +505,7 @@ class ThumbnailResource(DirectServeJsonResource): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if crop_info_list: thumbnail_info = min(crop_info_list, key=lambda t: t[:-1])[-1] @@ -516,20 +513,20 @@ class ThumbnailResource(DirectServeJsonResource): thumbnail_info = min(crop_info_list2, key=lambda t: t[:-1])[-1] elif desired_method == "scale": # Thumbnails that match equal or larger sizes of desired width/height. - info_list: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list: List[Tuple[int, bool, int, ThumbnailInfo]] = [] # Other thumbnails. - info_list2: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list2: List[Tuple[int, bool, int, ThumbnailInfo]] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "scale": + if info.method != "scale": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: info_list.append((size_quality, type_quality, length_quality, info)) else: @@ -538,7 +535,7 @@ class ThumbnailResource(DirectServeJsonResource): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if info_list: thumbnail_info = min(info_list, key=lambda t: t[:-1])[-1] @@ -550,13 +547,7 @@ class ThumbnailResource(DirectServeJsonResource): file_id=file_id, url_cache=url_cache, server_name=server_name, - thumbnail=ThumbnailInfo( - width=thumbnail_info["thumbnail_width"], - height=thumbnail_info["thumbnail_height"], - type=thumbnail_info["thumbnail_type"], - method=thumbnail_info["thumbnail_method"], - length=thumbnail_info["thumbnail_length"], - ), + thumbnail=thumbnail_info, ) # No matching thumbnail was found. diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 043e8d6077..949326d85d 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -14,11 +14,12 @@ # limitations under the License. import logging +import re from typing import IO, TYPE_CHECKING, Dict, List, Optional from synapse.api.errors import Codes, SynapseError -from synapse.http.server import DirectServeJsonResource, respond_with_json -from synapse.http.servlet import parse_bytes_from_args +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet, parse_bytes_from_args from synapse.http.site import SynapseRequest from synapse.media.media_storage import SpamMediaException @@ -29,8 +30,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class UploadResource(DirectServeJsonResource): - isLeaf = True +class UploadResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() @@ -43,10 +44,7 @@ class UploadResource(DirectServeJsonResource): self.max_upload_size = hs.config.media.max_upload_size self.clock = hs.get_clock() - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_POST(self, request: SynapseRequest) -> None: + async def on_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ca894edd5a..7d8af5c610 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2418,7 +2418,7 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """ Executes a SELECT query on the named table with start and limit, of row numbers, which may return zero or number of rows from start to limit, @@ -2447,7 +2447,7 @@ class DatabasePool: order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: - The result as a list of dictionaries. + The result as a list of tuples. """ if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") @@ -2474,7 +2474,7 @@ class DatabasePool: ) txn.execute(sql, arg_list + [limit, start]) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_search_list( self, diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 101403578c..dfcbf0a175 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -142,26 +142,6 @@ class DataStore( super().__init__(database, db_conn, hs) - async def get_users(self) -> List[JsonDict]: - """Function to retrieve a list of users in users table. - - Returns: - A list of dictionaries representing users. - """ - return await self.db_pool.simple_select_list( - table="users", - keyvalues={}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "user_type", - "deactivated", - ], - desc="get_users", - ) - async def get_users_paginate( self, start: int, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 80f146dd53..39498d52c6 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -103,6 +103,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "AccountDataAndTagsChangeCache", account_max ) + self.db_pool.updates.register_background_index_update( + update_name="room_account_data_index_room_id", + index_name="room_account_data_room_id", + table="room_account_data", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_account_data_for_deactivated_users", self._delete_account_data_for_deactivated_users, @@ -151,10 +158,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) return { - row["account_data_type"]: db_to_json(row["content"]) for row in rows + account_data_type: db_to_json(content) + for account_data_type, content in txn } return await self.db_pool.runInteraction( @@ -196,13 +203,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) by_room: Dict[str, Dict[str, JsonDict]] = {} - for row in rows: - room_data = by_room.setdefault(row["room_id"], {}) + for room_id, account_data_type, content in txn: + room_data = by_room.setdefault(room_id, {}) - room_data[row["account_data_type"]] = db_to_json(row["content"]) + room_data[account_data_type] = db_to_json(content) return by_room diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 0553a0621a..073a99cd84 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -14,17 +14,7 @@ # limitations under the License. import logging import re -from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Optional, - Pattern, - Sequence, - Tuple, - cast, -) +from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast from synapse.appservice import ( ApplicationService, @@ -353,21 +343,15 @@ class ApplicationServiceTransactionWorkerStore( def _get_oldest_unsent_txn( txn: LoggingTransaction, - ) -> Optional[Dict[str, Any]]: + ) -> Optional[Tuple[int, str]]: # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" + "SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?" " ORDER BY txn_id ASC LIMIT 1", (service.id,), ) - rows = self.db_pool.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry + return cast(Optional[Tuple[int, str]], txn.fetchone()) entry = await self.db_pool.runInteraction( "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn @@ -376,8 +360,9 @@ class ApplicationServiceTransactionWorkerStore( if not entry: return None - event_ids = db_to_json(entry["event_ids"]) + txn_id, event_ids_str = entry + event_ids = db_to_json(event_ids_str) events = await self.get_events_as_list(event_ids) # TODO: to-device messages, one-time key counts, device list summaries and unused @@ -385,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore( # We likely want to populate those for reliability. return AppServiceTransaction( service=service, - id=entry["txn_id"], + id=txn_id, events=events, ephemeral=[], to_device_messages=[], diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index df596f35f9..9f3804a504 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1413,13 +1413,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_devices_not_accessed_since_txn( txn: LoggingTransaction, - ) -> List[Dict[str, str]]: + ) -> List[Tuple[str, str]]: sql = """ SELECT user_id, device_id FROM devices WHERE last_seen < ? AND hidden = FALSE """ txn.execute(sql, (since_ms,)) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_devices_not_accessed_since", @@ -1427,11 +1427,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) devices: Dict[str, List[str]] = {} - for row in rows: + for user_id, device_id in rows: # Remote devices are never stale from our point of view. - if self.hs.is_mine_id(row["user_id"]): - user_devices = devices.setdefault(row["user_id"], []) - user_devices.append(row["device_id"]) + if self.hs.is_mine_id(user_id): + user_devices = devices.setdefault(user_id, []) + user_devices.append(device_id) return devices diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index d01f28cc80..aac4cfb054 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -53,6 +53,13 @@ class EndToEndRoomKeyBackgroundStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_index_update( + update_name="e2e_room_keys_index_room_id", + index_name="e2e_room_keys_room_id", + table="e2e_room_keys", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_e2e_backup_keys_for_deactivated_users", self._delete_e2e_backup_keys_for_deactivated_users, @@ -208,7 +215,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore): "message": "Set room key", "room_id": room_id, "session_id": session_id, - StreamKeyType.ROOM: room_key, + StreamKeyType.ROOM.value: room_key, } ) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 89fac23f93..749ae54e20 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -921,14 +921,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker } txn.execute(sql, params) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - user_id = row["user_id"] - key_type = row["keytype"] - key = db_to_json(row["keydata"]) + for user_id, key_type, key_data, _ in txn: user_keys = result.setdefault(user_id, {}) - user_keys[key_type] = key + user_keys[key_type] = db_to_json(key_data) return result @@ -988,13 +984,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker query_params.extend(item) txn.execute(sql, query_params) - rows = self.db_pool.cursor_to_dict(txn) # and add the signatures to the appropriate keys - for row in rows: - key_id: str = row["key_id"] - target_user_id: str = row["target_user_id"] - target_device_id: str = row["target_device_id"] + for target_user_id, target_device_id, key_id, signature in txn: key_type = devices[(target_user_id, target_device_id)] # We need to copy everything, because the result may have come # from the cache. dict.copy only does a shallow copy, so we @@ -1012,13 +1004,11 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ].copy() if from_user_id in signatures: user_sigs = signatures[from_user_id] = signatures[from_user_id] - user_sigs[key_id] = row["signature"] + user_sigs[key_id] = signature else: - signatures[from_user_id] = {key_id: row["signature"]} + signatures[from_user_id] = {key_id: signature} else: - target_user_key["signatures"] = { - from_user_id: {key_id: row["signature"]} - } + target_user_key["signatures"] = {from_user_id: {key_id: signature}} return keys diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 790d058c43..d4dcdb898c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1654,8 +1654,6 @@ class PersistEventsStore: ) -> None: to_prefill = [] - rows = [] - ev_map = {e.event_id: e for e, _ in events_and_contexts} if not ev_map: return @@ -1676,10 +1674,9 @@ class PersistEventsStore: ) txn.execute(sql + clause, args) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - event = ev_map[row["event_id"]] - if not row["rejects"] and not row["redacts"]: + for event_id, redacts, rejects in txn: + event = ev_map[event_id] + if not rejects and not redacts: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) async def external_prefill() -> None: diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 8cebeb5189..2e6b176bd2 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -28,6 +28,7 @@ from typing import ( from synapse.api.constants import Direction from synapse.logging.opentracing import trace +from synapse.media._base import ThumbnailInfo from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_url_cache", ) - async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "local_media_repository_thumbnails", {"media_id": media_id}, ( @@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ), desc="get_local_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def store_local_thumbnail( @@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def get_remote_media_thumbnails( self, origin: str, media_id: str - ) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + ) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "remote_media_cache_thumbnails", {"media_origin": origin, "media_id": media_id}, ( @@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "thumbnail_method", "thumbnail_type", "thumbnail_length", - "filesystem_id", ), desc="get_remote_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def get_remote_media_thumbnail( diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 194b4e031f..519f05fb60 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -20,6 +20,7 @@ from typing import ( Mapping, Optional, Tuple, + Union, cast, ) @@ -385,28 +386,47 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) limit = 100 offset = 0 while True: - rows = await self.db_pool.runInteraction( - "get_presence_for_all_users", - self.db_pool.simple_select_list_paginate_txn, - "presence_stream", - orderby="stream_id", - start=offset, - limit=limit, - exclude_keyvalues=exclude_keyvalues, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", ), - order_direction="ASC", ) - for row in rows: - users_to_state[row["user_id"]] = UserPresenceState(**row) + for ( + user_id, + state, + last_active_ts, + last_federation_update_ts, + last_user_sync_ts, + status_msg, + currently_active, + ) in rows: + users_to_state[user_id] = UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) # We've run out of updates to query if len(rows) < limit: @@ -434,13 +454,21 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) txn = db_conn.cursor() txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() txn.close() - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] + return [ + UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) + for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows + ] def take_presence_startup_info(self) -> List[UserPresenceState]: active_on_startup = self._presence_on_startup diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index dea0e0458c..1e11bf2706 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -89,6 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. + if isinstance(self.database_engine, PostgresEngine): + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute( diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 87e28e22d3..c7eb7fc478 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -47,6 +47,27 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# The type of a row in the pushers table. +PusherRow = Tuple[ + int, # id + str, # user_name + Optional[int], # access_token + str, # profile_tag + str, # kind + str, # app_id + str, # app_display_name + str, # device_display_name + str, # pushkey + int, # ts + str, # lang + str, # data + int, # last_stream_ordering + int, # last_success + int, # failing_since + bool, # enabled + str, # device_id +] + class PusherWorkerStore(SQLBaseStore): def __init__( @@ -83,30 +104,66 @@ class PusherWorkerStore(SQLBaseStore): self._remove_deleted_email_pushers, ) - def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: + def _decode_pushers_rows( + self, + rows: Iterable[PusherRow], + ) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table Drops any rows whose data cannot be decoded """ - for r in rows: - data_json = r["data"] + for ( + id, + user_name, + access_token, + profile_tag, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + ts, + lang, + data, + last_stream_ordering, + last_success, + failing_since, + enabled, + device_id, + ) in rows: try: - r["data"] = db_to_json(data_json) + data_json = db_to_json(data) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", - r["id"], - data_json, + id, + data, e.args[0], ) continue - # If we're using SQLite, then boolean values are integers. This is - # troublesome since some code using the return value of this method might - # expect it to be a boolean, or will expose it to clients (in responses). - r["enabled"] = bool(r["enabled"]) - - yield PusherConfig(**r) + yield PusherConfig( + id=id, + user_name=user_name, + profile_tag=profile_tag, + kind=kind, + app_id=app_id, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + ts=ts, + lang=lang, + data=data_json, + last_stream_ordering=last_stream_ordering, + last_success=last_success, + failing_since=failing_since, + # If we're using SQLite, then boolean values are integers. This is + # troublesome since some code using the return value of this method might + # expect it to be a boolean, or will expose it to clients (in responses). + enabled=bool(enabled), + device_id=device_id, + access_token=access_token, + ) def get_pushers_stream_token(self) -> int: return self._pushers_id_gen.get_current_token() @@ -136,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore): The pushers for which the given columns have the given values. """ - def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_pushers_by_txn(txn: LoggingTransaction) -> List[PusherRow]: # We could technically use simple_select_list here, but we need to call # COALESCE on the 'enabled' column. While it is technically possible to give # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it @@ -154,7 +211,7 @@ class PusherWorkerStore(SQLBaseStore): txn.execute(sql, list(keyvalues.values())) - return self.db_pool.cursor_to_dict(txn) + return cast(List[PusherRow], txn.fetchall()) ret = await self.db_pool.runInteraction( desc="get_pushers_by", @@ -164,14 +221,22 @@ class PusherWorkerStore(SQLBaseStore): return self._decode_pushers_rows(ret) async def get_enabled_pushers(self) -> Iterator[PusherConfig]: - def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]: - txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)") - rows = self.db_pool.cursor_to_dict(txn) - - return self._decode_pushers_rows(rows) + def get_enabled_pushers_txn(txn: LoggingTransaction) -> List[PusherRow]: + txn.execute( + """ + SELECT id, user_name, access_token, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, ts, lang, data, + last_stream_ordering, last_success, failing_since, + enabled, device_id + FROM pushers WHERE COALESCE(enabled, TRUE) + """ + ) + return cast(List[PusherRow], txn.fetchall()) - return await self.db_pool.runInteraction( - "get_enabled_pushers", get_enabled_pushers_txn + return self._decode_pushers_rows( + await self.db_pool.runInteraction( + "get_enabled_pushers", get_enabled_pushers_txn + ) ) async def get_all_updated_pushers_rows( @@ -304,7 +369,7 @@ class PusherWorkerStore(SQLBaseStore): ) async def get_throttle_params_by_room( - self, pusher_id: str + self, pusher_id: int ) -> Dict[str, ThrottleParams]: res = await self.db_pool.simple_select_list( "pusher_throttle", @@ -323,7 +388,7 @@ class PusherWorkerStore(SQLBaseStore): return params_by_room async def set_throttle_params( - self, pusher_id: str, room_id: str, params: ThrottleParams + self, pusher_id: int, room_id: str, params: ThrottleParams ) -> None: await self.db_pool.simple_upsert( "pusher_throttle", @@ -534,7 +599,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): (last_pusher_id, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if len(rows) == 0: return 0 @@ -550,19 +615,19 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): txn=txn, table="pushers", key_names=("id",), - key_values=[(row["pusher_id"],) for row in rows], + key_values=[row[0] for row in rows], value_names=("device_id", "access_token"), # If there was already a device_id on the pusher, we only want to clear # the access_token column, so we keep the existing device_id. Otherwise, # we set the device_id we got from joining the access_tokens table. value_values=[ - (row["pusher_device_id"] or row["token_device_id"], None) - for row in rows + (pusher_device_id or token_device_id, None) + for _, pusher_device_id, token_device_id in rows ], ) self.db_pool.updates._background_update_progress_txn( - txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["pusher_id"]} + txn, "set_device_id_for_pushers", {"pusher_id": rows[-1][0]} ) return len(rows) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0231f9407b..b2645ab43c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> Sequence[JsonMapping]: """See get_linearized_receipts_for_room""" - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]: if from_key: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id > ? AND stream_id <= ?" ) txn.execute(sql, (room_id, from_key, to_key)) else: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id <= ?" ) txn.execute(sql, (room_id, to_key)) - rows = self.db_pool.cursor_to_dict(txn) - - return rows + return cast(List[Tuple[str, str, str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f) @@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore): return [] content: JsonDict = {} - for row in rows: - content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[ - row["user_id"] - ] = db_to_json(row["data"]) + for receipt_type, user_id, event_id, data in rows: + content.setdefault(event_id, {}).setdefault(receipt_type, {})[ + user_id + ] = db_to_json(data) return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}] @@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore): if not room_ids: return {} - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, str, str, Optional[str], str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( @@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [from_key, to_key] + list(args)) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id <= ? AND """ @@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [to_key] + list(args)) - return self.db_pool.cursor_to_dict(txn) + return cast( + List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall() + ) txn_results = await self.db_pool.runInteraction( "_get_linearized_receipts_for_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) - if row["thread_id"]: - receipt_type[row["user_id"]]["thread_id"] = row["thread_id"] + receipt_type_dict[user_id] = db_to_json(data) + if thread_id: + receipt_type_dict[user_id]["thread_id"] = thread_id results = { room_id: [results[room_id]] if room_id in results else [] @@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore): A dictionary of roomids to a list of receipts. """ - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [from_key, to_key]) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [to_key]) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str, str, str, str]], txn.fetchall()) txn_results = await self.db_pool.runInteraction( "get_linearized_receipts_for_all_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) + receipt_type_dict[user_id] = db_to_json(data) return results @@ -742,7 +750,7 @@ class ReceiptsWorkerStore(SQLBaseStore): event_ids: List[str], thread_id: Optional[str], data: dict, - ) -> Optional[Tuple[int, int]]: + ) -> Optional[int]: """Insert a receipt, either from local client or remote server. Automatically does conversion between linearized and graph @@ -804,9 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore): data, ) - max_persisted_id = self._receipts_id_gen.get_current_token() - - return stream_id, max_persisted_id + return stream_id async def _insert_graph_receipt( self, diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index cc964604e2..64a2c31a5d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -195,7 +195,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" - def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]: # We could technically use simple_select_one here, but it would not perform # the COALESCEs (unless hacked into the column names), which could yield # confusing results. @@ -213,35 +213,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) - - if len(rows) == 0: + row = txn.fetchone() + if not row: return None - return rows[0] + ( + name, + is_guest, + admin, + consent_version, + consent_ts, + consent_server_notice_sent, + appservice_id, + creation_ts, + user_type, + deactivated, + shadow_banned, + approved, + locked, + ) = row + + return UserInfo( + appservice_id=appservice_id, + consent_server_notice_sent=consent_server_notice_sent, + consent_version=consent_version, + consent_ts=consent_ts, + creation_ts=creation_ts, + is_admin=bool(admin), + is_deactivated=bool(deactivated), + is_guest=bool(is_guest), + is_shadow_banned=bool(shadow_banned), + user_id=UserID.from_string(name), + user_type=user_type, + approved=bool(approved), + locked=bool(locked), + ) - row = await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( desc="get_user_by_id", func=get_user_by_id_txn, ) - if row is None: - return None - - return UserInfo( - appservice_id=row["appservice_id"], - consent_server_notice_sent=row["consent_server_notice_sent"], - consent_version=row["consent_version"], - consent_ts=row["consent_ts"], - creation_ts=row["creation_ts"], - is_admin=bool(row["admin"]), - is_deactivated=bool(row["deactivated"]), - is_guest=bool(row["is_guest"]), - is_shadow_banned=bool(row["shadow_banned"]), - user_id=UserID.from_string(row["name"]), - user_type=row["user_type"], - approved=bool(row["approved"]), - locked=bool(row["locked"]), - ) async def is_trial_user(self, user_id: str) -> bool: """Checks if user is in the "trial" period, i.e. within the first @@ -579,16 +590,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """ txn.execute(sql, (token,)) - rows = self.db_pool.cursor_to_dict(txn) - - if rows: - row = rows[0] - - # This field is nullable, ensure it comes out as a boolean - if row["token_used"] is None: - row["token_used"] = False + row = txn.fetchone() - return TokenLookupResult(**row) + if row: + ( + user_id, + is_guest, + shadow_banned, + token_id, + device_id, + valid_until_ms, + token_owner, + token_used, + ) = row + + return TokenLookupResult( + user_id=user_id, + is_guest=is_guest, + shadow_banned=shadow_banned, + token_id=token_id, + device_id=device_id, + valid_until_ms=valid_until_ms, + token_owner=token_owner, + # This field is nullable, ensure it comes out as a boolean + token_used=bool(token_used), + ) return None @@ -833,11 +859,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_users", _count_users) @@ -891,11 +916,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users without a special user_type registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users where user_type is null") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_real_users", _count_users) @@ -1252,12 +1276,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) txn.execute(sql, []) - res = self.db_pool.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) + for (name,) in txn.fetchall(): + self.set_expiration_date_for_user_txn(txn, name, use_delta=True) await self.db_pool.runInteraction( "get_users_with_no_expiration_date", @@ -1963,11 +1983,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) + row = txn.fetchone() + assert row is not None # We cast to bool because the value returned by the database engine might # be an integer if we're using SQLite. - return bool(rows[0]["approved"]) + return bool(row[0]) return await self.db_pool.runInteraction( desc="is_user_pending_approval", @@ -2045,22 +2066,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): (last_user, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True, 0 rows_processed_nb = 0 - for user in rows: - if not user["count_tokens"] and not user["count_threepids"]: - self.set_user_deactivated_status_txn(txn, user["name"], True) + for name, count_tokens, count_threepids in rows: + if not count_tokens and not count_threepids: + self.set_user_deactivated_status_txn(txn, name, True) rows_processed_nb += 1 logger.info("Marked %d rows as deactivated", rows_processed_nb) self.db_pool.updates._background_update_progress_txn( - txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]} + txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 719e11aea6..1d4d99932b 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -831,7 +831,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): def get_retention_policy_for_room_txn( txn: LoggingTransaction, - ) -> List[Dict[str, Optional[int]]]: + ) -> Optional[Tuple[Optional[int], Optional[int]]]: txn.execute( """ SELECT min_lifetime, max_lifetime FROM room_retention @@ -841,7 +841,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): (room_id,), ) - return self.db_pool.cursor_to_dict(txn) + return cast(Optional[Tuple[Optional[int], Optional[int]]], txn.fetchone()) ret = await self.db_pool.runInteraction( "get_retention_policy_for_room", @@ -856,8 +856,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): max_lifetime=self.config.retention.retention_default_max_lifetime, ) - min_lifetime = ret[0]["min_lifetime"] - max_lifetime = ret[0]["max_lifetime"] + min_lifetime, max_lifetime = ret # If one of the room's policy's attributes isn't defined, use the matching # attribute from the default policy. @@ -1162,14 +1161,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, args) - rows = self.db_pool.cursor_to_dict(txn) - rooms_dict = {} - - for row in rows: - rooms_dict[row["room_id"]] = RetentionPolicy( - min_lifetime=row["min_lifetime"], - max_lifetime=row["max_lifetime"], + rooms_dict = { + room_id: RetentionPolicy( + min_lifetime=min_lifetime, + max_lifetime=max_lifetime, ) + for room_id, min_lifetime, max_lifetime in txn + } if include_null: # If required, do a second query that retrieves all of the rooms we know @@ -1178,13 +1176,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql) - rows = self.db_pool.cursor_to_dict(txn) - # If a room isn't already in the dict (i.e. it doesn't have a retention # policy in its state), add it with a null policy. - for row in rows: - if row["room_id"] not in rooms_dict: - rooms_dict[row["room_id"]] = RetentionPolicy() + for (room_id,) in txn: + if room_id not in rooms_dict: + rooms_dict[room_id] = RetentionPolicy() return rooms_dict @@ -1703,24 +1699,24 @@ class RoomBackgroundUpdateStore(SQLBaseStore): (last_room, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True - for row in rows: - if not row["json"]: + for room_id, event_id, json in rows: + if not json: retention_policy = {} else: - ev = db_to_json(row["json"]) + ev = db_to_json(json) retention_policy = ev["content"] self.db_pool.simple_insert_txn( txn=txn, table="room_retention", values={ - "room_id": row["room_id"], - "event_id": row["event_id"], + "room_id": room_id, + "event_id": event_id, "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), }, @@ -1729,7 +1725,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): logger.info("Inserted %d rows into room_retention", len(rows)) self.db_pool.updates._background_update_progress_txn( - txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} + txn, "insert_room_retention", {"room_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e93573f315..bbe08368db 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1349,18 +1349,16 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] to_update = [] - for row in rows: - event_id = row["event_id"] - room_id = row["room_id"] + for _, event_id, room_id, json in rows: try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index a7aae661d8..1d69c4a5f0 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -179,22 +179,24 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # store_search_entries_txn with a generator function, but that # would mean having two cursors open on the database at once. # Instead we just build a list of results. - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] event_search_rows = [] - for row in rows: + for ( + stream_ordering, + event_id, + room_id, + etype, + json, + origin_server_ts, + ) in rows: try: - event_id = row["event_id"] - room_id = row["room_id"] - etype = row["type"] - stream_ordering = row["stream_ordering"] - origin_server_ts = row["origin_server_ts"] try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 5a3611c415..ea06e4eee0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -266,7 +266,7 @@ def generate_next_token( # when we are going backwards so we subtract one from the # stream part. last_stream_ordering -= 1 - return RoomStreamToken(last_topo_ordering, last_stream_ordering) + return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) def _make_generic_sql_bound( @@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if p > min_pos } - return RoomStreamToken(None, min_pos, immutabledict(positions)) + return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) async def get_room_events_stream_for_rooms( self, @@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = RoomStreamToken(None, min(r.stream_ordering for r in rows)) + key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return RoomStreamToken(topo, stream_ordering) + return RoomStreamToken(topological=topo, stream=stream_ordering) @overload def get_stream_id_for_event_txn( @@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", ) - return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + return RoomStreamToken( + topological=row["topological_ordering"], stream=row["stream_ordering"] + ) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream @@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): else: topo = None internal = event.internal_metadata - internal.before = RoomStreamToken(topo, stream - 1) - internal.after = RoomStreamToken(topo, stream) + internal.before = RoomStreamToken(topological=topo, stream=stream - 1) + internal.after = RoomStreamToken(topological=topo, stream=stream) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( @@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, results["stream_ordering"] + topological=results["topological_ordering"] - 1, + stream=results["stream_ordering"], ) after_token = RoomStreamToken( - results["topological_ordering"], results["stream_ordering"] + topological=results["topological_ordering"], + stream=results["stream_ordering"], ) rows, start_token = self._paginate_room_events_txn( diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 5c5372a825..5555b53575 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -27,6 +27,8 @@ from synapse.util import json_encoder if TYPE_CHECKING: from synapse.server import HomeServer +ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str] + class TaskSchedulerWorkerStore(SQLBaseStore): def __init__( @@ -38,13 +40,18 @@ class TaskSchedulerWorkerStore(SQLBaseStore): super().__init__(database, db_conn, hs) @staticmethod - def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: - row["status"] = TaskStatus(row["status"]) - if row["params"] is not None: - row["params"] = db_to_json(row["params"]) - if row["result"] is not None: - row["result"] = db_to_json(row["result"]) - return ScheduledTask(**row) + def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask: + task_id, action, status, timestamp, resource_id, params, result, error = row + return ScheduledTask( + id=task_id, + action=action, + status=TaskStatus(status), + timestamp=timestamp, + resource_id=resource_id, + params=db_to_json(params) if params is not None else None, + result=db_to_json(result) if result is not None else None, + error=error, + ) async def get_scheduled_tasks( self, @@ -68,7 +75,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ - def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]: clauses: List[str] = [] args: List[Any] = [] if resource_id: @@ -101,7 +108,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return self.db_pool.cursor_to_dict(txn) + return cast(List[ScheduledTaskRow], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_scheduled_tasks", get_scheduled_tasks_txn @@ -193,7 +200,22 @@ class TaskSchedulerWorkerStore(SQLBaseStore): desc="get_scheduled_task", ) - return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + return ( + TaskSchedulerWorkerStore._convert_row_to_task( + ( + row["id"], + row["action"], + row["status"], + row["timestamp"], + row["resource_id"], + row["params"], + row["result"], + row["error"], + ) + ) + if row + else None + ) async def delete_scheduled_task(self, id: str) -> None: """Delete a specific task from its id. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 8f70eff809..f35757280d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -526,7 +526,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +537,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +558,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 5b50bd66bc..de89de7d74 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -125,8 +125,8 @@ Changes in SCHEMA_VERSION = 82 SCHEMA_COMPAT_VERSION = ( - # The `event_txn_id_device_id` must be written to for new events. - 80 + # The event_txn_id table and tables from MSC2716 no longer exist. + 82 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql new file mode 100644 index 0000000000..149020bbd7 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql @@ -0,0 +1,24 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop the old event transaction ID table, the event_txn_id_device_id table +-- should be used instead. +DROP TABLE IF EXISTS event_txn_id; + +-- Drop tables related to MSC2716 since the implementation is being removed +DROP TABLE insertion_events; +DROP TABLE insertion_event_edges; +DROP TABLE insertion_event_extremities; +DROP TABLE batch_events; diff --git a/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql new file mode 100644 index 0000000000..fc948166e6 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql @@ -0,0 +1,20 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'e2e_room_keys_index_room_id', '{}'); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'room_account_data_index_room_id', '{}'); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index d7084d2358..609a0978a9 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Tuple +from typing import TYPE_CHECKING, Sequence, Tuple import attr @@ -23,7 +23,7 @@ from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.logging.opentracing import trace from synapse.streams import EventSource -from synapse.types import StreamToken +from synapse.types import StreamKeyType, StreamToken if TYPE_CHECKING: from synapse.server import HomeServer @@ -37,9 +37,14 @@ class _EventSourcesInner: receipt: ReceiptEventSource account_data: AccountDataEventSource - def get_sources(self) -> Iterator[Tuple[str, EventSource]]: - for attribute in attr.fields(_EventSourcesInner): - yield attribute.name, getattr(self, attribute.name) + def get_sources(self) -> Sequence[Tuple[StreamKeyType, EventSource]]: + return [ + (StreamKeyType.ROOM, self.room), + (StreamKeyType.PRESENCE, self.presence), + (StreamKeyType.TYPING, self.typing), + (StreamKeyType.RECEIPT, self.receipt), + (StreamKeyType.ACCOUNT_DATA, self.account_data), + ] class EventSources: diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 76b0e3e694..09a88c86a7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -22,8 +22,8 @@ from typing import ( Any, ClassVar, Dict, - Final, List, + Literal, Mapping, Match, MutableMapping, @@ -34,6 +34,7 @@ from typing import ( Type, TypeVar, Union, + overload, ) import attr @@ -60,6 +61,8 @@ from synapse.util.cancellation import cancellable from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: + from typing_extensions import Self + from synapse.appservice.api import ApplicationService from synapse.storage.databases.main import DataStore, PurgeEventsStore from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore @@ -436,7 +439,78 @@ def map_username_to_mxid_localpart( @attr.s(frozen=True, slots=True, order=False) -class RoomStreamToken: +class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): + """An abstract stream token class for streams that supports multiple + writers. + + This works by keeping track of the stream position of each writer, + represented by a default `stream` attribute and a map of instance name to + stream position of any writers that are ahead of the default stream + position. + """ + + stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) + + instance_map: "immutabledict[str, int]" = attr.ib( + factory=immutabledict, + validator=attr.validators.deep_mapping( + key_validator=attr.validators.instance_of(str), + value_validator=attr.validators.instance_of(int), + mapping_validator=attr.validators.instance_of(immutabledict), + ), + kw_only=True, + ) + + @classmethod + @abc.abstractmethod + async def parse(cls, store: "DataStore", string: str) -> "Self": + """Parse the string representation of the token.""" + ... + + @abc.abstractmethod + async def to_string(self, store: "DataStore") -> str: + """Serialize the token into its string representation.""" + ... + + def copy_and_advance(self, other: "Self") -> "Self": + """Return a new token such that if an event is after both this token and + the other token, then its after the returned token too. + """ + + max_stream = max(self.stream, other.stream) + + instance_map = { + instance: max( + self.instance_map.get(instance, self.stream), + other.instance_map.get(instance, other.stream), + ) + for instance in set(self.instance_map).union(other.instance_map) + } + + return attr.evolve( + self, stream=max_stream, instance_map=immutabledict(instance_map) + ) + + def get_max_stream_pos(self) -> int: + """Get the maximum stream position referenced in this token. + + The corresponding "min" position is, by definition just `self.stream`. + + This is used to handle tokens that have non-empty `instance_map`, and so + reference stream positions after the `self.stream` position. + """ + return max(self.instance_map.values(), default=self.stream) + + def get_stream_pos_for_instance(self, instance_name: str) -> int: + """Get the stream position that the given writer was at at this token.""" + + # If we don't have an entry for the instance we can assume that it was + # at `self.stream`. + return self.instance_map.get(instance_name, self.stream) + + +@attr.s(frozen=True, slots=True, order=False) +class RoomStreamToken(AbstractMultiWriterStreamToken): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -513,16 +587,8 @@ class RoomStreamToken: topological: Optional[int] = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(int)), - ) - stream: int = attr.ib(validator=attr.validators.instance_of(int)) - - instance_map: "immutabledict[str, int]" = attr.ib( - factory=immutabledict, - validator=attr.validators.deep_mapping( - key_validator=attr.validators.instance_of(str), - value_validator=attr.validators.instance_of(int), - mapping_validator=attr.validators.instance_of(immutabledict), - ), + kw_only=True, + default=None, ) def __attrs_post_init__(self) -> None: @@ -582,17 +648,7 @@ class RoomStreamToken: if self.topological or other.topological: raise Exception("Can't advance topological tokens") - max_stream = max(self.stream, other.stream) - - instance_map = { - instance: max( - self.instance_map.get(instance, self.stream), - other.instance_map.get(instance, other.stream), - ) - for instance in set(self.instance_map).union(other.instance_map) - } - - return RoomStreamToken(None, max_stream, immutabledict(instance_map)) + return super().copy_and_advance(other) def as_historical_tuple(self) -> Tuple[int, int]: """Returns a tuple of `(topological, stream)` for historical tokens. @@ -618,16 +674,6 @@ class RoomStreamToken: # at `self.stream`. return self.instance_map.get(instance_name, self.stream) - def get_max_stream_pos(self) -> int: - """Get the maximum stream position referenced in this token. - - The corresponding "min" position is, by definition just `self.stream`. - - This is used to handle tokens that have non-empty `instance_map`, and so - reference stream positions after the `self.stream` position. - """ - return max(self.instance_map.values(), default=self.stream) - async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) @@ -649,20 +695,20 @@ class RoomStreamToken: return "s%d" % (self.stream,) -class StreamKeyType: +class StreamKeyType(Enum): """Known stream types. A stream is a list of entities ordered by an incrementing "stream token". """ - ROOM: Final = "room_key" - PRESENCE: Final = "presence_key" - TYPING: Final = "typing_key" - RECEIPT: Final = "receipt_key" - ACCOUNT_DATA: Final = "account_data_key" - PUSH_RULES: Final = "push_rules_key" - TO_DEVICE: Final = "to_device_key" - DEVICE_LIST: Final = "device_list_key" + ROOM = "room_key" + PRESENCE = "presence_key" + TYPING = "typing_key" + RECEIPT = "receipt_key" + ACCOUNT_DATA = "account_data_key" + PUSH_RULES = "push_rules_key" + TO_DEVICE = "to_device_key" + DEVICE_LIST = "device_list_key" UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" @@ -784,7 +830,7 @@ class StreamToken: def room_stream_id(self) -> int: return self.room_key.stream - def copy_and_advance(self, key: str, new_value: Any) -> "StreamToken": + def copy_and_advance(self, key: StreamKeyType, new_value: Any) -> "StreamToken": """Advance the given key in the token to a new value if and only if the new value is after the old value. @@ -797,35 +843,68 @@ class StreamToken: return new_token new_token = self.copy_and_replace(key, new_value) - new_id = int(getattr(new_token, key)) - old_id = int(getattr(self, key)) + new_id = new_token.get_field(key) + old_id = self.get_field(key) if old_id < new_id: return new_token else: return self - def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken": - return attr.evolve(self, **{key: new_value}) + def copy_and_replace(self, key: StreamKeyType, new_value: Any) -> "StreamToken": + return attr.evolve(self, **{key.value: new_value}) + @overload + def get_field(self, key: Literal[StreamKeyType.ROOM]) -> RoomStreamToken: + ... -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) + @overload + def get_field( + self, + key: Literal[ + StreamKeyType.ACCOUNT_DATA, + StreamKeyType.DEVICE_LIST, + StreamKeyType.PRESENCE, + StreamKeyType.PUSH_RULES, + StreamKeyType.RECEIPT, + StreamKeyType.TO_DEVICE, + StreamKeyType.TYPING, + StreamKeyType.UN_PARTIAL_STATED_ROOMS, + ], + ) -> int: + ... + @overload + def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]: + ... -@attr.s(slots=True, frozen=True, auto_attribs=True) -class PersistedEventPosition: - """Position of a newly persisted event with instance that persisted it. + def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]: + """Returns the stream ID for the given key.""" + return getattr(self, key.value) - This can be used to test whether the event is persisted before or after a - RoomStreamToken. - """ + +StreamToken.START = StreamToken(RoomStreamToken(stream=0), 0, 0, 0, 0, 0, 0, 0, 0, 0) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PersistedPosition: + """Position of a newly persisted row with instance that persisted it.""" instance_name: str stream: int - def persisted_after(self, token: RoomStreamToken) -> bool: + def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: return token.get_stream_pos_for_instance(self.instance_name) < self.stream + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PersistedEventPosition(PersistedPosition): + """Position of a newly persisted event with instance that persisted it. + + This can be used to test whether the event is persisted before or after a + RoomStreamToken. + """ + def to_room_stream_token(self) -> RoomStreamToken: """Converts the position to a room stream token such that events persisted in the same room after this position will be after the @@ -836,7 +915,7 @@ class PersistedEventPosition: """ # Doing the naive thing satisfies the desired properties described in # the docstring. - return RoomStreamToken(None, self.stream) + return RoomStreamToken(stream=self.stream) @attr.s(slots=True, frozen=True, auto_attribs=True) |