diff options
Diffstat (limited to 'synapse/handlers')
32 files changed, 690 insertions, 488 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index e62e6cab77..8acd9f9a83 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -51,8 +51,8 @@ class AccountDataEventSource(object): {"type": account_data_type, "content": content, "room_id": room_id} ) - defer.returnValue((results, current_stream_id)) + return (results, current_stream_id) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - defer.returnValue(([], config.to_id)) + return ([], config.to_id) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 1f1708ba7d..34574f1a12 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -193,7 +193,7 @@ class AccountValidityHandler(object): if threepid["medium"] == "email": addresses.append(threepid["address"]) - defer.returnValue(addresses) + return addresses @defer.inlineCallbacks def _get_renewal_token(self, user_id): @@ -214,7 +214,7 @@ class AccountValidityHandler(object): try: renewal_token = stringutils.random_string(32) yield self.store.set_renewal_token_for_user(user_id, renewal_token) - defer.returnValue(renewal_token) + return renewal_token except StoreError: attempts += 1 raise StoreError(500, "Couldn't generate a unique string as refresh string.") @@ -226,11 +226,19 @@ class AccountValidityHandler(object): Args: renewal_token (str): Token sent with the renewal request. + Returns: + bool: Whether the provided token is valid. """ - user_id = yield self.store.get_user_from_renewal_token(renewal_token) + try: + user_id = yield self.store.get_user_from_renewal_token(renewal_token) + except StoreError: + defer.returnValue(False) + logger.debug("Renewing an account for user %s", user_id) yield self.renew_account_for_user(user_id) + defer.returnValue(True) + @defer.inlineCallbacks def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False): """Renews the account attached to a given user by pushing back the @@ -254,4 +262,4 @@ class AccountValidityHandler(object): user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent ) - defer.returnValue(expiration_ts) + return expiration_ts diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py index fbef2f3d38..46ac73106d 100644 --- a/synapse/handlers/acme.py +++ b/synapse/handlers/acme.py @@ -100,4 +100,4 @@ class AcmeHandler(object): logger.exception("Failed saving!") raise - defer.returnValue(True) + return True diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index e8a651e231..2f22f56ca4 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -49,7 +49,7 @@ class AdminHandler(BaseHandler): "devices": {"": {"sessions": [{"connections": connections}]}}, } - defer.returnValue(ret) + return ret @defer.inlineCallbacks def get_users(self): @@ -61,7 +61,7 @@ class AdminHandler(BaseHandler): """ ret = yield self.store.get_users() - defer.returnValue(ret) + return ret @defer.inlineCallbacks def get_users_paginate(self, order, start, limit): @@ -78,7 +78,7 @@ class AdminHandler(BaseHandler): """ ret = yield self.store.get_users_paginate(order, start, limit) - defer.returnValue(ret) + return ret @defer.inlineCallbacks def search_users(self, term): @@ -92,7 +92,7 @@ class AdminHandler(BaseHandler): """ ret = yield self.store.search_users(term) - defer.returnValue(ret) + return ret @defer.inlineCallbacks def export_user_data(self, user_id, writer): @@ -225,7 +225,7 @@ class AdminHandler(BaseHandler): state = yield self.store.get_state_for_event(event_id) writer.write_state(room_id, event_id, state) - defer.returnValue(writer.finished()) + return writer.finished() class ExfiltrationWriter(object): diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8f089f0e33..d1a51df6f9 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -167,8 +167,8 @@ class ApplicationServicesHandler(object): for user_service in user_query_services: is_known_user = yield self.appservice_api.query_user(user_service, user_id) if is_known_user: - defer.returnValue(True) - defer.returnValue(False) + return True + return False @defer.inlineCallbacks def query_room_alias_exists(self, room_alias): @@ -192,7 +192,7 @@ class ApplicationServicesHandler(object): if is_known_alias: # the alias exists now so don't query more ASes. result = yield self.store.get_association_from_room_alias(room_alias) - defer.returnValue(result) + return result @defer.inlineCallbacks def query_3pe(self, kind, protocol, fields): @@ -215,7 +215,7 @@ class ApplicationServicesHandler(object): if success: ret.extend(result) - defer.returnValue(ret) + return ret @defer.inlineCallbacks def get_3pe_protocols(self, only_protocol=None): @@ -254,7 +254,7 @@ class ApplicationServicesHandler(object): for p in protocols.keys(): protocols[p] = _merge_instances(protocols[p]) - defer.returnValue(protocols) + return protocols @defer.inlineCallbacks def _get_services_for_event(self, event): @@ -276,7 +276,7 @@ class ApplicationServicesHandler(object): if (yield s.is_interested(event, self.store)): interested_list.append(s) - defer.returnValue(interested_list) + return interested_list def _get_services_for_user(self, user_id): services = self.store.get_app_services() @@ -293,23 +293,23 @@ class ApplicationServicesHandler(object): if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. - defer.returnValue(False) + return False return user_info = yield self.store.get_user_by_id(user_id) if user_info: - defer.returnValue(False) + return False return # user not found; could be the AS though, so check. services = self.store.get_app_services() service_list = [s for s in services if s.sender == user_id] - defer.returnValue(len(service_list) == 0) + return len(service_list) == 0 @defer.inlineCallbacks def _check_user_exists(self, user_id): unknown_user = yield self._is_unknown_user(user_id) if unknown_user: exists = yield self.query_user_exists(user_id) - defer.returnValue(exists) - defer.returnValue(True) + return exists + return True diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d4d6574975..0f3ebf7ef8 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -155,7 +155,7 @@ class AuthHandler(BaseHandler): if user_id != requester.user.to_string(): raise AuthError(403, "Invalid auth") - defer.returnValue(params) + return params @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip, password_servlet=False): @@ -280,7 +280,7 @@ class AuthHandler(BaseHandler): creds, list(clientdict), ) - defer.returnValue((creds, clientdict, session["id"])) + return (creds, clientdict, session["id"]) ret = self._auth_dict_for_flows(flows, session) ret["completed"] = list(creds) @@ -307,8 +307,8 @@ class AuthHandler(BaseHandler): if result: creds[stagetype] = result self._save_session(sess) - defer.returnValue(True) - defer.returnValue(False) + return True + return False def get_session_id(self, clientdict): """ @@ -379,7 +379,7 @@ class AuthHandler(BaseHandler): res = yield checker( authdict, clientip=clientip, password_servlet=password_servlet ) - defer.returnValue(res) + return res # build a v1-login-style dict out of the authdict and fall back to the # v1 code @@ -389,7 +389,7 @@ class AuthHandler(BaseHandler): raise SynapseError(400, "", Codes.MISSING_PARAM) (canonical_id, callback) = yield self.validate_login(user_id, authdict) - defer.returnValue(canonical_id) + return canonical_id @defer.inlineCallbacks def _check_recaptcha(self, authdict, clientip, **kwargs): @@ -433,7 +433,7 @@ class AuthHandler(BaseHandler): resp_body.get("hostname"), ) if resp_body["success"]: - defer.returnValue(True) + return True raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) def _check_email_identity(self, authdict, **kwargs): @@ -502,7 +502,7 @@ class AuthHandler(BaseHandler): threepid["threepid_creds"] = authdict["threepid_creds"] - defer.returnValue(threepid) + return threepid def _get_params_recaptcha(self): return {"public_key": self.hs.config.recaptcha_public_key} @@ -606,7 +606,7 @@ class AuthHandler(BaseHandler): yield self.store.delete_access_token(access_token) raise StoreError(400, "Login raced against device deletion") - defer.returnValue(access_token) + return access_token @defer.inlineCallbacks def check_user_exists(self, user_id): @@ -629,8 +629,8 @@ class AuthHandler(BaseHandler): self.ratelimit_login_per_account(user_id) res = yield self._find_user_id_and_pwd_hash(user_id) if res is not None: - defer.returnValue(res[0]) - defer.returnValue(None) + return res[0] + return None @defer.inlineCallbacks def _find_user_id_and_pwd_hash(self, user_id): @@ -661,7 +661,7 @@ class AuthHandler(BaseHandler): user_id, user_infos.keys(), ) - defer.returnValue(result) + return result def get_supported_login_types(self): """Get a the login types supported for the /login API @@ -722,7 +722,7 @@ class AuthHandler(BaseHandler): known_login_type = True is_valid = yield provider.check_password(qualified_user_id, password) if is_valid: - defer.returnValue((qualified_user_id, None)) + return (qualified_user_id, None) if not hasattr(provider, "get_supported_login_types") or not hasattr( provider, "check_auth" @@ -756,7 +756,7 @@ class AuthHandler(BaseHandler): if result: if isinstance(result, str): result = (result, None) - defer.returnValue(result) + return result if login_type == LoginType.PASSWORD and self.hs.config.password_localdb_enabled: known_login_type = True @@ -766,7 +766,7 @@ class AuthHandler(BaseHandler): ) if canonical_user_id: - defer.returnValue((canonical_user_id, None)) + return (canonical_user_id, None) if not known_login_type: raise SynapseError(400, "Unknown login type %s" % login_type) @@ -814,9 +814,9 @@ class AuthHandler(BaseHandler): if isinstance(result, str): # If it's a str, set callback function to None result = (result, None) - defer.returnValue(result) + return result - defer.returnValue((None, None)) + return (None, None) @defer.inlineCallbacks def _check_local_password(self, user_id, password): @@ -838,7 +838,7 @@ class AuthHandler(BaseHandler): """ lookupres = yield self._find_user_id_and_pwd_hash(user_id) if not lookupres: - defer.returnValue(None) + return None (user_id, password_hash) = lookupres # If the password hash is None, the account has likely been deactivated @@ -850,8 +850,8 @@ class AuthHandler(BaseHandler): result = yield self.validate_hash(password, password_hash) if not result: logger.warn("Failed password login for user %s", user_id) - defer.returnValue(None) - defer.returnValue(user_id) + return None + return user_id @defer.inlineCallbacks def validate_short_term_login_token_and_get_user_id(self, login_token): @@ -860,12 +860,12 @@ class AuthHandler(BaseHandler): try: macaroon = pymacaroons.Macaroon.deserialize(login_token) user_id = auth_api.get_user_id_from_macaroon(macaroon) - auth_api.validate_macaroon(macaroon, "login", True, user_id) + auth_api.validate_macaroon(macaroon, "login", user_id) except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) self.ratelimit_login_per_account(user_id) yield self.auth.check_auth_blocking(user_id) - defer.returnValue(user_id) + return user_id @defer.inlineCallbacks def delete_access_token(self, access_token): @@ -976,7 +976,7 @@ class AuthHandler(BaseHandler): ) yield self.store.user_delete_threepid(user_id, medium, address) - defer.returnValue(result) + return result def _save_session(self, session): # TODO: Persistent storage diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index e8f9da6098..5f804d1f13 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -125,7 +125,7 @@ class DeactivateAccountHandler(BaseHandler): # Mark the user as deactivated. yield self.store.set_user_deactivated_status(user_id, True) - defer.returnValue(identity_server_supports_unbinding) + return identity_server_supports_unbinding def _start_user_parting(self): """ diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 99e8413092..5c1cf83c9d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -64,7 +64,7 @@ class DeviceWorkerHandler(BaseHandler): for device in devices: _update_device_from_client_ips(device, ips) - defer.returnValue(devices) + return devices @defer.inlineCallbacks def get_device(self, user_id, device_id): @@ -85,7 +85,7 @@ class DeviceWorkerHandler(BaseHandler): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - defer.returnValue(device) + return device @measure_func("device.get_user_ids_changed") @defer.inlineCallbacks @@ -200,9 +200,7 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - defer.returnValue( - {"changed": list(possibly_joined), "left": list(possibly_left)} - ) + return {"changed": list(possibly_joined), "left": list(possibly_left)} class DeviceHandler(DeviceWorkerHandler): @@ -211,12 +209,12 @@ class DeviceHandler(DeviceWorkerHandler): self.federation_sender = hs.get_federation_sender() - self._edu_updater = DeviceListEduUpdater(hs, self) + self.device_list_updater = DeviceListUpdater(hs, self) federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler( - "m.device_list_update", self._edu_updater.incoming_device_list_update + "m.device_list_update", self.device_list_updater.incoming_device_list_update ) federation_registry.register_query_handler( "user_devices", self.on_federation_query_user_devices @@ -250,7 +248,7 @@ class DeviceHandler(DeviceWorkerHandler): ) if new_device: yield self.notify_device_update(user_id, [device_id]) - defer.returnValue(device_id) + return device_id # if the device id is not specified, we'll autogen one, but loop a few # times in case of a clash. @@ -264,7 +262,7 @@ class DeviceHandler(DeviceWorkerHandler): ) if new_device: yield self.notify_device_update(user_id, [device_id]) - defer.returnValue(device_id) + return device_id attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") @@ -411,9 +409,7 @@ class DeviceHandler(DeviceWorkerHandler): @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - defer.returnValue( - {"user_id": user_id, "stream_id": stream_id, "devices": devices} - ) + return {"user_id": user_id, "stream_id": stream_id, "devices": devices} @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -430,7 +426,7 @@ def _update_device_from_client_ips(device, client_ips): device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")}) -class DeviceListEduUpdater(object): +class DeviceListUpdater(object): "Handles incoming device list updates from federation and updates the DB" def __init__(self, hs, device_handler): @@ -523,75 +519,7 @@ class DeviceListEduUpdater(object): logger.debug("Need to re-sync devices for %r? %r", user_id, resync) if resync: - # Fetch all devices for the user. - origin = get_domain_from_id(user_id) - try: - result = yield self.federation.query_user_devices(origin, user_id) - except ( - NotRetryingDestination, - RequestSendFailed, - HttpResponseException, - ): - # TODO: Remember that we are now out of sync and try again - # later - logger.warn("Failed to handle device list update for %s", user_id) - # We abort on exceptions rather than accepting the update - # as otherwise synapse will 'forget' that its device list - # is out of date. If we bail then we will retry the resync - # next time we get a device list update for this user_id. - # This makes it more likely that the device lists will - # eventually become consistent. - return - except FederationDeniedError as e: - logger.info(e) - return - except Exception: - # TODO: Remember that we are now out of sync and try again - # later - logger.exception( - "Failed to handle device list update for %s", user_id - ) - return - - stream_id = result["stream_id"] - devices = result["devices"] - - # If the remote server has more than ~1000 devices for this user - # we assume that something is going horribly wrong (e.g. a bot - # that logs in and creates a new device every time it tries to - # send a message). Maintaining lots of devices per user in the - # cache can cause serious performance issues as if this request - # takes more than 60s to complete, internal replication from the - # inbound federation worker to the synapse master may time out - # causing the inbound federation to fail and causing the remote - # server to retry, causing a DoS. So in this scenario we give - # up on storing the total list of devices and only handle the - # delta instead. - if len(devices) > 1000: - logger.warn( - "Ignoring device list snapshot for %s as it has >1K devs (%d)", - user_id, - len(devices), - ) - devices = [] - - for device in devices: - logger.debug( - "Handling resync update %r/%r, ID: %r", - user_id, - device["device_id"], - stream_id, - ) - - yield self.store.update_remote_device_list_cache( - user_id, devices, stream_id - ) - device_ids = [device["device_id"] for device in devices] - yield self.device_handler.notify_device_update(user_id, device_ids) - - # We clobber the seen updates since we've re-synced from a given - # point. - self._seen_updates[user_id] = set([stream_id]) + yield self.user_device_resync(user_id) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -623,7 +551,7 @@ class DeviceListEduUpdater(object): for _, stream_id, prev_ids, _ in updates: if not prev_ids: # We always do a resync if there are no previous IDs - defer.returnValue(True) + return True for prev_id in prev_ids: if prev_id == extremity: @@ -633,8 +561,82 @@ class DeviceListEduUpdater(object): elif prev_id in stream_id_in_updates: continue else: - defer.returnValue(True) + return True stream_id_in_updates.add(stream_id) - defer.returnValue(False) + return False + + @defer.inlineCallbacks + def user_device_resync(self, user_id): + """Fetches all devices for a user and updates the device cache with them. + + Args: + user_id (str): The user's id whose device_list will be updated. + Returns: + Deferred[dict]: a dict with device info as under the "devices" in the result of this + request: + https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid + """ + # Fetch all devices for the user. + origin = get_domain_from_id(user_id) + try: + result = yield self.federation.query_user_devices(origin, user_id) + except (NotRetryingDestination, RequestSendFailed, HttpResponseException): + # TODO: Remember that we are now out of sync and try again + # later + logger.warn("Failed to handle device list update for %s", user_id) + # We abort on exceptions rather than accepting the update + # as otherwise synapse will 'forget' that its device list + # is out of date. If we bail then we will retry the resync + # next time we get a device list update for this user_id. + # This makes it more likely that the device lists will + # eventually become consistent. + return + except FederationDeniedError as e: + logger.info(e) + return + except Exception: + # TODO: Remember that we are now out of sync and try again + # later + logger.exception("Failed to handle device list update for %s", user_id) + return + stream_id = result["stream_id"] + devices = result["devices"] + + # If the remote server has more than ~1000 devices for this user + # we assume that something is going horribly wrong (e.g. a bot + # that logs in and creates a new device every time it tries to + # send a message). Maintaining lots of devices per user in the + # cache can cause serious performance issues as if this request + # takes more than 60s to complete, internal replication from the + # inbound federation worker to the synapse master may time out + # causing the inbound federation to fail and causing the remote + # server to retry, causing a DoS. So in this scenario we give + # up on storing the total list of devices and only handle the + # delta instead. + if len(devices) > 1000: + logger.warn( + "Ignoring device list snapshot for %s as it has >1K devs (%d)", + user_id, + len(devices), + ) + devices = [] + + for device in devices: + logger.debug( + "Handling resync update %r/%r, ID: %r", + user_id, + device["device_id"], + stream_id, + ) + + yield self.store.update_remote_device_list_cache(user_id, devices, stream_id) + device_ids = [device["device_id"] for device in devices] + yield self.device_handler.notify_device_update(user_id, device_ids) + + # We clobber the seen updates since we've re-synced from a given + # point. + self._seen_updates[user_id] = set([stream_id]) + + defer.returnValue(result) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 42d5b3db30..526379c6f7 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -210,7 +210,7 @@ class DirectoryHandler(BaseHandler): except AuthError as e: logger.info("Failed to update alias events: %s", e) - defer.returnValue(room_id) + return room_id @defer.inlineCallbacks def delete_appservice_association(self, service, room_alias): @@ -229,7 +229,7 @@ class DirectoryHandler(BaseHandler): room_id = yield self.store.delete_room_alias(room_alias) - defer.returnValue(room_id) + return room_id @defer.inlineCallbacks def get_association(self, room_alias): @@ -277,8 +277,7 @@ class DirectoryHandler(BaseHandler): else: servers = list(servers) - defer.returnValue({"room_id": room_id, "servers": servers}) - return + return {"room_id": room_id, "servers": servers} @defer.inlineCallbacks def on_directory_query(self, args): @@ -289,7 +288,7 @@ class DirectoryHandler(BaseHandler): result = yield self.get_association_from_room_alias(room_alias) if result is not None: - defer.returnValue({"room_id": result.room_id, "servers": result.servers}) + return {"room_id": result.room_id, "servers": result.servers} else: raise SynapseError( 404, @@ -342,7 +341,7 @@ class DirectoryHandler(BaseHandler): # Query AS to see if it exists as_handler = self.appservice_handler result = yield as_handler.query_room_alias_exists(room_alias) - defer.returnValue(result) + return result def can_modify_alias(self, alias, user_id=None): # Any application service "interested" in an alias they are regexing on @@ -369,10 +368,10 @@ class DirectoryHandler(BaseHandler): creator = yield self.store.get_room_alias_creator(alias.to_string()) if creator is not None and creator == user_id: - defer.returnValue(True) + return True is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) - defer.returnValue(is_admin) + return is_admin @defer.inlineCallbacks def edit_published_room_list(self, requester, room_id, visibility): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index fdfe8611b6..1f90b0d278 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -25,6 +25,7 @@ from twisted.internet import defer from synapse.api.errors import CodeMessageException, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.types import UserID, get_domain_from_id +from synapse.util import unwrapFirstError from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -65,6 +66,7 @@ class E2eKeysHandler(object): } } """ + device_keys_query = query_body.get("device_keys", {}) # separate users by domain. @@ -121,7 +123,56 @@ class E2eKeysHandler(object): # Now fetch any devices that we don't have in our cache @defer.inlineCallbacks def do_remote_query(destination): + """This is called when we are querying the device list of a user on + a remote homeserver and their device list is not in the device list + cache. If we share a room with this user and we're not querying for + specific user we will update the cache + with their device list.""" + destination_query = remote_queries_not_in_cache[destination] + + # We first consider whether we wish to update the device list cache with + # the users device list. We want to track a user's devices when the + # authenticated user shares a room with the queried user and the query + # has not specified a particular device. + # If we update the cache for the queried user we remove them from further + # queries. We use the more efficient batched query_client_keys for all + # remaining users + user_ids_updated = [] + for (user_id, device_list) in destination_query.items(): + if user_id in user_ids_updated: + continue + + if device_list: + continue + + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: + continue + + # We've decided we're sharing a room with this user and should + # probably be tracking their device lists. However, we haven't + # done an initial sync on the device list so we do it now. + try: + user_devices = yield self.device_handler.device_list_updater.user_device_resync( + user_id + ) + user_devices = user_devices["devices"] + for device in user_devices: + results[user_id] = {device["device_id"]: device["keys"]} + user_ids_updated.append(user_id) + except Exception as e: + failures[destination] = _exception_to_failure(e) + + if len(destination_query) == len(user_ids_updated): + # We've updated all the users in the query and we do not need to + # make any further remote calls. + return + + # Remove all the users from the query which we have updated + for user_id in user_ids_updated: + destination_query.pop(user_id) + try: remote_result = yield self.federation.query_client_keys( destination, {"device_keys": destination_query}, timeout=timeout @@ -132,7 +183,8 @@ class E2eKeysHandler(object): results[user_id] = keys except Exception as e: - failures[destination] = _exception_to_failure(e) + failure = _exception_to_failure(e) + failures[destination] = failure yield make_deferred_yieldable( defer.gatherResults( @@ -141,10 +193,10 @@ class E2eKeysHandler(object): for destination in remote_queries_not_in_cache ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) ) - defer.returnValue({"device_keys": results, "failures": failures}) + return {"device_keys": results, "failures": failures} @defer.inlineCallbacks def query_local_devices(self, query): @@ -189,7 +241,7 @@ class E2eKeysHandler(object): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r - defer.returnValue(result_dict) + return result_dict @defer.inlineCallbacks def on_federation_query_client_keys(self, query_body): @@ -197,7 +249,7 @@ class E2eKeysHandler(object): """ device_keys_query = query_body.get("device_keys", {}) res = yield self.query_local_devices(device_keys_query) - defer.returnValue({"device_keys": res}) + return {"device_keys": res} @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): @@ -234,8 +286,10 @@ class E2eKeysHandler(object): for user_id, keys in remote_result["one_time_keys"].items(): if user_id in device_keys: json_result[user_id] = keys + except Exception as e: - failures[destination] = _exception_to_failure(e) + failure = _exception_to_failure(e) + failures[destination] = failure yield make_deferred_yieldable( defer.gatherResults( @@ -259,10 +313,11 @@ class E2eKeysHandler(object): ), ) - defer.returnValue({"one_time_keys": json_result, "failures": failures}) + return {"one_time_keys": json_result, "failures": failures} @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. @@ -297,7 +352,7 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - defer.returnValue({"one_time_key_counts": result}) + return {"one_time_key_counts": result} @defer.inlineCallbacks def _upload_one_time_keys_for_user( diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index ebd807bca6..41b871fc59 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -84,7 +84,7 @@ class E2eRoomKeysHandler(object): user_id, version, room_id, session_id ) - defer.returnValue(results) + return results @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): @@ -262,7 +262,7 @@ class E2eRoomKeysHandler(object): new_version = yield self.store.create_e2e_room_keys_version( user_id, version_info ) - defer.returnValue(new_version) + return new_version @defer.inlineCallbacks def get_version_info(self, user_id, version=None): @@ -292,7 +292,7 @@ class E2eRoomKeysHandler(object): raise NotFoundError("Unknown backup version") else: raise - defer.returnValue(res) + return res @defer.inlineCallbacks def delete_version(self, user_id, version=None): @@ -350,4 +350,4 @@ class E2eRoomKeysHandler(object): user_id, version, version_info ) - defer.returnValue({}) + return {} diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 6a38328af3..2f1f10a9af 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -143,7 +143,7 @@ class EventStreamHandler(BaseHandler): "end": tokens[1].to_string(), } - defer.returnValue(chunk) + return chunk class EventHandler(BaseHandler): @@ -166,7 +166,7 @@ class EventHandler(BaseHandler): event = yield self.store.get_event(event_id, check_room_id=room_id) if not event: - defer.returnValue(None) + return None return users = yield self.store.get_users_in_room(event.room_id) @@ -179,4 +179,4 @@ class EventHandler(BaseHandler): if not filtered: raise AuthError(403, "You don't have permission to access that event.") - defer.returnValue(event) + return event diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 30b69af82c..c86903b98b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -210,7 +210,7 @@ class FederationHandler(BaseHandler): event_id, origin, ) - defer.returnValue(None) + return None state = None auth_chain = [] @@ -676,7 +676,7 @@ class FederationHandler(BaseHandler): events = [e for e in events if e.event_id not in seen_events] if not events: - defer.returnValue([]) + return [] event_map = {e.event_id: e for e in events} @@ -838,7 +838,7 @@ class FederationHandler(BaseHandler): # TODO: We can probably do something more clever here. yield self._handle_new_event(dest, event, backfilled=True) - defer.returnValue(events) + return events @defer.inlineCallbacks def maybe_backfill(self, room_id, current_depth): @@ -894,7 +894,7 @@ class FederationHandler(BaseHandler): ) if not filtered_extremities: - defer.returnValue(False) + return False # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1])) @@ -965,7 +965,7 @@ class FederationHandler(BaseHandler): # If this succeeded then we probably already have the # appropriate stuff. # TODO: We can probably do something more intelligent here. - defer.returnValue(True) + return True except SynapseError as e: logger.info("Failed to backfill from %s because %s", dom, e) continue @@ -978,6 +978,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(str(e)) continue + except RequestSendFailed as e: + logger.info("Falied to get backfill from %s because %s", dom, e) + continue except FederationDeniedError as e: logger.info(e) continue @@ -985,11 +988,11 @@ class FederationHandler(BaseHandler): logger.exception("Failed to backfill from %s because %s", dom, e) continue - defer.returnValue(False) + return False success = yield try_backfill(likely_domains) if success: - defer.returnValue(True) + return True # Huh, well *those* domains didn't work out. Lets try some domains # from the time. @@ -1031,11 +1034,11 @@ class FederationHandler(BaseHandler): [dom for dom, _ in likely_domains if dom not in tried_domains] ) if success: - defer.returnValue(True) + return True tried_domains.update(dom for dom, _ in likely_domains) - defer.returnValue(False) + return False def _sanity_check_event(self, ev): """ @@ -1082,7 +1085,7 @@ class FederationHandler(BaseHandler): pdu=event, ) - defer.returnValue(pdu) + return pdu @defer.inlineCallbacks def on_event_auth(self, event_id): @@ -1090,7 +1093,7 @@ class FederationHandler(BaseHandler): auth = yield self.store.get_auth_chain( [auth_id for auth_id in event.auth_event_ids()], include_given=True ) - defer.returnValue([e for e in auth]) + return [e for e in auth] @log_function @defer.inlineCallbacks @@ -1177,7 +1180,7 @@ class FederationHandler(BaseHandler): run_in_background(self._handle_queued_pdus, room_queue) - defer.returnValue(True) + return True @defer.inlineCallbacks def _handle_queued_pdus(self, room_queue): @@ -1264,7 +1267,7 @@ class FederationHandler(BaseHandler): room_version, event, context, do_sig_check=False ) - defer.returnValue(event) + return event @defer.inlineCallbacks @log_function @@ -1325,7 +1328,7 @@ class FederationHandler(BaseHandler): state = yield self.store.get_events(list(prev_state_ids.values())) - defer.returnValue({"state": list(state.values()), "auth_chain": auth_chain}) + return {"state": list(state.values()), "auth_chain": auth_chain} @defer.inlineCallbacks def on_invite_request(self, origin, pdu): @@ -1381,7 +1384,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) - defer.returnValue(event) + return event @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): @@ -1406,7 +1409,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) - defer.returnValue(event) + return event @defer.inlineCallbacks def _make_and_verify_event( @@ -1424,7 +1427,7 @@ class FederationHandler(BaseHandler): assert event.user_id == user_id assert event.state_key == user_id assert event.room_id == room_id - defer.returnValue((origin, event, format_ver)) + return (origin, event, format_ver) @defer.inlineCallbacks @log_function @@ -1484,7 +1487,7 @@ class FederationHandler(BaseHandler): logger.warn("Failed to create new leave %r because %s", event, e) raise e - defer.returnValue(event) + return event @defer.inlineCallbacks @log_function @@ -1517,7 +1520,7 @@ class FederationHandler(BaseHandler): event.signatures, ) - defer.returnValue(None) + return None @defer.inlineCallbacks def get_state_for_pdu(self, room_id, event_id): @@ -1545,9 +1548,9 @@ class FederationHandler(BaseHandler): del results[(event.type, event.state_key)] res = list(results.values()) - defer.returnValue(res) + return res else: - defer.returnValue([]) + return [] @defer.inlineCallbacks def get_state_ids_for_pdu(self, room_id, event_id): @@ -1572,9 +1575,9 @@ class FederationHandler(BaseHandler): else: results.pop((event.type, event.state_key), None) - defer.returnValue(list(results.values())) + return list(results.values()) else: - defer.returnValue([]) + return [] @defer.inlineCallbacks @log_function @@ -1587,7 +1590,7 @@ class FederationHandler(BaseHandler): events = yield filter_events_for_server(self.store, origin, events) - defer.returnValue(events) + return events @defer.inlineCallbacks @log_function @@ -1617,9 +1620,9 @@ class FederationHandler(BaseHandler): events = yield filter_events_for_server(self.store, origin, [event]) event = events[0] - defer.returnValue(event) + return event else: - defer.returnValue(None) + return None def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) @@ -1651,7 +1654,7 @@ class FederationHandler(BaseHandler): self.store.remove_push_actions_from_staging, event.event_id ) - defer.returnValue(context) + return context @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): @@ -1674,7 +1677,7 @@ class FederationHandler(BaseHandler): auth_events=ev_info.get("auth_events"), backfilled=backfilled, ) - defer.returnValue(res) + return res contexts = yield make_deferred_yieldable( defer.gatherResults( @@ -1833,7 +1836,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) - defer.returnValue(context) + return context @defer.inlineCallbacks def _check_for_soft_fail(self, event, state, backfilled): @@ -1952,7 +1955,7 @@ class FederationHandler(BaseHandler): logger.debug("on_query_auth returning: %s", ret) - defer.returnValue(ret) + return ret @defer.inlineCallbacks def on_get_missing_events( @@ -1975,7 +1978,7 @@ class FederationHandler(BaseHandler): self.store, origin, missing_events ) - defer.returnValue(missing_events) + return missing_events @defer.inlineCallbacks @log_function @@ -2451,16 +2454,14 @@ class FederationHandler(BaseHandler): logger.debug("construct_auth_difference returning") - defer.returnValue( - { - "auth_chain": local_auth, - "rejects": { - e.event_id: {"reason": reason_map[e.event_id], "proof": None} - for e in base_remote_rejected - }, - "missing": [e.event_id for e in missing_locals], - } - ) + return { + "auth_chain": local_auth, + "rejects": { + e.event_id: {"reason": reason_map[e.event_id], "proof": None} + for e in base_remote_rejected + }, + "missing": [e.event_id for e in missing_locals], + } @defer.inlineCallbacks @log_function @@ -2608,7 +2609,7 @@ class FederationHandler(BaseHandler): builder=builder ) EventValidator().validate_new(event) - defer.returnValue((event, context)) + return (event, context) @defer.inlineCallbacks def _check_signature(self, event, context): @@ -2798,3 +2799,28 @@ class FederationHandler(BaseHandler): ) else: return user_joined_room(self.distributor, user, room_id) + + @defer.inlineCallbacks + def get_room_complexity(self, remote_room_hosts, room_id): + """ + Fetch the complexity of a remote room over federation. + + Args: + remote_room_hosts (list[str]): The remote servers to ask. + room_id (str): The room ID to ask about. + + Returns: + Deferred[dict] or Deferred[None]: Dict contains the complexity + metric versions, while None means we could not fetch the complexity. + """ + + for host in remote_room_hosts: + res = yield self.federation_client.get_room_complexity(host, room_id) + + # We got a result, return it. + if res: + defer.returnValue(res) + + # We fell off the bottom, couldn't get the complexity from anyone. Oh + # well. + defer.returnValue(None) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7da63bb643..46eb9ee88b 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -126,9 +126,12 @@ class GroupsLocalHandler(object): group_id, requester_user_id ) else: - res = yield self.transport_client.get_group_summary( - get_domain_from_id(group_id), group_id, requester_user_id - ) + try: + res = yield self.transport_client.get_group_summary( + get_domain_from_id(group_id), group_id, requester_user_id + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") group_server_name = get_domain_from_id(group_id) @@ -162,7 +165,7 @@ class GroupsLocalHandler(object): res.setdefault("user", {})["is_publicised"] = is_publicised - defer.returnValue(res) + return res @defer.inlineCallbacks def create_group(self, group_id, user_id, content): @@ -183,9 +186,12 @@ class GroupsLocalHandler(object): content["user_profile"] = yield self.profile_handler.get_profile(user_id) - res = yield self.transport_client.create_group( - get_domain_from_id(group_id), group_id, user_id, content - ) + try: + res = yield self.transport_client.create_group( + get_domain_from_id(group_id), group_id, user_id, content + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") remote_attestation = res["attestation"] yield self.attestations.verify_attestation( @@ -207,7 +213,7 @@ class GroupsLocalHandler(object): ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - defer.returnValue(res) + return res @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -217,13 +223,16 @@ class GroupsLocalHandler(object): res = yield self.groups_server_handler.get_users_in_group( group_id, requester_user_id ) - defer.returnValue(res) + return res group_server_name = get_domain_from_id(group_id) - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id - ) + try: + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") chunk = res["chunk"] valid_entries = [] @@ -244,7 +253,7 @@ class GroupsLocalHandler(object): res["chunk"] = valid_entries - defer.returnValue(res) + return res @defer.inlineCallbacks def join_group(self, group_id, user_id, content): @@ -258,9 +267,12 @@ class GroupsLocalHandler(object): local_attestation = self.attestations.create_attestation(group_id, user_id) content["attestation"] = local_attestation - res = yield self.transport_client.join_group( - get_domain_from_id(group_id), group_id, user_id, content - ) + try: + res = yield self.transport_client.join_group( + get_domain_from_id(group_id), group_id, user_id, content + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") remote_attestation = res["attestation"] @@ -285,7 +297,7 @@ class GroupsLocalHandler(object): ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - defer.returnValue({}) + return {} @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): @@ -299,9 +311,12 @@ class GroupsLocalHandler(object): local_attestation = self.attestations.create_attestation(group_id, user_id) content["attestation"] = local_attestation - res = yield self.transport_client.accept_group_invite( - get_domain_from_id(group_id), group_id, user_id, content - ) + try: + res = yield self.transport_client.accept_group_invite( + get_domain_from_id(group_id), group_id, user_id, content + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") remote_attestation = res["attestation"] @@ -326,7 +341,7 @@ class GroupsLocalHandler(object): ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - defer.returnValue({}) + return {} @defer.inlineCallbacks def invite(self, group_id, user_id, requester_user_id, config): @@ -338,15 +353,18 @@ class GroupsLocalHandler(object): group_id, user_id, requester_user_id, content ) else: - res = yield self.transport_client.invite_to_group( - get_domain_from_id(group_id), - group_id, - user_id, - requester_user_id, - content, - ) + try: + res = yield self.transport_client.invite_to_group( + get_domain_from_id(group_id), + group_id, + user_id, + requester_user_id, + content, + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") - defer.returnValue(res) + return res @defer.inlineCallbacks def on_invite(self, group_id, user_id, content): @@ -377,7 +395,7 @@ class GroupsLocalHandler(object): logger.warn("No profile for user %s: %s", user_id, e) user_profile = {} - defer.returnValue({"state": "invite", "user_profile": user_profile}) + return {"state": "invite", "user_profile": user_profile} @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): @@ -398,15 +416,18 @@ class GroupsLocalHandler(object): ) else: content["requester_user_id"] = requester_user_id - res = yield self.transport_client.remove_user_from_group( - get_domain_from_id(group_id), - group_id, - requester_user_id, - user_id, - content, - ) + try: + res = yield self.transport_client.remove_user_from_group( + get_domain_from_id(group_id), + group_id, + requester_user_id, + user_id, + content, + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") - defer.returnValue(res) + return res @defer.inlineCallbacks def user_removed_from_group(self, group_id, user_id, content): @@ -421,7 +442,7 @@ class GroupsLocalHandler(object): @defer.inlineCallbacks def get_joined_groups(self, user_id): group_ids = yield self.store.get_joined_groups(user_id) - defer.returnValue({"groups": group_ids}) + return {"groups": group_ids} @defer.inlineCallbacks def get_publicised_groups_for_user(self, user_id): @@ -433,14 +454,18 @@ class GroupsLocalHandler(object): for app_service in self.store.get_app_services(): result.extend(app_service.get_groups_for_user(user_id)) - defer.returnValue({"groups": result}) + return {"groups": result} else: - bulk_result = yield self.transport_client.bulk_get_publicised_groups( - get_domain_from_id(user_id), [user_id] - ) + try: + bulk_result = yield self.transport_client.bulk_get_publicised_groups( + get_domain_from_id(user_id), [user_id] + ) + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + result = bulk_result.get("users", {}).get(user_id) # TODO: Verify attestations - defer.returnValue({"groups": result}) + return {"groups": result} @defer.inlineCallbacks def bulk_get_publicised_groups(self, user_ids, proxy=True): @@ -475,4 +500,4 @@ class GroupsLocalHandler(object): for app_service in self.store.get_app_services(): results[uid].extend(app_service.get_groups_for_user(uid)) - defer.returnValue({"users": results}) + return {"users": results} diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 546d6169e9..d199521b58 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -82,7 +82,7 @@ class IdentityHandler(BaseHandler): "%s is not a trusted ID server: rejecting 3pid " + "credentials", id_server, ) - defer.returnValue(None) + return None try: data = yield self.http_client.get_json( @@ -95,8 +95,8 @@ class IdentityHandler(BaseHandler): raise e.to_synapse_error() if "medium" in data: - defer.returnValue(data) - defer.returnValue(None) + return data + return None @defer.inlineCallbacks def bind_threepid(self, creds, mxid): @@ -133,7 +133,7 @@ class IdentityHandler(BaseHandler): ) except CodeMessageException as e: data = json.loads(e.msg) # XXX WAT? - defer.returnValue(data) + return data @defer.inlineCallbacks def try_unbind_threepid(self, mxid, threepid): @@ -161,7 +161,7 @@ class IdentityHandler(BaseHandler): # We don't know where to unbind, so we don't have a choice but to return if not id_servers: - defer.returnValue(False) + return False changed = True for id_server in id_servers: @@ -169,7 +169,7 @@ class IdentityHandler(BaseHandler): mxid, threepid, id_server ) - defer.returnValue(changed) + return changed @defer.inlineCallbacks def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server): @@ -224,7 +224,7 @@ class IdentityHandler(BaseHandler): id_server=id_server, ) - defer.returnValue(changed) + return changed @defer.inlineCallbacks def requestEmailToken( @@ -250,7 +250,7 @@ class IdentityHandler(BaseHandler): % (id_server, "/_matrix/identity/api/v1/validate/email/requestToken"), params, ) - defer.returnValue(data) + return data except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() @@ -278,7 +278,7 @@ class IdentityHandler(BaseHandler): % (id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken"), params, ) - defer.returnValue(data) + return data except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 54c966c8a6..42d6650ed9 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -250,7 +250,7 @@ class InitialSyncHandler(BaseHandler): "end": now_token.to_string(), } - defer.returnValue(ret) + return ret @defer.inlineCallbacks def room_initial_sync(self, requester, room_id, pagin_config=None): @@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler): result["account_data"] = account_data_events - defer.returnValue(result) + return result @defer.inlineCallbacks def _room_initial_sync_parted( @@ -330,28 +330,24 @@ class InitialSyncHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue( - { - "membership": membership, - "room_id": room_id, - "messages": { - "chunk": ( - yield self._event_serializer.serialize_events( - messages, time_now - ) - ), - "start": start_token.to_string(), - "end": end_token.to_string(), - }, - "state": ( - yield self._event_serializer.serialize_events( - room_state.values(), time_now - ) + return { + "membership": membership, + "room_id": room_id, + "messages": { + "chunk": ( + yield self._event_serializer.serialize_events(messages, time_now) ), - "presence": [], - "receipts": [], - } - ) + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": ( + yield self._event_serializer.serialize_events( + room_state.values(), time_now + ) + ), + "presence": [], + "receipts": [], + } @defer.inlineCallbacks def _room_initial_sync_joined( @@ -384,13 +380,13 @@ class InitialSyncHandler(BaseHandler): def get_presence(): # If presence is disabled, return an empty list if not self.hs.config.use_presence: - defer.returnValue([]) + return [] states = yield presence_handler.get_states( [m.user_id for m in room_members], as_event=True ) - defer.returnValue(states) + return states @defer.inlineCallbacks def get_receipts(): @@ -399,7 +395,7 @@ class InitialSyncHandler(BaseHandler): ) if not receipts: receipts = [] - defer.returnValue(receipts) + return receipts presence, receipts, (messages, token) = yield make_deferred_yieldable( defer.gatherResults( @@ -442,7 +438,7 @@ class InitialSyncHandler(BaseHandler): if not is_peeking: ret["membership"] = membership - defer.returnValue(ret) + return ret @defer.inlineCallbacks def _check_in_room_or_world_readable(self, room_id, user_id): @@ -453,7 +449,7 @@ class InitialSyncHandler(BaseHandler): # * The user is a guest user, and has joined the room # else it will throw. member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) + return (member_event.membership, member_event.event_id) return except AuthError: visibility = yield self.state_handler.get_current_state( @@ -463,7 +459,7 @@ class InitialSyncHandler(BaseHandler): visibility and visibility.content["history_visibility"] == "world_readable" ): - defer.returnValue((Membership.JOIN, None)) + return (Membership.JOIN, None) return raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6d7a987f13..a5e23c4caf 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -87,7 +87,7 @@ class MessageHandler(object): ) data = room_state[membership_event_id].get(key) - defer.returnValue(data) + return data @defer.inlineCallbacks def get_state_events( @@ -174,7 +174,7 @@ class MessageHandler(object): # events, as clients won't use them. bundle_aggregations=False, ) - defer.returnValue(events) + return events @defer.inlineCallbacks def get_joined_members(self, requester, room_id): @@ -213,15 +213,13 @@ class MessageHandler(object): # Loop fell through, AS has no interested users in room raise AuthError(403, "Appservice not in room") - defer.returnValue( - { - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in iteritems(users_with_profile) + return { + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, } - ) + for user_id, profile in iteritems(users_with_profile) + } class EventCreationHandler(object): @@ -380,7 +378,11 @@ class EventCreationHandler(object): # tolerate them in event_auth.check(). prev_state_ids = yield context.get_prev_state_ids(self.store) prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender)) - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + prev_event = ( + yield self.store.get_event(prev_event_id, allow_none=True) + if prev_event_id + else None + ) if not prev_event or prev_event.membership != Membership.JOIN: logger.warning( ( @@ -398,7 +400,7 @@ class EventCreationHandler(object): self.validator.validate_new(event) - defer.returnValue((event, context)) + return (event, context) def _is_exempt_from_privacy_policy(self, builder, requester): """"Determine if an event to be sent is exempt from having to consent @@ -425,9 +427,9 @@ class EventCreationHandler(object): @defer.inlineCallbacks def _is_server_notices_room(self, room_id): if self.config.server_notices_mxid is None: - defer.returnValue(False) + return False user_ids = yield self.store.get_users_in_room(room_id) - defer.returnValue(self.config.server_notices_mxid in user_ids) + return self.config.server_notices_mxid in user_ids @defer.inlineCallbacks def assert_accepted_privacy_policy(self, requester): @@ -507,7 +509,7 @@ class EventCreationHandler(object): event.event_id, prev_state.event_id, ) - defer.returnValue(prev_state) + return prev_state yield self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit @@ -523,6 +525,8 @@ class EventCreationHandler(object): """ prev_state_ids = yield context.get_prev_state_ids(self.store) prev_event_id = prev_state_ids.get((event.type, event.state_key)) + if not prev_event_id: + return prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -531,7 +535,7 @@ class EventCreationHandler(object): prev_content = encode_canonical_json(prev_event.content) next_content = encode_canonical_json(event.content) if prev_content == next_content: - defer.returnValue(prev_event) + return prev_event return @defer.inlineCallbacks @@ -563,7 +567,7 @@ class EventCreationHandler(object): yield self.send_nonmember_event( requester, event, context, ratelimit=ratelimit ) - defer.returnValue(event) + return event @measure_func("create_new_client_event") @defer.inlineCallbacks @@ -626,7 +630,7 @@ class EventCreationHandler(object): logger.debug("Created event %s", event.event_id) - defer.returnValue((event, context)) + return (event, context) @measure_func("handle_new_client_event") @defer.inlineCallbacks @@ -791,7 +795,6 @@ class EventCreationHandler(object): get_prev_content=False, allow_rejected=False, allow_none=True, - check_room_id=event.room_id, ) # we can make some additional checks now if we have the original event. @@ -799,6 +802,9 @@ class EventCreationHandler(object): if original_event.type == EventTypes.Create: raise AuthError(403, "Redacting create events is not permitted") + if original_event.room_id != event.room_id: + raise SynapseError(400, "Cannot redact event from a different room") + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( event, prev_state_ids, for_verification=True diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 20bcfed334..d83aab3f74 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -242,13 +242,11 @@ class PaginationHandler(object): ) if not events: - defer.returnValue( - { - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - ) + return { + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } state = None if event_filter and event_filter.lazy_load_members() and len(events) > 0: @@ -286,4 +284,4 @@ class PaginationHandler(object): ) ) - defer.returnValue(chunk) + return chunk diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6f3537e435..94a9ca0357 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -333,7 +333,7 @@ class PresenceHandler(object): """Checks the presence of users that have timed out and updates as appropriate. """ - logger.info("Handling presence timeouts") + logger.debug("Handling presence timeouts") now = self.clock.time_msec() # Fetch the list of users that *may* have timed out. Things may have @@ -461,7 +461,7 @@ class PresenceHandler(object): if affect_presence: run_in_background(_end) - defer.returnValue(_user_syncing()) + return _user_syncing() def get_currently_syncing_users(self): """Get the set of user ids that are currently syncing on this HS. @@ -556,7 +556,7 @@ class PresenceHandler(object): """Get the current presence state for a user. """ res = yield self.current_state_for_users([user_id]) - defer.returnValue(res[user_id]) + return res[user_id] @defer.inlineCallbacks def current_state_for_users(self, user_ids): @@ -585,7 +585,7 @@ class PresenceHandler(object): states.update(new) self.user_to_current_state.update(new) - defer.returnValue(states) + return states @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -681,7 +681,7 @@ class PresenceHandler(object): def get_state(self, target_user, as_event=False): results = yield self.get_states([target_user.to_string()], as_event=as_event) - defer.returnValue(results[0]) + return results[0] @defer.inlineCallbacks def get_states(self, target_user_ids, as_event=False): @@ -703,17 +703,15 @@ class PresenceHandler(object): now = self.clock.time_msec() if as_event: - defer.returnValue( - [ - { - "type": "m.presence", - "content": format_user_presence_state(state, now), - } - for state in updates - ] - ) + return [ + { + "type": "m.presence", + "content": format_user_presence_state(state, now), + } + for state in updates + ] else: - defer.returnValue(updates) + return updates @defer.inlineCallbacks def set_state(self, target_user, state, ignore_status_msg=False): @@ -757,9 +755,9 @@ class PresenceHandler(object): ) if observer_room_ids & observed_room_ids: - defer.returnValue(True) + return True - defer.returnValue(False) + return False @defer.inlineCallbacks def get_all_presence_updates(self, last_id, current_id): @@ -778,7 +776,7 @@ class PresenceHandler(object): # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. rows = yield self.store.get_all_presence_updates(last_id, current_id) - defer.returnValue(rows) + return rows def notify_new_event(self): """Called when new events have happened. Handles users and servers @@ -1034,7 +1032,7 @@ class PresenceEventSource(object): # # Hence this guard where we just return nothing so that the sync # doesn't return. C.f. #5503. - defer.returnValue(([], max_token)) + return ([], max_token) presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache @@ -1068,17 +1066,11 @@ class PresenceEventSource(object): updates = yield presence.current_state_for_users(user_ids_changed) if include_offline: - defer.returnValue((list(updates.values()), max_token)) + return (list(updates.values()), max_token) else: - defer.returnValue( - ( - [ - s - for s in itervalues(updates) - if s.state != PresenceState.OFFLINE - ], - max_token, - ) + return ( + [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE], + max_token, ) def get_current_key(self): @@ -1107,7 +1099,7 @@ class PresenceEventSource(object): ) users_interested_in.update(user_ids) - defer.returnValue(users_interested_in) + return users_interested_in def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): @@ -1287,7 +1279,7 @@ def get_interested_parties(store, states): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - defer.returnValue((room_ids_to_states, users_to_states)) + return (room_ids_to_states, users_to_states) @defer.inlineCallbacks @@ -1321,4 +1313,4 @@ def get_interested_remotes(store, states, state_handler): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) - defer.returnValue(hosts_and_states) + return hosts_and_states diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index a2388a7091..2cc237e6a5 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -73,7 +73,7 @@ class BaseProfileHandler(BaseHandler): raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) raise - defer.returnValue({"displayname": displayname, "avatar_url": avatar_url}) + return {"displayname": displayname, "avatar_url": avatar_url} else: try: result = yield self.federation.make_query( @@ -82,7 +82,7 @@ class BaseProfileHandler(BaseHandler): args={"user_id": user_id}, ignore_backoff=True, ) - defer.returnValue(result) + return result except RequestSendFailed as e: raise_from(SynapseError(502, "Failed to fetch profile"), e) except HttpResponseException as e: @@ -108,10 +108,10 @@ class BaseProfileHandler(BaseHandler): raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) raise - defer.returnValue({"displayname": displayname, "avatar_url": avatar_url}) + return {"displayname": displayname, "avatar_url": avatar_url} else: profile = yield self.store.get_from_remote_profile_cache(user_id) - defer.returnValue(profile or {}) + return profile or {} @defer.inlineCallbacks def get_displayname(self, target_user): @@ -125,7 +125,7 @@ class BaseProfileHandler(BaseHandler): raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) raise - defer.returnValue(displayname) + return displayname else: try: result = yield self.federation.make_query( @@ -139,7 +139,7 @@ class BaseProfileHandler(BaseHandler): except HttpResponseException as e: raise e.to_synapse_error() - defer.returnValue(result["displayname"]) + return result["displayname"] @defer.inlineCallbacks def set_displayname(self, target_user, requester, new_displayname, by_admin=False): @@ -186,7 +186,7 @@ class BaseProfileHandler(BaseHandler): if e.code == 404: raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) raise - defer.returnValue(avatar_url) + return avatar_url else: try: result = yield self.federation.make_query( @@ -200,7 +200,7 @@ class BaseProfileHandler(BaseHandler): except HttpResponseException as e: raise e.to_synapse_error() - defer.returnValue(result["avatar_url"]) + return result["avatar_url"] @defer.inlineCallbacks def set_avatar_url(self, target_user, requester, new_avatar_url, by_admin=False): @@ -251,7 +251,7 @@ class BaseProfileHandler(BaseHandler): raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) raise - defer.returnValue(response) + return response @defer.inlineCallbacks def _update_join_states(self, requester, target_user): diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e58bf7e360..73973502a4 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -93,7 +93,7 @@ class ReceiptsHandler(BaseHandler): if min_batch_id is None: # no new receipts - defer.returnValue(False) + return False affected_room_ids = list(set([r.room_id for r in receipts])) @@ -103,7 +103,7 @@ class ReceiptsHandler(BaseHandler): min_batch_id, max_batch_id, affected_room_ids ) - defer.returnValue(True) + return True @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, event_id): @@ -133,9 +133,9 @@ class ReceiptsHandler(BaseHandler): ) if not result: - defer.returnValue([]) + return [] - defer.returnValue(result) + return result class ReceiptEventSource(object): @@ -148,13 +148,13 @@ class ReceiptEventSource(object): to_key = yield self.get_current_key() if from_key == to_key: - defer.returnValue(([], to_key)) + return ([], to_key) events = yield self.store.get_linearized_receipts_for_rooms( room_ids, from_key=from_key, to_key=to_key ) - defer.returnValue((events, to_key)) + return (events, to_key) def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() @@ -173,4 +173,4 @@ class ReceiptEventSource(object): room_ids, from_key=from_key, to_key=to_key ) - defer.returnValue((events, to_key)) + return (events, to_key) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index bb7cfd71b9..4631fab94e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -265,7 +265,7 @@ class RegistrationHandler(BaseHandler): # Bind email to new account yield self._register_email_threepid(user_id, threepid_dict, None, False) - defer.returnValue(user_id) + return user_id @defer.inlineCallbacks def _auto_join_rooms(self, user_id): @@ -360,7 +360,7 @@ class RegistrationHandler(BaseHandler): appservice_id=service_id, create_profile_with_displayname=user.localpart, ) - defer.returnValue(user_id) + return user_id @defer.inlineCallbacks def check_recaptcha(self, ip, private_key, challenge, response): @@ -461,7 +461,7 @@ class RegistrationHandler(BaseHandler): id = self._next_generated_user_id self._next_generated_user_id += 1 - defer.returnValue(str(id)) + return str(id) @defer.inlineCallbacks def _validate_captcha(self, ip_addr, private_key, challenge, response): @@ -481,7 +481,7 @@ class RegistrationHandler(BaseHandler): "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" + "error=%s" % lines[1], } - defer.returnValue(json) + return json @defer.inlineCallbacks def _submit_captcha(self, ip_addr, private_key, challenge, response): @@ -497,7 +497,7 @@ class RegistrationHandler(BaseHandler): "response": response, }, ) - defer.returnValue(data) + return data @defer.inlineCallbacks def _join_user_to_room(self, requester, room_identifier): @@ -622,7 +622,7 @@ class RegistrationHandler(BaseHandler): initial_display_name=initial_display_name, is_guest=is_guest, ) - defer.returnValue((r["device_id"], r["access_token"])) + return (r["device_id"], r["access_token"]) valid_until_ms = None if self.session_lifetime is not None: @@ -645,7 +645,7 @@ class RegistrationHandler(BaseHandler): user_id, device_id=device_id, valid_until_ms=valid_until_ms ) - defer.returnValue((device_id, access_token)) + return (device_id, access_token) @defer.inlineCallbacks def post_registration_actions( @@ -798,7 +798,7 @@ class RegistrationHandler(BaseHandler): if ex.errcode == Codes.MISSING_PARAM: # This will only happen if the ID server returns a malformed response logger.info("Can't add incomplete 3pid") - defer.returnValue(None) + return None raise yield self._auth_handler.add_threepid( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index db3f8cb76b..5caa90c3b7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -128,7 +128,7 @@ class RoomCreationHandler(BaseHandler): old_room_id, new_version, # args for _upgrade_room ) - defer.returnValue(ret) + return ret @defer.inlineCallbacks def _upgrade_room(self, requester, old_room_id, new_version): @@ -193,7 +193,7 @@ class RoomCreationHandler(BaseHandler): requester, old_room_id, new_room_id, old_room_state ) - defer.returnValue(new_room_id) + return new_room_id @defer.inlineCallbacks def _update_upgraded_room_pls( @@ -671,7 +671,7 @@ class RoomCreationHandler(BaseHandler): result["room_alias"] = room_alias.to_string() yield directory_handler.send_room_alias_update_event(requester, room_id) - defer.returnValue(result) + return result @defer.inlineCallbacks def _send_events_for_new_room( @@ -796,7 +796,7 @@ class RoomCreationHandler(BaseHandler): room_creator_user_id=creator_id, is_public=is_public, ) - defer.returnValue(gen_room_id) + return gen_room_id except StoreError: attempts += 1 raise StoreError(500, "Couldn't generate a room ID.") @@ -839,7 +839,7 @@ class RoomContextHandler(object): event_id, get_prev_content=True, allow_none=True ) if not event: - defer.returnValue(None) + return None return filtered = yield (filter_evts([event])) @@ -890,7 +890,7 @@ class RoomContextHandler(object): results["end"] = token.copy_and_replace("room_key", results["end"]).to_string() - defer.returnValue(results) + return results class RoomEventSource(object): @@ -941,7 +941,7 @@ class RoomEventSource(object): else: end_key = to_key - defer.returnValue((events, end_key)) + return (events, end_key) def get_current_key(self): return self.store.get_room_events_max_id() @@ -959,4 +959,4 @@ class RoomEventSource(object): limit=config.limit, ) - defer.returnValue((events, next_key)) + return (events, next_key) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index aae696a7e8..e9094ad02b 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -325,7 +325,7 @@ class RoomListHandler(BaseHandler): current_limit=since_token.current_limit - 1, ).to_token() - defer.returnValue(results) + return results @defer.inlineCallbacks def _append_room_entry_to_chunk( @@ -420,7 +420,7 @@ class RoomListHandler(BaseHandler): if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) if not allow_private and join_rule and join_rule != JoinRules.PUBLIC: - defer.returnValue(None) + return None # Return whether this room is open to federation users or not create_event = current_state.get((EventTypes.Create, "")) @@ -469,7 +469,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - defer.returnValue(result) + return result @defer.inlineCallbacks def get_remote_public_room_list( @@ -482,7 +482,7 @@ class RoomListHandler(BaseHandler): third_party_instance_id=None, ): if not self.enable_room_list_search: - defer.returnValue({"chunk": [], "total_room_count_estimate": 0}) + return {"chunk": [], "total_room_count_estimate": 0} if search_filter: # We currently don't support searching across federation, so we have @@ -507,7 +507,7 @@ class RoomListHandler(BaseHandler): ] } - defer.returnValue(res) + return res def _get_remote_list_cached( self, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e0196ef83e..249a6d9c5d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,8 +26,7 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer -import synapse.server -import synapse.types +from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError from synapse.types import RoomID, UserID @@ -191,7 +190,7 @@ class RoomMemberHandler(object): ) if duplicate is not None: # Discard the new event since this membership change is a no-op. - defer.returnValue(duplicate) + return duplicate yield self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target], ratelimit=ratelimit @@ -233,7 +232,7 @@ class RoomMemberHandler(object): if prev_member_event.membership == Membership.JOIN: yield self._user_left_room(target, room_id) - defer.returnValue(event) + return event @defer.inlineCallbacks def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id): @@ -303,7 +302,7 @@ class RoomMemberHandler(object): require_consent=require_consent, ) - defer.returnValue(result) + return result @defer.inlineCallbacks def _update_membership( @@ -423,7 +422,7 @@ class RoomMemberHandler(object): same_membership = old_membership == effective_membership_state same_sender = requester.user.to_string() == old_state.sender if same_sender and same_membership and same_content: - defer.returnValue(old_state) + return old_state if old_membership in ["ban", "leave"] and action == "kick": raise AuthError(403, "The target user is not in the room") @@ -473,7 +472,7 @@ class RoomMemberHandler(object): ret = yield self._remote_join( requester, remote_room_hosts, room_id, target, content ) - defer.returnValue(ret) + return ret elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: @@ -495,7 +494,7 @@ class RoomMemberHandler(object): res = yield self._remote_reject_invite( requester, remote_room_hosts, room_id, target ) - defer.returnValue(res) + return res res = yield self._local_membership_update( requester=requester, @@ -508,7 +507,7 @@ class RoomMemberHandler(object): content=content, require_consent=require_consent, ) - defer.returnValue(res) + return res @defer.inlineCallbacks def send_membership_event( @@ -543,7 +542,7 @@ class RoomMemberHandler(object): ), "Sender (%s) must be same as requester (%s)" % (sender, requester.user) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: - requester = synapse.types.create_requester(target_user) + requester = types.create_requester(target_user) prev_event = yield self.event_creation_handler.deduplicate_state_event( event, context @@ -596,11 +595,11 @@ class RoomMemberHandler(object): """ guest_access_id = current_state_ids.get((EventTypes.GuestAccess, ""), None) if not guest_access_id: - defer.returnValue(False) + return False guest_access = yield self.store.get_event(guest_access_id) - defer.returnValue( + return ( guest_access and guest_access.content and "guest_access" in guest_access.content @@ -635,7 +634,7 @@ class RoomMemberHandler(object): servers.remove(room_alias.domain) servers.insert(0, room_alias.domain) - defer.returnValue((RoomID.from_string(room_id), servers)) + return (RoomID.from_string(room_id), servers) @defer.inlineCallbacks def _get_inviter(self, user_id, room_id): @@ -643,7 +642,7 @@ class RoomMemberHandler(object): user_id=user_id, room_id=room_id ) if invite: - defer.returnValue(UserID.from_string(invite.sender)) + return UserID.from_string(invite.sender) @defer.inlineCallbacks def do_3pid_invite( @@ -708,11 +707,11 @@ class RoomMemberHandler(object): if "signatures" not in data: raise AuthError(401, "No signatures on 3pid binding") yield self._verify_any_signature(data, id_server) - defer.returnValue(data["mxid"]) + return data["mxid"] except IOError as e: logger.warn("Error from identity server lookup: %s" % (e,)) - defer.returnValue(None) + return None @defer.inlineCallbacks def _verify_any_signature(self, data, server_hostname): @@ -904,7 +903,7 @@ class RoomMemberHandler(object): if not public_keys: public_keys.append(fallback_public_key) display_name = data["display_name"] - defer.returnValue((token, public_keys, fallback_public_key, display_name)) + return (token, public_keys, fallback_public_key, display_name) @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): @@ -913,7 +912,7 @@ class RoomMemberHandler(object): create_event_id = current_state_ids.get(("m.room.create", "")) if len(current_state_ids) == 1 and create_event_id: # We can only get here if we're in the process of creating the room - defer.returnValue(True) + return True for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): @@ -925,16 +924,16 @@ class RoomMemberHandler(object): continue if event.membership == Membership.JOIN: - defer.returnValue(True) + return True - defer.returnValue(False) + return False @defer.inlineCallbacks def _is_server_notice_room(self, room_id): if self._server_notices_mxid is None: - defer.returnValue(False) + return False user_ids = yield self.store.get_users_in_room(room_id) - defer.returnValue(self._server_notices_mxid in user_ids) + return self._server_notices_mxid in user_ids class RoomMemberMasterHandler(RoomMemberHandler): @@ -946,13 +945,53 @@ class RoomMemberMasterHandler(RoomMemberHandler): self.distributor.declare("user_left_room") @defer.inlineCallbacks + def _is_remote_room_too_complex(self, room_id, remote_room_hosts): + """ + Check if complexity of a remote room is too great. + + Args: + room_id (str) + remote_room_hosts (list[str]) + + Returns: bool of whether the complexity is too great, or None + if unable to be fetched + """ + max_complexity = self.hs.config.limit_remote_rooms.complexity + complexity = yield self.federation_handler.get_room_complexity( + remote_room_hosts, room_id + ) + + if complexity: + if complexity["v1"] > max_complexity: + return True + return False + return None + + @defer.inlineCallbacks + def _is_local_room_too_complex(self, room_id): + """ + Check if the complexity of a local room is too great. + + Args: + room_id (str) + + Returns: bool + """ + max_complexity = self.hs.config.limit_remote_rooms.complexity + complexity = yield self.store.get_room_complexity(room_id) + + if complexity["v1"] > max_complexity: + return True + + return False + + @defer.inlineCallbacks def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ # filter ourselves out of remote_room_hosts: do_invite_join ignores it # and if it is the only entry we'd like to return a 404 rather than a # 500. - remote_room_hosts = [ host for host in remote_room_hosts if host != self.hs.hostname ] @@ -960,6 +999,18 @@ class RoomMemberMasterHandler(RoomMemberHandler): if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") + if self.hs.config.limit_remote_rooms.enabled: + # Fetch the room complexity + too_complex = yield self._is_remote_room_too_complex( + room_id, remote_room_hosts + ) + if too_complex is True: + raise SynapseError( + code=400, + msg=self.hs.config.limit_remote_rooms.complexity_error, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + ) + # We don't do an auth check if we are doing an invite # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we @@ -969,6 +1020,31 @@ class RoomMemberMasterHandler(RoomMemberHandler): ) yield self._user_joined_room(user, room_id) + # Check the room we just joined wasn't too large, if we didn't fetch the + # complexity of it before. + if self.hs.config.limit_remote_rooms.enabled: + if too_complex is False: + # We checked, and we're under the limit. + return + + # Check again, but with the local state events + too_complex = yield self._is_local_room_too_complex(room_id) + + if too_complex is False: + # We're under the limit. + return + + # The room is too large. Leave. + requester = types.create_requester(user, None, False, None) + yield self.update_membership( + requester=requester, target=user, room_id=room_id, action="leave" + ) + raise SynapseError( + code=400, + msg=self.hs.config.limit_remote_rooms.complexity_error, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + ) + @defer.inlineCallbacks def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite @@ -978,7 +1054,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): ret = yield fed_handler.do_remotely_reject_invite( remote_room_hosts, room_id, target.to_string() ) - defer.returnValue(ret) + return ret except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -989,7 +1065,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): logger.warn("Failed to reject invite: %s", e) yield self.store.locally_reject_invite(target.to_string(), room_id) - defer.returnValue({}) + return {} def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index fc873a3ba6..75e96ae1a2 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -53,7 +53,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): yield self._user_joined_room(user, room_id) - defer.returnValue(ret) + return ret def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ddc4430d03..cd5e90bacb 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -69,7 +69,7 @@ class SearchHandler(BaseHandler): # Scan through the old room for further predecessors room_id = predecessor["room_id"] - defer.returnValue(historical_room_ids) + return historical_room_ids @defer.inlineCallbacks def search(self, user, content, batch=None): @@ -186,13 +186,11 @@ class SearchHandler(BaseHandler): room_ids.intersection_update({batch_group_key}) if not room_ids: - defer.returnValue( - { - "search_categories": { - "room_events": {"results": [], "count": 0, "highlights": []} - } + return { + "search_categories": { + "room_events": {"results": [], "count": 0, "highlights": []} } - ) + } rank_map = {} # event_id -> rank of event allowed_events = [] @@ -455,4 +453,4 @@ class SearchHandler(BaseHandler): if global_next_batch: rooms_cat_res["next_batch"] = global_next_batch - defer.returnValue({"search_categories": {"room_events": rooms_cat_res}}) + return {"search_categories": {"room_events": rooms_cat_res}} diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 6b364befd5..f065970c40 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -48,7 +48,7 @@ class StateDeltasHandler(object): if not event and not prev_event: logger.debug("Neither event exists: %r %r", prev_event_id, event_id) - defer.returnValue(None) + return None prev_value = None value = None @@ -62,8 +62,8 @@ class StateDeltasHandler(object): logger.debug("prev_value: %r -> value: %r", prev_value, value) if value == public_value and prev_value != public_value: - defer.returnValue(True) + return True elif value != public_value and prev_value == public_value: - defer.returnValue(False) + return False else: - defer.returnValue(None) + return None diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index a0ee8db988..4449da6669 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -86,7 +86,7 @@ class StatsHandler(StateDeltasHandler): # If still None then the initial background update hasn't happened yet if self.pos is None: - defer.returnValue(None) + return None # Loop round handling deltas until we're up to date while True: @@ -328,6 +328,6 @@ class StatsHandler(StateDeltasHandler): == "world_readable" ) ): - defer.returnValue(True) + return True else: - defer.returnValue(False) + return False diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cd1ac0a27a..98da2318a0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -263,7 +263,7 @@ class SyncHandler(object): timeout, full_state, ) - defer.returnValue(res) + return res @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): @@ -303,7 +303,7 @@ class SyncHandler(object): lazy_loaded = "false" non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() - defer.returnValue(result) + return result def current_sync_for_user(self, sync_config, since_token=None, full_state=False): """Get the sync for client needed to match what the server has now. @@ -317,7 +317,7 @@ class SyncHandler(object): user_id = user.to_string() rules = yield self.store.get_push_rules_for_user(user_id) rules = format_push_rules_for_user(user, rules) - defer.returnValue(rules) + return rules @defer.inlineCallbacks def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): @@ -378,7 +378,7 @@ class SyncHandler(object): event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) - defer.returnValue((now_token, ephemeral_by_room)) + return (now_token, ephemeral_by_room) @defer.inlineCallbacks def _load_filtered_recents( @@ -426,8 +426,8 @@ class SyncHandler(object): recents = [] if not limited or block_all_timeline: - defer.returnValue( - TimelineBatch(events=recents, prev_batch=now_token, limited=False) + return TimelineBatch( + events=recents, prev_batch=now_token, limited=False ) filtering_factor = 2 @@ -490,12 +490,10 @@ class SyncHandler(object): prev_batch_token = now_token.copy_and_replace("room_key", room_key) - defer.returnValue( - TimelineBatch( - events=recents, - prev_batch=prev_batch_token, - limited=limited or newly_joined_room, - ) + return TimelineBatch( + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room, ) @defer.inlineCallbacks @@ -517,7 +515,7 @@ class SyncHandler(object): if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id - defer.returnValue(state_ids) + return state_ids @defer.inlineCallbacks def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()): @@ -549,7 +547,7 @@ class SyncHandler(object): else: # no events in this room - so presumably no state state = {} - defer.returnValue(state) + return state @defer.inlineCallbacks def compute_summary(self, room_id, sync_config, batch, state, now_token): @@ -579,7 +577,7 @@ class SyncHandler(object): ) if not last_events: - defer.returnValue(None) + return None return last_event = last_events[-1] @@ -611,14 +609,14 @@ class SyncHandler(object): if name_id: name = yield self.store.get_event(name_id, allow_none=True) if name and name.content.get("name"): - defer.returnValue(summary) + return summary if canonical_alias_id: canonical_alias = yield self.store.get_event( canonical_alias_id, allow_none=True ) if canonical_alias and canonical_alias.content.get("alias"): - defer.returnValue(summary) + return summary me = sync_config.user.to_string() @@ -652,7 +650,7 @@ class SyncHandler(object): summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5] if not sync_config.filter_collection.lazy_load_members(): - defer.returnValue(summary) + return summary # ensure we send membership events for heroes if needed cache_key = (sync_config.user.to_string(), sync_config.device_id) @@ -686,7 +684,7 @@ class SyncHandler(object): cache.set(s.state_key, s.event_id) state[(EventTypes.Member, s.state_key)] = s - defer.returnValue(summary) + return summary def get_lazy_loaded_members_cache(self, cache_key): cache = self.lazy_loaded_members_cache.get(cache_key) @@ -783,9 +781,17 @@ class SyncHandler(object): lazy_load_members=lazy_load_members, ) elif batch.limited: - state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter - ) + if batch: + state_at_timeline_start = yield self.store.get_state_ids_for_event( + batch.events[0].event_id, state_filter=state_filter + ) + else: + # Its not clear how we get here, but empirically we do + # (#5407). Logging has been added elsewhere to try and + # figure out where this state comes from. + state_at_timeline_start = yield self.get_state_at( + room_id, stream_position=now_token, state_filter=state_filter + ) # for now, we disable LL for gappy syncs - see # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 @@ -805,9 +811,17 @@ class SyncHandler(object): room_id, stream_position=since_token, state_filter=state_filter ) - current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter - ) + if batch: + current_state_ids = yield self.store.get_state_ids_for_event( + batch.events[-1].event_id, state_filter=state_filter + ) + else: + # Its not clear how we get here, but empirically we do + # (#5407). Logging has been added elsewhere to try and + # figure out where this state comes from. + current_state_ids = yield self.get_state_at( + room_id, stream_position=now_token, state_filter=state_filter + ) state_ids = _calculate_state( timeline_contains=timeline_state, @@ -871,14 +885,12 @@ class SyncHandler(object): if state_ids: state = yield self.store.get_events(list(state_ids.values())) - defer.returnValue( - { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - list(state.values()) - ) - } - ) + return { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + list(state.values()) + ) + } @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): @@ -894,11 +906,11 @@ class SyncHandler(object): notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) - defer.returnValue(notifs) + return notifs # There is no new information in this period, so your notification # count is whatever it was last time. - defer.returnValue(None) + return None @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): @@ -989,19 +1001,17 @@ class SyncHandler(object): "Sync result for newly joined room %s: %r", room_id, joined_room ) - defer.returnValue( - SyncResult( - presence=sync_result_builder.presence, - account_data=sync_result_builder.account_data, - joined=sync_result_builder.joined, - invited=sync_result_builder.invited, - archived=sync_result_builder.archived, - to_device=sync_result_builder.to_device, - device_lists=device_lists, - groups=sync_result_builder.groups, - device_one_time_keys_count=one_time_key_counts, - next_batch=sync_result_builder.now_token, - ) + return SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + to_device=sync_result_builder.to_device, + device_lists=device_lists, + groups=sync_result_builder.groups, + device_one_time_keys_count=one_time_key_counts, + next_batch=sync_result_builder.now_token, ) @measure_func("_generate_sync_entry_for_groups") @@ -1124,11 +1134,9 @@ class SyncHandler(object): # Remove any users that we still share a room with. newly_left_users -= users_who_share_room - defer.returnValue( - DeviceLists(changed=users_that_have_changed, left=newly_left_users) - ) + return DeviceLists(changed=users_that_have_changed, left=newly_left_users) else: - defer.returnValue(DeviceLists(changed=[], left=[])) + return DeviceLists(changed=[], left=[]) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -1225,7 +1233,7 @@ class SyncHandler(object): sync_result_builder.account_data = account_data_for_user - defer.returnValue(account_data_by_room) + return account_data_by_room @defer.inlineCallbacks def _generate_sync_entry_for_presence( @@ -1325,7 +1333,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - defer.returnValue(([], [], [], [])) + return ([], [], [], []) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id @@ -1388,13 +1396,11 @@ class SyncHandler(object): newly_left_users -= newly_joined_or_invited_users - defer.returnValue( - ( - newly_joined_rooms, - newly_joined_or_invited_users, - newly_left_rooms, - newly_left_users, - ) + return ( + newly_joined_rooms, + newly_joined_or_invited_users, + newly_left_rooms, + newly_left_users, ) @defer.inlineCallbacks @@ -1414,13 +1420,13 @@ class SyncHandler(object): ) if rooms_changed: - defer.returnValue(True) + return True stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): - defer.returnValue(True) - defer.returnValue(False) + return True + return False @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builder, ignored_users): @@ -1637,7 +1643,7 @@ class SyncHandler(object): ) room_entries.append(entry) - defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) + return (room_entries, invited, newly_joined_rooms, newly_left_rooms) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): @@ -1711,7 +1717,7 @@ class SyncHandler(object): ) ) - defer.returnValue((room_entries, invited, [])) + return (room_entries, invited, []) @defer.inlineCallbacks def _generate_room_entry( @@ -1765,6 +1771,21 @@ class SyncHandler(object): newly_joined_room=newly_joined, ) + if not batch and batch.limited: + # This resulted in #5407, which is weird, so lets log! We do it + # here as we have the maximum amount of information. + user_id = sync_result_builder.sync_config.user.to_string() + logger.info( + "Issue #5407: Found limited batch with no events. user %s, room %s," + " sync_config %s, newly_joined %s, events %s, batch %s.", + user_id, + room_id, + sync_config, + newly_joined, + events, + batch, + ) + if newly_joined: # debug for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger.debug( @@ -1912,7 +1933,7 @@ class SyncHandler(object): joined_room_ids.add(room_id) joined_room_ids = frozenset(joined_room_ids) - defer.returnValue(joined_room_ids) + return joined_room_ids def _action_has_highlight(actions): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c3e0c8fc7e..f882330293 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -83,7 +83,7 @@ class TypingHandler(object): self._room_typing = {} def _handle_timeouts(self): - logger.info("Checking for typing timeouts") + logger.debug("Checking for typing timeouts") now = self.clock.time_msec() @@ -140,7 +140,7 @@ class TypingHandler(object): if was_present: # No point sending another notification - defer.returnValue(None) + return None self._push_update(member=member, typing=True) @@ -173,7 +173,7 @@ class TypingHandler(object): def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): # No point - defer.returnValue(None) + return None self._member_typing_until.pop(member, None) self._member_last_federation_poke.pop(member, None) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 5de9630950..e53669e40d 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -133,7 +133,7 @@ class UserDirectoryHandler(StateDeltasHandler): # If still None then the initial background update hasn't happened yet if self.pos is None: - defer.returnValue(None) + return None # Loop round handling deltas until we're up to date while True: |