diff options
Diffstat (limited to 'synapse')
96 files changed, 1463 insertions, 1261 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 7b3a5a8221..ddc195bc32 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -276,25 +276,25 @@ class Auth(object): self.get_access_token_from_request(request) ) if app_service is None: - return (None, None) + return None, None if app_service.ip_range_whitelist: ip_address = IPAddress(self.hs.get_ip_from_request(request)) if ip_address not in app_service.ip_range_whitelist: - return (None, None) + return None, None if b"user_id" not in request.args: - return (app_service.sender, app_service) + return app_service.sender, app_service user_id = request.args[b"user_id"][0].decode("utf8") if app_service.sender == user_id: - return (app_service.sender, app_service) + return app_service.sender, app_service if not app_service.is_interested_in_user(user_id): raise AuthError(403, "Application service cannot masquerade as this user.") if not (yield self.store.get_user_by_id(user_id)): raise AuthError(403, "Application service has not registered this user") - return (user_id, app_service) + return user_id, app_service @defer.inlineCallbacks def get_user_by_access_token(self, token, rights="access"): @@ -694,7 +694,7 @@ class Auth(object): # * The user is a guest user, and has joined the room # else it will throw. member_event = yield self.check_user_was_in_room(room_id, user_id) - return (member_event.membership, member_event.event_id) + return member_event.membership, member_event.event_id except AuthError: visibility = yield self.state.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" @@ -703,8 +703,7 @@ class Auth(object): visibility and visibility.content["history_visibility"] == "world_readable" ): - return (Membership.JOIN, None) - return + return Membership.JOIN, None raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 611d285421..9504bfbc70 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet): except HttpResponseException as e: raise e.to_synapse_error() - return (200, result) + return 200, result @defer.inlineCallbacks def on_PUT(self, request, user_id): yield self.auth.get_user_by_req(request) - return (200, {}) + return 200, {} class KeyUploadServlet(RestServlet): @@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet): self.main_uri + request.uri.decode("ascii"), body, headers=headers ) - return (200, result) + return 200, result else: # Just interested in counts. result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - return (200, {"one_time_key_counts": result}) + return 200, {"one_time_key_counts": result} class FrontendProxySlavedStore( diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 007ca75a94..3e25bf5747 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient): except CodeMessageException as e: if e.code == 404: return False - return logger.warning("query_user to %s received %s", uri, e.code) except Exception as ex: logger.warning("query_user to %s threw exception %s", uri, ex) @@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_alias to %s received %s", uri, e.code) if e.code == 404: return False - return except Exception as ex: logger.warning("query_alias to %s threw exception %s", uri, ex) return False @@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient): sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) return True - return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 41eabbe717..694fb2c816 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -83,7 +83,7 @@ def compute_content_hash(event_dict, hash_algorithm): event_json_bytes = encode_canonical_json(event_dict) hashed = hash_algorithm(event_json_bytes) - return (hashed.name, hashed.digest()) + return hashed.name, hashed.digest() def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): @@ -106,7 +106,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): event_dict.pop("unsigned", None) event_json_bytes = encode_canonical_json(event_dict) hashed = hash_algorithm(event_json_bytes) - return (hashed.name, hashed.digest()) + return hashed.name, hashed.digest() def compute_event_signature(event_dict, signature_name, signing_key): diff --git a/synapse/event_auth.py b/synapse/event_auth.py index cd52e3f867..4e91df60e6 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -637,11 +637,11 @@ def auth_types_for_event(event): if event.type == EventTypes.Create: return [] - auth_types = [] - - auth_types.append((EventTypes.PowerLevels, "")) - auth_types.append((EventTypes.Member, event.sender)) - auth_types.append((EventTypes.Create, "")) + auth_types = [ + (EventTypes.PowerLevels, ""), + (EventTypes.Member, event.sender), + (EventTypes.Create, ""), + ] if event.type == EventTypes.Member: membership = event.content["membership"] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index bec3080895..6ee6216660 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -355,7 +355,7 @@ class FederationClient(FederationBase): auth_chain.sort(key=lambda e: e.depth) - return (pdus, auth_chain) + return pdus, auth_chain except HttpResponseException as e: if e.code == 400 or e.code == 404: logger.info("Failed to use get_room_state_ids API, falling back") @@ -404,7 +404,7 @@ class FederationClient(FederationBase): signed_auth.sort(key=lambda e: e.depth) - return (signed_pdus, signed_auth) + return signed_pdus, signed_auth @defer.inlineCallbacks def get_events_from_store_or_dest(self, destination, room_id, event_ids): @@ -429,7 +429,7 @@ class FederationClient(FederationBase): missing_events.discard(k) if not missing_events: - return (signed_events, failed_to_fetch) + return signed_events, failed_to_fetch logger.debug( "Fetching unknown state/auth events %s for room %s", @@ -465,7 +465,7 @@ class FederationClient(FederationBase): # We removed all events we successfully fetched from `batch` failed_to_fetch.update(batch) - return (signed_events, failed_to_fetch) + return signed_events, failed_to_fetch @defer.inlineCallbacks @log_function diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 05fd49f3c1..e5f0b90aec 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -100,7 +100,7 @@ class FederationServer(FederationBase): res = self._transaction_from_pdus(pdus).get_dict() - return (200, res) + return 200, res @defer.inlineCallbacks @log_function @@ -163,7 +163,7 @@ class FederationServer(FederationBase): yield self.transaction_actions.set_response( origin, transaction, 400, response ) - return (400, response) + return 400, response received_pdus_counter.inc(len(transaction.pdus)) @@ -265,7 +265,7 @@ class FederationServer(FederationBase): logger.debug("Returning: %s", str(response)) yield self.transaction_actions.set_response(origin, transaction, 200, response) - return (200, response) + return 200, response @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): @@ -298,7 +298,7 @@ class FederationServer(FederationBase): event_id, ) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_state_ids_request(self, origin, room_id, event_id): @@ -315,7 +315,7 @@ class FederationServer(FederationBase): state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id) auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids) - return (200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}) + return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): @@ -345,15 +345,15 @@ class FederationServer(FederationBase): pdu = yield self.handler.get_persisted_pdu(origin, event_id) if pdu: - return (200, self._transaction_from_pdus([pdu]).get_dict()) + return 200, self._transaction_from_pdus([pdu]).get_dict() else: - return (404, "") + return 404, "" @defer.inlineCallbacks def on_query_request(self, query_type, args): received_queries_counter.labels(query_type).inc() resp = yield self.registry.on_query(query_type, args) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_make_join_request(self, origin, room_id, user_id, supported_versions): @@ -435,7 +435,7 @@ class FederationServer(FederationBase): logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) yield self.handler.on_send_leave_request(origin, pdu) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_event_auth(self, origin, room_id, event_id): @@ -446,7 +446,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]} - return (200, res) + return 200, res @defer.inlineCallbacks def on_query_auth_request(self, origin, content, room_id, event_id): @@ -499,7 +499,7 @@ class FederationServer(FederationBase): "missing": ret.get("missing", []), } - return (200, send_content) + return 200, send_content @log_function def on_query_client_keys(self, origin, content): diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 8acd9f9a83..38bc67191c 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -51,8 +51,8 @@ class AccountDataEventSource(object): {"type": account_data_type, "content": content, "room_id": room_id} ) - return (results, current_stream_id) + return results, current_stream_id @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - return ([], config.to_id) + return [], config.to_id diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d1a51df6f9..3e9b298154 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -294,12 +294,10 @@ class ApplicationServicesHandler(object): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. return False - return user_info = yield self.store.get_user_by_id(user_id) if user_info: return False - return # user not found; could be the AS though, so check. services = self.store.get_app_services() diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 0f3ebf7ef8..f844409d21 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -280,7 +280,7 @@ class AuthHandler(BaseHandler): creds, list(clientdict), ) - return (creds, clientdict, session["id"]) + return creds, clientdict, session["id"] ret = self._auth_dict_for_flows(flows, session) ret["completed"] = list(creds) @@ -722,7 +722,7 @@ class AuthHandler(BaseHandler): known_login_type = True is_valid = yield provider.check_password(qualified_user_id, password) if is_valid: - return (qualified_user_id, None) + return qualified_user_id, None if not hasattr(provider, "get_supported_login_types") or not hasattr( provider, "check_auth" @@ -766,7 +766,7 @@ class AuthHandler(BaseHandler): ) if canonical_user_id: - return (canonical_user_id, None) + return canonical_user_id, None if not known_login_type: raise SynapseError(400, "Unknown login type %s" % login_type) @@ -816,7 +816,7 @@ class AuthHandler(BaseHandler): result = (result, None) return result - return (None, None) + return None, None @defer.inlineCallbacks def _check_local_password(self, user_id, password): diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d27a9f7337..5f23ee4488 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -27,6 +27,7 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -47,6 +48,7 @@ class DeviceWorkerHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + @trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -58,6 +60,7 @@ class DeviceWorkerHandler(BaseHandler): defer.Deferred: list[dict[str, X]]: info on each device """ + set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -66,8 +69,10 @@ class DeviceWorkerHandler(BaseHandler): for device in devices: _update_device_from_client_ips(device, ips) + log_kv(device_map) return devices + @trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -87,9 +92,14 @@ class DeviceWorkerHandler(BaseHandler): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) + + set_tag("device", device) + set_tag("ips", ips) + return device @measure_func("device.get_user_ids_changed") + @trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -99,6 +109,9 @@ class DeviceWorkerHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + + set_tag("user_id", user_id) + set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -150,6 +163,9 @@ class DeviceWorkerHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: + log_kv( + {"event": "encountered empty previous state", "room_id": room_id} + ) for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: @@ -202,7 +218,11 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - return {"changed": list(possibly_joined), "left": list(possibly_left)} + result = {"changed": list(possibly_joined), "left": list(possibly_left)} + + log_kv(result) + + return result class DeviceHandler(DeviceWorkerHandler): @@ -269,6 +289,7 @@ class DeviceHandler(DeviceWorkerHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") + @trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -286,6 +307,10 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + log_kv( + {"reason": "User doesn't have device id.", "device_id": device_id} + ) pass else: raise @@ -298,6 +323,7 @@ class DeviceHandler(DeviceWorkerHandler): yield self.notify_device_update(user_id, [device_id]) + @trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -333,6 +359,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -373,6 +401,7 @@ class DeviceHandler(DeviceWorkerHandler): else: raise + @trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -388,6 +417,8 @@ class DeviceHandler(DeviceWorkerHandler): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) + set_tag("target_hosts", hosts) + position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) @@ -407,6 +438,7 @@ class DeviceHandler(DeviceWorkerHandler): ) for host in hosts: self.federation_sender.send_device_messages(host) + log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks def notify_user_signature_update(self, from_user_id, user_ids): @@ -468,12 +500,15 @@ class DeviceListUpdater(object): iterable=True, ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ + set_tag("origin", origin) + set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -488,12 +523,30 @@ class DeviceListUpdater(object): device_id, origin, ) + + set_tag("error", True) + log_kv( + { + "message": "Got a device list update edu from a user and " + "device which does not match the origin of the request.", + "user_id": user_id, + "device_id": device_id, + } + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + set_tag("error", True) + log_kv( + { + "message": "Got an update from a user for which " + "we don't share any rooms", + "other user_id": user_id, + } + ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, @@ -595,6 +648,7 @@ class DeviceListUpdater(object): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: @@ -611,13 +665,20 @@ class DeviceListUpdater(object): # eventually become consistent. return except FederationDeniedError as e: + set_tag("error", True) + log_kv({"reason": "FederationDeniedError"}) logger.info(e) return - except Exception: + except Exception as e: # TODO: Remember that we are now out of sync and try again # later + set_tag("error", True) + log_kv( + {"message": "Exception raised by federation request", "exception": e} + ) logger.exception("Failed to handle device list update for %s", user_id) return + log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c7d56779b8..01731cb2d0 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -22,6 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.logging.opentracing import ( get_active_span_text_map, + log_kv, set_tag, start_active_span, whitelisted_homeserver, @@ -86,7 +87,8 @@ class DeviceMessageHandler(object): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - + set_tag("number_of_messages", len(messages)) + set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -124,6 +126,7 @@ class DeviceMessageHandler(object): else None, } + log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -132,6 +135,7 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) + log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 2f1f10a9af..5e748687e3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -167,7 +167,6 @@ class EventHandler(BaseHandler): if not event: return None - return users = yield self.store.get_users_in_room(event.room_id) is_peeking = user.to_string() not in users diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 94306c94a9..538b16efd6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1428,7 +1428,7 @@ class FederationHandler(BaseHandler): assert event.user_id == user_id assert event.state_key == user_id assert event.room_id == room_id - return (origin, event, format_ver) + return origin, event, format_ver @defer.inlineCallbacks @log_function diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 97daca5fee..d199521b58 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -282,16 +282,3 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - - -class LookupAlgorithm: - """ - Supported hashing algorithms when performing a 3PID lookup. - - SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64 - encoding - NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext - """ - - SHA256 = "sha256" - NONE = "none" diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 42d6650ed9..f991efeee3 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -449,8 +449,7 @@ class InitialSyncHandler(BaseHandler): # * The user is a guest user, and has joined the room # else it will throw. member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - return (member_event.membership, member_event.event_id) - return + return member_event.membership, member_event.event_id except AuthError: visibility = yield self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" @@ -459,8 +458,7 @@ class InitialSyncHandler(BaseHandler): visibility and visibility.content["history_visibility"] == "world_readable" ): - return (Membership.JOIN, None) - return + return Membership.JOIN, None raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 94a9ca0357..053cf66b28 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -255,7 +255,7 @@ class PresenceHandler(object): self.unpersisted_users_changes = set() if unpersisted: - logger.info("Persisting %d upersisted presence updates", len(unpersisted)) + logger.info("Persisting %d unpersisted presence updates", len(unpersisted)) yield self.store.update_presence( [self.user_to_current_state[user_id] for user_id in unpersisted] ) @@ -1032,7 +1032,7 @@ class PresenceEventSource(object): # # Hence this guard where we just return nothing so that the sync # doesn't return. C.f. #5503. - return ([], max_token) + return [], max_token presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache @@ -1279,7 +1279,7 @@ def get_interested_parties(store, states): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - return (room_ids_to_states, users_to_states) + return room_ids_to_states, users_to_states @defer.inlineCallbacks diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 73973502a4..6854c751a6 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -148,7 +148,7 @@ class ReceiptEventSource(object): to_key = yield self.get_current_key() if from_key == to_key: - return ([], to_key) + return [], to_key events = yield self.store.get_linearized_receipts_for_rooms( room_ids, from_key=from_key, to_key=to_key diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 4631fab94e..975da57ffd 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,13 +24,11 @@ from synapse.api.errors import ( AuthError, Codes, ConsentNotGivenError, - InvalidCaptchaError, LimitExceededError, RegistrationError, SynapseError, ) from synapse.config.server import is_threepid_reserved -from synapse.http.client import CaptchaServerHttpClient from synapse.http.servlet import assert_params_in_dict from synapse.replication.http.login import RegisterDeviceReplicationServlet from synapse.replication.http.register import ( @@ -39,7 +37,6 @@ from synapse.replication.http.register import ( ) from synapse.types import RoomAlias, RoomID, UserID, create_requester from synapse.util.async_helpers import Linearizer -from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -59,7 +56,6 @@ class RegistrationHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() self.user_directory_handler = hs.get_user_directory_handler() - self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler self.ratelimiter = hs.get_registration_ratelimiter() @@ -362,70 +358,6 @@ class RegistrationHandler(BaseHandler): ) return user_id - @defer.inlineCallbacks - def check_recaptcha(self, ip, private_key, challenge, response): - """ - Checks a recaptcha is correct. - - Used only by c/s api v1 - """ - - captcha_response = yield self._validate_captcha( - ip, private_key, challenge, response - ) - if not captcha_response["valid"]: - logger.info( - "Invalid captcha entered from %s. Error: %s", - ip, - captcha_response["error_url"], - ) - raise InvalidCaptchaError(error_url=captcha_response["error_url"]) - else: - logger.info("Valid captcha entered from %s", ip) - - @defer.inlineCallbacks - def register_email(self, threepidCreds): - """ - Registers emails with an identity server. - - Used only by c/s api v1 - """ - - for c in threepidCreds: - logger.info( - "validating threepidcred sid %s on id server %s", - c["sid"], - c["idServer"], - ) - try: - threepid = yield self.identity_handler.threepid_from_creds(c) - except Exception: - logger.exception("Couldn't validate 3pid") - raise RegistrationError(400, "Couldn't validate 3pid") - - if not threepid: - raise RegistrationError(400, "Couldn't validate 3pid") - logger.info( - "got threepid with medium '%s' and address '%s'", - threepid["medium"], - threepid["address"], - ) - - if not check_3pid_allowed(self.hs, threepid["medium"], threepid["address"]): - raise RegistrationError(403, "Third party identifier is not allowed") - - @defer.inlineCallbacks - def bind_emails(self, user_id, threepidCreds): - """Links emails with a user ID and informs an identity server. - - Used only by c/s api v1 - """ - - # Now we have a matrix ID, bind it to the threepids we were given - for c in threepidCreds: - # XXX: This should be a deferred list, shouldn't it? - yield self.identity_handler.bind_threepid(c, user_id) - def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): # don't allow people to register the server notices mxid if self._server_notices_mxid is not None: @@ -464,44 +396,7 @@ class RegistrationHandler(BaseHandler): return str(id) @defer.inlineCallbacks - def _validate_captcha(self, ip_addr, private_key, challenge, response): - """Validates the captcha provided. - - Used only by c/s api v1 - - Returns: - dict: Containing 'valid'(bool) and 'error_url'(str) if invalid. - - """ - response = yield self._submit_captcha(ip_addr, private_key, challenge, response) - # parse Google's response. Lovely format.. - lines = response.split("\n") - json = { - "valid": lines[0] == "true", - "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" - + "error=%s" % lines[1], - } - return json - - @defer.inlineCallbacks - def _submit_captcha(self, ip_addr, private_key, challenge, response): - """ - Used only by c/s api v1 - """ - data = yield self.captcha_client.post_urlencoded_get_raw( - "http://www.recaptcha.net:80/recaptcha/api/verify", - args={ - "privatekey": private_key, - "remoteip": ip_addr, - "challenge": challenge, - "response": response, - }, - ) - return data - - @defer.inlineCallbacks def _join_user_to_room(self, requester, room_identifier): - room_id = None room_member_handler = self.hs.get_room_member_handler() if RoomID.is_valid(room_identifier): room_id = room_identifier @@ -622,7 +517,7 @@ class RegistrationHandler(BaseHandler): initial_display_name=initial_display_name, is_guest=is_guest, ) - return (r["device_id"], r["access_token"]) + return r["device_id"], r["access_token"] valid_until_ms = None if self.session_lifetime is not None: @@ -648,9 +543,7 @@ class RegistrationHandler(BaseHandler): return (device_id, access_token) @defer.inlineCallbacks - def post_registration_actions( - self, user_id, auth_result, access_token, bind_email, bind_msisdn - ): + def post_registration_actions(self, user_id, auth_result, access_token): """A user has completed registration Args: @@ -659,18 +552,10 @@ class RegistrationHandler(BaseHandler): registered user. access_token (str|None): The access token of the newly logged in device, or None if `inhibit_login` enabled. - bind_email (bool): Whether to bind the email with the identity - server. - bind_msisdn (bool): Whether to bind the msisdn with the identity - server. """ if self.hs.config.worker_app: yield self._post_registration_client( - user_id=user_id, - auth_result=auth_result, - access_token=access_token, - bind_email=bind_email, - bind_msisdn=bind_msisdn, + user_id=user_id, auth_result=auth_result, access_token=access_token ) return @@ -683,13 +568,11 @@ class RegistrationHandler(BaseHandler): ): yield self.store.upsert_monthly_active_user(user_id) - yield self._register_email_threepid( - user_id, threepid, access_token, bind_email - ) + yield self._register_email_threepid(user_id, threepid, access_token) if auth_result and LoginType.MSISDN in auth_result: threepid = auth_result[LoginType.MSISDN] - yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn) + yield self._register_msisdn_threepid(user_id, threepid) if auth_result and LoginType.TERMS in auth_result: yield self._on_user_consented(user_id, self.hs.config.user_consent_version) @@ -708,14 +591,12 @@ class RegistrationHandler(BaseHandler): yield self.post_consent_actions(user_id) @defer.inlineCallbacks - def _register_email_threepid(self, user_id, threepid, token, bind_email): + def _register_email_threepid(self, user_id, threepid, token): """Add an email address as a 3pid identifier Also adds an email pusher for the email address, if configured in the HS config - Also optionally binds emails to the given user_id on the identity server - Must be called on master. Args: @@ -723,8 +604,6 @@ class RegistrationHandler(BaseHandler): threepid (object): m.login.email.identity auth response token (str|None): access_token for the user, or None if not logged in. - bind_email (bool): true if the client requested the email to be - bound at the identity server Returns: defer.Deferred: """ @@ -766,29 +645,15 @@ class RegistrationHandler(BaseHandler): data={}, ) - if bind_email: - logger.info("bind_email specified: binding") - logger.debug("Binding emails %s to %s" % (threepid, user_id)) - yield self.identity_handler.bind_threepid( - threepid["threepid_creds"], user_id - ) - else: - logger.info("bind_email not specified: not binding email") - @defer.inlineCallbacks - def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn): + def _register_msisdn_threepid(self, user_id, threepid): """Add a phone number as a 3pid identifier - Also optionally binds msisdn to the given user_id on the identity server - Must be called on master. Args: user_id (str): id of user threepid (object): m.login.msisdn auth response - token (str): access_token for the user - bind_email (bool): true if the client requested the email to be - bound at the identity server Returns: defer.Deferred: """ @@ -804,12 +669,3 @@ class RegistrationHandler(BaseHandler): yield self._auth_handler.add_threepid( user_id, threepid["medium"], threepid["address"], threepid["validated_at"] ) - - if bind_msisdn: - logger.info("bind_msisdn specified: binding") - logger.debug("Binding msisdn %s to %s", threepid, user_id) - yield self.identity_handler.bind_threepid( - threepid["threepid_creds"], user_id - ) - else: - logger.info("bind_msisdn not specified: not binding msisdn") diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6e47fe7867..a509e11d69 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -852,7 +852,6 @@ class RoomContextHandler(object): ) if not event: return None - return filtered = yield (filter_evts([event])) if not filtered: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 4605cb9c0b..093f2ea36e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -29,11 +29,9 @@ from twisted.internet import defer from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError -from synapse.handlers.identity import LookupAlgorithm from synapse.types import RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room -from synapse.util.hash import sha256_and_url_safe_base64 from ._base import BaseHandler @@ -525,7 +523,7 @@ class RoomMemberHandler(object): event (SynapseEvent): The membership event. context: The context of the event. is_guest (bool): Whether the sender is a guest. - remote_room_hosts (list[str]|None): Homeservers which are likely to already be in + room_hosts ([str]): Homeservers which are likely to already be in the room, and could be danced with in order to join this homeserver for the first time. ratelimit (bool): Whether to rate limit this request. @@ -636,7 +634,7 @@ class RoomMemberHandler(object): servers.remove(room_alias.domain) servers.insert(0, room_alias.domain) - return RoomID.from_string(room_id), servers + return (RoomID.from_string(room_id), servers) @defer.inlineCallbacks def _get_inviter(self, user_id, room_id): @@ -699,44 +697,6 @@ class RoomMemberHandler(object): raise SynapseError( 403, "Looking up third-party identifiers is denied from this server" ) - - # Check what hashing details are supported by this identity server - use_v1 = False - hash_details = None - try: - hash_details = yield self.simple_http_client.get_json( - "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server) - ) - except (HttpResponseException, ValueError) as e: - # Catch HttpResponseExcept for a non-200 response code - # Catch ValueError for non-JSON response body - - # Check if this identity server does not know about v2 lookups - if e.code == 404: - # This is an old identity server that does not yet support v2 lookups - use_v1 = True - else: - logger.warn("Error when looking up hashing details: %s" % (e,)) - return None - - if use_v1: - return (yield self._lookup_3pid_v1(id_server, medium, address)) - - return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details)) - - @defer.inlineCallbacks - def _lookup_3pid_v1(self, id_server, medium, address): - """Looks up a 3pid in the passed identity server using v1 lookup. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - - Returns: - str: the matrix ID of the 3pid, or None if it is not recognized. - """ try: data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server), @@ -751,83 +711,8 @@ class RoomMemberHandler(object): except IOError as e: logger.warn("Error from identity server lookup: %s" % (e,)) - - return None - - @defer.inlineCallbacks - def _lookup_3pid_v2(self, id_server, medium, address, hash_details): - """Looks up a 3pid in the passed identity server using v2 lookup. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - hash_details (dict[str, str|list]): A dictionary containing hashing information - provided by an identity server. - - Returns: - Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised. - """ - # Extract information from hash_details - supported_lookup_algorithms = hash_details["algorithms"] - lookup_pepper = hash_details["lookup_pepper"] - - # Check if any of the supported lookup algorithms are present - if LookupAlgorithm.SHA256 in supported_lookup_algorithms: - # Perform a hashed lookup - lookup_algorithm = LookupAlgorithm.SHA256 - - # Hash address, medium and the pepper with sha256 - to_hash = "%s %s %s" % (address, medium, lookup_pepper) - lookup_value = sha256_and_url_safe_base64(to_hash) - - elif LookupAlgorithm.NONE in supported_lookup_algorithms: - # Perform a non-hashed lookup - lookup_algorithm = LookupAlgorithm.NONE - - # Combine together plaintext address and medium - lookup_value = "%s %s" % (address, medium) - - else: - logger.warn( - "None of the provided lookup algorithms of %s%s are supported: %s", - id_server_scheme, - id_server, - hash_details["algorithms"], - ) - raise SynapseError( - 400, - "Provided identity server does not support any v2 lookup " - "algorithms that this homeserver supports.", - ) - - try: - lookup_results = yield self.simple_http_client.post_json_get_json( - "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server), - { - "addresses": [lookup_value], - "algorithm": lookup_algorithm, - "pepper": lookup_pepper, - }, - ) - except (HttpResponseException, ValueError) as e: - # Catch HttpResponseExcept for a non-200 response code - # Catch ValueError for non-JSON response body - logger.warn("Error when performing a 3pid lookup: %s" % (e,)) return None - # Check for a mapping from what we looked up to an MXID - if "mappings" not in lookup_results or not isinstance( - lookup_results["mappings"], dict - ): - logger.debug("No results from 3pid lookup") - return None - - # Return the MXID if it's available, or None otherwise - mxid = lookup_results["mappings"].get(lookup_value) - return mxid - @defer.inlineCallbacks def _verify_any_signature(self, data, server_hostname): if server_hostname not in data["signatures"]: @@ -1018,7 +903,7 @@ class RoomMemberHandler(object): if not public_keys: public_keys.append(fallback_public_key) display_name = data["display_name"] - return (token, public_keys, fallback_public_key, display_name) + return token, public_keys, fallback_public_key, display_name @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4449da6669..921735edb3 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -14,15 +14,14 @@ # limitations under the License. import logging +from collections import Counter from twisted.internet import defer -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import EventTypes, Membership from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import UserID -from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler): def notify_new_event(self): """Called when there may be more deltas to process """ - if not self.hs.config.stats_enabled: + if not self.hs.config.stats_enabled or self._is_processing: return - if self._is_processing: - return + self._is_processing = True @defer.inlineCallbacks def process(): @@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler): finally: self._is_processing = False - self._is_processing = True run_as_background_process("stats.notify_new_event", process) @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() - - # If still None then the initial background update hasn't happened yet - if self.pos is None: - return None + self.pos = yield self.store.get_stats_positions() # Loop round handling deltas until we're up to date + while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: - return + deltas = yield self.store.get_current_state_deltas(self.pos) + + if deltas: + logger.debug("Handling %d state deltas", len(deltas)) + room_deltas, user_deltas = yield self._handle_deltas(deltas) + + max_pos = deltas[-1]["stream_id"] + else: + room_deltas = {} + user_deltas = {} + max_pos = yield self.store.get_room_max_stream_ordering() - logger.info("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) + # Then count deltas for total_events and total_event_bytes. + room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes( + self.pos, max_pos + ) + + for room_id, fields in room_count.items(): + room_deltas.setdefault(room_id, {}).update(fields) + + for user_id, fields in user_count.items(): + user_deltas.setdefault(user_id, {}).update(fields) + + logger.debug("room_deltas: %s", room_deltas) + logger.debug("user_deltas: %s", user_deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + # Always call this so that we update the stats position. + yield self.store.bulk_update_stats_delta( + self.clock.time_msec(), + updates={"room": room_deltas, "user": user_deltas}, + stream_id=max_pos, + ) + + event_processing_positions.labels("stats").set(max_pos) - event_processing_positions.labels("stats").set(self.pos) + if self.pos == max_pos: + break + + self.pos = max_pos @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + + Returns: + Deferred[tuple[dict[str, Counter], dict[str, counter]]] + Resovles to two dicts, the room deltas and the user deltas, + mapping from room/user ID to changes in the various fields. """ - Called with the state deltas to process - """ + + room_to_stats_deltas = {} + user_to_stats_deltas = {} + + room_to_state_updates = {} + for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] - stream_pos = delta["stream_id"] - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_room_stats(room_id) + token = yield self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler): continue if event_id is None and prev_event_id is None: - # Errr... + logger.error( + "event ID is None and so is the previous event ID. stream_id: %s", + stream_id, + ) continue event_content = {} + sender = None if event_id is not None: event = yield self.store.get_event(event_id, allow_none=True) if event: event_content = event.content or {} + sender = event.sender + + # All the values in this dict are deltas (RELATIVE changes) + room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter()) - # We use stream_pos here rather than fetch by event_id as event_id - # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + room_state = room_to_state_updates.setdefault(room_id, {}) - # quantise time to the nearest bucket - now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] += 1 if typ == EventTypes.Member: # we could use _get_key_change here but it's a bit inefficient # given we're not testing for a specific result; might as well # just grab the prev_membership and membership strings and # compare them. - prev_event_content = {} + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None if prev_event_id is not None: prev_event = yield self.store.get_event( prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) membership = event_content.get("membership", Membership.LEAVE) - prev_membership = prev_event_content.get("membership", Membership.LEAVE) - - if prev_membership == membership: - continue - if prev_membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", -1 - ) + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif membership == prev_membership: + pass # noop + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] -= 1 elif prev_membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", -1 - ) + room_stats_delta["invited_members"] -= 1 elif prev_membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", -1 - ) + room_stats_delta["left_members"] -= 1 elif prev_membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", -1 - ) + room_stats_delta["banned_members"] -= 1 else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) + raise ValueError( + "%r is not a valid prev_membership" % (prev_membership,) + ) + if membership == prev_membership: + pass # noop if membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", +1 - ) + room_stats_delta["joined_members"] += 1 elif membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", +1 - ) + room_stats_delta["invited_members"] += 1 + + if sender and self.is_mine_id(sender): + user_to_stats_deltas.setdefault(sender, Counter())[ + "invites_sent" + ] += 1 + elif membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", +1 - ) + room_stats_delta["left_members"] += 1 elif membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", +1 - ) + room_stats_delta["banned_members"] += 1 else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) + raise ValueError("%r is not a valid membership" % (membership,)) user_id = state_key if self.is_mine_id(user_id): - # update user_stats as it's one of our users - public = yield self._is_public_room(room_id) - - if membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - -1, - ) - elif membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - +1, - ) + # this accounts for transitions like leave → ban and so on. + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN + ) - elif typ == EventTypes.Create: - # Newly created room. Add it with all blank portions. - yield self.store.update_room_state( - room_id, - { - "join_rules": None, - "history_visibility": None, - "encryption": None, - "name": None, - "topic": None, - "avatar": None, - "canonical_alias": None, - }, - ) + if has_changed_joinedness: + delta = +1 if membership == Membership.JOIN else -1 - elif typ == EventTypes.JoinRules: - yield self.store.update_room_state( - room_id, {"join_rules": event_content.get("join_rule")} - ) + user_to_stats_deltas.setdefault(user_id, Counter())[ + "joined_rooms" + ] += delta - is_public = yield self._get_key_change( - prev_event_id, event_id, "join_rule", JoinRules.PUBLIC - ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + room_stats_delta["local_users_in_room"] += delta + elif typ == EventTypes.Create: + room_state["is_federatable"] = event_content.get("m.federate", True) + if sender and self.is_mine_id(sender): + user_to_stats_deltas.setdefault(sender, Counter())[ + "rooms_created" + ] += 1 + elif typ == EventTypes.JoinRules: + room_state["join_rules"] = event_content.get("join_rule") elif typ == EventTypes.RoomHistoryVisibility: - yield self.store.update_room_state( - room_id, - {"history_visibility": event_content.get("history_visibility")}, - ) - - is_public = yield self._get_key_change( - prev_event_id, event_id, "history_visibility", "world_readable" + room_state["history_visibility"] = event_content.get( + "history_visibility" ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) - elif typ == EventTypes.Encryption: - yield self.store.update_room_state( - room_id, {"encryption": event_content.get("algorithm")} - ) + room_state["encryption"] = event_content.get("algorithm") elif typ == EventTypes.Name: - yield self.store.update_room_state( - room_id, {"name": event_content.get("name")} - ) + room_state["name"] = event_content.get("name") elif typ == EventTypes.Topic: - yield self.store.update_room_state( - room_id, {"topic": event_content.get("topic")} - ) + room_state["topic"] = event_content.get("topic") elif typ == EventTypes.RoomAvatar: - yield self.store.update_room_state( - room_id, {"avatar": event_content.get("url")} - ) + room_state["avatar"] = event_content.get("url") elif typ == EventTypes.CanonicalAlias: - yield self.store.update_room_state( - room_id, {"canonical_alias": event_content.get("alias")} - ) + room_state["canonical_alias"] = event_content.get("alias") + elif typ == EventTypes.GuestAccess: + room_state["guest_access"] = event_content.get("guest_access") - @defer.inlineCallbacks - def update_public_room_stats(self, ts, room_id, is_public): - """ - Increment/decrement a user's number of public rooms when a room they are - in changes to/from public visibility. + for room_id, state in room_to_state_updates.items(): + yield self.store.update_room_state(room_id, state) - Args: - ts (int): Timestamp in seconds - room_id (str) - is_public (bool) - """ - # For now, blindly iterate over all local users in the room so that - # we can handle the whole problem of copying buckets over as needed - user_ids = yield self.store.get_users_in_room(room_id) - - for user_id in user_ids: - if self.hs.is_mine(UserID.from_string(user_id)): - yield self.store.update_stats_delta( - ts, "user", user_id, "public_rooms", +1 if is_public else -1 - ) - yield self.store.update_stats_delta( - ts, "user", user_id, "private_rooms", -1 if is_public else +1 - ) - - @defer.inlineCallbacks - def _is_public_room(self, room_id): - join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) - history_visibility = yield self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility - ) - - if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( - ( - history_visibility - and history_visibility.content.get("history_visibility") - == "world_readable" - ) - ): - return True - else: - return False + return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 986ac38f77..d99160e9d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -378,7 +378,7 @@ class SyncHandler(object): event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) - return (now_token, ephemeral_by_room) + return now_token, ephemeral_by_room @defer.inlineCallbacks def _load_filtered_recents( @@ -578,7 +578,6 @@ class SyncHandler(object): if not last_events: return None - return last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( @@ -1337,7 +1336,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - return ([], [], [], []) + return [], [], [], [] ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id @@ -1647,7 +1646,7 @@ class SyncHandler(object): ) room_entries.append(entry) - return (room_entries, invited, newly_joined_rooms, newly_left_rooms) + return room_entries, invited, newly_joined_rooms, newly_left_rooms @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): @@ -1721,7 +1720,7 @@ class SyncHandler(object): ) ) - return (room_entries, invited, []) + return room_entries, invited, [] @defer.inlineCallbacks def _generate_room_entry( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index f882330293..ca8ae9fb5b 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -319,4 +319,4 @@ class TypingNotificationEventSource(object): return self.get_typing_handler()._latest_room_serial def get_pagination_rows(self, user, pagination_config, key): - return ([], pagination_config.from_key) + return [], pagination_config.from_key diff --git a/synapse/http/client.py b/synapse/http/client.py index 0ac20ebefc..0ae6db8ea7 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -35,7 +35,7 @@ from twisted.internet.interfaces import ( ) from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool, PartialDownloadError, readBody +from twisted.web.client import Agent, HTTPConnectionPool, readBody from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers @@ -599,38 +599,6 @@ def _readBodyToFile(response, stream, max_size): return d -class CaptchaServerHttpClient(SimpleHttpClient): - """ - Separate HTTP client for talking to google's captcha servers - Only slightly special because accepts partial download responses - - used only by c/s api v1 - """ - - @defer.inlineCallbacks - def post_urlencoded_get_raw(self, url, args={}): - query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True) - - response = yield self.request( - "POST", - url, - data=query_bytes, - headers=Headers( - { - b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": [self.user_agent], - } - ), - ) - - try: - body = yield make_deferred_yieldable(readBody(response)) - return body - except PartialDownloadError as e: - # twisted dislikes google's response, no content length. - return e.response - - def encode_urlencode_args(args): return {k: encode_urlencode_arg(v) for k, v in args.items()} diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 5e9b0befb0..7ddfad286d 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -207,7 +207,7 @@ class WellKnownResolver(object): cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID, ) - return (result, cache_period) + return result, cache_period @defer.inlineCallbacks def _make_well_known_request(self, server_name, retry): diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index dd296027a1..256b972aaa 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -85,14 +85,14 @@ the function becomes the operation name for the span. return something_usual_and_useful -Operation names can be explicitly set for functions by using -``trace_using_operation_name`` +Operation names can be explicitly set for a function by passing the +operation name to ``trace`` .. code-block:: python - from synapse.logging.opentracing import trace_using_operation_name + from synapse.logging.opentracing import trace - @trace_using_operation_name("A *much* better operation name") + @trace(opname="a_better_operation_name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful @@ -641,66 +641,26 @@ def extract_text_map(carrier): # Tracing decorators -def trace(func): +def trace(func=None, opname=None): """ Decorator to trace a function. - Sets the operation name to that of the function's. + Sets the operation name to that of the function's or that given + as operation_name. See the module's doc string for usage + examples. """ - if opentracing is None: - return func - @wraps(func) - def _trace_inner(self, *args, **kwargs): - if opentracing is None: - return func(self, *args, **kwargs) - - scope = start_active_span(func.__name__) - scope.__enter__() - - try: - result = func(self, *args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result): - scope.__exit__(None, None, None) - return result - - def err_back(result): - scope.span.set_tag(tags.ERROR, True) - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - scope.__exit__(None, None, None) - - return result - - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - - return _trace_inner - - -def trace_using_operation_name(operation_name): - """Decorator to trace a function. Explicitely sets the operation_name.""" - - def trace(func): - """ - Decorator to trace a function. - Sets the operation name to that of the function's. - """ + def decorator(func): if opentracing is None: return func + _opname = opname if opname else func.__name__ + @wraps(func) def _trace_inner(self, *args, **kwargs): if opentracing is None: return func(self, *args, **kwargs) - scope = start_active_span(operation_name) + scope = start_active_span(_opname) scope.__enter__() try: @@ -717,6 +677,7 @@ def trace_using_operation_name(operation_name): return result result.addCallbacks(call_back, err_back) + else: scope.__exit__(None, None, None) @@ -728,7 +689,10 @@ def trace_using_operation_name(operation_name): return _trace_inner - return trace + if func: + return decorator(func) + else: + return decorator def tag_args(func): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 41147d4292..735b882363 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -101,7 +101,7 @@ class ModuleApi(object): ) user_id = yield self.register_user(localpart, displayname, emails) _, access_token = yield self.register_device(user_id) - return (user_id, access_token) + return user_id, access_token def register_user(self, localpart, displayname=None, emails=[]): """Registers a new user with given localpart and optional displayname, emails. diff --git a/synapse/notifier.py b/synapse/notifier.py index bd80c801b6..4e091314e6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -472,11 +472,11 @@ class Notifier(object): joined_room_ids = yield self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: - return ([explicit_room_id], True) + return [explicit_room_id], True if (yield self._is_world_readable(explicit_room_id)): - return ([explicit_room_id], False) + return [explicit_room_id], False raise AuthError(403, "Non-joined access not allowed") - return (joined_room_ids, True) + return joined_room_ids, True @defer.inlineCallbacks def _is_world_readable(self, room_id): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c831975635..22491f3700 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -134,7 +134,7 @@ class BulkPushRuleEvaluator(object): pl_event = auth_events.get(POWER_KEY) - return (pl_event.content if pl_event else {}, sender_level) + return pl_event.content if pl_event else {}, sender_level @defer.inlineCallbacks def action_for_event_by_user(self, event, context): diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index fed4f08820..2f16955954 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -113,7 +113,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event_and_contexts, backfilled ) - return (200, {}) + return 200, {} class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): @@ -156,7 +156,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): result = yield self.registry.on_edu(edu_type, origin, edu_content) - return (200, result) + return 200, result class ReplicationGetQueryRestServlet(ReplicationEndpoint): @@ -204,7 +204,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): result = yield self.registry.on_query(query_type, args) - return (200, result) + return 200, result class ReplicationCleanRoomRestServlet(ReplicationEndpoint): @@ -238,7 +238,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): def _handle_request(self, request, room_id): yield self.store.clean_room_for_join(room_id) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index f17d3a2da4..786f5232b2 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -64,7 +64,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): user_id, device_id, initial_display_name, is_guest ) - return (200, {"device_id": device_id, "access_token": access_token}) + return 200, {"device_id": device_id, "access_token": access_token} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 4217335d88..b9ce3477ad 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -83,7 +83,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): remote_room_hosts, room_id, user_id, event_content ) - return (200, {}) + return 200, {} class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): @@ -153,7 +153,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): yield self.store.locally_reject_invite(user_id, room_id) ret = {} - return (200, ret) + return 200, ret class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): @@ -202,7 +202,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): else: raise Exception("Unrecognized change: %r", change) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 3341320a87..38260256cf 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -90,7 +90,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): address=content["address"], ) - return (200, {}) + return 200, {} class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): @@ -106,7 +106,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): self.registration_handler = hs.get_registration_handler() @staticmethod - def _serialize_payload(user_id, auth_result, access_token, bind_email, bind_msisdn): + def _serialize_payload(user_id, auth_result, access_token): """ Args: user_id (str): The user ID that consented @@ -114,17 +114,8 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): registered user. access_token (str|None): The access token of the newly logged in device, or None if `inhibit_login` enabled. - bind_email (bool): Whether to bind the email with the identity - server - bind_msisdn (bool): Whether to bind the msisdn with the identity - server """ - return { - "auth_result": auth_result, - "access_token": access_token, - "bind_email": bind_email, - "bind_msisdn": bind_msisdn, - } + return {"auth_result": auth_result, "access_token": access_token} @defer.inlineCallbacks def _handle_request(self, request, user_id): @@ -132,18 +123,12 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): auth_result = content["auth_result"] access_token = content["access_token"] - bind_email = content["bind_email"] - bind_msisdn = content["bind_msisdn"] yield self.registration_handler.post_registration_actions( - user_id=user_id, - auth_result=auth_result, - access_token=access_token, - bind_email=bind_email, - bind_msisdn=bind_msisdn, + user_id=user_id, auth_result=auth_result, access_token=access_token ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index eff7bd7305..adb9b2f7f4 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -117,7 +117,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index c10b85d2ff..f03111c259 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -158,7 +158,7 @@ class Stream(object): updates, current_token = yield self.get_updates_since(self.last_token) self.last_token = current_token - return (updates, current_token) + return updates, current_token @defer.inlineCallbacks def get_updates_since(self, from_token): @@ -172,14 +172,14 @@ class Stream(object): sent over the replication steam. """ if from_token in ("NOW", "now"): - return ([], self.upto_token) + return [], self.upto_token current_token = self.upto_token from_token = int(from_token) if from_token == current_token: - return ([], current_token) + return [], current_token if self._LIMITED: rows = yield self.update_function( @@ -198,7 +198,7 @@ class Stream(object): if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND: raise Exception("stream %s has fallen behind" % (self.NAME)) - return (updates, current_token) + return updates, current_token def current_token(self): """Gets the current token of the underlying streams. Should be provided diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index fa91cc8dee..81b6bd8816 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -41,7 +41,7 @@ from synapse.rest.admin._base import ( assert_user_is_admin, historical_admin_path_patterns, ) -from synapse.rest.admin.media import register_servlets_for_media_repo +from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.users import UserAdminServlet @@ -69,7 +69,7 @@ class UsersRestServlet(RestServlet): ret = yield self.handlers.admin_handler.get_users() - return (200, ret) + return 200, ret class VersionServlet(RestServlet): @@ -120,7 +120,7 @@ class UserRegisterServlet(RestServlet): nonce = self.hs.get_secrets().token_hex(64) self.nonces[nonce] = int(self.reactor.seconds()) - return (200, {"nonce": nonce}) + return 200, {"nonce": nonce} @defer.inlineCallbacks def on_POST(self, request): @@ -212,7 +212,7 @@ class UserRegisterServlet(RestServlet): ) result = yield register._create_registration_details(user_id, body) - return (200, result) + return 200, result class WhoisRestServlet(RestServlet): @@ -237,7 +237,7 @@ class WhoisRestServlet(RestServlet): ret = yield self.handlers.admin_handler.get_whois(target_user) - return (200, ret) + return 200, ret class PurgeHistoryRestServlet(RestServlet): @@ -322,7 +322,7 @@ class PurgeHistoryRestServlet(RestServlet): room_id, token, delete_local_events=delete_local_events ) - return (200, {"purge_id": purge_id}) + return 200, {"purge_id": purge_id} class PurgeHistoryStatusRestServlet(RestServlet): @@ -347,7 +347,7 @@ class PurgeHistoryStatusRestServlet(RestServlet): if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) - return (200, purge_status.asdict()) + return 200, purge_status.asdict() class DeactivateAccountRestServlet(RestServlet): @@ -379,7 +379,7 @@ class DeactivateAccountRestServlet(RestServlet): else: id_server_unbind_result = "no-support" - return (200, {"id_server_unbind_result": id_server_unbind_result}) + return 200, {"id_server_unbind_result": id_server_unbind_result} class ShutdownRoomRestServlet(RestServlet): @@ -549,7 +549,7 @@ class ResetPasswordRestServlet(RestServlet): yield self._set_password_handler.set_password( target_user_id, new_password, requester ) - return (200, {}) + return 200, {} class GetUsersPaginatedRestServlet(RestServlet): @@ -591,7 +591,7 @@ class GetUsersPaginatedRestServlet(RestServlet): logger.info("limit: %s, start: %s", limit, start) ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit) - return (200, ret) + return 200, ret @defer.inlineCallbacks def on_POST(self, request, target_user_id): @@ -619,7 +619,7 @@ class GetUsersPaginatedRestServlet(RestServlet): logger.info("limit: %s, start: %s", limit, start) ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit) - return (200, ret) + return 200, ret class SearchUsersRestServlet(RestServlet): @@ -662,7 +662,7 @@ class SearchUsersRestServlet(RestServlet): logger.info("term: %s ", term) ret = yield self.handlers.admin_handler.search_users(term) - return (200, ret) + return 200, ret class DeleteGroupAdminRestServlet(RestServlet): @@ -685,7 +685,7 @@ class DeleteGroupAdminRestServlet(RestServlet): raise SynapseError(400, "Can only delete local groups") yield self.group_server.delete_group(group_id, requester.user.to_string()) - return (200, {}) + return 200, {} class AccountValidityRenewServlet(RestServlet): @@ -716,7 +716,7 @@ class AccountValidityRenewServlet(RestServlet): ) res = {"expiration_ts": expiration_ts} - return (200, res) + return 200, res ######################################################################################## @@ -761,9 +761,12 @@ def register_servlets_for_client_rest_resource(hs, http_server): DeleteGroupAdminRestServlet(hs).register(http_server) AccountValidityRenewServlet(hs).register(http_server) - # Load the media repo ones if we're using them. + # Load the media repo ones if we're using them. Otherwise load the servlets which + # don't need a media repo (typically readonly admin APIs). if hs.config.can_load_media_repo: register_servlets_for_media_repo(hs, http_server) + else: + ListMediaInRoom(hs).register(http_server) # don't add more things here: new servlets should only be exposed on # /_synapse/admin so should not go here. Instead register them in AdminRestResource. diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py index 824df919f2..ed7086d09c 100644 --- a/synapse/rest/admin/media.py +++ b/synapse/rest/admin/media.py @@ -49,7 +49,7 @@ class QuarantineMediaInRoom(RestServlet): room_id, requester.user.to_string() ) - return (200, {"num_quarantined": num_quarantined}) + return 200, {"num_quarantined": num_quarantined} class ListMediaInRoom(RestServlet): @@ -60,6 +60,7 @@ class ListMediaInRoom(RestServlet): def __init__(self, hs): self.store = hs.get_datastore() + self.auth = hs.get_auth() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -70,7 +71,7 @@ class ListMediaInRoom(RestServlet): local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id) - return (200, {"local": local_mxcs, "remote": remote_mxcs}) + return 200, {"local": local_mxcs, "remote": remote_mxcs} class PurgeMediaCacheRestServlet(RestServlet): @@ -89,7 +90,7 @@ class PurgeMediaCacheRestServlet(RestServlet): ret = yield self.media_repository.delete_old_remote_media(before_ts) - return (200, ret) + return 200, ret def register_servlets_for_media_repo(hs, http_server): diff --git a/synapse/rest/admin/purge_room_servlet.py b/synapse/rest/admin/purge_room_servlet.py index 2922eb543e..f474066542 100644 --- a/synapse/rest/admin/purge_room_servlet.py +++ b/synapse/rest/admin/purge_room_servlet.py @@ -54,4 +54,4 @@ class PurgeRoomServlet(RestServlet): await self.pagination_handler.purge_room(body["room_id"]) - return (200, {}) + return 200, {} diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index 656526fea5..ae2cbe2e0a 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -92,7 +92,7 @@ class SendServerNoticeServlet(RestServlet): event_content=body["content"], ) - return (200, {"event_id": event.event_id}) + return 200, {"event_id": event.event_id} def on_PUT(self, request, txn_id): return self.txns.fetch_or_execute_request( diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 5364117420..9720a3bab0 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -71,7 +71,7 @@ class UserAdminServlet(RestServlet): is_admin = yield self.handlers.admin_handler.get_user_server_admin(target_user) is_admin = bool(is_admin) - return (200, {"admin": is_admin}) + return 200, {"admin": is_admin} @defer.inlineCallbacks def on_PUT(self, request, user_id): @@ -97,4 +97,4 @@ class UserAdminServlet(RestServlet): target_user, set_admin_to ) - return (200, {}) + return 200, {} diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 4284738021..4ea3666874 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -54,7 +54,7 @@ class ClientDirectoryServer(RestServlet): dir_handler = self.handlers.directory_handler res = yield dir_handler.get_association(room_alias) - return (200, res) + return 200, res @defer.inlineCallbacks def on_PUT(self, request, room_alias): @@ -87,7 +87,7 @@ class ClientDirectoryServer(RestServlet): requester, room_alias, room_id, servers ) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_DELETE(self, request, room_alias): @@ -102,7 +102,7 @@ class ClientDirectoryServer(RestServlet): service.url, room_alias.to_string(), ) - return (200, {}) + return 200, {} except InvalidClientCredentialsError: # fallback to default user behaviour if they aren't an AS pass @@ -118,7 +118,7 @@ class ClientDirectoryServer(RestServlet): "User %s deleted alias %s", user.to_string(), room_alias.to_string() ) - return (200, {}) + return 200, {} class ClientDirectoryListServer(RestServlet): @@ -136,7 +136,7 @@ class ClientDirectoryListServer(RestServlet): if room is None: raise NotFoundError("Unknown room") - return (200, {"visibility": "public" if room["is_public"] else "private"}) + return 200, {"visibility": "public" if room["is_public"] else "private"} @defer.inlineCallbacks def on_PUT(self, request, room_id): @@ -149,7 +149,7 @@ class ClientDirectoryListServer(RestServlet): requester, room_id, visibility ) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_DELETE(self, request, room_id): @@ -159,7 +159,7 @@ class ClientDirectoryListServer(RestServlet): requester, room_id, "private" ) - return (200, {}) + return 200, {} class ClientAppserviceDirectoryListServer(RestServlet): @@ -193,4 +193,4 @@ class ClientAppserviceDirectoryListServer(RestServlet): requester.app_service.id, network_id, room_id, visibility ) - return (200, {}) + return 200, {} diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 53ebed2203..6651b4cf07 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -67,10 +67,10 @@ class EventStreamRestServlet(RestServlet): is_guest=is_guest, ) - return (200, chunk) + return 200, chunk def on_OPTIONS(self, request): - return (200, {}) + return 200, {} # TODO: Unit test gets, with and without auth, with different kinds of events. @@ -91,9 +91,9 @@ class EventRestServlet(RestServlet): time_now = self.clock.time_msec() if event: event = yield self._event_serializer.serialize_event(event, time_now) - return (200, event) + return 200, event else: - return (404, "Event not found.") + return 404, "Event not found." def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index 70b8478e90..2da3cd7511 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -42,7 +42,7 @@ class InitialSyncRestServlet(RestServlet): include_archived=include_archived, ) - return (200, content) + return 200, content def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 5762b9fd06..25a1b67092 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -121,10 +121,10 @@ class LoginRestServlet(RestServlet): ({"type": t} for t in self.auth_handler.get_supported_login_types()) ) - return (200, {"flows": flows}) + return 200, {"flows": flows} def on_OPTIONS(self, request): - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_POST(self, request): @@ -152,7 +152,7 @@ class LoginRestServlet(RestServlet): well_known_data = self._well_known_builder.get_well_known() if well_known_data: result["well_known"] = well_known_data - return (200, result) + return 200, result @defer.inlineCallbacks def _do_other_login(self, login_submission): diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py index 2769f3a189..4785a34d75 100644 --- a/synapse/rest/client/v1/logout.py +++ b/synapse/rest/client/v1/logout.py @@ -33,7 +33,7 @@ class LogoutRestServlet(RestServlet): self._device_handler = hs.get_device_handler() def on_OPTIONS(self, request): - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_POST(self, request): @@ -49,7 +49,7 @@ class LogoutRestServlet(RestServlet): requester.user.to_string(), requester.device_id ) - return (200, {}) + return 200, {} class LogoutAllRestServlet(RestServlet): @@ -62,7 +62,7 @@ class LogoutAllRestServlet(RestServlet): self._device_handler = hs.get_device_handler() def on_OPTIONS(self, request): - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_POST(self, request): @@ -75,7 +75,7 @@ class LogoutAllRestServlet(RestServlet): # .. and then delete any access tokens which weren't associated with # devices. yield self._auth_handler.delete_access_tokens_for_user(user_id) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 1eb1068c98..0153525cef 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -56,7 +56,7 @@ class PresenceStatusRestServlet(RestServlet): state = yield self.presence_handler.get_state(target_user=user) state = format_user_presence_state(state, self.clock.time_msec()) - return (200, state) + return 200, state @defer.inlineCallbacks def on_PUT(self, request, user_id): @@ -88,10 +88,10 @@ class PresenceStatusRestServlet(RestServlet): if self.hs.config.use_presence: yield self.presence_handler.set_state(user, state) - return (200, {}) + return 200, {} def on_OPTIONS(self, request): - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 2657ae45bb..bbce2e2b71 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -48,7 +48,7 @@ class ProfileDisplaynameRestServlet(RestServlet): if displayname is not None: ret["displayname"] = displayname - return (200, ret) + return 200, ret @defer.inlineCallbacks def on_PUT(self, request, user_id): @@ -61,14 +61,14 @@ class ProfileDisplaynameRestServlet(RestServlet): try: new_name = content["displayname"] except Exception: - return (400, "Unable to parse name") + return 400, "Unable to parse name" yield self.profile_handler.set_displayname(user, requester, new_name, is_admin) - return (200, {}) + return 200, {} def on_OPTIONS(self, request, user_id): - return (200, {}) + return 200, {} class ProfileAvatarURLRestServlet(RestServlet): @@ -98,7 +98,7 @@ class ProfileAvatarURLRestServlet(RestServlet): if avatar_url is not None: ret["avatar_url"] = avatar_url - return (200, ret) + return 200, ret @defer.inlineCallbacks def on_PUT(self, request, user_id): @@ -110,14 +110,14 @@ class ProfileAvatarURLRestServlet(RestServlet): try: new_name = content["avatar_url"] except Exception: - return (400, "Unable to parse name") + return 400, "Unable to parse name" yield self.profile_handler.set_avatar_url(user, requester, new_name, is_admin) - return (200, {}) + return 200, {} def on_OPTIONS(self, request, user_id): - return (200, {}) + return 200, {} class ProfileRestServlet(RestServlet): @@ -150,7 +150,7 @@ class ProfileRestServlet(RestServlet): if avatar_url is not None: ret["avatar_url"] = avatar_url - return (200, ret) + return 200, ret def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c3ae8b98a8..9f8c3d09e3 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -69,7 +69,7 @@ class PushRuleRestServlet(RestServlet): if "attr" in spec: yield self.set_rule_attr(user_id, spec, content) self.notify_user(user_id) - return (200, {}) + return 200, {} if spec["rule_id"].startswith("."): # Rule ids starting with '.' are reserved for server default rules. @@ -106,7 +106,7 @@ class PushRuleRestServlet(RestServlet): except RuleNotFoundException as e: raise SynapseError(400, str(e)) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_DELETE(self, request, path): @@ -123,7 +123,7 @@ class PushRuleRestServlet(RestServlet): try: yield self.store.delete_push_rule(user_id, namespaced_rule_id) self.notify_user(user_id) - return (200, {}) + return 200, {} except StoreError as e: if e.code == 404: raise NotFoundError() @@ -151,10 +151,10 @@ class PushRuleRestServlet(RestServlet): ) if path[0] == "": - return (200, rules) + return 200, rules elif path[0] == "global": result = _filter_ruleset_with_path(rules["global"], path[1:]) - return (200, result) + return 200, result else: raise UnrecognizedRequestError() diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index ebc3dec516..41660682d9 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -62,7 +62,7 @@ class PushersRestServlet(RestServlet): if k not in allowed_keys: del p[k] - return (200, {"pushers": pushers}) + return 200, {"pushers": pushers} def on_OPTIONS(self, _): return 200, {} @@ -94,7 +94,7 @@ class PushersSetRestServlet(RestServlet): yield self.pusher_pool.remove_pusher( content["app_id"], content["pushkey"], user_id=user.to_string() ) - return (200, {}) + return 200, {} assert_params_in_dict( content, @@ -143,7 +143,7 @@ class PushersSetRestServlet(RestServlet): self.notifier.on_new_replication_data() - return (200, {}) + return 200, {} def on_OPTIONS(self, _): return 200, {} diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4b2344e696..3582259026 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -91,14 +91,14 @@ class RoomCreateRestServlet(TransactionRestServlet): requester, self.get_room_config(request) ) - return (200, info) + return 200, info def get_room_config(self, request): user_supplied_config = parse_json_object_from_request(request) return user_supplied_config def on_OPTIONS(self, request): - return (200, {}) + return 200, {} # TODO: Needs unit testing for generic events @@ -173,9 +173,9 @@ class RoomStateEventRestServlet(TransactionRestServlet): if format == "event": event = format_event_for_client_v2(data.get_dict()) - return (200, event) + return 200, event elif format == "content": - return (200, data.get_dict()["content"]) + return 200, data.get_dict()["content"] @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): @@ -210,7 +210,7 @@ class RoomStateEventRestServlet(TransactionRestServlet): ret = {} if event: ret = {"event_id": event.event_id} - return (200, ret) + return 200, ret # TODO: Needs unit testing for generic events + feedback @@ -244,10 +244,10 @@ class RoomSendEventRestServlet(TransactionRestServlet): requester, event_dict, txn_id=txn_id ) - return (200, {"event_id": event.event_id}) + return 200, {"event_id": event.event_id} def on_GET(self, request, room_id, event_type, txn_id): - return (200, "Not implemented") + return 200, "Not implemented" def on_PUT(self, request, room_id, event_type, txn_id): return self.txns.fetch_or_execute_request( @@ -307,7 +307,7 @@ class JoinRoomAliasServlet(TransactionRestServlet): third_party_signed=content.get("third_party_signed", None), ) - return (200, {"room_id": room_id}) + return 200, {"room_id": room_id} def on_PUT(self, request, room_identifier, txn_id): return self.txns.fetch_or_execute_request( @@ -360,7 +360,7 @@ class PublicRoomListRestServlet(TransactionRestServlet): limit=limit, since_token=since_token ) - return (200, data) + return 200, data @defer.inlineCallbacks def on_POST(self, request): @@ -405,7 +405,7 @@ class PublicRoomListRestServlet(TransactionRestServlet): network_tuple=network_tuple, ) - return (200, data) + return 200, data # TODO: Needs unit testing @@ -456,7 +456,7 @@ class RoomMemberListRestServlet(RestServlet): continue chunk.append(event) - return (200, {"chunk": chunk}) + return 200, {"chunk": chunk} # deprecated in favour of /members?membership=join? @@ -477,7 +477,7 @@ class JoinedRoomMemberListRestServlet(RestServlet): requester, room_id ) - return (200, {"joined": users_with_profile}) + return 200, {"joined": users_with_profile} # TODO: Needs better unit testing @@ -510,7 +510,7 @@ class RoomMessageListRestServlet(RestServlet): event_filter=event_filter, ) - return (200, msgs) + return 200, msgs # TODO: Needs unit testing @@ -531,7 +531,7 @@ class RoomStateRestServlet(RestServlet): user_id=requester.user.to_string(), is_guest=requester.is_guest, ) - return (200, events) + return 200, events # TODO: Needs unit testing @@ -550,7 +550,7 @@ class RoomInitialSyncRestServlet(RestServlet): content = yield self.initial_sync_handler.room_initial_sync( room_id=room_id, requester=requester, pagin_config=pagination_config ) - return (200, content) + return 200, content class RoomEventServlet(RestServlet): @@ -581,7 +581,7 @@ class RoomEventServlet(RestServlet): time_now = self.clock.time_msec() if event: event = yield self._event_serializer.serialize_event(event, time_now) - return (200, event) + return 200, event return SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) @@ -633,7 +633,7 @@ class RoomEventContextServlet(RestServlet): results["state"], time_now ) - return (200, results) + return 200, results class RoomForgetRestServlet(TransactionRestServlet): @@ -652,7 +652,7 @@ class RoomForgetRestServlet(TransactionRestServlet): yield self.room_member_handler.forget(user=requester.user, room_id=room_id) - return (200, {}) + return 200, {} def on_PUT(self, request, room_id, txn_id): return self.txns.fetch_or_execute_request( @@ -702,8 +702,7 @@ class RoomMembershipRestServlet(TransactionRestServlet): requester, txn_id, ) - return (200, {}) - return + return 200, {} target = requester.user if membership_action in ["invite", "ban", "unban", "kick"]: @@ -729,7 +728,7 @@ class RoomMembershipRestServlet(TransactionRestServlet): if membership_action == "join": return_value["room_id"] = room_id - return (200, return_value) + return 200, return_value def _has_3pid_invite_keys(self, content): for key in {"id_server", "medium", "address"}: @@ -771,7 +770,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet): txn_id=txn_id, ) - return (200, {"event_id": event.event_id}) + return 200, {"event_id": event.event_id} def on_PUT(self, request, room_id, event_id, txn_id): return self.txns.fetch_or_execute_request( @@ -816,7 +815,7 @@ class RoomTypingRestServlet(RestServlet): target_user=target_user, auth_user=requester.user, room_id=room_id ) - return (200, {}) + return 200, {} class SearchRestServlet(RestServlet): @@ -838,7 +837,7 @@ class SearchRestServlet(RestServlet): requester.user, content, batch ) - return (200, results) + return 200, results class JoinedRoomsRestServlet(RestServlet): @@ -854,7 +853,7 @@ class JoinedRoomsRestServlet(RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) room_ids = yield self.store.get_rooms_for_user(requester.user.to_string()) - return (200, {"joined_rooms": list(room_ids)}) + return 200, {"joined_rooms": list(room_ids)} def register_txn_path(servlet, regex_string, http_server, with_get=False): diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index 497cddf8b8..2afdbb89e5 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -60,7 +60,7 @@ class VoipRestServlet(RestServlet): password = turnPassword else: - return (200, {}) + return 200, {} return ( 200, @@ -73,7 +73,7 @@ class VoipRestServlet(RestServlet): ) def on_OPTIONS(self, request): - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 934ed5d16d..0620a4d0cf 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -117,7 +117,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): # Wrap the session id in a JSON object ret = {"sid": sid} - return (200, ret) + return 200, ret @defer.inlineCallbacks def send_password_reset(self, email, client_secret, send_attempt, next_link=None): @@ -221,7 +221,7 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND) ret = yield self.identity_handler.requestMsisdnToken(**body) - return (200, ret) + return 200, ret class PasswordResetSubmitTokenServlet(RestServlet): @@ -330,7 +330,7 @@ class PasswordResetSubmitTokenServlet(RestServlet): ) response_code = 200 if valid else 400 - return (response_code, {"success": valid}) + return response_code, {"success": valid} class PasswordRestServlet(RestServlet): @@ -399,7 +399,7 @@ class PasswordRestServlet(RestServlet): yield self._set_password_handler.set_password(user_id, new_password, requester) - return (200, {}) + return 200, {} def on_OPTIONS(self, _): return 200, {} @@ -434,7 +434,7 @@ class DeactivateAccountRestServlet(RestServlet): yield self._deactivate_account_handler.deactivate_account( requester.user.to_string(), erase ) - return (200, {}) + return 200, {} yield self.auth_handler.validate_user_via_ui_auth( requester, body, self.hs.get_ip_from_request(request) @@ -447,7 +447,7 @@ class DeactivateAccountRestServlet(RestServlet): else: id_server_unbind_result = "no-support" - return (200, {"id_server_unbind_result": id_server_unbind_result}) + return 200, {"id_server_unbind_result": id_server_unbind_result} class EmailThreepidRequestTokenRestServlet(RestServlet): @@ -481,7 +481,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) ret = yield self.identity_handler.requestEmailToken(**body) - return (200, ret) + return 200, ret class MsisdnThreepidRequestTokenRestServlet(RestServlet): @@ -516,7 +516,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE) ret = yield self.identity_handler.requestMsisdnToken(**body) - return (200, ret) + return 200, ret class ThreepidRestServlet(RestServlet): @@ -536,7 +536,7 @@ class ThreepidRestServlet(RestServlet): threepids = yield self.datastore.user_get_threepids(requester.user.to_string()) - return (200, {"threepids": threepids}) + return 200, {"threepids": threepids} @defer.inlineCallbacks def on_POST(self, request): @@ -568,7 +568,7 @@ class ThreepidRestServlet(RestServlet): logger.debug("Binding threepid %s to %s", threepid, user_id) yield self.identity_handler.bind_threepid(threePidCreds, user_id) - return (200, {}) + return 200, {} class ThreepidDeleteRestServlet(RestServlet): @@ -603,7 +603,7 @@ class ThreepidDeleteRestServlet(RestServlet): else: id_server_unbind_result = "no-support" - return (200, {"id_server_unbind_result": id_server_unbind_result}) + return 200, {"id_server_unbind_result": id_server_unbind_result} class WhoamiRestServlet(RestServlet): @@ -617,7 +617,7 @@ class WhoamiRestServlet(RestServlet): def on_GET(self, request): requester = yield self.auth.get_user_by_req(request) - return (200, {"user_id": requester.user.to_string()}) + return 200, {"user_id": requester.user.to_string()} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 98f2f6f4b5..f0db204ffa 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -55,7 +55,7 @@ class AccountDataServlet(RestServlet): self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_GET(self, request, user_id, account_data_type): @@ -70,7 +70,7 @@ class AccountDataServlet(RestServlet): if event is None: raise NotFoundError("Account data not found") - return (200, event) + return 200, event class RoomAccountDataServlet(RestServlet): @@ -112,7 +112,7 @@ class RoomAccountDataServlet(RestServlet): self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_GET(self, request, user_id, room_id, account_data_type): @@ -127,7 +127,7 @@ class RoomAccountDataServlet(RestServlet): if event is None: raise NotFoundError("Room account data not found") - return (200, event) + return 200, event def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py index a4fa45fe11..acd58af193 100644 --- a/synapse/rest/client/v2_alpha/capabilities.py +++ b/synapse/rest/client/v2_alpha/capabilities.py @@ -58,7 +58,7 @@ class CapabilitiesRestServlet(RestServlet): "m.change_password": {"enabled": change_password}, } } - return (200, response) + return 200, response def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 9adf76cc0c..26d0235208 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -48,7 +48,7 @@ class DevicesRestServlet(RestServlet): devices = yield self.device_handler.get_devices_by_user( requester.user.to_string() ) - return (200, {"devices": devices}) + return 200, {"devices": devices} class DeleteDevicesRestServlet(RestServlet): @@ -91,7 +91,7 @@ class DeleteDevicesRestServlet(RestServlet): yield self.device_handler.delete_devices( requester.user.to_string(), body["devices"] ) - return (200, {}) + return 200, {} class DeviceRestServlet(RestServlet): @@ -114,7 +114,7 @@ class DeviceRestServlet(RestServlet): device = yield self.device_handler.get_device( requester.user.to_string(), device_id ) - return (200, device) + return 200, device @interactive_auth_handler @defer.inlineCallbacks @@ -137,7 +137,7 @@ class DeviceRestServlet(RestServlet): ) yield self.device_handler.delete_device(requester.user.to_string(), device_id) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_PUT(self, request, device_id): @@ -147,7 +147,7 @@ class DeviceRestServlet(RestServlet): yield self.device_handler.update_device( requester.user.to_string(), device_id, body ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index 22be0ee3c5..c6ddf24c8d 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -56,7 +56,7 @@ class GetFilterRestServlet(RestServlet): user_localpart=target_user.localpart, filter_id=filter_id ) - return (200, filter.get_filter_json()) + return 200, filter.get_filter_json() except (KeyError, StoreError): raise SynapseError(400, "No such filter", errcode=Codes.NOT_FOUND) @@ -89,7 +89,7 @@ class CreateFilterRestServlet(RestServlet): user_localpart=target_user.localpart, user_filter=content ) - return (200, {"filter_id": str(filter_id)}) + return 200, {"filter_id": str(filter_id)} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index e629c4256d..999a0fa80c 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -47,7 +47,7 @@ class GroupServlet(RestServlet): group_id, requester_user_id ) - return (200, group_description) + return 200, group_description @defer.inlineCallbacks def on_POST(self, request, group_id): @@ -59,7 +59,7 @@ class GroupServlet(RestServlet): group_id, requester_user_id, content ) - return (200, {}) + return 200, {} class GroupSummaryServlet(RestServlet): @@ -83,7 +83,7 @@ class GroupSummaryServlet(RestServlet): group_id, requester_user_id ) - return (200, get_group_summary) + return 200, get_group_summary class GroupSummaryRoomsCatServlet(RestServlet): @@ -120,7 +120,7 @@ class GroupSummaryRoomsCatServlet(RestServlet): content=content, ) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_DELETE(self, request, group_id, category_id, room_id): @@ -131,7 +131,7 @@ class GroupSummaryRoomsCatServlet(RestServlet): group_id, requester_user_id, room_id=room_id, category_id=category_id ) - return (200, resp) + return 200, resp class GroupCategoryServlet(RestServlet): @@ -157,7 +157,7 @@ class GroupCategoryServlet(RestServlet): group_id, requester_user_id, category_id=category_id ) - return (200, category) + return 200, category @defer.inlineCallbacks def on_PUT(self, request, group_id, category_id): @@ -169,7 +169,7 @@ class GroupCategoryServlet(RestServlet): group_id, requester_user_id, category_id=category_id, content=content ) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_DELETE(self, request, group_id, category_id): @@ -180,7 +180,7 @@ class GroupCategoryServlet(RestServlet): group_id, requester_user_id, category_id=category_id ) - return (200, resp) + return 200, resp class GroupCategoriesServlet(RestServlet): @@ -204,7 +204,7 @@ class GroupCategoriesServlet(RestServlet): group_id, requester_user_id ) - return (200, category) + return 200, category class GroupRoleServlet(RestServlet): @@ -228,7 +228,7 @@ class GroupRoleServlet(RestServlet): group_id, requester_user_id, role_id=role_id ) - return (200, category) + return 200, category @defer.inlineCallbacks def on_PUT(self, request, group_id, role_id): @@ -240,7 +240,7 @@ class GroupRoleServlet(RestServlet): group_id, requester_user_id, role_id=role_id, content=content ) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_DELETE(self, request, group_id, role_id): @@ -251,7 +251,7 @@ class GroupRoleServlet(RestServlet): group_id, requester_user_id, role_id=role_id ) - return (200, resp) + return 200, resp class GroupRolesServlet(RestServlet): @@ -275,7 +275,7 @@ class GroupRolesServlet(RestServlet): group_id, requester_user_id ) - return (200, category) + return 200, category class GroupSummaryUsersRoleServlet(RestServlet): @@ -312,7 +312,7 @@ class GroupSummaryUsersRoleServlet(RestServlet): content=content, ) - return (200, resp) + return 200, resp @defer.inlineCallbacks def on_DELETE(self, request, group_id, role_id, user_id): @@ -323,7 +323,7 @@ class GroupSummaryUsersRoleServlet(RestServlet): group_id, requester_user_id, user_id=user_id, role_id=role_id ) - return (200, resp) + return 200, resp class GroupRoomServlet(RestServlet): @@ -347,7 +347,7 @@ class GroupRoomServlet(RestServlet): group_id, requester_user_id ) - return (200, result) + return 200, result class GroupUsersServlet(RestServlet): @@ -371,7 +371,7 @@ class GroupUsersServlet(RestServlet): group_id, requester_user_id ) - return (200, result) + return 200, result class GroupInvitedUsersServlet(RestServlet): @@ -395,7 +395,7 @@ class GroupInvitedUsersServlet(RestServlet): group_id, requester_user_id ) - return (200, result) + return 200, result class GroupSettingJoinPolicyServlet(RestServlet): @@ -420,7 +420,7 @@ class GroupSettingJoinPolicyServlet(RestServlet): group_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupCreateServlet(RestServlet): @@ -450,7 +450,7 @@ class GroupCreateServlet(RestServlet): group_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupAdminRoomsServlet(RestServlet): @@ -477,7 +477,7 @@ class GroupAdminRoomsServlet(RestServlet): group_id, requester_user_id, room_id, content ) - return (200, result) + return 200, result @defer.inlineCallbacks def on_DELETE(self, request, group_id, room_id): @@ -488,7 +488,7 @@ class GroupAdminRoomsServlet(RestServlet): group_id, requester_user_id, room_id ) - return (200, result) + return 200, result class GroupAdminRoomsConfigServlet(RestServlet): @@ -516,7 +516,7 @@ class GroupAdminRoomsConfigServlet(RestServlet): group_id, requester_user_id, room_id, config_key, content ) - return (200, result) + return 200, result class GroupAdminUsersInviteServlet(RestServlet): @@ -546,7 +546,7 @@ class GroupAdminUsersInviteServlet(RestServlet): group_id, user_id, requester_user_id, config ) - return (200, result) + return 200, result class GroupAdminUsersKickServlet(RestServlet): @@ -573,7 +573,7 @@ class GroupAdminUsersKickServlet(RestServlet): group_id, user_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupSelfLeaveServlet(RestServlet): @@ -598,7 +598,7 @@ class GroupSelfLeaveServlet(RestServlet): group_id, requester_user_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupSelfJoinServlet(RestServlet): @@ -623,7 +623,7 @@ class GroupSelfJoinServlet(RestServlet): group_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupSelfAcceptInviteServlet(RestServlet): @@ -648,7 +648,7 @@ class GroupSelfAcceptInviteServlet(RestServlet): group_id, requester_user_id, content ) - return (200, result) + return 200, result class GroupSelfUpdatePublicityServlet(RestServlet): @@ -672,7 +672,7 @@ class GroupSelfUpdatePublicityServlet(RestServlet): publicise = content["publicise"] yield self.store.update_group_publicity(group_id, requester_user_id, publicise) - return (200, {}) + return 200, {} class PublicisedGroupsForUserServlet(RestServlet): @@ -694,7 +694,7 @@ class PublicisedGroupsForUserServlet(RestServlet): result = yield self.groups_handler.get_publicised_groups_for_user(user_id) - return (200, result) + return 200, result class PublicisedGroupsForUsersServlet(RestServlet): @@ -719,7 +719,7 @@ class PublicisedGroupsForUsersServlet(RestServlet): result = yield self.groups_handler.bulk_get_publicised_groups(user_ids) - return (200, result) + return 200, result class GroupsForUserServlet(RestServlet): @@ -741,7 +741,7 @@ class GroupsForUserServlet(RestServlet): result = yield self.groups_handler.get_joined_groups(requester_user_id) - return (200, result) + return 200, result def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 7cbec3d4d8..151a70d449 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) -from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import StreamToken from ._base import client_patterns, interactive_auth_handler @@ -70,7 +70,7 @@ class KeyUploadServlet(RestServlet): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace_using_operation_name("upload_keys") + @trace(opname="upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -106,7 +106,7 @@ class KeyUploadServlet(RestServlet): result = yield self.e2e_keys_handler.upload_keys_for_user( user_id, device_id, body ) - return (200, result) + return 200, result class KeyQueryServlet(RestServlet): @@ -161,7 +161,7 @@ class KeyQueryServlet(RestServlet): timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) result = yield self.e2e_keys_handler.query_devices(body, timeout, user_id) - return (200, result) + return 200, result class KeyChangesServlet(RestServlet): @@ -202,7 +202,7 @@ class KeyChangesServlet(RestServlet): results = yield self.device_handler.get_user_ids_changed(user_id, from_token) - return (200, results) + return 200, results class OneTimeKeyServlet(RestServlet): @@ -237,7 +237,7 @@ class OneTimeKeyServlet(RestServlet): timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) result = yield self.e2e_keys_handler.claim_one_time_keys(body, timeout) - return (200, result) + return 200, result class SigningKeyUploadServlet(RestServlet): diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index d034863a3c..10c1ad5b07 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -88,7 +88,7 @@ class NotificationsServlet(RestServlet): returned_push_actions.append(returned_pa) next_token = str(pa["stream_ordering"]) - return (200, {"notifications": returned_push_actions, "next_token": next_token}) + return 200, {"notifications": returned_push_actions, "next_token": next_token} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index d93d6a9f24..b3bf8567e1 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -59,7 +59,7 @@ class ReadMarkerRestServlet(RestServlet): event_id=read_marker_event_id, ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 98a97b7059..0dab03d227 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -52,7 +52,7 @@ class ReceiptRestServlet(RestServlet): room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 9510a1e2b0..1ccd2bed2f 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -94,7 +94,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) ret = yield self.identity_handler.requestEmailToken(**body) - return (200, ret) + return 200, ret class MsisdnRegisterRequestTokenRestServlet(RestServlet): @@ -137,7 +137,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): ) ret = yield self.identity_handler.requestMsisdnToken(**body) - return (200, ret) + return 200, ret class UsernameAvailabilityRestServlet(RestServlet): @@ -177,7 +177,7 @@ class UsernameAvailabilityRestServlet(RestServlet): yield self.registration_handler.check_username(username) - return (200, {"available": True}) + return 200, {"available": True} class RegisterRestServlet(RestServlet): @@ -230,7 +230,6 @@ class RegisterRestServlet(RestServlet): if kind == b"guest": ret = yield self._do_guest_registration(body, address=client_addr) return ret - return elif kind != b"user": raise UnrecognizedRequestError( "Do not understand membership kind: %s" % (kind,) @@ -279,8 +278,7 @@ class RegisterRestServlet(RestServlet): result = yield self._do_appservice_registration( desired_username, access_token, body ) - return (200, result) # we throw for non 200 responses - return + return 200, result # we throw for non 200 responses # for regular registration, downcase the provided username before # attempting to register it. This should mean @@ -483,11 +481,9 @@ class RegisterRestServlet(RestServlet): user_id=registered_user_id, auth_result=auth_result, access_token=return_dict.get("access_token"), - bind_email=params.get("bind_email"), - bind_msisdn=params.get("bind_msisdn"), ) - return (200, return_dict) + return 200, return_dict def on_OPTIONS(self, _): return 200, {} diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py index 1538b247e5..040b37c504 100644 --- a/synapse/rest/client/v2_alpha/relations.py +++ b/synapse/rest/client/v2_alpha/relations.py @@ -118,7 +118,7 @@ class RelationSendServlet(RestServlet): requester, event_dict=event_dict, txn_id=txn_id ) - return (200, {"event_id": event.event_id}) + return 200, {"event_id": event.event_id} class RelationPaginationServlet(RestServlet): @@ -198,7 +198,7 @@ class RelationPaginationServlet(RestServlet): return_value["chunk"] = events return_value["original_event"] = original_event - return (200, return_value) + return 200, return_value class RelationAggregationPaginationServlet(RestServlet): @@ -270,7 +270,7 @@ class RelationAggregationPaginationServlet(RestServlet): to_token=to_token, ) - return (200, pagination_chunk.to_dict()) + return 200, pagination_chunk.to_dict() class RelationAggregationGroupPaginationServlet(RestServlet): @@ -356,7 +356,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet): return_value = result.to_dict() return_value["chunk"] = events - return (200, return_value) + return 200, return_value def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py index 3fdd4584a3..e7449864cd 100644 --- a/synapse/rest/client/v2_alpha/report_event.py +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -72,7 +72,7 @@ class ReportEventRestServlet(RestServlet): received_ts=self.clock.time_msec(), ) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 10dec96208..df4f44cd36 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -135,7 +135,7 @@ class RoomKeysServlet(RestServlet): body = {"rooms": {room_id: body}} yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_GET(self, request, room_id, session_id): @@ -218,7 +218,7 @@ class RoomKeysServlet(RestServlet): else: room_keys = room_keys["rooms"][room_id] - return (200, room_keys) + return 200, room_keys @defer.inlineCallbacks def on_DELETE(self, request, room_id, session_id): @@ -242,7 +242,7 @@ class RoomKeysServlet(RestServlet): yield self.e2e_room_keys_handler.delete_room_keys( user_id, version, room_id, session_id ) - return (200, {}) + return 200, {} class RoomKeysNewVersionServlet(RestServlet): @@ -293,7 +293,7 @@ class RoomKeysNewVersionServlet(RestServlet): info = parse_json_object_from_request(request) new_version = yield self.e2e_room_keys_handler.create_version(user_id, info) - return (200, {"version": new_version}) + return 200, {"version": new_version} # we deliberately don't have a PUT /version, as these things really should # be immutable to avoid people footgunning @@ -338,7 +338,7 @@ class RoomKeysVersionServlet(RestServlet): except SynapseError as e: if e.code == 404: raise SynapseError(404, "No backup found", Codes.NOT_FOUND) - return (200, info) + return 200, info @defer.inlineCallbacks def on_DELETE(self, request, version): @@ -358,7 +358,7 @@ class RoomKeysVersionServlet(RestServlet): user_id = requester.user.to_string() yield self.e2e_room_keys_handler.delete_version(user_id, version) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_PUT(self, request, version): @@ -392,7 +392,7 @@ class RoomKeysVersionServlet(RestServlet): ) yield self.e2e_room_keys_handler.update_version(user_id, version, info) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py index 14ba61a63e..d2c3316eb7 100644 --- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py @@ -80,7 +80,7 @@ class RoomUpgradeRestServlet(RestServlet): ret = {"replacement_room": new_room_id} - return (200, ret) + return 200, ret def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2613648d82..d90e52ed1a 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request +from synapse.logging.opentracing import set_tag, trace from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns @@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() + @trace(opname="sendToDevice") def on_PUT(self, request, message_type, txn_id): + set_tag("message_type", message_type) + set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 7b32dd2212..c98c5a3802 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -174,7 +174,7 @@ class SyncRestServlet(RestServlet): time_now, sync_result, requester.access_token_id, filter ) - return (200, response_content) + return 200, response_content @defer.inlineCallbacks def encode_response(self, time_now, sync_result, access_token_id, filter): diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index d173544355..3b555669a0 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -45,7 +45,7 @@ class TagListServlet(RestServlet): tags = yield self.store.get_tags_for_room(user_id, room_id) - return (200, {"tags": tags}) + return 200, {"tags": tags} class TagServlet(RestServlet): @@ -76,7 +76,7 @@ class TagServlet(RestServlet): self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) - return (200, {}) + return 200, {} @defer.inlineCallbacks def on_DELETE(self, request, user_id, room_id, tag): @@ -88,7 +88,7 @@ class TagServlet(RestServlet): self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) - return (200, {}) + return 200, {} def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 158e686b01..2e8d672471 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -40,7 +40,7 @@ class ThirdPartyProtocolsServlet(RestServlet): yield self.auth.get_user_by_req(request, allow_guest=True) protocols = yield self.appservice_handler.get_3pe_protocols() - return (200, protocols) + return 200, protocols class ThirdPartyProtocolServlet(RestServlet): @@ -60,9 +60,9 @@ class ThirdPartyProtocolServlet(RestServlet): only_protocol=protocol ) if protocol in protocols: - return (200, protocols[protocol]) + return 200, protocols[protocol] else: - return (404, {"error": "Unknown protocol"}) + return 404, {"error": "Unknown protocol"} class ThirdPartyUserServlet(RestServlet): @@ -85,7 +85,7 @@ class ThirdPartyUserServlet(RestServlet): ThirdPartyEntityKind.USER, protocol, fields ) - return (200, results) + return 200, results class ThirdPartyLocationServlet(RestServlet): @@ -108,7 +108,7 @@ class ThirdPartyLocationServlet(RestServlet): ThirdPartyEntityKind.LOCATION, protocol, fields ) - return (200, results) + return 200, results def register_servlets(hs, http_server): diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index 7ab2b80e46..2863affbab 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -60,7 +60,7 @@ class UserDirectorySearchRestServlet(RestServlet): user_id = requester.user.to_string() if not self.hs.config.user_directory_search_enabled: - return (200, {"limited": False, "results": []}) + return 200, {"limited": False, "results": []} body = parse_json_object_from_request(request) @@ -76,7 +76,7 @@ class UserDirectorySearchRestServlet(RestServlet): user_id, search_term, limit ) - return (200, results) + return 200, results def register_servlets(hs, http_server): diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index cf5759e9a6..b972e152a9 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -318,14 +318,14 @@ class MediaRepository(object): responder = yield self.media_storage.fetch_media(file_info) if responder: - return (responder, media_info) + return responder, media_info # Failed to find the file anywhere, lets download it. media_info = yield self._download_remote_file(server_name, media_id, file_id) responder = yield self.media_storage.fetch_media(file_info) - return (responder, media_info) + return responder, media_info @defer.inlineCallbacks def _download_remote_file(self, server_name, media_id, file_id): @@ -526,7 +526,7 @@ class MediaRepository(object): try: file_info = FileInfo( server_name=server_name, - file_id=media_id, + file_id=file_id, thumbnail=True, thumbnail_width=t_width, thumbnail_height=t_height, diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index bd40891a7f..7a56cd4b6c 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -183,7 +183,6 @@ class PreviewUrlResource(DirectServeResource): if isinstance(og, six.text_type): og = og.encode("utf8") return og - return media_info = yield self._download_url(url, user) diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 90d8e6bffe..c995d7e043 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -78,9 +78,9 @@ class Thumbnailer(object): """ if max_width * self.height < max_height * self.width: - return (max_width, (max_width * self.height) // self.width) + return max_width, (max_width * self.height) // self.width else: - return ((max_height * self.width) // self.height, max_height) + return (max_height * self.width) // self.height, max_height def scale(self, width, height, output_type): """Rescales the image to the given dimensions. diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py index 729c097e6d..81c4aff496 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -193,4 +193,4 @@ class ResourceLimitsServerNotices(object): if event_id in referenced_events: referenced_events.remove(event.event_id) - return (currently_blocked, referenced_events) + return currently_blocked, referenced_events diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index a0d34f16ea..2b0f4c79ee 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -136,7 +136,6 @@ class StateHandler(object): if event_id: event = yield self.store.get_event(event_id, allow_none=True) return event - return state_map = yield self.store.get_events( list(state.values()), get_prev_content=False diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 9fa5b4f3d6..6afbfc0d74 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore): room_data = by_room.setdefault(row["room_id"], {}) room_data[row["account_data_type"]] = json.loads(row["content"]) - return (global_account_data, by_room) + return global_account_data, by_room return self.runInteraction( "get_account_data_for_user", get_account_data_for_user_txn @@ -205,7 +205,7 @@ class AccountDataWorkerStore(SQLBaseStore): ) txn.execute(sql, (last_room_id, current_id, limit)) room_results = txn.fetchall() - return (global_results, room_results) + return global_results, room_results return self.runInteraction( "get_all_updated_account_data_txn", get_updated_account_data_txn @@ -244,13 +244,13 @@ class AccountDataWorkerStore(SQLBaseStore): room_account_data = account_data_by_room.setdefault(row[0], {}) room_account_data[row[1]] = json.loads(row[2]) - return (global_account_data, account_data_by_room) + return global_account_data, account_data_by_room changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: - return ({}, {}) + return {}, {} return self.runInteraction( "get_updated_account_data_for_user", get_updated_account_data_for_user_txn diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 05d9c05c3f..435b2acd4d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore( ) if result: return result.get("state") - return return None def set_appservice_state(self, service, state): @@ -358,7 +357,7 @@ class ApplicationServiceTransactionWorkerStore( events = yield self.get_events_as_list(event_ids) - return (upper_bound, events) + return upper_bound, events class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 79bb0ea46d..6b7458304e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,6 +19,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -66,12 +67,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): messages.append(json.loads(row[1])) if len(messages) < limit: stream_pos = current_stream_id - return (messages, stream_pos) + return messages, stream_pos return self.runInteraction( "get_new_messages_for_device", get_new_messages_for_device_txn ) + @trace @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore): last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), None ) + + set_tag("last_deleted_stream_id", last_deleted_stream_id) + if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: + log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) + log_kv( + {"message": "deleted {} messages for device".format(count), "count": count} + ) + # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count + @trace def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore): in the stream the messages got to. """ + set_tag("destination", destination) + set_tag("last_stream_id", last_stream_id) + set_tag("current_stream_id", current_stream_id) + set_tag("limit", limit) + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: + log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) + @trace def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -156,14 +174,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: + log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id - return (messages, stream_pos) + return messages, stream_pos return self.runInteraction( "get_new_device_msgs_for_remote", get_new_messages_for_remote_destination_txn, ) + @trace def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): expiry_ms=30 * 60 * 1000, ) + @trace @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 51eac0f5e7..7b4213f20b 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -25,6 +25,7 @@ from twisted.internet import defer from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( get_active_span_text_map, + set_tag, trace, whitelisted_homeserver, ) @@ -98,7 +99,7 @@ class DeviceWorkerStore(SQLBaseStore): destination, int(from_stream_id) ) if not has_changed: - return (now_stream_id, []) + return now_stream_id, [] # We retrieve n+1 devices from the list of outbound pokes where n is # our outbound device update limit. We then check if the very last @@ -121,7 +122,7 @@ class DeviceWorkerStore(SQLBaseStore): # Return an empty list if there are no updates if not updates: - return (now_stream_id, []) + return now_stream_id, [] # if we have exceeded the limit, we need to exclude any results with the # same stream_id as the last row. @@ -171,13 +172,13 @@ class DeviceWorkerStore(SQLBaseStore): # skip that stream_id and return an empty list, and continue with the next # stream_id next time. if not query_map: - return (stream_id_cutoff, []) + return stream_id_cutoff, [] results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) - return (now_stream_id, results) + return now_stream_id, results def _get_devices_by_remote_txn( self, txn, destination, from_stream_id, now_stream_id, limit @@ -360,6 +361,7 @@ class DeviceWorkerStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + @trace @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -391,7 +393,10 @@ class DeviceWorkerStore(SQLBaseStore): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) - return (user_ids_not_in_cache, results) + set_tag("in_cache", results) + set_tag("not_in_cache", user_ids_not_in_cache) + + return user_ids_not_in_cache, results @cachedInlineCallbacks(num_args=2, tree=True) def _get_cached_user_device(self, user_id, device_id): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e966a73f3d..eed7757ed5 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore): if not room_id: return None - return servers = yield self._simple_select_onecol( "room_alias_servers", @@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore): if not servers: return None - return return RoomAliasMapping(room_id, room_alias.to_string(), servers) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..1958afe1d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -810,7 +810,7 @@ class EventsStore( # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return (None, None) + return None, None if len(new_state_groups) == 1 and len(old_state_groups) == 1: # If we're going from one state group to another, lets check if @@ -827,7 +827,7 @@ class EventsStore( # the current state in memory then lets also return that, # but it doesn't matter if we don't. new_state = state_groups_map.get(new_state_group) - return (new_state, delta_ids) + return new_state, delta_ids # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -839,7 +839,7 @@ class EventsStore( if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - return (state_groups_map[new_state_groups.pop()], None) + return state_groups_map[new_state_groups.pop()], None # Ok, we need to defer to the state handler to resolve our state sets. @@ -868,7 +868,7 @@ class EventsStore( state_res_store=StateResolutionStore(self), ) - return (res.state, None) + return res.state, None @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): @@ -891,7 +891,7 @@ class EventsStore( if ev_id != existing_state.get(key) } - return (to_delete, to_insert) + return to_delete, to_insert @log_function def _persist_events_txn( @@ -2270,8 +2270,9 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", - "room_stats", + "room_stats_state", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1a0f2d5768..5db6f2d84a 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -90,7 +90,7 @@ class PresenceStore(SQLBaseStore): presence_states, ) - return (stream_orderings[-1], self._presence_id_gen.get_current_token()) + return stream_orderings[-1], self._presence_id_gen.get_current_token() def _update_presence_txn(self, txn, stream_orderings, presence_states): for stream_id, state in zip(stream_orderings, presence_states): diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 8a5d8e9b18..912c1df6be 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore): if e.code == 404: # no match return ProfileInfo(None, None) - return else: raise diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index b431d24b8a..3e0e834a62 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -133,7 +133,7 @@ class PusherWorkerStore(SQLBaseStore): txn.execute(sql, (last_id, current_id, limit)) deleted = txn.fetchall() - return (updated, deleted) + return updated, deleted return self.runInteraction( "get_all_updated_pushers", get_all_updated_pushers_txn diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 6aa6d98ebb..290ddb30e8 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -478,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore): max_persisted_id = self._receipts_id_gen.get_current_token() - return (stream_id, max_persisted_id) + return stream_id, max_persisted_id def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data): return self.runInteraction( diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3f50324253..2d3c7e2dc9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -869,6 +869,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @@ -1140,6 +1151,7 @@ class RegistrationStore( deferred str|None: A str representing a link to redirect the user to if there is one. """ + # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): row = self._simple_select_one_txn( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index eecb276465..f8b682ebd9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): - def f(txn): - # If we can assume current_state_events.membership is up to date - # then we can avoid a join, which is a Very Good Thing given how - # frequently this function gets called. - if self._current_state_events_membership_up_to_date: - sql = """ - SELECT state_key FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? AND membership = ? - """ - else: - sql = """ - SELECT state_key FROM room_memberships as m - INNER JOIN current_state_events as c - ON m.event_id = c.event_id - AND m.room_id = c.room_id - AND m.user_id = c.state_key - WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? - """ + return self.runInteraction( + "get_users_in_room", self.get_users_in_room_txn, room_id + ) - txn.execute(sql, (room_id, Membership.JOIN)) - return [to_ascii(r[0]) for r in txn] + def get_users_in_room_txn(self, txn, room_id): + # If we can assume current_state_events.membership is up to date + # then we can avoid a join, which is a Very Good Thing given how + # frequently this function gets called. + if self._current_state_events_membership_up_to_date: + sql = """ + SELECT state_key FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? AND membership = ? + """ + else: + sql = """ + SELECT state_key FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? + """ - return self.runInteraction("get_users_in_room", f) + txn.execute(sql, (room_id, Membership.JOIN)) + return [to_ascii(r[0]) for r in txn] @cached(max_entries=100000) def get_room_summary(self, room_id): diff --git a/synapse/storage/schema/delta/56/stats_separated.sql b/synapse/storage/schema/delta/56/stats_separated.sql new file mode 100644 index 0000000000..163529c071 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated.sql @@ -0,0 +1,152 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS room_state; +DROP TABLE IF EXISTS room_stats_state; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_process_users', + 'populate_stats_cleanup' +); + +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', ''); + +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +DROP TABLE IF EXISTS stats_incremental_position; +CREATE TABLE stats_incremental_position ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +-- insert a null row and make sure it is the only one. +INSERT INTO stats_incremental_position ( + stream_id +) SELECT COALESCE(MAX(stream_ordering), 0) from events; + +-- represents PRESENT room statistics for a room +-- only holds absolute fields +DROP TABLE IF EXISTS room_stats_current; +CREATE TABLE room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These are absolute counts + current_state_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + local_users_in_room INT NOT NULL, + + -- The maximum delta stream position that this row takes into account. + completed_delta_stream_id BIGINT NOT NULL +); + + +-- represents HISTORICAL room statistics for a room +DROP TABLE IF EXISTS room_stats_historical; +CREATE TABLE room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). + -- Note that end_ts is quantised. + end_ts BIGINT NOT NULL, + bucket_size BIGINT NOT NULL, + + -- These stats are absolute counts + current_state_events BIGINT NOT NULL, + joined_members BIGINT NOT NULL, + invited_members BIGINT NOT NULL, + left_members BIGINT NOT NULL, + banned_members BIGINT NOT NULL, + local_users_in_room BIGINT NOT NULL, + + -- These stats are per time slice + total_events BIGINT NOT NULL, + total_event_bytes BIGINT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- represents PRESENT statistics for a user +-- only holds absolute fields +DROP TABLE IF EXISTS user_stats_current; +CREATE TABLE user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + joined_rooms BIGINT NOT NULL, + + -- The maximum delta stream position that this row takes into account. + completed_delta_stream_id BIGINT NOT NULL +); + +-- represents HISTORICAL statistics for a user +DROP TABLE IF EXISTS user_stats_historical; +CREATE TABLE user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size BIGINT NOT NULL, + + joined_rooms BIGINT NOT NULL, + + invites_sent BIGINT NOT NULL, + rooms_created BIGINT NOT NULL, + total_events BIGINT NOT NULL, + total_event_bytes BIGINT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts); + + +CREATE TABLE room_stats_state ( + room_id TEXT NOT NULL, + name TEXT, + canonical_alias TEXT, + join_rules TEXT, + history_visibility TEXT, + encryption TEXT, + avatar TEXT, + guest_access TEXT, + is_federatable BOOLEAN, + topic TEXT +); + +CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e13efed417..6560173c08 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +15,22 @@ # limitations under the License. import logging +from itertools import chain from twisted.internet import defer +from twisted.internet.defer import DeferredLock from synapse.api.constants import EventTypes, Membership -from synapse.storage.prepare_database import get_statements +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "local_users_in_room", ), - "user": ("public_rooms", "private_rooms"), + "user": ("joined_rooms",), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice +PER_SLICE_FIELDS = { + "room": ("total_events", "total_event_bytes"), + "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"), +} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -TEMP_TABLE = "_temp_populate_stats" +# these are the tables (& ID columns) which contain our actual subjects +TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")} class StatsStore(StateDeltasStore): @@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables - ) + self.stats_delta_processing_lock = DeferredLock() + self.register_background_update_handler( "populate_stats_process_rooms", self._populate_stats_process_rooms ) self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup + "populate_stats_process_users", self._populate_stats_process_users ) + # we no longer need to perform clean-up, but we will give ourselves + # the potential to reintroduce it in the future – so documentation + # will still encourage the use of this no-op handler. + self.register_noop_background_update("populate_stats_cleanup") + self.register_noop_background_update("populate_stats_prepare") - @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") - return 1 - - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( - """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) - - for statement in stmts: - txn.execute(statement) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() + Args: + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch - yield self._end_background_update("populate_stats_createtables") - return 1 + Returns: + int: a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): + def _populate_stats_process_users(self, progress, batch_size): """ - Update the user directory stream position, then clean up the old tables. + This is a background update which regenerates statistics for users. """ if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") + yield self._end_background_update("populate_stats_process_users") return 1 - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" + last_user_id = progress.get("last_user_id", "") + + def _get_next_batch(txn): + sql = """ + SELECT DISTINCT name FROM users + WHERE name > ? + ORDER BY name ASC + LIMIT ? + """ + txn.execute(sql, (last_user_id, batch_size)) + return [r for r, in txn] + + users_to_work_on = yield self.runInteraction( + "_populate_stats_process_users", _get_next_batch ) - yield self.update_stats_stream_pos(position) - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + # No more rooms -- complete the transaction. + if not users_to_work_on: + yield self._end_background_update("populate_stats_process_users") + return 1 - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) + for user_id in users_to_work_on: + yield self._calculate_and_set_initial_state_for_user(user_id) + progress["last_user_id"] = user_id - yield self._end_background_update("populate_stats_cleanup") - return 1 + yield self.runInteraction( + "populate_stats_process_users", + self._background_update_progress_txn, + "populate_stats_process_users", + progress, + ) + + return len(users_to_work_on) @defer.inlineCallbacks def _populate_stats_process_rooms(self, progress, batch_size): - + """ + This is a background update which regenerates statistics for rooms. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_process_rooms") return 1 - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() + last_room_id = progress.get("last_room_id", "") def _get_next_batch(txn): - # Only fetch 250 rooms, so we don't fetch too many at once, even - # if those 250 rooms have less than batch_size state events. sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC - LIMIT 250 - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - rooms_to_work_on = txn.fetchall() - - if not rooms_to_work_on: - return None - - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - progress["remaining"] = txn.fetchone()[0] - - return rooms_to_work_on + SELECT DISTINCT room_id FROM current_state_events + WHERE room_id > ? + ORDER BY room_id ASC + LIMIT ? + """ + txn.execute(sql, (last_room_id, batch_size)) + return [r for r, in txn] rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch + "populate_stats_rooms_get_batch", _get_next_batch ) # No more rooms -- complete the transaction. @@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore): yield self._end_background_update("populate_stats_process_rooms") return 1 - logger.info( - "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), - progress["remaining"], - ) - - # Number of state events we've processed by going through each room - processed_event_count = 0 - - for room_id, event_count in rooms_to_work_on: - - current_state_ids = yield self.get_current_state_ids(room_id) - - join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) - history_visibility_id = current_state_ids.get( - (EventTypes.RoomHistoryVisibility, "") - ) - encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) - name_id = current_state_ids.get((EventTypes.Name, "")) - topic_id = current_state_ids.get((EventTypes.Topic, "")) - avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) - canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - - event_ids = [ - join_rules_id, - history_visibility_id, - encryption_id, - name_id, - topic_id, - avatar_id, - canonical_alias_id, - ] - - state_events = yield self.get_events( - [ev for ev in event_ids if ev is not None] - ) - - def _get_or_none(event_id, arg): - event = state_events.get(event_id) - if event: - return event.content.get(arg) - return None - - yield self.update_room_state( - room_id, - { - "join_rules": _get_or_none(join_rules_id, "join_rule"), - "history_visibility": _get_or_none( - history_visibility_id, "history_visibility" - ), - "encryption": _get_or_none(encryption_id, "algorithm"), - "name": _get_or_none(name_id, "name"), - "topic": _get_or_none(topic_id, "topic"), - "avatar": _get_or_none(avatar_id, "url"), - "canonical_alias": _get_or_none(canonical_alias_id, "alias"), - }, - ) + for room_id in rooms_to_work_on: + yield self._calculate_and_set_initial_state_for_room(room_id) + progress["last_room_id"] = room_id - now = self.hs.get_reactor().seconds() - - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - - def _fetch_data(txn): - - # Get the current token of the room - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - current_state_events = len(current_state_ids) - - membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( - txn, - "room", - room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, - ) - - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) - - # Update the remaining counter. - progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - processed_event_count += event_count - - if processed_event_count > batch_size: - # Don't process any more rooms, we've hit our batch size. - return processed_event_count + yield self.runInteraction( + "_populate_stats_process_rooms", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) - return processed_event_count + return len(rooms_to_work_on) - def delete_all_stats(self): + def get_stats_positions(self): """ - Delete all statistics records. + Returns the stats processor positions. """ - - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") - - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - - def get_stats_stream_pos(self): return self._simple_select_one_onecol( - table="stats_stream_pos", + table="stats_incremental_position", keyvalues={}, retcol="stream_id", - desc="stats_stream_pos", - ) - - def update_stats_stream_pos(self, stream_id): - return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", + desc="stats_incremental_position", ) def update_room_state(self, room_id, fields): @@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", ) - def get_deltas_for_room(self, room_id, start, size=100): + def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): """ - Get statistics deltas for a given room. + Get statistics for a given subject. Args: - room_id (str) + stats_type (str): The type of subject + stats_id (str): The ID of the subject (e.g. room_id or user_id) start (int): Pagination start. Number of entries, not timestamp. size (int): How many entries to return. Returns: Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". + ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", + return self.runInteraction( + "get_statistics_for_subject", + self._get_statistics_for_subject_txn, + stats_type, + stats_id, + start, + size, + ) + + def _get_statistics_for_subject_txn( + self, txn, stats_type, stats_id, start, size=100 + ): + """ + Transaction-bound version of L{get_statistics_for_subject}. + """ + + table, id_col = TYPE_TO_TABLE[stats_type] + selected_columns = list( + ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] + ) + + slice_list = self._simple_select_list_paginate_txn( + txn, + table + "_historical", + {id_col: stats_id}, + "end_ts", start, size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), + retcols=selected_columns + ["bucket_size", "end_ts"], order_direction="DESC", ) - def get_all_room_state(self): - return self._simple_select_list( - "room_state", None, retcols=("name", "topic", "canonical_alias") + return slice_list + + def get_room_stats_state(self, room_id): + """ + Returns the current room_stats_state for a room. + + Args: + room_id (str): The ID of the room to return state for. + + Returns (dict): + Dictionary containing these keys: + "name", "topic", "canonical_alias", "avatar", "join_rules", + "history_visibility" + """ + return self._simple_select_one( + "room_stats_state", + {"room_id": room_id}, + retcols=( + "name", + "topic", + "canonical_alias", + "avatar", + "join_rules", + "history_visibility", + ), ) @cached() - def get_earliest_token_for_room_stats(self, room_id): + def get_earliest_token_for_stats(self, stats_type, id): """ Fetch the "earliest token". This is used by the room stats delta processor to ignore deltas that have been processed between the @@ -406,79 +306,571 @@ class StatsStore(StateDeltasStore): Returns: Deferred[int] """ + table, id_col = TYPE_TO_TABLE[stats_type] + return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", + "%s_current" % (table,), + keyvalues={id_col: id}, + retcol="completed_delta_stream_id", allow_none=True, ) - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", + def bulk_update_stats_delta(self, ts, updates, stream_id): + """Bulk update stats tables for a given stream_id and updates the stats + incremental position. + + Args: + ts (int): Current timestamp in ms + updates(dict[str, dict[str, dict[str, Counter]]]): The updates to + commit as a mapping stats_type -> stats_id -> field -> delta. + stream_id (int): Current position. + + Returns: + Deferred + """ + + def _bulk_update_stats_delta_txn(txn): + for stats_type, stats_updates in updates.items(): + for stats_id, fields in stats_updates.items(): + self._update_stats_delta_txn( + txn, + ts=ts, + stats_type=stats_type, + stats_id=stats_id, + fields=fields, + complete_with_stream_id=stream_id, + ) + + self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + ) + + return self.runInteraction( + "bulk_update_stats_delta", _bulk_update_stats_delta_txn ) - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + def update_stats_delta( + self, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id, + absolute_field_overrides=None, + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + absolute_field_overrides (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + absolute_field_overrides=absolute_field_overrides, ) - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] - - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id, + absolute_field_overrides=None, + ): + if absolute_field_overrides is None: + absolute_field_overrides = {} + + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + # Lets be paranoid and check that all the given field names are known + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) ) + + # Per slice fields do not get added to the _current table + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + # Keep the delta stream ID field up to date + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={"end_ts": end_ts}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """Used to update values in the stats tables. + + This is basically a slightly convoluted upsert that *adds* to any + existing rows. + + Args: + txn + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, table) + retcols = list(chain(absolutes.keys(), additive_relatives.keys())) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + current_row.update(absolutes) + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + extra_dst_insvalues, + additive_relatives, + src_table, + copy_columns, + ): + """Updates the historic stats table with latest updates. + + This involves copying "absolute" fields from the `_current` table, and + adding relative fields to any existing values. + + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. + extra_dst_insvalues (dict[str, any]): Additional values to insert + on new row creation for `into_table`. + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives, + extra_dst_keyvalues, + extra_dst_insvalues, + ) + sel_exprs = chain( + keyvalues, + copy_columns, + ( + "?" + for _ in chain( + additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + ) + ), + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) + for f in additive_relatives + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": " AND ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = list( + chain( + additive_relatives.values(), + extra_dst_keyvalues.values(), + extra_dst_insvalues.values(), + keyvalues.values(), + ) + ) + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, into_table) + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues=all_dest_keyvalues, + retcols=list(chain(additive_relatives.keys(), copy_columns)), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = { + **keyvalues, + **extra_dst_keyvalues, + **extra_dst_insvalues, + **src_row, + **additive_relatives, + } + self._simple_insert_txn(txn, into_table, merged_dict) else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) + + def get_changes_room_total_events_and_bytes(self, min_pos, max_pos): + """Fetches the counts of events in the given range of stream IDs. + + Args: + min_pos (int) + max_pos (int) + + Returns: + Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field + changes. + """ + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + self.get_changes_room_total_events_and_bytes_txn, + min_pos, + max_pos, + ) + + def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos): + """Gets the total_events and total_event_bytes counts for rooms and + senders, in a range of stream_orderings (including backfilled events). + + Args: + txn + low_pos (int): Low stream ordering + high_pos (int): High stream ordering + + Returns: + tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The + room and user deltas for total_events/total_event_bytes in the + format of `stats_id` -> fields + """ + + if low_pos >= high_pos: + # nothing to do here. + return {}, {} + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE (? < stream_ordering AND stream_ordering <= ?) + OR (? <= stream_ordering AND stream_ordering <= ?) + GROUP BY events.room_id + """ % ( + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) + + room_deltas = { + room_id: {"total_events": new_events, "total_event_bytes": new_bytes} + for room_id, new_events, new_bytes in txn + } + + sql = """ + SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE (? < stream_ordering AND stream_ordering <= ?) + OR (? <= stream_ordering AND stream_ordering <= ?) + GROUP BY events.sender + """ % ( + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) + + user_deltas = { + user_id: {"total_events": new_events, "total_event_bytes": new_bytes} + for user_id, new_events, new_bytes in txn + if self.hs.is_mine_id(user_id) + } + + return room_deltas, user_deltas + + @defer.inlineCallbacks + def _calculate_and_set_initial_state_for_room(self, room_id): + """Calculate and insert an entry into room_stats_current. + + Args: + room_id (str) + + Returns: + Deferred[tuple[dict, dict, int]]: A tuple of room state, membership + counts and stream position. + """ + + def _fetch_current_state_stats(txn): + pos = self.get_room_max_stream_ordering() + + rows = self._simple_select_many_txn( + txn, + table="current_state_events", + column="type", + iterable=[ + EventTypes.Create, + EventTypes.JoinRules, + EventTypes.RoomHistoryVisibility, + EventTypes.Encryption, + EventTypes.Name, + EventTypes.Topic, + EventTypes.RoomAvatar, + EventTypes.CanonicalAlias, + ], + keyvalues={"room_id": room_id, "state_key": ""}, + retcols=["event_id"], + ) + + event_ids = [row["event_id"] for row in rows] + + txn.execute( + """ + SELECT membership, count(*) FROM current_state_events + WHERE room_id = ? AND type = 'm.room.member' + GROUP BY membership + """, + (room_id,), + ) + membership_counts = {membership: cnt for membership, cnt in txn} + + txn.execute( + """ + SELECT COALESCE(count(*), 0) FROM current_state_events + WHERE room_id = ? + """, + (room_id,), + ) + + current_state_events_count, = txn.fetchone() + + users_in_room = self.get_users_in_room_txn(txn, room_id) + + return ( + event_ids, + membership_counts, + current_state_events_count, + users_in_room, + pos, + ) + + ( + event_ids, + membership_counts, + current_state_events_count, + users_in_room, + pos, + ) = yield self.runInteraction( + "get_initial_state_for_room", _fetch_current_state_stats + ) + + state_event_map = yield self.get_events(event_ids, get_prev_content=False) + + room_state = { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + "is_federatable": True, + } + + for event in state_event_map.values(): + if event.type == EventTypes.JoinRules: + room_state["join_rules"] = event.content.get("join_rule") + elif event.type == EventTypes.RoomHistoryVisibility: + room_state["history_visibility"] = event.content.get( + "history_visibility" ) - txn.execute(sql, (value, stats_id, current_ts)) + elif event.type == EventTypes.Encryption: + room_state["encryption"] = event.content.get("algorithm") + elif event.type == EventTypes.Name: + room_state["name"] = event.content.get("name") + elif event.type == EventTypes.Topic: + room_state["topic"] = event.content.get("topic") + elif event.type == EventTypes.RoomAvatar: + room_state["avatar"] = event.content.get("url") + elif event.type == EventTypes.CanonicalAlias: + room_state["canonical_alias"] = event.content.get("alias") + elif event.type == EventTypes.Create: + room_state["is_federatable"] = event.content.get("m.federate", True) + + yield self.update_room_state(room_id, room_state) + + local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)] + + yield self.update_stats_delta( + ts=self.clock.time_msec(), + stats_type="room", + stats_id=room_id, + fields={}, + complete_with_stream_id=pos, + absolute_field_overrides={ + "current_state_events": current_state_events_count, + "joined_members": membership_counts.get(Membership.JOIN, 0), + "invited_members": membership_counts.get(Membership.INVITE, 0), + "left_members": membership_counts.get(Membership.LEAVE, 0), + "banned_members": membership_counts.get(Membership.BAN, 0), + "local_users_in_room": len(local_users_in_room), + }, + ) + + @defer.inlineCallbacks + def _calculate_and_set_initial_state_for_user(self, user_id): + def _calculate_and_set_initial_state_for_user_txn(txn): + pos = self._get_max_stream_id_in_current_state_deltas_txn(txn) - return self.runInteraction("update_stats_delta", _update_stats_delta) + txn.execute( + """ + SELECT COUNT(distinct room_id) FROM current_state_events + WHERE type = 'm.room.member' AND state_key = ? + AND membership = 'join' + """, + (user_id,), + ) + count, = txn.fetchone() + return count, pos + + joined_rooms, pos = yield self.runInteraction( + "calculate_and_set_initial_state_for_user", + _calculate_and_set_initial_state_for_user_txn, + ) + + yield self.update_stats_delta( + ts=self.clock.time_msec(), + stats_type="user", + stats_id=user_id, + fields={}, + complete_with_stream_id=pos, + absolute_field_overrides={"joined_rooms": joined_rooms}, + ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 856c2ee8d8..490454f19a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): the chunk of events returned. """ if from_key == to_key: - return ([], from_key) + return [], from_key from_id = RoomStreamToken.parse_stream_token(from_key).stream to_id = RoomStreamToken.parse_stream_token(to_key).stream @@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) if not has_changed: - return ([], from_key) + return [], from_key def f(txn): sql = ( @@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # get. key = from_key - return (ret, key) + return ret, key @defer.inlineCallbacks def get_membership_changes_for_user(self, user_id, from_key, to_key): @@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): """ # Allow a zero limit here, and no-op. if limit == 0: - return ([], end_token) + return [], end_token end_token = RoomStreamToken.parse(end_token) @@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # We want to return the results in ascending order. rows.reverse() - return (rows, token) + return rows, token def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering @@ -783,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): events = yield self.get_events_as_list(event_ids) - return (upper_bound, events) + return upper_bound, events def get_federation_out_pos(self, typ): return self._simple_select_one_onecol( diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f1c8d99419..cbb0a4810a 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -195,6 +195,6 @@ class ChainedIdGenerator(object): with self._lock: if self._unfinished_ids: stream_id, chained_id = self._unfinished_ids[0] - return (stream_id - 1, chained_id) + return stream_id - 1, chained_id - return (self._current_max, self.chained_generator.get_current_token()) + return self._current_max, self.chained_generator.get_current_token() diff --git a/synapse/streams/config.py b/synapse/streams/config.py index f7f5906a99..02994ab2a5 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -37,7 +37,7 @@ class SourcePaginationConfig(object): self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None def __repr__(self): - return ("StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)") % ( + return "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)" % ( self.from_key, self.to_key, self.direction, diff --git a/synapse/util/hash.py b/synapse/util/hash.py deleted file mode 100644 index 359168704e..0000000000 --- a/synapse/util/hash.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import hashlib - -import unpaddedbase64 - - -def sha256_and_url_safe_base64(input_text): - """SHA256 hash an input string, encode the digest as url-safe base64, and - return - - :param input_text: string to hash - :type input_text: str - - :returns a sha256 hashed and url-safe base64 encoded digest - :rtype: str - """ - digest = hashlib.sha256(input_text.encode()).digest() - return unpaddedbase64.encode_base64(digest, urlsafe=True) |