diff options
Diffstat (limited to 'synapse')
52 files changed, 599 insertions, 696 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 2178e623da..d8190f92ab 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -13,12 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Optional +from typing import List, Optional, Tuple import pymacaroons from netaddr import IPAddress -from twisted.internet import defer from twisted.web.server import Request import synapse.types @@ -80,13 +79,14 @@ class Auth(object): self._track_appservice_user_ips = hs.config.track_appservice_user_ips self._macaroon_secret_key = hs.config.macaroon_secret_key - @defer.inlineCallbacks - def check_from_context(self, room_version: str, event, context, do_sig_check=True): - prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids()) - auth_events_ids = yield self.compute_auth_events( + async def check_from_context( + self, room_version: str, event, context, do_sig_check=True + ): + prev_state_ids = await context.get_prev_state_ids() + auth_events_ids = self.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events = yield self.store.get_events(auth_events_ids) + auth_events = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events.values()} room_version_obj = KNOWN_ROOM_VERSIONS[room_version] @@ -94,14 +94,13 @@ class Auth(object): room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check ) - @defer.inlineCallbacks - def check_user_in_room( + async def check_user_in_room( self, room_id: str, user_id: str, current_state: Optional[StateMap[EventBase]] = None, allow_departed_users: bool = False, - ): + ) -> EventBase: """Check if the user is in the room, or was at some point. Args: room_id: The room to check. @@ -119,37 +118,35 @@ class Auth(object): Raises: AuthError if the user is/was not in the room. Returns: - Deferred[Optional[EventBase]]: - Membership event for the user if the user was in the - room. This will be the join event if they are currently joined to - the room. This will be the leave event if they have left the room. + Membership event for the user if the user was in the + room. This will be the join event if they are currently joined to + the room. This will be the leave event if they have left the room. """ if current_state: member = current_state.get((EventTypes.Member, user_id), None) else: - member = yield defer.ensureDeferred( - self.state.get_current_state( - room_id=room_id, event_type=EventTypes.Member, state_key=user_id - ) + member = await self.state.get_current_state( + room_id=room_id, event_type=EventTypes.Member, state_key=user_id ) - membership = member.membership if member else None - if membership == Membership.JOIN: - return member + if member: + membership = member.membership - # XXX this looks totally bogus. Why do we not allow users who have been banned, - # or those who were members previously and have been re-invited? - if allow_departed_users and membership == Membership.LEAVE: - forgot = yield self.store.did_forget(user_id, room_id) - if not forgot: + if membership == Membership.JOIN: return member + # XXX this looks totally bogus. Why do we not allow users who have been banned, + # or those who were members previously and have been re-invited? + if allow_departed_users and membership == Membership.LEAVE: + forgot = await self.store.did_forget(user_id, room_id) + if not forgot: + return member + raise AuthError(403, "User %s not in room %s" % (user_id, room_id)) - @defer.inlineCallbacks - def check_host_in_room(self, room_id, host): + async def check_host_in_room(self, room_id, host): with Measure(self.clock, "check_host_in_room"): - latest_event_ids = yield self.store.is_host_joined(room_id, host) + latest_event_ids = await self.store.is_host_joined(room_id, host) return latest_event_ids def can_federate(self, event, auth_events): @@ -160,14 +157,13 @@ class Auth(object): def get_public_keys(self, invite_event): return event_auth.get_public_keys(invite_event) - @defer.inlineCallbacks - def get_user_by_req( + async def get_user_by_req( self, request: Request, allow_guest: bool = False, rights: str = "access", allow_expired: bool = False, - ): + ) -> synapse.types.Requester: """ Get a registered user's ID. Args: @@ -180,7 +176,7 @@ class Auth(object): /login will deliver access tokens regardless of expiration. Returns: - defer.Deferred: resolves to a `synapse.types.Requester` object + Resolves to the requester Raises: InvalidClientCredentialsError if no user by that token exists or the token is invalid. @@ -194,14 +190,14 @@ class Auth(object): access_token = self.get_access_token_from_request(request) - user_id, app_service = yield self._get_appservice_user_id(request) + user_id, app_service = await self._get_appservice_user_id(request) if user_id: request.authenticated_entity = user_id opentracing.set_tag("authenticated_entity", user_id) opentracing.set_tag("appservice_id", app_service.id) if ip_addr and self._track_appservice_user_ips: - yield self.store.insert_client_ip( + await self.store.insert_client_ip( user_id=user_id, access_token=access_token, ip=ip_addr, @@ -211,7 +207,7 @@ class Auth(object): return synapse.types.create_requester(user_id, app_service=app_service) - user_info = yield self.get_user_by_access_token( + user_info = await self.get_user_by_access_token( access_token, rights, allow_expired=allow_expired ) user = user_info["user"] @@ -221,7 +217,7 @@ class Auth(object): # Deny the request if the user account has expired. if self._account_validity.enabled and not allow_expired: user_id = user.to_string() - expiration_ts = yield self.store.get_expiration_ts_for_user(user_id) + expiration_ts = await self.store.get_expiration_ts_for_user(user_id) if ( expiration_ts is not None and self.clock.time_msec() >= expiration_ts @@ -235,7 +231,7 @@ class Auth(object): device_id = user_info.get("device_id") if user and access_token and ip_addr: - yield self.store.insert_client_ip( + await self.store.insert_client_ip( user_id=user.to_string(), access_token=access_token, ip=ip_addr, @@ -261,8 +257,7 @@ class Auth(object): except KeyError: raise MissingClientTokenError() - @defer.inlineCallbacks - def _get_appservice_user_id(self, request): + async def _get_appservice_user_id(self, request): app_service = self.store.get_app_service_by_token( self.get_access_token_from_request(request) ) @@ -283,14 +278,13 @@ class Auth(object): if not app_service.is_interested_in_user(user_id): raise AuthError(403, "Application service cannot masquerade as this user.") - if not (yield self.store.get_user_by_id(user_id)): + if not (await self.store.get_user_by_id(user_id)): raise AuthError(403, "Application service has not registered this user") return user_id, app_service - @defer.inlineCallbacks - def get_user_by_access_token( + async def get_user_by_access_token( self, token: str, rights: str = "access", allow_expired: bool = False, - ): + ) -> dict: """ Validate access token and get user_id from it Args: @@ -300,7 +294,7 @@ class Auth(object): allow_expired: If False, raises an InvalidClientTokenError if the token is expired Returns: - Deferred[dict]: dict that includes: + dict that includes: `user` (UserID) `is_guest` (bool) `token_id` (int|None): access token id. May be None if guest @@ -314,7 +308,7 @@ class Auth(object): if rights == "access": # first look in the database - r = yield self._look_up_user_by_access_token(token) + r = await self._look_up_user_by_access_token(token) if r: valid_until_ms = r["valid_until_ms"] if ( @@ -352,7 +346,7 @@ class Auth(object): # It would of course be much easier to store guest access # tokens in the database as well, but that would break existing # guest tokens. - stored_user = yield self.store.get_user_by_id(user_id) + stored_user = await self.store.get_user_by_id(user_id) if not stored_user: raise InvalidClientTokenError("Unknown user_id %s" % user_id) if not stored_user["is_guest"]: @@ -482,9 +476,8 @@ class Auth(object): now = self.hs.get_clock().time_msec() return now < expiry - @defer.inlineCallbacks - def _look_up_user_by_access_token(self, token): - ret = yield self.store.get_user_by_access_token(token) + async def _look_up_user_by_access_token(self, token): + ret = await self.store.get_user_by_access_token(token) if not ret: return None @@ -507,7 +500,7 @@ class Auth(object): logger.warning("Unrecognised appservice access token.") raise InvalidClientTokenError() request.authenticated_entity = service.sender - return defer.succeed(service) + return service async def is_server_admin(self, user: UserID) -> bool: """ Check if the given user is a local server admin. @@ -522,7 +515,7 @@ class Auth(object): def compute_auth_events( self, event, current_state_ids: StateMap[str], for_verification: bool = False, - ): + ) -> List[str]: """Given an event and current state return the list of event IDs used to auth an event. @@ -530,11 +523,11 @@ class Auth(object): should be added to the event's `auth_events`. Returns: - defer.Deferred(list[str]): List of event IDs. + List of event IDs. """ if event.type == EventTypes.Create: - return defer.succeed([]) + return [] # Currently we ignore the `for_verification` flag even though there are # some situations where we can drop particular auth events when adding @@ -553,7 +546,7 @@ class Auth(object): if auth_ev_id: auth_ids.append(auth_ev_id) - return defer.succeed(auth_ids) + return auth_ids async def check_can_change_room_list(self, room_id: str, user: UserID): """Determine whether the user is allowed to edit the room's entry in the @@ -636,10 +629,9 @@ class Auth(object): return query_params[0].decode("ascii") - @defer.inlineCallbacks - def check_user_in_room_or_world_readable( + async def check_user_in_room_or_world_readable( self, room_id: str, user_id: str, allow_departed_users: bool = False - ): + ) -> Tuple[str, Optional[str]]: """Checks that the user is or was in the room or the room is world readable. If it isn't then an exception is raised. @@ -650,10 +642,9 @@ class Auth(object): members but have now departed Returns: - Deferred[tuple[str, str|None]]: Resolves to the current membership of - the user in the room and the membership event ID of the user. If - the user is not in the room and never has been, then - `(Membership.JOIN, None)` is returned. + Resolves to the current membership of the user in the room and the + membership event ID of the user. If the user is not in the room and + never has been, then `(Membership.JOIN, None)` is returned. """ try: @@ -662,15 +653,13 @@ class Auth(object): # * The user is a non-guest user, and was ever in the room # * The user is a guest user, and has joined the room # else it will throw. - member_event = yield self.check_user_in_room( + member_event = await self.check_user_in_room( room_id, user_id, allow_departed_users=allow_departed_users ) return member_event.membership, member_event.event_id except AuthError: - visibility = yield defer.ensureDeferred( - self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) + visibility = await self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" ) if ( visibility diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py index 5c499b6b4e..49093bf181 100644 --- a/synapse/api/auth_blocking.py +++ b/synapse/api/auth_blocking.py @@ -15,8 +15,6 @@ import logging -from twisted.internet import defer - from synapse.api.constants import LimitBlockingTypes, UserTypes from synapse.api.errors import Codes, ResourceLimitError from synapse.config.server import is_threepid_reserved @@ -36,8 +34,7 @@ class AuthBlocking(object): self._limit_usage_by_mau = hs.config.limit_usage_by_mau self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids - @defer.inlineCallbacks - def check_auth_blocking(self, user_id=None, threepid=None, user_type=None): + async def check_auth_blocking(self, user_id=None, threepid=None, user_type=None): """Checks if the user should be rejected for some external reason, such as monthly active user limiting or global disable flag @@ -60,7 +57,7 @@ class AuthBlocking(object): if user_id is not None: if user_id == self._server_notices_mxid: return - if (yield self.store.is_support_user(user_id)): + if await self.store.is_support_user(user_id): return if self._hs_disabled: @@ -76,11 +73,11 @@ class AuthBlocking(object): # If the user is already part of the MAU cohort or a trial user if user_id: - timestamp = yield self.store.user_last_seen_monthly_active(user_id) + timestamp = await self.store.user_last_seen_monthly_active(user_id) if timestamp: return - is_trial = yield self.store.is_trial_user(user_id) + is_trial = await self.store.is_trial_user(user_id) if is_trial: return elif threepid: @@ -93,7 +90,7 @@ class AuthBlocking(object): # allow registration. Support users are excluded from MAU checks. return # Else if there is no room in the MAU bucket, bail - current_mau = yield self.store.get_monthly_active_count() + current_mau = await self.store.get_monthly_active_count() if current_mau >= self._max_mau_value: raise ResourceLimitError( 403, diff --git a/synapse/api/errors.py b/synapse/api/errors.py index b3bab1aa52..6e40630ab6 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -238,14 +238,16 @@ class InteractiveAuthIncompleteError(Exception): (This indicates we should return a 401 with 'result' as the body) Attributes: + session_id: The ID of the ongoing interactive auth session. result: the server response to the request, which should be passed back to the client """ - def __init__(self, result: "JsonDict"): + def __init__(self, session_id: str, result: "JsonDict"): super(InteractiveAuthIncompleteError, self).__init__( "Interactive auth not yet complete" ) + self.session_id = session_id self.result = result diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index f988f62a1e..7393d6cb74 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -21,8 +21,6 @@ import jsonschema from canonicaljson import json from jsonschema import FormatChecker -from twisted.internet import defer - from synapse.api.constants import EventContentFields from synapse.api.errors import SynapseError from synapse.storage.presence import UserPresenceState @@ -137,9 +135,8 @@ class Filtering(object): super(Filtering, self).__init__() self.store = hs.get_datastore() - @defer.inlineCallbacks - def get_user_filter(self, user_localpart, filter_id): - result = yield self.store.get_user_filter(user_localpart, filter_id) + async def get_user_filter(self, user_localpart, filter_id): + result = await self.store.get_user_filter(user_localpart, filter_id) return FilterCollection(result) def add_user_filter(self, user_localpart, user_filter): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1a16d0b9f8..7957586d69 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -123,6 +123,7 @@ from synapse.rest.client.v2_alpha.account_data import ( from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.versions import VersionsRestServlet +from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.databases.main.censor_events import CensorEventsStore @@ -493,7 +494,10 @@ class GenericWorkerServer(HomeServer): site_tag = listener_config.http_options.tag if site_tag is None: site_tag = port - resources = {} + + # We always include a health resource. + resources = {"/health": HealthResource()} + for res in listener_config.http_options.resources: for name in res.names: if name == "metrics": diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d87a77718e..98d0d14a12 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -68,6 +68,7 @@ from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.rest import ClientRestResource from synapse.rest.admin import AdminRestResource +from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.well_known import WellKnownResource from synapse.server import HomeServer @@ -98,7 +99,9 @@ class SynapseHomeServer(HomeServer): if site_tag is None: site_tag = port - resources = {} + # We always include a health resource. + resources = {"/health": HealthResource()} + for res in listener_config.http_options.resources: for name in res.names: if name == "openid" and "federation" in res.names: diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index a5a2a7815d..777c0f00b1 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -48,6 +48,14 @@ class ServerContextFactory(ContextFactory): connections.""" def __init__(self, config): + # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version, + # switch to those (see https://github.com/pyca/cryptography/issues/5379). + # + # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2 + # or v3, but is a synonym for TLS_METHOD, which allows the client and server + # to negotiate an appropriate version of TLS constrained by the version options + # set with context.set_options. + # self._context = SSL.Context(SSL.SSLv23_METHOD) self.configure_context(self._context, config) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 69b53ca2bc..4e179d49b3 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -106,7 +106,7 @@ class EventBuilder(object): state_ids = await self._state.get_current_state_ids( self.room_id, prev_event_ids ) - auth_ids = await self._auth.compute_auth_events(self, state_ids) + auth_ids = self._auth.compute_auth_events(self, state_ids) format_version = self.room_version.event_format if format_version == EventFormatVersions.V1: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fbc56c351b..c9044a5019 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -101,7 +101,7 @@ class ApplicationServicesHandler(object): async def start_scheduler(): try: - return self.scheduler.start() + return await self.scheduler.start() except Exception: logger.error("Application Services Failure") diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index c7d921c21a..c24e7bafe0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -162,7 +162,7 @@ class AuthHandler(BaseHandler): request_body: Dict[str, Any], clientip: str, description: str, - ) -> dict: + ) -> Tuple[dict, str]: """ Checks that the user is who they claim to be, via a UI auth. @@ -183,9 +183,14 @@ class AuthHandler(BaseHandler): describes the operation happening on their account. Returns: - The parameters for this request (which may + A tuple of (params, session_id). + + 'params' contains the parameters for this request (which may have been given only in a previous call). + 'session_id' is the ID of this session, either passed in by the + client or assigned by this call + Raises: InteractiveAuthIncompleteError if the client has not yet completed any of the permitted login flows @@ -207,7 +212,7 @@ class AuthHandler(BaseHandler): flows = [[login_type] for login_type in self._supported_ui_auth_types] try: - result, params, _ = await self.check_auth( + result, params, session_id = await self.check_ui_auth( flows, request, request_body, clientip, description ) except LoginError: @@ -230,7 +235,7 @@ class AuthHandler(BaseHandler): if user_id != requester.user.to_string(): raise AuthError(403, "Invalid auth") - return params + return params, session_id def get_enabled_auth_types(self): """Return the enabled user-interactive authentication types @@ -240,7 +245,7 @@ class AuthHandler(BaseHandler): """ return self.checkers.keys() - async def check_auth( + async def check_ui_auth( self, flows: List[List[str]], request: SynapseRequest, @@ -363,7 +368,7 @@ class AuthHandler(BaseHandler): if not authdict: raise InteractiveAuthIncompleteError( - self._auth_dict_for_flows(flows, session.session_id) + session.session_id, self._auth_dict_for_flows(flows, session.session_id) ) # check auth type currently being presented @@ -410,7 +415,7 @@ class AuthHandler(BaseHandler): ret = self._auth_dict_for_flows(flows, session.session_id) ret["completed"] = list(creds) ret.update(errordict) - raise InteractiveAuthIncompleteError(ret) + raise InteractiveAuthIncompleteError(session.session_id, ret) async def add_oob_auth( self, stagetype: str, authdict: Dict[str, Any], clientip: str diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b3764dedae..593932adb7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2064,7 +2064,7 @@ class FederationHandler(BaseHandler): if not auth_events: prev_state_ids = await context.get_prev_state_ids() - auth_events_ids = await self.auth.compute_auth_events( + auth_events_ids = self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) auth_events_x = await self.store.get_events(auth_events_ids) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 43901d0934..708533d4d1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1061,7 +1061,7 @@ class EventCreationHandler(object): raise SynapseError(400, "Cannot redact event from a different room") prev_state_ids = await context.get_prev_state_ids() - auth_events_ids = await self.auth.compute_auth_events( + auth_events_ids = self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) auth_events = await self.store.get_events(auth_events_ids) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5a19bac929..c42dac18f5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -103,7 +103,6 @@ class JoinedSyncResult: account_data = attr.ib(type=List[JsonDict]) unread_notifications = attr.ib(type=JsonDict) summary = attr.ib(type=Optional[JsonDict]) - unread_count = attr.ib(type=int) def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used @@ -1887,10 +1886,6 @@ class SyncHandler(object): if room_builder.rtype == "joined": unread_notifications = {} # type: Dict[str, str] - - unread_count = await self.store.get_unread_message_count_for_user( - room_id, sync_config.user.to_string(), - ) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -1899,7 +1894,6 @@ class SyncHandler(object): account_data=account_data_events, unread_notifications=unread_notifications, summary=summary, - unread_count=unread_count, ) if room_sync or always_include: diff --git a/synapse/http/server.py b/synapse/http/server.py index 94ab29974a..ffe6cfa09e 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -25,7 +25,7 @@ from io import BytesIO from typing import Any, Callable, Dict, Tuple, Union import jinja2 -from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json +from canonicaljson import encode_canonical_json, encode_pretty_printed_json from twisted.internet import defer from twisted.python import failure @@ -46,6 +46,7 @@ from synapse.api.errors import ( from synapse.http.site import SynapseRequest from synapse.logging.context import preserve_fn from synapse.logging.opentracing import trace_servlet +from synapse.util import json_encoder from synapse.util.caches import intern_dict logger = logging.getLogger(__name__) @@ -538,7 +539,7 @@ def respond_with_json( # canonicaljson already encodes to bytes json_bytes = encode_canonical_json(json_object) else: - json_bytes = json.dumps(json_object).encode("utf-8") + json_bytes = json_encoder.encode(json_object).encode("utf-8") return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors) diff --git a/synapse/http/site.py b/synapse/http/site.py index 6f3b2258cc..6e79b47828 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -146,10 +146,9 @@ class SynapseRequest(Request): Returns a context manager; the correct way to use this is: - @defer.inlineCallbacks - def handle_request(request): + async def handle_request(request): with request.processing("FooServlet"): - yield really_handle_the_request() + await really_handle_the_request() Once the context manager is closed, the completion of the request will be logged, and the various metrics will be updated. @@ -287,7 +286,9 @@ class SynapseRequest(Request): # the connection dropped) code += "!" - self.site.access_logger.info( + log_level = logging.INFO if self._should_log_request() else logging.DEBUG + self.site.access_logger.log( + log_level, "%s - %s - {%s}" " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" ' %sB %s "%s %s %s" "%s" [%d dbevts]', @@ -315,6 +316,17 @@ class SynapseRequest(Request): except Exception as e: logger.warning("Failed to stop metrics: %r", e) + def _should_log_request(self) -> bool: + """Whether we should log at INFO that we processed the request. + """ + if self.path == b"/health": + return False + + if self.method == b"OPTIONS": + return False + + return True + class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index a9269196b3..f766d16db6 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,16 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging import threading -from asyncio import iscoroutine from functools import wraps from typing import TYPE_CHECKING, Dict, Optional, Set from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from twisted.python.failure import Failure from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -167,7 +166,7 @@ class _BackgroundProcess(object): ) -def run_as_background_process(desc, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs): normal synapse inlineCallbacks function). Args: - desc (str): a description for this background process type + desc: a description for this background process type func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs): follow the synapse logcontext rules. """ - @defer.inlineCallbacks - def run(): + async def run(): with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs): try: result = func(*args, **kwargs) - # We probably don't have an ensureDeferred in our call stack to handle - # coroutine results, so we need to ensureDeferred here. - # - # But we need this check because ensureDeferred doesn't like being - # called on immediate values (as opposed to Deferreds or coroutines). - if iscoroutine(result): - result = defer.ensureDeferred(result) + if inspect.isawaitable(result): + result = await result - return (yield result) + return result except Exception: - # failure.Failure() fishes the original Failure out of our stack, and - # thus gives us a sensible stack trace. - f = Failure() - logger.error( - "Background process '%s' threw an exception", - desc, - exc_info=(f.type, f.value, f.getTracebackObject()), + logger.exception( + "Background process '%s' threw an exception", desc, ) finally: _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): - return run() + # Note that we return a Deferred here so that it can be used in a + # looping_call and other places that expect a Deferred. + return defer.ensureDeferred(run()) def wrap_as_background_process(desc): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 8201849951..c2fb757d9a 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -194,12 +194,16 @@ class ModuleApi(object): synapse.api.errors.AuthError: the access token is invalid """ # see if the access token corresponds to a device - user_info = yield self._auth.get_user_by_access_token(access_token) + user_info = yield defer.ensureDeferred( + self._auth.get_user_by_access_token(access_token) + ) device_id = user_info.get("device_id") user_id = user_info["user"].to_string() if device_id: # delete the device, which will also delete its access tokens - yield self._hs.get_device_handler().delete_device(user_id, device_id) + yield defer.ensureDeferred( + self._hs.get_device_handler().delete_device(user_id, device_id) + ) else: # no associated device. Just delete the access token. yield defer.ensureDeferred( diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 04b9d8ac82..e7fcee0e87 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -120,7 +120,7 @@ class BulkPushRuleEvaluator(object): pl_event = await self.store.get_event(pl_event_id) auth_events = {POWER_KEY: pl_event} else: - auth_events_ids = await self.auth.compute_auth_events( + auth_events_ids = self.auth.compute_auth_events( event, prev_state_ids, for_verification=False ) auth_events = await self.store.get_events(auth_events_ids) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index bc8f71916b..d0145666bf 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -21,13 +21,22 @@ async def get_badge_count(store, user_id): invites = await store.get_invited_rooms_for_local_user(user_id) joins = await store.get_rooms_for_user(user_id) + my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read") + badge = len(invites) for room_id in joins: - unread_count = await store.get_unread_message_count_for_user(room_id, user_id) - # return one badge count per conversation, as count per - # message is so noisy as to be almost useless - badge += 1 if unread_count else 0 + if room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[room_id] + + notifs = await ( + store.get_unread_event_push_actions_by_room_for_user( + room_id, user_id, last_unread_event_id + ) + ) + # return one badge count per conversation, as count per + # message is so noisy as to be almost useless + badge += 1 if notifs["notify_count"] else 0 return badge diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 60dd3f6701..a6fdedde63 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -28,7 +28,7 @@ class SlavedClientIpStore(BaseSlavedStore): name="client_ip_last_seen", keylen=4, max_entries=50000 ) - def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): + async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) key = (user_id, access_token, ip) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index f33801f883..d853e4447e 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -18,11 +18,12 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are allowed to be sent by which side. """ import abc -import json import logging from typing import Tuple, Type -_json_encoder = json.JSONEncoder() +from canonicaljson import json + +from synapse.util import json_encoder as _json_encoder logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 5934b1fe8b..b210015173 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -89,7 +89,7 @@ class ClientDirectoryServer(RestServlet): dir_handler = self.handlers.directory_handler try: - service = await self.auth.get_appservice_by_req(request) + service = self.auth.get_appservice_by_req(request) room_alias = RoomAlias.from_string(room_alias) await dir_handler.delete_appservice_association(service, room_alias) logger.info( diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 3767a809a4..fead85074b 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -18,7 +18,12 @@ import logging from http import HTTPStatus from synapse.api.constants import LoginType -from synapse.api.errors import Codes, SynapseError, ThreepidValidationError +from synapse.api.errors import ( + Codes, + InteractiveAuthIncompleteError, + SynapseError, + ThreepidValidationError, +) from synapse.config.emailconfig import ThreepidBehaviour from synapse.http.server import finish_request, respond_with_html from synapse.http.servlet import ( @@ -239,18 +244,12 @@ class PasswordRestServlet(RestServlet): # we do basic sanity checks here because the auth layer will store these # in sessions. Pull out the new password provided to us. - if "new_password" in body: - new_password = body.pop("new_password") + new_password = body.pop("new_password", None) + if new_password is not None: if not isinstance(new_password, str) or len(new_password) > 512: raise SynapseError(400, "Invalid password") self.password_policy_handler.validate_password(new_password) - # If the password is valid, hash it and store it back on the body. - # This ensures that only the hashed password is handled everywhere. - if "new_password_hash" in body: - raise SynapseError(400, "Unexpected property: new_password_hash") - body["new_password_hash"] = await self.auth_handler.hash(new_password) - # there are two possibilities here. Either the user does not have an # access token, and needs to do a password reset; or they have one and # need to validate their identity. @@ -263,23 +262,49 @@ class PasswordRestServlet(RestServlet): if self.auth.has_access_token(request): requester = await self.auth.get_user_by_req(request) - params = await self.auth_handler.validate_user_via_ui_auth( - requester, - request, - body, - self.hs.get_ip_from_request(request), - "modify your account password", - ) + try: + params, session_id = await self.auth_handler.validate_user_via_ui_auth( + requester, + request, + body, + self.hs.get_ip_from_request(request), + "modify your account password", + ) + except InteractiveAuthIncompleteError as e: + # The user needs to provide more steps to complete auth, but + # they're not required to provide the password again. + # + # If a password is available now, hash the provided password and + # store it for later. + if new_password: + password_hash = await self.auth_handler.hash(new_password) + await self.auth_handler.set_session_data( + e.session_id, "password_hash", password_hash + ) + raise user_id = requester.user.to_string() else: requester = None - result, params, _ = await self.auth_handler.check_auth( - [[LoginType.EMAIL_IDENTITY]], - request, - body, - self.hs.get_ip_from_request(request), - "modify your account password", - ) + try: + result, params, session_id = await self.auth_handler.check_ui_auth( + [[LoginType.EMAIL_IDENTITY]], + request, + body, + self.hs.get_ip_from_request(request), + "modify your account password", + ) + except InteractiveAuthIncompleteError as e: + # The user needs to provide more steps to complete auth, but + # they're not required to provide the password again. + # + # If a password is available now, hash the provided password and + # store it for later. + if new_password: + password_hash = await self.auth_handler.hash(new_password) + await self.auth_handler.set_session_data( + e.session_id, "password_hash", password_hash + ) + raise if LoginType.EMAIL_IDENTITY in result: threepid = result[LoginType.EMAIL_IDENTITY] @@ -304,12 +329,21 @@ class PasswordRestServlet(RestServlet): logger.error("Auth succeeded but no known type! %r", result.keys()) raise SynapseError(500, "", Codes.UNKNOWN) - assert_params_in_dict(params, ["new_password_hash"]) - new_password_hash = params["new_password_hash"] + # If we have a password in this request, prefer it. Otherwise, there + # must be a password hash from an earlier request. + if new_password: + password_hash = await self.auth_handler.hash(new_password) + else: + password_hash = await self.auth_handler.get_session_data( + session_id, "password_hash", None + ) + if not password_hash: + raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM) + logout_devices = params.get("logout_devices", True) await self._set_password_handler.set_password( - user_id, new_password_hash, logout_devices, requester + user_id, password_hash, logout_devices, requester ) return 200, {} diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 370742ce59..f808175698 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -24,6 +24,7 @@ import synapse.types from synapse.api.constants import LoginType from synapse.api.errors import ( Codes, + InteractiveAuthIncompleteError, SynapseError, ThreepidValidationError, UnrecognizedRequestError, @@ -387,6 +388,7 @@ class RegisterRestServlet(RestServlet): self.ratelimiter = hs.get_registration_ratelimiter() self.password_policy_handler = hs.get_password_policy_handler() self.clock = hs.get_clock() + self._registration_enabled = self.hs.config.enable_registration self._registration_flows = _calculate_registration_flows( hs.config, self.auth_handler @@ -412,20 +414,8 @@ class RegisterRestServlet(RestServlet): "Do not understand membership kind: %s" % (kind.decode("utf8"),) ) - # we do basic sanity checks here because the auth layer will store these - # in sessions. Pull out the username/password provided to us. - if "password" in body: - password = body.pop("password") - if not isinstance(password, str) or len(password) > 512: - raise SynapseError(400, "Invalid password") - self.password_policy_handler.validate_password(password) - - # If the password is valid, hash it and store it back on the body. - # This ensures that only the hashed password is handled everywhere. - if "password_hash" in body: - raise SynapseError(400, "Unexpected property: password_hash") - body["password_hash"] = await self.auth_handler.hash(password) - + # Pull out the provided username and do basic sanity checks early since + # the auth layer will store these in sessions. desired_username = None if "username" in body: if not isinstance(body["username"], str) or len(body["username"]) > 512: @@ -434,7 +424,7 @@ class RegisterRestServlet(RestServlet): appservice = None if self.auth.has_access_token(request): - appservice = await self.auth.get_appservice_by_req(request) + appservice = self.auth.get_appservice_by_req(request) # fork off as soon as possible for ASes which have completely # different registration flows to normal users @@ -459,22 +449,35 @@ class RegisterRestServlet(RestServlet): ) return 200, result # we throw for non 200 responses - # for regular registration, downcase the provided username before - # attempting to register it. This should mean - # that people who try to register with upper-case in their usernames - # don't get a nasty surprise. (Note that we treat username - # case-insenstively in login, so they are free to carry on imagining - # that their username is CrAzYh4cKeR if that keeps them happy) - if desired_username is not None: - desired_username = desired_username.lower() - # == Normal User Registration == (everyone else) - if not self.hs.config.enable_registration: + if not self._registration_enabled: raise SynapseError(403, "Registration has been disabled") + # For regular registration, convert the provided username to lowercase + # before attempting to register it. This should mean that people who try + # to register with upper-case in their usernames don't get a nasty surprise. + # + # Note that we treat usernames case-insensitively in login, so they are + # free to carry on imagining that their username is CrAzYh4cKeR if that + # keeps them happy. + if desired_username is not None: + desired_username = desired_username.lower() + + # Check if this account is upgrading from a guest account. guest_access_token = body.get("guest_access_token", None) - if "initial_device_display_name" in body and "password_hash" not in body: + # Pull out the provided password and do basic sanity checks early. + # + # Note that we remove the password from the body since the auth layer + # will store the body in the session and we don't want a plaintext + # password store there. + password = body.pop("password", None) + if password is not None: + if not isinstance(password, str) or len(password) > 512: + raise SynapseError(400, "Invalid password") + self.password_policy_handler.validate_password(password) + + if "initial_device_display_name" in body and password is None: # ignore 'initial_device_display_name' if sent without # a password to work around a client bug where it sent # the 'initial_device_display_name' param alone, wiping out @@ -484,6 +487,7 @@ class RegisterRestServlet(RestServlet): session_id = self.auth_handler.get_session_id(body) registered_user_id = None + password_hash = None if session_id: # if we get a registered user id out of here, it means we previously # registered a user for this session, so we could just return the @@ -492,7 +496,12 @@ class RegisterRestServlet(RestServlet): registered_user_id = await self.auth_handler.get_session_data( session_id, "registered_user_id", None ) + # Extract the previously-hashed password from the session. + password_hash = await self.auth_handler.get_session_data( + session_id, "password_hash", None + ) + # Ensure that the username is valid. if desired_username is not None: await self.registration_handler.check_username( desired_username, @@ -500,20 +509,38 @@ class RegisterRestServlet(RestServlet): assigned_user_id=registered_user_id, ) - auth_result, params, session_id = await self.auth_handler.check_auth( - self._registration_flows, - request, - body, - self.hs.get_ip_from_request(request), - "register a new account", - ) + # Check if the user-interactive authentication flows are complete, if + # not this will raise a user-interactive auth error. + try: + auth_result, params, session_id = await self.auth_handler.check_ui_auth( + self._registration_flows, + request, + body, + self.hs.get_ip_from_request(request), + "register a new account", + ) + except InteractiveAuthIncompleteError as e: + # The user needs to provide more steps to complete auth. + # + # Hash the password and store it with the session since the client + # is not required to provide the password again. + # + # If a password hash was previously stored we will not attempt to + # re-hash and store it for efficiency. This assumes the password + # does not change throughout the authentication flow, but this + # should be fine since the data is meant to be consistent. + if not password_hash and password: + password_hash = await self.auth_handler.hash(password) + await self.auth_handler.set_session_data( + e.session_id, "password_hash", password_hash + ) + raise # Check that we're not trying to register a denied 3pid. # # the user-facing checks will probably already have happened in # /register/email/requestToken when we requested a 3pid, but that's not # guaranteed. - if auth_result: for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]: if login_type in auth_result: @@ -535,12 +562,15 @@ class RegisterRestServlet(RestServlet): # don't re-register the threepids registered = False else: - # NB: This may be from the auth handler and NOT from the POST - assert_params_in_dict(params, ["password_hash"]) + # If we have a password in this request, prefer it. Otherwise, there + # might be a password hash from an earlier request. + if password: + password_hash = await self.auth_handler.hash(password) + if not password_hash: + raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM) desired_username = params.get("username", None) guest_access_token = params.get("guest_access_token", None) - new_password_hash = params.get("password_hash", None) if desired_username is not None: desired_username = desired_username.lower() @@ -582,7 +612,7 @@ class RegisterRestServlet(RestServlet): registered_user_id = await self.registration_handler.register_user( localpart=desired_username, - password_hash=new_password_hash, + password_hash=password_hash, guest_access_token=guest_access_token, threepid=threepid, address=client_addr, @@ -595,8 +625,8 @@ class RegisterRestServlet(RestServlet): ): await self.store.upsert_monthly_active_user(registered_user_id) - # remember that we've now registered that user account, and with - # what user ID (since the user may not have specified) + # Remember that the user account has been registered (and the user + # ID it was registered with, since it might not have been specified). await self.auth_handler.set_session_data( session_id, "registered_user_id", registered_user_id ) @@ -635,7 +665,7 @@ class RegisterRestServlet(RestServlet): (object) params: registration parameters, from which we pull device_id, initial_device_name and inhibit_login Returns: - defer.Deferred: (object) dictionary for response from /register + (object) dictionary for response from /register """ result = {"user_id": user_id, "home_server": self.hs.hostname} if not params.get("inhibit_login", False): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 3f5bf75e59..a5c24fbd63 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -426,7 +426,6 @@ class SyncRestServlet(RestServlet): result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications result["summary"] = room.summary - result["org.matrix.msc2654.unread_count"] = room.unread_count return result diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index 4386eb4e72..b3e4d5612e 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -22,8 +22,6 @@ from os import path import jinja2 from jinja2 import TemplateNotFound -from twisted.internet import defer - from synapse.api.errors import NotFoundError, StoreError, SynapseError from synapse.config import ConfigError from synapse.http.server import DirectServeHtmlResource, respond_with_html @@ -135,7 +133,7 @@ class ConsentResource(DirectServeHtmlResource): else: qualified_user_id = UserID(username, self.hs.hostname).to_string() - u = await defer.maybeDeferred(self.store.get_user_by_id, qualified_user_id) + u = await self.store.get_user_by_id(qualified_user_id) if u is None: raise NotFoundError("Unknown user") diff --git a/synapse/rest/health.py b/synapse/rest/health.py new file mode 100644 index 0000000000..0170950bf3 --- /dev/null +++ b/synapse/rest/health.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.resource import Resource + + +class HealthResource(Resource): + """A resource that does nothing except return a 200 with a body of `OK`, + which can be used as a health check. + + Note: `SynapseRequest._should_log_request` ensures that requests to + `/health` do not get logged at INFO. + """ + + isLeaf = 1 + + def render_GET(self, request): + request.setHeader(b"Content-Type", b"text/plain") + return b"OK" diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index f4768a9e8b..cd8c246594 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -27,9 +27,7 @@ from typing import Dict, Optional from urllib import parse as urlparse import attr -from canonicaljson import json -from twisted.internet import defer from twisted.internet.error import DNSLookupError from synapse.api.errors import Codes, SynapseError @@ -43,6 +41,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.media.v1._base import get_filename_from_headers +from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.stringutils import random_string @@ -228,7 +227,7 @@ class PreviewUrlResource(DirectServeJsonResource): else: logger.info("Returning cached response") - og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe)) + og = await make_deferred_yieldable(observable.observe()) respond_with_json_bytes(request, 200, og, send_cors=True) async def _do_preview(self, url: str, user: str, ts: int) -> bytes: @@ -355,7 +354,7 @@ class PreviewUrlResource(DirectServeJsonResource): logger.debug("Calculated OG for %s as %s", url, og) - jsonog = json.dumps(og) + jsonog = json_encoder.encode(og) # store OG in history-aware DB cache await self.store.store_url_cache( diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 2193d8fdc5..cf039e7f7d 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -18,13 +18,12 @@ import abc import logging from typing import List, Tuple -from canonicaljson import json - from twisted.internet import defer from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -327,7 +326,7 @@ class AccountDataStore(AccountDataWorkerStore): Returns: A deferred that completes once the account_data has been added. """ - content_json = json.dumps(content) + content_json = json_encoder.encode(content) with self._account_data_id_gen.get_next() as next_id: # no need to lock here as room_account_data has a unique constraint @@ -373,7 +372,7 @@ class AccountDataStore(AccountDataWorkerStore): Returns: A deferred that completes once the account_data has been added. """ - content_json = json.dumps(content) + content_json = json_encoder.encode(content) with self._account_data_id_gen.get_next() as next_id: # no need to lock here as account_data has a unique constraint on diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 683afde52b..10de446065 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -172,7 +172,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate((room_id,)) - self.get_unread_message_count_for_user.invalidate_many((room_id,)) self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) if not backfilled: diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 712c8d0264..216a5925fc 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -14,8 +14,7 @@ # limitations under the License. import logging - -from twisted.internet import defer +from typing import Dict, Optional, Tuple from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore @@ -82,21 +81,19 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): "devices_last_seen", self._devices_last_seen_update ) - @defer.inlineCallbacks - def _remove_user_ip_nonunique(self, progress, batch_size): + async def _remove_user_ip_nonunique(self, progress, batch_size): def f(conn): txn = conn.cursor() txn.execute("DROP INDEX IF EXISTS user_ips_user_ip") txn.close() - yield self.db_pool.runWithConnection(f) - yield self.db_pool.updates._end_background_update( + await self.db_pool.runWithConnection(f) + await self.db_pool.updates._end_background_update( "user_ips_drop_nonunique_index" ) return 1 - @defer.inlineCallbacks - def _analyze_user_ip(self, progress, batch_size): + async def _analyze_user_ip(self, progress, batch_size): # Background update to analyze user_ips table before we run the # deduplication background update. The table may not have been analyzed # for ages due to the table locks. @@ -106,14 +103,13 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): def user_ips_analyze(txn): txn.execute("ANALYZE user_ips") - yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze) + await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze) - yield self.db_pool.updates._end_background_update("user_ips_analyze") + await self.db_pool.updates._end_background_update("user_ips_analyze") return 1 - @defer.inlineCallbacks - def _remove_user_ip_dupes(self, progress, batch_size): + async def _remove_user_ip_dupes(self, progress, batch_size): # This works function works by scanning the user_ips table in batches # based on `last_seen`. For each row in a batch it searches the rest of # the table to see if there are any duplicates, if there are then they @@ -140,7 +136,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return None # Get a last seen that has roughly `batch_size` since `begin_last_seen` - end_last_seen = yield self.db_pool.runInteraction( + end_last_seen = await self.db_pool.runInteraction( "user_ips_dups_get_last_seen", get_last_seen ) @@ -275,15 +271,14 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): txn, "user_ips_remove_dupes", {"last_seen": end_last_seen} ) - yield self.db_pool.runInteraction("user_ips_dups_remove", remove) + await self.db_pool.runInteraction("user_ips_dups_remove", remove) if last: - yield self.db_pool.updates._end_background_update("user_ips_remove_dupes") + await self.db_pool.updates._end_background_update("user_ips_remove_dupes") return batch_size - @defer.inlineCallbacks - def _devices_last_seen_update(self, progress, batch_size): + async def _devices_last_seen_update(self, progress, batch_size): """Background update to insert last seen info into devices table """ @@ -346,12 +341,12 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return len(rows) - updated = yield self.db_pool.runInteraction( + updated = await self.db_pool.runInteraction( "_devices_last_seen_update", _devices_last_seen_update_txn ) if not updated: - yield self.db_pool.updates._end_background_update("devices_last_seen") + await self.db_pool.updates._end_background_update("devices_last_seen") return updated @@ -380,8 +375,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): if self.user_ips_max_age: self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) - @defer.inlineCallbacks - def insert_client_ip( + async def insert_client_ip( self, user_id, access_token, ip, user_agent, device_id, now=None ): if not now: @@ -392,7 +386,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): last_seen = self.client_ip_last_seen.get(key) except KeyError: last_seen = None - yield self.populate_monthly_active_users(user_id) + await self.populate_monthly_active_users(user_id) # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return @@ -461,25 +455,25 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): # Failed to upsert, log and continue logger.error("Failed to insert client IP %r: %r", entry, e) - @defer.inlineCallbacks - def get_last_client_ip_by_device(self, user_id, device_id): + async def get_last_client_ip_by_device( + self, user_id: str, device_id: Optional[str] + ) -> Dict[Tuple[str, str], dict]: """For each device_id listed, give the user_ip it was last seen on Args: - user_id (str) - device_id (str): If None fetches all devices for the user + user_id: The user to fetch devices for. + device_id: If None fetches all devices for the user Returns: - defer.Deferred: resolves to a dict, where the keys - are (user_id, device_id) tuples. The values are also dicts, with - keys giving the column names + A dictionary mapping a tuple of (user_id, device_id) to dicts, with + keys giving the column names from the devices table. """ keyvalues = {"user_id": user_id} if device_id is not None: keyvalues["device_id"] = device_id - res = yield self.db_pool.simple_select_list( + res = await self.db_pool.simple_select_list( table="devices", keyvalues=keyvalues, retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), @@ -501,8 +495,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): } return ret - @defer.inlineCallbacks - def get_user_ip_and_agents(self, user): + async def get_user_ip_and_agents(self, user): user_id = user.to_string() results = {} @@ -512,7 +505,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): user_agent, _, last_seen = self._batch_row_update[key] results[(access_token, ip)] = (user_agent, last_seen) - rows = yield self.db_pool.simple_select_list( + rows = await self.db_pool.simple_select_list( table="user_ips", keyvalues={"user_id": user_id}, retcols=["access_token", "ip", "user_agent", "last_seen"], diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 874ecdf8d2..76ec954f44 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -16,13 +16,12 @@ import logging from typing import List, Tuple -from canonicaljson import json - from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool +from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache logger = logging.getLogger(__name__) @@ -354,7 +353,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) ) rows = [] for destination, edu in remote_messages_by_destination.items(): - edu_json = json.dumps(edu) + edu_json = json_encoder.encode(edu) rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) @@ -432,7 +431,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) # Handle wildcard device_ids. sql = "SELECT device_id FROM devices WHERE user_id = ?" txn.execute(sql, (user_id,)) - message_json = json.dumps(messages_by_device["*"]) + message_json = json_encoder.encode(messages_by_device["*"]) for row in txn: # Add the message for all devices for this user on this # server. @@ -454,7 +453,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) # Only insert into the local inbox if the device exists on # this server device = row[0] - message_json = json.dumps(messages_by_device[device]) + message_json = json_encoder.encode(messages_by_device[device]) messages_json_for_user[device] = message_json if messages_json_for_user: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 88a7aadfc6..7a5f0bab05 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -17,8 +17,6 @@ import logging from typing import List, Optional, Set, Tuple -from canonicaljson import json - from twisted.internet import defer from synapse.api.errors import Codes, StoreError @@ -36,6 +34,7 @@ from synapse.storage.database import ( make_tuple_comparison_clause, ) from synapse.types import Collection, get_verify_key_from_cross_signing_key +from synapse.util import json_encoder from synapse.util.caches.descriptors import ( Cache, cached, @@ -137,7 +136,9 @@ class DeviceWorkerStore(SQLBaseStore): master_key_by_user = {} self_signing_key_by_user = {} for user in users: - cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master") + cross_signing_key = yield defer.ensureDeferred( + self.get_e2e_cross_signing_key(user, "master") + ) if cross_signing_key: key_id, verify_key = get_verify_key_from_cross_signing_key( cross_signing_key @@ -150,8 +151,8 @@ class DeviceWorkerStore(SQLBaseStore): "device_id": verify_key.version, } - cross_signing_key = yield self.get_e2e_cross_signing_key( - user, "self_signing" + cross_signing_key = yield defer.ensureDeferred( + self.get_e2e_cross_signing_key(user, "self_signing") ) if cross_signing_key: key_id, verify_key = get_verify_key_from_cross_signing_key( @@ -247,7 +248,7 @@ class DeviceWorkerStore(SQLBaseStore): destination (str): The host the device updates are intended for from_stream_id (int): The minimum stream_id to filter updates by, exclusive query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping - user_id/device_id to update stream_id and the relevent json-encoded + user_id/device_id to update stream_id and the relevant json-encoded opentracing context Returns: @@ -397,7 +398,7 @@ class DeviceWorkerStore(SQLBaseStore): values={ "stream_id": stream_id, "from_user_id": from_user_id, - "user_ids": json.dumps(user_ids), + "user_ids": json_encoder.encode(user_ids), }, ) @@ -600,7 +601,7 @@ class DeviceWorkerStore(SQLBaseStore): between the requested tokens due to the limit. The token returned can be used in a subsequent call to this - function to get further updatees. + function to get further updates. The updates are a list of 2-tuples of stream ID and the row data """ @@ -1032,7 +1033,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, - values={"content": json.dumps(content)}, + values={"content": json_encoder.encode(content)}, # we don't need to lock, because we assume we are the only thread # updating this user's devices. lock=False, @@ -1088,7 +1089,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): { "user_id": user_id, "device_id": content["device_id"], - "content": json.dumps(content), + "content": json_encoder.encode(content), } for content in devices ], @@ -1209,7 +1210,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "device_id": device_id, "sent": False, "ts": now, - "opentracing_context": json.dumps(context) + "opentracing_context": json_encoder.encode(context) if whitelisted_homeserver(destination) else "{}", } diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index 7819bfcbb3..037e02603c 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -14,30 +14,29 @@ # limitations under the License. from collections import namedtuple -from typing import Optional - -from twisted.internet import defer +from typing import Iterable, Optional from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore +from synapse.types import RoomAlias from synapse.util.caches.descriptors import cached RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers")) class DirectoryWorkerStore(SQLBaseStore): - @defer.inlineCallbacks - def get_association_from_room_alias(self, room_alias): - """ Get's the room_id and server list for a given room_alias + async def get_association_from_room_alias( + self, room_alias: RoomAlias + ) -> Optional[RoomAliasMapping]: + """Gets the room_id and server list for a given room_alias Args: - room_alias (RoomAlias) + room_alias: The alias to translate to an ID. Returns: - Deferred: results in namedtuple with keys "room_id" and - "servers" or None if no association can be found + The room alias mapping or None if no association can be found. """ - room_id = yield self.db_pool.simple_select_one_onecol( + room_id = await self.db_pool.simple_select_one_onecol( "room_aliases", {"room_alias": room_alias.to_string()}, "room_id", @@ -48,7 +47,7 @@ class DirectoryWorkerStore(SQLBaseStore): if not room_id: return None - servers = yield self.db_pool.simple_select_onecol( + servers = await self.db_pool.simple_select_onecol( "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", @@ -79,18 +78,20 @@ class DirectoryWorkerStore(SQLBaseStore): class DirectoryStore(DirectoryWorkerStore): - @defer.inlineCallbacks - def create_room_alias_association(self, room_alias, room_id, servers, creator=None): + async def create_room_alias_association( + self, + room_alias: RoomAlias, + room_id: str, + servers: Iterable[str], + creator: Optional[str] = None, + ) -> None: """ Creates an association between a room alias and room_id/servers Args: - room_alias (RoomAlias) - room_id (str) - servers (list) - creator (str): Optional user_id of creator. - - Returns: - Deferred + room_alias: The alias to create. + room_id: The target of the alias. + servers: A list of servers through which it may be possible to join the room + creator: Optional user_id of creator. """ def alias_txn(txn): @@ -118,24 +119,22 @@ class DirectoryStore(DirectoryWorkerStore): ) try: - ret = yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "create_room_alias_association", alias_txn ) except self.database_engine.module.IntegrityError: raise SynapseError( 409, "Room alias %s already exists" % room_alias.to_string() ) - return ret - @defer.inlineCallbacks - def delete_room_alias(self, room_alias): - room_id = yield self.db_pool.runInteraction( + async def delete_room_alias(self, room_alias: RoomAlias) -> str: + room_id = await self.db_pool.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias ) return room_id - def _delete_room_alias_txn(self, txn, room_alias): + def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str: txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),), diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index 90152edc3c..2eeb9f97dc 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -14,18 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from canonicaljson import json - -from twisted.internet import defer - from synapse.api.errors import StoreError from synapse.logging.opentracing import log_kv, trace from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.util import json_encoder class EndToEndRoomKeyStore(SQLBaseStore): - @defer.inlineCallbacks - def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key): + async def update_e2e_room_key( + self, user_id, version, room_id, session_id, room_key + ): """Replaces the encrypted E2E room key for a given session in a given backup Args: @@ -38,7 +36,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): StoreError """ - yield self.db_pool.simple_update_one( + await self.db_pool.simple_update_one( table="e2e_room_keys", keyvalues={ "user_id": user_id, @@ -50,13 +48,12 @@ class EndToEndRoomKeyStore(SQLBaseStore): "first_message_index": room_key["first_message_index"], "forwarded_count": room_key["forwarded_count"], "is_verified": room_key["is_verified"], - "session_data": json.dumps(room_key["session_data"]), + "session_data": json_encoder.encode(room_key["session_data"]), }, desc="update_e2e_room_key", ) - @defer.inlineCallbacks - def add_e2e_room_keys(self, user_id, version, room_keys): + async def add_e2e_room_keys(self, user_id, version, room_keys): """Bulk add room keys to a given backup. Args: @@ -77,7 +74,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): "first_message_index": room_key["first_message_index"], "forwarded_count": room_key["forwarded_count"], "is_verified": room_key["is_verified"], - "session_data": json.dumps(room_key["session_data"]), + "session_data": json_encoder.encode(room_key["session_data"]), } ) log_kv( @@ -89,13 +86,12 @@ class EndToEndRoomKeyStore(SQLBaseStore): } ) - yield self.db_pool.simple_insert_many( + await self.db_pool.simple_insert_many( table="e2e_room_keys", values=values, desc="add_e2e_room_keys" ) @trace - @defer.inlineCallbacks - def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): + async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given room, or a given session. @@ -110,7 +106,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): the backup (or for the specified room) Returns: - A deferred list of dicts giving the session_data and message metadata for + A list of dicts giving the session_data and message metadata for these room keys. """ @@ -125,7 +121,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): if session_id: keyvalues["session_id"] = session_id - rows = yield self.db_pool.simple_select_list( + rows = await self.db_pool.simple_select_list( table="e2e_room_keys", keyvalues=keyvalues, retcols=( @@ -243,8 +239,9 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) @trace - @defer.inlineCallbacks - def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): + async def delete_e2e_room_keys( + self, user_id, version, room_id=None, session_id=None + ): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given room or a given session. @@ -259,7 +256,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): the backup (or for the specified room) Returns: - A deferred of the deletion transaction + The deletion transaction """ keyvalues = {"user_id": user_id, "version": int(version)} @@ -268,7 +265,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): if session_id: keyvalues["session_id"] = session_id - yield self.db_pool.simple_delete( + await self.db_pool.simple_delete( table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys" ) @@ -360,7 +357,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): "user_id": user_id, "version": new_version, "algorithm": info["algorithm"], - "auth_data": json.dumps(info["auth_data"]), + "auth_data": json_encoder.encode(info["auth_data"]), }, ) @@ -387,7 +384,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): updatevalues = {} if info is not None and "auth_data" in info: - updatevalues["auth_data"] = json.dumps(info["auth_data"]) + updatevalues["auth_data"] = json_encoder.encode(info["auth_data"]) if version_etag is not None: updatevalues["etag"] = version_etag diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 40354b8304..f93e0d320d 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -14,24 +14,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Tuple +from typing import Dict, Iterable, List, Optional, Tuple -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.enterprise.adbapi import Connection -from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import make_in_list_sql_clause +from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.iterutils import batch_iter class EndToEndKeyWorkerStore(SQLBaseStore): @trace - @defer.inlineCallbacks - def get_e2e_device_keys( + async def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False ): """Fetch a list of device keys. @@ -51,7 +50,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): if not query_list: return {} - results = yield self.db_pool.runInteraction( + results = await self.db_pool.runInteraction( "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list, @@ -174,8 +173,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): log_kv(result) return result - @defer.inlineCallbacks - def get_e2e_one_time_keys(self, user_id, device_id, key_ids): + async def get_e2e_one_time_keys( + self, user_id: str, device_id: str, key_ids: List[str] + ) -> Dict[Tuple[str, str], str]: """Retrieve a number of one-time keys for a user Args: @@ -185,11 +185,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore): retrieve Returns: - deferred resolving to Dict[(str, str), str]: map from (algorithm, - key_id) to json string for key + A map from (algorithm, key_id) to json string for key """ - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", iterable=key_ids, @@ -201,17 +200,21 @@ class EndToEndKeyWorkerStore(SQLBaseStore): log_kv({"message": "Fetched one time keys for user", "one_time_keys": result}) return result - @defer.inlineCallbacks - def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): + async def add_e2e_one_time_keys( + self, + user_id: str, + device_id: str, + time_now: int, + new_keys: Iterable[Tuple[str, str, str]], + ) -> None: """Insert some new one time keys for a device. Errors if any of the keys already exist. Args: - user_id(str): id of user to get keys for - device_id(str): id of device to get keys for - time_now(long): insertion time to record (ms since epoch) - new_keys(iterable[(str, str, str)]: keys to add - each a tuple of - (algorithm, key_id, key json) + user_id: id of user to get keys for + device_id: id of device to get keys for + time_now: insertion time to record (ms since epoch) + new_keys: keys to add - each a tuple of (algorithm, key_id, key json) """ def _add_e2e_one_time_keys(txn): @@ -241,7 +244,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): txn, self.count_e2e_one_time_keys, (user_id, device_id) ) - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys ) @@ -268,22 +271,23 @@ class EndToEndKeyWorkerStore(SQLBaseStore): "count_e2e_one_time_keys", _count_e2e_one_time_keys ) - @defer.inlineCallbacks - def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): + async def get_e2e_cross_signing_key( + self, user_id: str, key_type: str, from_user_id: Optional[str] = None + ) -> Optional[dict]: """Returns a user's cross-signing key. Args: - user_id (str): the user whose key is being requested - key_type (str): the type of key that is being requested: either 'master' + user_id: the user whose key is being requested + key_type: the type of key that is being requested: either 'master' for a master key, 'self_signing' for a self-signing key, or 'user_signing' for a user-signing key - from_user_id (str): if specified, signatures made by this user on + from_user_id: if specified, signatures made by this user on the self-signing key will be included in the result Returns: dict of the key data or None if not found """ - res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id) + res = await self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id) user_keys = res.get(user_id) if not user_keys: return None @@ -449,28 +453,26 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return keys - @defer.inlineCallbacks - def get_e2e_cross_signing_keys_bulk( - self, user_ids: List[str], from_user_id: str = None - ) -> defer.Deferred: + async def get_e2e_cross_signing_keys_bulk( + self, user_ids: List[str], from_user_id: Optional[str] = None + ) -> Dict[str, Dict[str, dict]]: """Returns the cross-signing keys for a set of users. Args: - user_ids (list[str]): the users whose keys are being requested - from_user_id (str): if specified, signatures made by this user on + user_ids: the users whose keys are being requested + from_user_id: if specified, signatures made by this user on the self-signing keys will be included in the result Returns: - Deferred[dict[str, dict[str, dict]]]: map of user ID to key type to - key data. If a user's cross-signing keys were not found, either - their user ID will not be in the dict, or their user ID will map - to None. + A map of user ID to key type to key data. If a user's cross-signing + keys were not found, either their user ID will not be in the dict, + or their user ID will map to None. """ - result = yield self._get_bare_e2e_cross_signing_keys_bulk(user_ids) + result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids) if from_user_id: - result = yield self.db_pool.runInteraction( + result = await self.db_pool.runInteraction( "get_e2e_cross_signing_signatures", self._get_e2e_cross_signing_signatures_txn, result, @@ -700,7 +702,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): values={ "user_id": user_id, "keytype": key_type, - "keydata": json.dumps(key), + "keydata": json_encoder.encode(key), "stream_id": stream_id, }, ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b8cefb4d5e..7c246d3e4c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -17,11 +17,10 @@ import logging from typing import List -from canonicaljson import json - from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool +from synapse.util import json_encoder from synapse.util.caches.descriptors import cachedInlineCallbacks logger = logging.getLogger(__name__) @@ -50,7 +49,7 @@ def _serialize_action(actions, is_highlight): else: if actions == DEFAULT_NOTIF_ACTION: return "" - return json.dumps(actions) + return json_encoder.encode(actions) def _deserialize_action(actions, is_highlight): diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 4d8a24ce4b..1a68bf32cb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -53,47 +53,6 @@ event_counter = Counter( ["type", "origin_type", "origin_entity"], ) -STATE_EVENT_TYPES_TO_MARK_UNREAD = { - EventTypes.Topic, - EventTypes.Name, - EventTypes.RoomAvatar, - EventTypes.Tombstone, -} - - -def should_count_as_unread(event: EventBase, context: EventContext) -> bool: - # Exclude rejected and soft-failed events. - if context.rejected or event.internal_metadata.is_soft_failed(): - return False - - # Exclude notices. - if ( - not event.is_state() - and event.type == EventTypes.Message - and event.content.get("msgtype") == "m.notice" - ): - return False - - # Exclude edits. - relates_to = event.content.get("m.relates_to", {}) - if relates_to.get("rel_type") == RelationTypes.REPLACE: - return False - - # Mark events that have a non-empty string body as unread. - body = event.content.get("body") - if isinstance(body, str) and body: - return True - - # Mark some state events as unread. - if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD: - return True - - # Mark encrypted events as unread. - if not event.is_state() and event.type == EventTypes.Encrypted: - return True - - return False - def encode_json(json_object): """ @@ -239,10 +198,6 @@ class PersistEventsStore: event_counter.labels(event.type, origin_type, origin_entity).inc() - self.store.get_unread_message_count_for_user.invalidate_many( - (event.room_id,), - ) - for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) @@ -864,9 +819,8 @@ class PersistEventsStore: "contains_url": ( "url" in event.content and isinstance(event.content["url"], str) ), - "count_as_unread": should_count_as_unread(event, context), } - for event, context in events_and_contexts + for event, _ in events_and_contexts ], ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a7b7393f6e..755b7a2a85 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -41,15 +41,9 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool -from synapse.storage.types import Cursor from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id -from synapse.util.caches.descriptors import ( - Cache, - _CacheContext, - cached, - cachedInlineCallbacks, -) +from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1364,84 +1358,6 @@ class EventsWorkerStore(SQLBaseStore): desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) - @cached(tree=True, cache_context=True) - async def get_unread_message_count_for_user( - self, room_id: str, user_id: str, cache_context: _CacheContext, - ) -> int: - """Retrieve the count of unread messages for the given room and user. - - Args: - room_id: The ID of the room to count unread messages in. - user_id: The ID of the user to count unread messages for. - - Returns: - The number of unread messages for the given user in the given room. - """ - with Measure(self._clock, "get_unread_message_count_for_user"): - last_read_event_id = await self.get_last_receipt_event_id_for_user( - user_id=user_id, - room_id=room_id, - receipt_type="m.read", - on_invalidate=cache_context.invalidate, - ) - - return await self.db_pool.runInteraction( - "get_unread_message_count_for_user", - self._get_unread_message_count_for_user_txn, - user_id, - room_id, - last_read_event_id, - ) - - def _get_unread_message_count_for_user_txn( - self, - txn: Cursor, - user_id: str, - room_id: str, - last_read_event_id: Optional[str], - ) -> int: - if last_read_event_id: - # Get the stream ordering for the last read event. - stream_ordering = self.db_pool.simple_select_one_onecol_txn( - txn=txn, - table="events", - keyvalues={"room_id": room_id, "event_id": last_read_event_id}, - retcol="stream_ordering", - ) - else: - # If there's no read receipt for that room, it probably means the user hasn't - # opened it yet, in which case use the stream ID of their join event. - # We can't just set it to 0 otherwise messages from other local users from - # before this user joined will be counted as well. - txn.execute( - """ - SELECT stream_ordering FROM local_current_membership - LEFT JOIN events USING (event_id, room_id) - WHERE membership = 'join' - AND user_id = ? - AND room_id = ? - """, - (user_id, room_id), - ) - row = txn.fetchone() - - if row is None: - return 0 - - stream_ordering = row[0] - - # Count the messages that qualify as unread after the stream ordering we've just - # retrieved. - sql = """ - SELECT COUNT(*) FROM events - WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread - """ - - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - - return row[0] if row else 0 - AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index a98181f445..75ea6d4b2f 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -16,12 +16,11 @@ from typing import List, Tuple -from canonicaljson import json - from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.util import json_encoder # The category ID for the "default" category. We don't store as null in the # database to avoid the fun of null != null @@ -752,7 +751,7 @@ class GroupServerStore(GroupServerWorkerStore): if profile is None: insertion_values["profile"] = "{}" else: - update_values["profile"] = json.dumps(profile) + update_values["profile"] = json_encoder.encode(profile) if is_public is None: insertion_values["is_public"] = True @@ -783,7 +782,7 @@ class GroupServerStore(GroupServerWorkerStore): if profile is None: insertion_values["profile"] = "{}" else: - update_values["profile"] = json.dumps(profile) + update_values["profile"] = json_encoder.encode(profile) if is_public is None: insertion_values["is_public"] = True @@ -1007,7 +1006,7 @@ class GroupServerStore(GroupServerWorkerStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation_json": json.dumps(remote_attestation), + "attestation_json": json_encoder.encode(remote_attestation), }, ) @@ -1131,7 +1130,7 @@ class GroupServerStore(GroupServerWorkerStore): "is_admin": is_admin, "membership": membership, "is_publicised": is_publicised, - "content": json.dumps(content), + "content": json_encoder.encode(content), }, ) @@ -1143,7 +1142,7 @@ class GroupServerStore(GroupServerWorkerStore): "group_id": group_id, "user_id": user_id, "type": "membership", - "content": json.dumps( + "content": json_encoder.encode( {"membership": membership, "content": content} ), }, @@ -1171,7 +1170,7 @@ class GroupServerStore(GroupServerWorkerStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation_json": json.dumps(remote_attestation), + "attestation_json": json_encoder.encode(remote_attestation), }, ) else: @@ -1240,7 +1239,7 @@ class GroupServerStore(GroupServerWorkerStore): keyvalues={"group_id": group_id, "user_id": user_id}, updatevalues={ "valid_until_ms": attestation["valid_until_ms"], - "attestation_json": json.dumps(attestation), + "attestation_json": json_encoder.encode(attestation), }, desc="update_remote_attestion", ) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 02b01d9619..e71cdd2cb4 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -15,8 +15,6 @@ import logging from typing import List -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.util.caches.descriptors import cached @@ -252,16 +250,12 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): "reap_monthly_active_users", _reap_users, reserved_users ) - @defer.inlineCallbacks - def upsert_monthly_active_user(self, user_id): + async def upsert_monthly_active_user(self, user_id: str) -> None: """Updates or inserts the user into the monthly active user table, which is used to track the current MAU usage of the server Args: - user_id (str): user to add/update - - Returns: - Deferred + user_id: user to add/update """ # Support user never to be included in MAU stats. Note I can't easily call this # from upsert_monthly_active_user_txn because then I need a _txn form of @@ -271,11 +265,11 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): # _initialise_reserved_users reasoning that it would be very strange to # include a support user in this context. - is_support = yield self.is_support_user(user_id) + is_support = await self.is_support_user(user_id) if is_support: return - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id ) @@ -322,8 +316,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): return is_insert - @defer.inlineCallbacks - def populate_monthly_active_users(self, user_id): + async def populate_monthly_active_users(self, user_id): """Checks on the state of monthly active user limits and optionally add the user to the monthly active tables @@ -332,14 +325,14 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): """ if self._limit_usage_by_mau or self._mau_stats_only: # Trial users and guests should not be included as part of MAU group - is_guest = yield self.is_guest(user_id) + is_guest = await self.is_guest(user_id) if is_guest: return - is_trial = yield self.is_trial_user(user_id) + is_trial = await self.is_trial_user(user_id) if is_trial: return - last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id) + last_seen_timestamp = await self.user_last_seen_monthly_active(user_id) now = self.hs.get_clock().time_msec() # We want to reduce to the total number of db writes, and are happy @@ -352,10 +345,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): # False, there is no point in checking get_monthly_active_count - it # adds no value and will break the logic if max_mau_value is exceeded. if not self._limit_usage_by_mau: - yield self.upsert_monthly_active_user(user_id) + await self.upsert_monthly_active_user(user_id) else: - count = yield self.get_monthly_active_count() + count = await self.get_monthly_active_count() if count < self._max_mau_value: - yield self.upsert_monthly_active_user(user_id) + await self.upsert_monthly_active_user(user_id) elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY: - yield self.upsert_monthly_active_user(user_id) + await self.upsert_monthly_active_user(user_id) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 5fd899326a..19a0211a03 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -18,8 +18,6 @@ import abc import logging from typing import List, Tuple, Union -from canonicaljson import json - from twisted.internet import defer from synapse.push.baserules import list_with_base_rules @@ -33,6 +31,7 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ChainedIdGenerator +from synapse.util import json_encoder from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -419,8 +418,8 @@ class PushRuleStore(PushRulesWorkerStore): before=None, after=None, ): - conditions_json = json.dumps(conditions) - actions_json = json.dumps(actions) + conditions_json = json_encoder.encode(conditions) + actions_json = json_encoder.encode(actions) with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids if before or after: @@ -689,7 +688,7 @@ class PushRuleStore(PushRulesWorkerStore): @defer.inlineCallbacks def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): - actions_json = json.dumps(actions) + actions_json = json_encoder.encode(actions) def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): if is_default_rule: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 6255977c92..1920a8a152 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -18,13 +18,12 @@ import abc import logging from typing import List, Tuple -from canonicaljson import json - from twisted.internet import defer from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -459,7 +458,7 @@ class ReceiptsStore(ReceiptsWorkerStore): values={ "stream_id": stream_id, "event_id": event_id, - "data": json.dumps(data), + "data": json_encoder.encode(data), }, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock @@ -585,7 +584,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, - "event_ids": json.dumps(event_ids), - "data": json.dumps(data), + "event_ids": json_encoder.encode(event_ids), + "data": json_encoder.encode(data), }, ) diff --git a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql b/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql deleted file mode 100644 index 531b532c73..0000000000 --- a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Store a boolean value in the events table for whether the event should be counted in --- the unread_count property of sync responses. -ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN; diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index 2162d0712d..7f8d1880e5 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -16,8 +16,7 @@ import logging import re from collections import namedtuple - -from twisted.internet import defer +from typing import List, Optional from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search ) - @defer.inlineCallbacks - def _background_reindex_search(self, progress, batch_size): + async def _background_reindex_search(self, progress, batch_size): # we work through the events table from highest stream id to lowest target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): return len(event_search_rows) - result = yield self.db_pool.runInteraction( + result = await self.db_pool.runInteraction( self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn ) if not result: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_UPDATE_NAME ) return result - @defer.inlineCallbacks - def _background_reindex_gin_search(self, progress, batch_size): + async def _background_reindex_gin_search(self, progress, batch_size): """This handles old synapses which used GIST indexes, if any; converting them back to be GIN as per the actual schema. """ @@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): conn.set_session(autocommit=False) if isinstance(self.database_engine, PostgresEngine): - yield self.db_pool.runWithConnection(create_index) + await self.db_pool.runWithConnection(create_index) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME ) return 1 - @defer.inlineCallbacks - def _background_reindex_search_order(self, progress, batch_size): + async def _background_reindex_search_order(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) @@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ) conn.set_session(autocommit=False) - yield self.db_pool.runWithConnection(create_index) + await self.db_pool.runWithConnection(create_index) pg = dict(progress) pg["have_added_indexes"] = True - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, self.db_pool.updates._background_update_progress_txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, @@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): return len(rows), True - num_rows, finished = yield self.db_pool.runInteraction( + num_rows, finished = await self.db_pool.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn ) if not finished: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_ORDER_UPDATE_NAME ) @@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore): def __init__(self, database: DatabasePool, db_conn, hs): super(SearchStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def search_msgs(self, room_ids, search_term, keys): + async def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. Args: @@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore): # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_msgs", self.db_pool.cursor_to_dict, sql, *args ) @@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore): # We set redact_behaviour to BLOCK here to prevent redacted events being returned in # search results (which is a data leak) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r["event_id"] for r in results], redact_behaviour=EventRedactBehaviour.BLOCK, ) @@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_query, events) + highlights = await self._find_highlights_in_postgres(search_query, events) count_sql += " GROUP BY room_id" - count_results = yield self.db_pool.execute( + count_results = await self.db_pool.execute( "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args ) @@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore): "count": count, } - @defer.inlineCallbacks - def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): + async def search_rooms( + self, + room_ids: List[str], + search_term: str, + keys: List[str], + limit, + pagination_token: Optional[str] = None, + ) -> List[dict]: """Performs a full text search over events with given keys. Args: - room_id (list): The room_ids to search in - search_term (str): Search term to search for - keys (list): List of keys to search in, currently supports - "content.body", "content.name", "content.topic" - pagination_token (str): A pagination token previously returned + room_ids: The room_ids to search in + search_term: Search term to search for + keys: List of keys to search in, currently supports "content.body", + "content.name", "content.topic" + pagination_token: A pagination token previously returned Returns: - list of dicts + Each match as a dictionary. """ clauses = [] @@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore): args.append(limit) - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_rooms", self.db_pool.cursor_to_dict, sql, *args ) @@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore): # We set redact_behaviour to BLOCK here to prevent redacted events being returned in # search results (which is a data leak) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r["event_id"] for r in results], redact_behaviour=EventRedactBehaviour.BLOCK, ) @@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_query, events) + highlights = await self._find_highlights_in_postgres(search_query, events) count_sql += " GROUP BY room_id" - count_results = yield self.db_pool.execute( + count_results = await self.db_pool.execute( "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args ) diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index dae8e8bd29..be191dd870 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -15,8 +15,6 @@ from unpaddedbase64 import encode_base64 -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList @@ -40,9 +38,8 @@ class SignatureWorkerStore(SQLBaseStore): return self.db_pool.runInteraction("get_event_reference_hashes", f) - @defer.inlineCallbacks - def add_event_hashes(self, event_ids): - hashes = yield self.get_event_reference_hashes(event_ids) + async def add_event_hashes(self, event_ids): + hashes = await self.get_event_reference_hashes(event_ids) hashes = { e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"} for e_id, h in hashes.items() diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index d73a8e8ab9..af21fe457a 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -16,8 +16,6 @@ import logging import re -from twisted.internet import defer - from synapse.api.constants import EventTypes, JoinRules from synapse.storage.database import DatabasePool from synapse.storage.databases.main.state import StateFilter @@ -59,8 +57,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) - @defer.inlineCallbacks - def _populate_user_directory_createtables(self, progress, batch_size): + async def _populate_user_directory_createtables(self, progress, batch_size): # Get all the rooms that we want to process. def _make_staging_area(txn): @@ -102,45 +99,43 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.db_pool.runInteraction( + new_pos = await self.get_max_stream_id_in_current_state_deltas() + await self.db_pool.runInteraction( "populate_user_directory_temp_build", _make_staging_area ) - yield self.db_pool.simple_insert( + await self.db_pool.simple_insert( TEMP_TABLE + "_position", {"position": new_pos} ) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_createtables" ) return 1 - @defer.inlineCallbacks - def _populate_user_directory_cleanup(self, progress, batch_size): + async def _populate_user_directory_cleanup(self, progress, batch_size): """ Update the user directory stream position, then clean up the old tables. """ - position = yield self.db_pool.simple_select_one_onecol( + position = await self.db_pool.simple_select_one_onecol( TEMP_TABLE + "_position", None, "position" ) - yield self.update_user_directory_stream_pos(position) + await self.update_user_directory_stream_pos(position) def _delete_staging_area(txn): txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory_cleanup", _delete_staging_area ) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_cleanup" ) return 1 - @defer.inlineCallbacks - def _populate_user_directory_process_rooms(self, progress, batch_size): + async def _populate_user_directory_process_rooms(self, progress, batch_size): """ Args: progress (dict) @@ -151,7 +146,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # If we don't have progress filed, delete everything. if not progress: - yield self.delete_all_from_user_dir() + await self.delete_all_from_user_dir() def _get_next_batch(txn): # Only fetch 250 rooms, so we don't fetch too many at once, even @@ -176,13 +171,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return rooms_to_work_on - rooms_to_work_on = yield self.db_pool.runInteraction( + rooms_to_work_on = await self.db_pool.runInteraction( "populate_user_directory_temp_read", _get_next_batch ) # No more rooms -- complete the transaction. if not rooms_to_work_on: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_rooms" ) return 1 @@ -195,21 +190,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): processed_event_count = 0 for room_id, event_count in rooms_to_work_on: - is_in_room = yield self.is_host_joined(room_id, self.server_name) + is_in_room = await self.is_host_joined(room_id, self.server_name) if is_in_room: - is_public = yield self.is_room_world_readable_or_publicly_joinable( + is_public = await self.is_room_world_readable_or_publicly_joinable( room_id ) - users_with_profile = yield defer.ensureDeferred( - state.get_current_users_in_room(room_id) - ) + users_with_profile = await state.get_current_users_in_room(room_id) user_ids = set(users_with_profile) # Update each user in the user directory. for user_id, profile in users_with_profile.items(): - yield self.update_profile_in_user_dir( + await self.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) @@ -223,7 +216,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): to_insert.add(user_id) if to_insert: - yield self.add_users_in_public_rooms(room_id, to_insert) + await self.add_users_in_public_rooms(room_id, to_insert) to_insert.clear() else: for user_id in user_ids: @@ -243,22 +236,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # If it gets too big, stop and write to the database # to prevent storing too much in RAM. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET: - yield self.add_users_who_share_private_room( + await self.add_users_who_share_private_room( room_id, to_insert ) to_insert.clear() if to_insert: - yield self.add_users_who_share_private_room(room_id, to_insert) + await self.add_users_who_share_private_room(room_id, to_insert) to_insert.clear() # We've finished a room. Delete it from the table. - yield self.db_pool.simple_delete_one( + await self.db_pool.simple_delete_one( TEMP_TABLE + "_rooms", {"room_id": room_id} ) # Update the remaining counter. progress["remaining"] -= 1 - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory", self.db_pool.updates._background_update_progress_txn, "populate_user_directory_process_rooms", @@ -273,13 +266,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return processed_event_count - @defer.inlineCallbacks - def _populate_user_directory_process_users(self, progress, batch_size): + async def _populate_user_directory_process_users(self, progress, batch_size): """ If search_all_users is enabled, add all of the users to the user directory. """ if not self.hs.config.user_directory_search_all_users: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_users" ) return 1 @@ -305,13 +297,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return users_to_work_on - users_to_work_on = yield self.db_pool.runInteraction( + users_to_work_on = await self.db_pool.runInteraction( "populate_user_directory_temp_read", _get_next_batch ) # No more users -- complete the transaction. if not users_to_work_on: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_users" ) return 1 @@ -322,18 +314,18 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) for user_id in users_to_work_on: - profile = yield self.get_profileinfo(get_localpart_from_id(user_id)) - yield self.update_profile_in_user_dir( + profile = await self.get_profileinfo(get_localpart_from_id(user_id)) + await self.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) # We've finished processing a user. Delete it from the table. - yield self.db_pool.simple_delete_one( + await self.db_pool.simple_delete_one( TEMP_TABLE + "_users", {"user_id": user_id} ) # Update the remaining counter. progress["remaining"] -= 1 - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory", self.db_pool.updates._background_update_progress_txn, "populate_user_directory_process_users", @@ -342,8 +334,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return len(users_to_work_on) - @defer.inlineCallbacks - def is_room_world_readable_or_publicly_joinable(self, room_id): + async def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable """ @@ -353,20 +344,20 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): (EventTypes.RoomHistoryVisibility, ""), ) - current_state_ids = yield self.get_filtered_current_state_ids( + current_state_ids = await self.get_filtered_current_state_ids( room_id, StateFilter.from_types(types_to_filter) ) join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) if join_rules_id: - join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + join_rule_ev = await self.get_event(join_rules_id, allow_none=True) if join_rule_ev: if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC: return True hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) if hist_vis_id: - hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True) if hist_vis_ev: if hist_vis_ev.content.get("history_visibility") == "world_readable": return True @@ -590,19 +581,18 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): "remove_from_user_dir", _remove_from_user_dir_txn ) - @defer.inlineCallbacks - def get_users_in_dir_due_to_room(self, room_id): + async def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory because they're in the given room_id """ - user_ids_share_pub = yield self.db_pool.simple_select_onecol( + user_ids_share_pub = await self.db_pool.simple_select_onecol( table="users_in_public_rooms", keyvalues={"room_id": room_id}, retcol="user_id", desc="get_users_in_dir_due_to_room", ) - user_ids_share_priv = yield self.db_pool.simple_select_onecol( + user_ids_share_priv = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"room_id": room_id}, retcol="other_user_id", @@ -645,8 +635,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @defer.inlineCallbacks - def get_user_dir_rooms_user_is_in(self, user_id): + async def get_user_dir_rooms_user_is_in(self, user_id): """ Returns the rooms that a user is in. @@ -656,14 +645,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): Returns: list: user_id """ - rows = yield self.db_pool.simple_select_onecol( + rows = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, retcol="room_id", desc="get_rooms_user_is_in", ) - pub_rows = yield self.db_pool.simple_select_onecol( + pub_rows = await self.db_pool.simple_select_onecol( table="users_in_public_rooms", keyvalues={"user_id": user_id}, retcol="room_id", @@ -674,32 +663,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): users.update(rows) return list(users) - @defer.inlineCallbacks - def get_rooms_in_common_for_users(self, user_id, other_user_id): - """Given two user_ids find out the list of rooms they share. - """ - sql = """ - SELECT room_id FROM ( - SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (event_id) - WHERE type = 'm.room.member' - AND m.membership = 'join' - AND state_key = ? - ) AS f1 INNER JOIN ( - SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (event_id) - WHERE type = 'm.room.member' - AND m.membership = 'join' - AND state_key = ? - ) f2 USING (room_id) - """ - - rows = yield self.db_pool.execute( - "get_rooms_in_common_for_users", None, sql, user_id, other_user_id - ) - - return [room_id for room_id, in rows] - def get_user_directory_stream_pos(self): return self.db_pool.simple_select_one_onecol( table="user_directory_stream_pos", @@ -708,8 +671,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): desc="get_user_directory_stream_pos", ) - @defer.inlineCallbacks - def search_user_dir(self, user_id, search_term, limit): + async def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -806,7 +768,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): # This should be unreachable. raise Exception("Unrecognized database engine") - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_user_dir", self.db_pool.cursor_to_dict, sql, *args ) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c63256d3bd..b3f76428b6 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -17,6 +17,7 @@ import logging import re import attr +from canonicaljson import json from twisted.internet import defer, task @@ -24,6 +25,9 @@ from synapse.logging import context logger = logging.getLogger(__name__) +# Create a custom encoder to reduce the whitespace produced by JSON encoding. +json_encoder = json.JSONEncoder(separators=(",", ":")) + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 9b09c08b89..c2d72a82cf 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -192,7 +192,7 @@ class Cache(object): callbacks = [callback] if callback else [] self.check_thread() observable = ObservableDeferred(value, consumeErrors=True) - observer = defer.maybeDeferred(observable.observe) + observer = observable.observe() entry = CacheEntry(deferred=observable, callbacks=callbacks) existing_entry = self._pending_deferred_cache.pop(key, None) diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index eab78dd256..0e445e01d7 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -63,5 +63,8 @@ def _handle_frozendict(obj): ) -# A JSONEncoder which is capable of encoding frozendicts without barfing -frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict) +# A JSONEncoder which is capable of encoding frozendicts without barfing. +# Additionally reduce the whitespace produced by JSON encoding. +frozendict_json_encoder = json.JSONEncoder( + default=_handle_frozendict, separators=(",", ":"), +) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index ec61e14423..a805f51df1 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,14 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import inspect import logging from functools import wraps from prometheus_client import Counter -from twisted.internet import defer - from synapse.logging.context import LoggingContext, current_context from synapse.metrics import InFlightGauge @@ -62,25 +59,31 @@ in_flight = InFlightGauge( def measure_func(name=None): - def wrapper(func): - block_name = func.__name__ if name is None else name + """ + Used to decorate an async function with a `Measure` context manager. + + Usage: - if inspect.iscoroutinefunction(func): + @measure_func() + async def foo(...): + ... - @wraps(func) - async def measured_func(self, *args, **kwargs): - with Measure(self.clock, block_name): - r = await func(self, *args, **kwargs) - return r + Which is analogous to: - else: + async def foo(...): + with Measure(...): + ... + + """ + + def wrapper(func): + block_name = func.__name__ if name is None else name - @wraps(func) - @defer.inlineCallbacks - def measured_func(self, *args, **kwargs): - with Measure(self.clock, block_name): - r = yield func(self, *args, **kwargs) - return r + @wraps(func) + async def measured_func(self, *args, **kwargs): + with Measure(self.clock, block_name): + r = await func(self, *args, **kwargs) + return r return measured_func diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 8794317caa..919988d3bc 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -15,8 +15,6 @@ import logging import random -from twisted.internet import defer - import synapse.logging.context from synapse.api.errors import CodeMessageException @@ -54,8 +52,7 @@ class NotRetryingDestination(Exception): self.destination = destination -@defer.inlineCallbacks -def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs): +async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs): """For a given destination check if we have previously failed to send a request there and are waiting before retrying the destination. If we are not ready to retry the destination, this will raise a @@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) Example usage: try: - limiter = yield get_retry_limiter(destination, clock, store) + limiter = await get_retry_limiter(destination, clock, store) with limiter: - response = yield do_request() + response = await do_request() except NotRetryingDestination: # We aren't ready to retry that destination. raise @@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) failure_ts = None retry_last_ts, retry_interval = (0, 0) - retry_timings = yield store.get_destination_retry_timings(destination) + retry_timings = await store.get_destination_retry_timings(destination) if retry_timings: failure_ts = retry_timings["failure_ts"] @@ -222,10 +219,9 @@ class RetryDestinationLimiter(object): if self.failure_ts is None: self.failure_ts = retry_last_ts - @defer.inlineCallbacks - def store_retry_timings(): + async def store_retry_timings(): try: - yield self.store.set_destination_retry_timings( + await self.store.set_destination_retry_timings( self.destination, self.failure_ts, retry_last_ts, |