From 2c7866d6643f4fd3bbffa9905ede6c36983ef29c Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 23 May 2018 14:38:56 +0100 Subject: Hit the 3pid unbind endpoint on deactivation --- synapse/handlers/deactivate_account.py | 22 ++++++++++++++++++++- synapse/handlers/identity.py | 35 ++++++++++++++++++++++++++++++++++ synapse/http/matrixfederationclient.py | 9 +++++++-- synapse/storage/registration.py | 9 --------- 4 files changed, 63 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index d58ea6c650..0277f80b75 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -17,6 +17,7 @@ from twisted.internet import defer, reactor from ._base import BaseHandler from synapse.types import UserID, create_requester from synapse.util.logcontext import run_in_background +from synapse.api.errors import SynapseError import logging @@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() self._room_member_handler = hs.get_room_member_handler() + self._identity_handler = hs.get_handlers().identity_handler # Flag that indicates whether the process to part users from rooms is running self._user_parter_running = False @@ -51,6 +53,25 @@ class DeactivateAccountHandler(BaseHandler): # FIXME: Theoretically there is a race here wherein user resets # password using threepid. + # delete threepids first. We remove these from the IS so if this fails, + # leave the user still active so they can try again. + # Ideally we would prevent password resets and then do this in the + # background thread. + threepids = yield self.store.user_get_threepids(user_id) + for threepid in threepids: + try: + yield self._identity_handler.unbind_threepid(user_id, + { + 'medium': threepid['medium'], + 'address': threepid['address'], + }, + ) + except: + # Do we want this to be a fatal error or should we carry on? + logger.exception("Failed to remove threepid from ID server") + raise SynapseError(400, "Failed to remove threepid from ID server") + yield self.store.user_delete_threepid(user_id, threepid['medium'], threepid['address']) + # first delete any devices belonging to the user, which will also # delete corresponding access tokens. yield self._device_handler.delete_all_devices_for_user(user_id) @@ -58,7 +79,6 @@ class DeactivateAccountHandler(BaseHandler): # a device. yield self._auth_handler.delete_access_tokens_for_user(user_id) - yield self.store.user_delete_threepids(user_id) yield self.store.user_set_password_hash(user_id, None) # Add the user to a table of users pending deactivation (ie. diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 91a0898860..67a89a1d72 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -38,6 +39,7 @@ class IdentityHandler(BaseHandler): super(IdentityHandler, self).__init__(hs) self.http_client = hs.get_simple_http_client() + self.federation_http_client = hs.get_http_client() self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers) self.trust_any_id_server_just_for_testing_do_not_use = ( @@ -138,6 +140,39 @@ class IdentityHandler(BaseHandler): data = json.loads(e.msg) defer.returnValue(data) + @defer.inlineCallbacks + def unbind_threepid(self, mxid, threepid): + yield run_on_reactor() + logger.debug("unbinding threepid %r from %s", threepid, mxid) + if not self.trusted_id_servers: + logger.warn("Can't unbind threepid: no trusted ID servers set in config") + defer.returnValue(False) + id_server = next(iter(self.trusted_id_servers)) + + url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,) + content = { + "mxid": mxid, + "threepid": threepid, + } + headers = {} + # we abuse the federation http client to sign the request, but we have to send it + # using the normal http client since we don't want the SRV lookup and want normal + # 'browser-like' HTTPS. + self.federation_http_client.sign_request( + destination=None, + method='POST', + url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'), + headers_dict=headers, + content=content, + destination_is=id_server, + ) + yield self.http_client.post_json_get_json( + url, + content, + headers, + ) + defer.returnValue(True) + @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4b2b85464d..21eaf77dc4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -262,14 +262,19 @@ class MatrixFederationHttpClient(object): defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, - content=None): + content=None, destination_is=None): request = { "method": method, "uri": url_bytes, "origin": self.server_name, - "destination": destination, } + if destination is not None: + request["destination"] = destination + + if destination_is is not None: + request["destination_is"] = destination_is + if content is not None: request["content"] = content diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a530e29f43..5d8a850ac9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -452,15 +452,6 @@ class RegistrationStore(RegistrationWorkerStore, defer.returnValue(ret['user_id']) defer.returnValue(None) - def user_delete_threepids(self, user_id): - return self._simple_delete( - "user_threepids", - keyvalues={ - "user_id": user_id, - }, - desc="user_delete_threepids", - ) - def user_delete_threepid(self, user_id, medium, address): return self._simple_delete( "user_threepids", -- cgit 1.4.1 From b3bff53178dc8dd9050b84bc953c55835e8410d1 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 24 May 2018 11:08:05 +0100 Subject: Unbind 3pids when they're deleted too --- synapse/handlers/auth.py | 8 ++++++++ synapse/rest/client/v2_alpha/account.py | 13 ++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a5365c4fe4..c3f20417c7 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -825,6 +825,14 @@ class AuthHandler(BaseHandler): if medium == 'email': address = address.lower() + identity_handler = self.hs.get_handlers().identity_handler + identity_handler.unbind_threepid(user_id, + { + 'medium': medium, + 'address': address, + }, + ) + ret = yield self.store.user_delete_threepid( user_id, medium, address, ) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 30523995af..4310e78733 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -381,9 +381,16 @@ class ThreepidDeleteRestServlet(RestServlet): requester = yield self.auth.get_user_by_req(request) user_id = requester.user.to_string() - yield self.auth_handler.delete_threepid( - user_id, body['medium'], body['address'] - ) + try: + yield self.auth_handler.delete_threepid( + user_id, body['medium'], body['address'] + ) + except Exception as e: + # NB. This endpoint should succeed if there is nothing to + # delete, so it should only throw if something is wrong + # that we ought to care about. + logger.exception("Failed to remove threepid") + raise SynapseError(500, "Failed to remove threepid") defer.returnValue((200, {})) -- cgit 1.4.1 From a21a41bad719cabfbe8ff1e7aea574ff0d0132ba Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 24 May 2018 11:19:59 +0100 Subject: comment --- synapse/handlers/identity.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 67a89a1d72..6bc3479755 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -147,6 +147,10 @@ class IdentityHandler(BaseHandler): if not self.trusted_id_servers: logger.warn("Can't unbind threepid: no trusted ID servers set in config") defer.returnValue(False) + + # We don't track what ID server we added 3pids on (perhaps we ought to) but we assume + # that any of the servers in the trusted list are in the same ID server federation, + # so we can pick any one of them to send the deletion request to. id_server = next(iter(self.trusted_id_servers)) url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,) -- cgit 1.4.1 From 9700d15611ec93d1177d29181362fbd02df92629 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 24 May 2018 11:23:15 +0100 Subject: pep8 --- synapse/handlers/auth.py | 3 ++- synapse/handlers/deactivate_account.py | 9 ++++++--- synapse/handlers/identity.py | 7 ++++--- synapse/rest/client/v2_alpha/account.py | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index c3f20417c7..512c31185d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -826,7 +826,8 @@ class AuthHandler(BaseHandler): address = address.lower() identity_handler = self.hs.get_handlers().identity_handler - identity_handler.unbind_threepid(user_id, + identity_handler.unbind_threepid( + user_id, { 'medium': medium, 'address': address, diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 0277f80b75..f92f953a79 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -60,17 +60,20 @@ class DeactivateAccountHandler(BaseHandler): threepids = yield self.store.user_get_threepids(user_id) for threepid in threepids: try: - yield self._identity_handler.unbind_threepid(user_id, + yield self._identity_handler.unbind_threepid( + user_id, { 'medium': threepid['medium'], 'address': threepid['address'], }, ) - except: + except Exception: # Do we want this to be a fatal error or should we carry on? logger.exception("Failed to remove threepid from ID server") raise SynapseError(400, "Failed to remove threepid from ID server") - yield self.store.user_delete_threepid(user_id, threepid['medium'], threepid['address']) + yield self.store.user_delete_threepid( + user_id, threepid['medium'], threepid['address'], + ) # first delete any devices belonging to the user, which will also # delete corresponding access tokens. diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6bc3479755..92cd4019d8 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -148,9 +148,10 @@ class IdentityHandler(BaseHandler): logger.warn("Can't unbind threepid: no trusted ID servers set in config") defer.returnValue(False) - # We don't track what ID server we added 3pids on (perhaps we ought to) but we assume - # that any of the servers in the trusted list are in the same ID server federation, - # so we can pick any one of them to send the deletion request to. + # We don't track what ID server we added 3pids on (perhaps we ought to) + # but we assume that any of the servers in the trusted list are in the + # same ID server federation, so we can pick any one of them to send the + # deletion request to. id_server = next(iter(self.trusted_id_servers)) url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 4310e78733..0291fba9e7 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -385,7 +385,7 @@ class ThreepidDeleteRestServlet(RestServlet): yield self.auth_handler.delete_threepid( user_id, body['medium'], body['address'] ) - except Exception as e: + except Exception: # NB. This endpoint should succeed if there is nothing to # delete, so it should only throw if something is wrong # that we ought to care about. -- cgit 1.4.1 From f731e42baf2ffd186a79cb941017389fda030b0b Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jun 2018 12:00:51 +0100 Subject: docstring --- synapse/handlers/identity.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 92cd4019d8..434eb17ef0 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -142,7 +142,15 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def unbind_threepid(self, mxid, threepid): - yield run_on_reactor() + """ + Removes a binding from an identity server + Args: + mxid (str): Matrix user ID of binding to be removed + threepid (dict): Dict with medium & address of binding to be removed + + Returns: + Deferred + """ logger.debug("unbinding threepid %r from %s", threepid, mxid) if not self.trusted_id_servers: logger.warn("Can't unbind threepid: no trusted ID servers set in config") -- cgit 1.4.1 From e44150a6de841dc56a108b7dadaad7ea2c597ae2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jun 2018 12:01:13 +0100 Subject: Missing yield --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 512c31185d..c058b7cabb 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -826,7 +826,7 @@ class AuthHandler(BaseHandler): address = address.lower() identity_handler = self.hs.get_handlers().identity_handler - identity_handler.unbind_threepid( + yield identity_handler.unbind_threepid( user_id, { 'medium': medium, -- cgit 1.4.1 From 6a29e815fc58fec00b6f7001a20f29bc367a55fc Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jun 2018 12:01:23 +0100 Subject: Fix comment --- synapse/handlers/deactivate_account.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index ce6e2b14fe..8ec5ba2012 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -76,7 +76,7 @@ class DeactivateAccountHandler(BaseHandler): user_id, threepid['medium'], threepid['address'], ) - # first delete any devices belonging to the user, which will also + # delete any devices belonging to the user, which will also # delete corresponding access tokens. yield self._device_handler.delete_all_devices_for_user(user_id) # then delete any remaining access tokens which weren't associated with -- cgit 1.4.1 From c5930d513a37f0f143afe49315cf56174e73ce6a Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jun 2018 12:05:58 +0100 Subject: Docstring --- synapse/http/matrixfederationclient.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c7f919498c..1fa9fb3cb2 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -265,6 +265,20 @@ class MatrixFederationHttpClient(object): def sign_request(self, destination, method, url_bytes, headers_dict, content=None, destination_is=None): + """ + Signs a request by adding an Authorization header to headers_dict + Args: + destination (str): The desination home server of the request. May be null if the + destination is an identity server, in which case destination_is must be non-null. + method (str): The HTTP method of the request + url_bytes (str): ? + headers_dict (dict): Dictionary of request headers to append to + content (str): The body of the request + destination_is (str): As 'destination', but if the destination is an identity server + + Returns: + Deferred + """ request = { "method": method, "uri": url_bytes, -- cgit 1.4.1 From d62162bbec27f8d14274ae56c8a6d0bcaa2941fe Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 5 Jun 2018 18:09:13 +0100 Subject: doc fixes --- synapse/handlers/identity.py | 2 +- synapse/http/matrixfederationclient.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 434eb17ef0..529400955d 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -149,7 +149,7 @@ class IdentityHandler(BaseHandler): threepid (dict): Dict with medium & address of binding to be removed Returns: - Deferred + Deferred[bool]: True on success, otherwise False """ logger.debug("unbinding threepid %r from %s", threepid, mxid) if not self.trusted_id_servers: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1fa9fb3cb2..6fc0c2296b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -268,16 +268,16 @@ class MatrixFederationHttpClient(object): """ Signs a request by adding an Authorization header to headers_dict Args: - destination (str): The desination home server of the request. May be null if the + destination (bytes): The desination home server of the request. May be None if the destination is an identity server, in which case destination_is must be non-null. - method (str): The HTTP method of the request - url_bytes (str): ? + method (bytes): The HTTP method of the request + url_bytes (bytes): The URI path of the request headers_dict (dict): Dictionary of request headers to append to - content (str): The body of the request - destination_is (str): As 'destination', but if the destination is an identity server + content (bytes): The body of the request + destination_is (bytes): As 'destination', but if the destination is an identity server Returns: - Deferred + None """ request = { "method": method, -- cgit 1.4.1 From 607bd27c83c2fa9236ba88a7167fdb87e6e94f58 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 5 Jun 2018 18:10:35 +0100 Subject: fix pep8 --- synapse/http/matrixfederationclient.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6fc0c2296b..98797c37df 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -268,13 +268,15 @@ class MatrixFederationHttpClient(object): """ Signs a request by adding an Authorization header to headers_dict Args: - destination (bytes): The desination home server of the request. May be None if the - destination is an identity server, in which case destination_is must be non-null. + destination (bytes): The desination home server of the request. May be None + if the destination is an identity server, in which case destination_is + must be non-null. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request headers_dict (dict): Dictionary of request headers to append to content (bytes): The body of the request - destination_is (bytes): As 'destination', but if the destination is an identity server + destination_is (bytes): As 'destination', but if the destination is an + identity server Returns: None -- cgit 1.4.1 From 3e4bc4488cf044e048935f8dd3bdf8b460aaa55f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jun 2018 09:44:10 +0100 Subject: More doc fixes --- synapse/http/matrixfederationclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 98797c37df..bce7c631b4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -268,9 +268,9 @@ class MatrixFederationHttpClient(object): """ Signs a request by adding an Authorization header to headers_dict Args: - destination (bytes): The desination home server of the request. May be None + destination (bytes|None): The desination home server of the request. May be None if the destination is an identity server, in which case destination_is - must be non-null. + must be non-None. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request headers_dict (dict): Dictionary of request headers to append to -- cgit 1.4.1 From bf54c1cf6ce5097b817b262340e2e1bd4cb13be9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jun 2018 10:15:33 +0100 Subject: pep8 --- synapse/http/matrixfederationclient.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bce7c631b4..bf56e77c7a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -268,9 +268,9 @@ class MatrixFederationHttpClient(object): """ Signs a request by adding an Authorization header to headers_dict Args: - destination (bytes|None): The desination home server of the request. May be None - if the destination is an identity server, in which case destination_is - must be non-None. + destination (bytes|None): The desination home server of the request. + May be None if the destination is an identity server, in which case + destination_is must be non-None. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request headers_dict (dict): Dictionary of request headers to append to -- cgit 1.4.1 From 0834b49c6a9b6c597a154d4b2dfcf8fff90699ec Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 8 Jun 2018 11:34:46 +0100 Subject: Fix event filtering in get_missing_events handler --- synapse/handlers/federation.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcf94befb7..495ac4c648 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1794,6 +1794,10 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) + missing_events = yield self._filter_events_for_server( + origin, room_id, missing_events, + ) + defer.returnValue(missing_events) @defer.inlineCallbacks -- cgit 1.4.1 From e82db24a0ece7169d15fdb69cac5096d72d49af7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 8 Jun 2018 12:01:36 +0100 Subject: Try to log more helpful info when a sig verification fails Firstly, don't swallow the reason for the failure Secondly, don't assume all exceptions are verification failures Thirdly, log a bit of info about the key being used if debug is enabled --- synapse/crypto/keyring.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 22ee0fc93f..9b17ef0a08 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -27,10 +27,12 @@ from synapse.util.metrics import Measure from twisted.internet import defer from signedjson.sign import ( - verify_signed_json, signature_ids, sign_json, encode_canonical_json + verify_signed_json, signature_ids, sign_json, encode_canonical_json, + SignatureVerifyException, ) from signedjson.key import ( - is_signing_algorithm_supported, decode_verify_key_bytes + is_signing_algorithm_supported, decode_verify_key_bytes, + encode_verify_key_base64, ) from unpaddedbase64 import decode_base64, encode_base64 @@ -56,7 +58,7 @@ Attributes: key_ids(set(str)): The set of key_ids to that could be used to verify the JSON object json_object(dict): The JSON object to verify. - deferred(twisted.internet.defer.Deferred): + deferred(Deferred[str, str, nacl.signing.VerifyKey]): A deferred (server_name, key_id, verify_key) tuple that resolves when a verify key has been fetched. The deferreds' callbacks are run with no logcontext. @@ -736,6 +738,17 @@ class Keyring(object): @defer.inlineCallbacks def _handle_key_deferred(verify_request): + """Waits for the key to become available, and then performs a verification + + Args: + verify_request (VerifyKeyRequest): + + Returns: + Deferred[None] + + Raises: + SynapseError if there was a problem performing the verification + """ server_name = verify_request.server_name try: with PreserveLoggingContext(): @@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request): )) try: verify_signed_json(json_object, server_name, verify_key) - except Exception: + except SignatureVerifyException as e: + logger.debug( + "Error verifying signature for %s:%s:%s with key %s: %s", + server_name, verify_key.alg, verify_key.version, + encode_verify_key_base64(verify_key), + str(e), + ) raise SynapseError( 401, - "Invalid signature for server %s with key %s:%s" % ( - server_name, verify_key.alg, verify_key.version + "Invalid signature for server %s with key %s:%s: %s" % ( + server_name, verify_key.alg, verify_key.version, str(e), ), Codes.UNAUTHORIZED, ) -- cgit 1.4.1 From c6b1441c52e06b6b8c8715dd3e4d4340aac3a216 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 8 Jun 2018 11:34:46 +0100 Subject: Fix event filtering in get_missing_events handler --- synapse/handlers/federation.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcf94befb7..495ac4c648 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1794,6 +1794,10 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) + missing_events = yield self._filter_events_for_server( + origin, room_id, missing_events, + ) + defer.returnValue(missing_events) @defer.inlineCallbacks -- cgit 1.4.1 From 0eb4722932b4d2d92cd8cca97e1b36bd3b20d38d Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Fri, 8 Jun 2018 15:21:46 +0100 Subject: changelog a bump version --- CHANGES.rst | 10 ++++++++++ synapse/__init__.py | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/CHANGES.rst b/CHANGES.rst index 3a2a30873f..ff137f481a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,13 @@ +Changes in synapse v0.31.1 (2018-06-08) +======================================= + +This release fixes a security bug where event visibility rules were not +applied correctly to ``get_missing_events``. + +Bug Fixes: + + * Fix event filtering in get_missing_events handler (PR #3371) + Changes in synapse v0.31.0 (2018-06-06) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index ca113db434..78fc63aa49 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.31.0" +__version__ = "0.31.1" -- cgit 1.4.1 From d6cc36920504a7527fb9aacfc4e6fd35292455b2 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 11 Jun 2018 14:43:55 +0100 Subject: fix idiotic typo in state res --- synapse/state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/state.py b/synapse/state.py index 216418f58d..8098db94b4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -694,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma return auth_events -def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids, +def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map): conflicted_state = {} - for key, event_ids in iteritems(conflicted_state_ds): + for key, event_ids in iteritems(conflicted_state_ids): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events -- cgit 1.4.1 From eb32b2ca2018814ca8af08751e3b31f35fa71d24 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 11 Jun 2018 22:56:11 +0100 Subject: Optimise state_group_cache update (1) matrix-org-hotfixes has removed the intern calls; let's do the same here. (2) remove redundant iteritems() so we can used an optimised db update. --- synapse/storage/state.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index bdee14a8eb..85c8fffc19 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -567,11 +567,7 @@ class StateGroupWorkerStore(SQLBaseStore): # from the database. for group, group_state_dict in iteritems(group_to_state_dict): state_dict = results[group] - - state_dict.update( - ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) - for k, v in iteritems(group_state_dict) - ) + state_dict.update(group_state_dict) self._state_group_cache.update( cache_seq_num, -- cgit 1.4.1 From bd348f0af66a82e91b71d4313a20798b8d33b832 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jun 2018 11:37:10 +0100 Subject: remove dead filter_events_for_clients This is only used by filter_events_for_client, so we can simplify the whole thing by just doing one user at a time, and removing a dead storage function to boot. --- synapse/storage/account_data.py | 21 +---- synapse/visibility.py | 98 ++++++++-------------- .../replication/slave/storage/test_account_data.py | 8 -- 3 files changed, 35 insertions(+), 92 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index f83ff0454a..284ec3c970 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -20,7 +20,7 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.util.id_generators import StreamIdGenerator from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks import abc import simplejson as json @@ -114,25 +114,6 @@ class AccountDataWorkerStore(SQLBaseStore): else: defer.returnValue(None) - @cachedList(cached_method_name="get_global_account_data_by_type_for_user", - num_args=2, list_name="user_ids", inlineCallbacks=True) - def get_global_account_data_by_type_for_users(self, data_type, user_ids): - rows = yield self._simple_select_many_batch( - table="account_data", - column="user_id", - iterable=user_ids, - keyvalues={ - "account_data_type": data_type, - }, - retcols=("user_id", "content",), - desc="get_global_account_data_by_type_for_users", - ) - - defer.returnValue({ - row["user_id"]: json.loads(row["content"]) if row["content"] else None - for row in rows - }) - @cached(num_args=2) def get_account_data_for_room(self, user_id, room_id): """Get all the client account_data for a user for a room. diff --git a/synapse/visibility.py b/synapse/visibility.py index aaca2c584c..aef4953c1d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer from synapse.api.constants import Membership, EventTypes from synapse.util.logcontext import make_deferred_yieldable, preserve_fn -import logging - logger = logging.getLogger(__name__) @@ -43,21 +43,35 @@ MEMBERSHIP_PRIORITY = ( @defer.inlineCallbacks -def filter_events_for_clients(store, user_tuples, events, event_id_to_state, - always_include_ids=frozenset()): - """ Returns dict of user_id -> list of events that user is allowed to - see. +def filter_events_for_client(store, user_id, events, is_peeking=False, + always_include_ids=frozenset()): + """ + Check which events a user is allowed to see Args: - user_tuples (str, bool): (user id, is_peeking) for each user to be - checked. is_peeking should be true if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the - given events - events ([synapse.events.EventBase]): list of events to filter + store (synapse.storage.DataStore): our datastore (can also be a worker + store) + user_id(str): user id to be checked + events(list[synapse.events.EventBase]): sequence of events to be checked + is_peeking(bool): should be True if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events always_include_ids (set(event_id)): set of event ids to specifically include (unless sender is ignored) + + Returns: + Deferred[list[synapse.events.EventBase]] """ + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types, + ) + forgotten = yield make_deferred_yieldable(defer.gatherResults([ defer.maybeDeferred( preserve_fn(store.who_forgot_in_room), @@ -71,25 +85,20 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state, row["event_id"] for rows in forgotten for row in rows ) - ignore_dict_content = yield store.get_global_account_data_by_type_for_users( - "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] + ignore_dict_content = yield store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id, ) # FIXME: This will explode if people upload something incorrect. - ignore_dict = { - user_id: frozenset( - content.get("ignored_users", {}).keys() if content else [] - ) - for user_id, content in ignore_dict_content.items() - } + ignore_list = frozenset( + ignore_dict_content.get("ignored_users", {}).keys() + if ignore_dict_content else [] + ) - def allowed(event, user_id, is_peeking, ignore_list): + def allowed(event): """ Args: event (synapse.events.EventBase): event to check - user_id (str) - is_peeking (bool) - ignore_list (list): list of users to ignore """ if not event.is_state() and event.sender in ignore_list: return False @@ -184,43 +193,4 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state, # we don't know when they left. return not is_peeking - defer.returnValue({ - user_id: [ - event - for event in events - if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) - ] - for user_id, is_peeking in user_tuples - }) - - -@defer.inlineCallbacks -def filter_events_for_client(store, user_id, events, is_peeking=False, - always_include_ids=frozenset()): - """ - Check which events a user is allowed to see - - Args: - user_id(str): user id to be checked - events([synapse.events.EventBase]): list of events to be checked - is_peeking(bool): should be True if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - - Returns: - [synapse.events.EventBase] - """ - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - event_id_to_state = yield store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - res = yield filter_events_for_clients( - store, [(user_id, is_peeking)], events, event_id_to_state, - always_include_ids=always_include_ids, - ) - defer.returnValue(res.get(user_id, [])) + defer.returnValue(list(filter(allowed, events))) diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py index da54d478ce..f47a42e45d 100644 --- a/tests/replication/slave/storage/test_account_data.py +++ b/tests/replication/slave/storage/test_account_data.py @@ -37,10 +37,6 @@ class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase): "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 1} ) - yield self.check( - "get_global_account_data_by_type_for_users", - [TYPE, [USER_ID]], {USER_ID: {"a": 1}} - ) yield self.master_store.add_account_data_for_user( USER_ID, TYPE, {"a": 2} @@ -50,7 +46,3 @@ class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase): "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 2} ) - yield self.check( - "get_global_account_data_by_type_for_users", - [TYPE, [USER_ID]], {USER_ID: {"a": 2}} - ) -- cgit 1.4.1 From 9fc5b74b24314cc41d8cf54ab73ca8107b1342b4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jun 2018 16:18:57 +0100 Subject: simplify get_persisted_pdu it doesn't make much sense to use get_persisted_pdu on the receive path: just get the event straight from the store. --- synapse/federation/federation_server.py | 13 +--------- synapse/handlers/federation.py | 44 +++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 30 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2d420a58a2..d4dd967c60 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -277,7 +277,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_pdu_request(self, origin, event_id): - pdu = yield self._get_persisted_pdu(origin, event_id) + pdu = yield self.handler.get_persisted_pdu(origin, event_id) if pdu: defer.returnValue( @@ -470,17 +470,6 @@ class FederationServer(FederationBase): ts_now_ms = self._clock.time_msec() return self.store.get_user_id_for_open_id_token(token, ts_now_ms) - @log_function - def _get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. - - Returns: - Deferred: Results in a `Pdu`. - """ - return self.handler.get_persisted_pdu( - origin, event_id, do_auth=do_auth - ) - def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for transmission. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 495ac4c648..a5dee355e1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -103,8 +103,10 @@ class FederationHandler(BaseHandler): """ # We reprocess pdus when we have seen them only as outliers - existing = yield self.get_persisted_pdu( - origin, pdu.event_id, do_auth=False + existing = yield self.store.get_event( + pdu.event_id, + allow_none=True, + allow_rejected=True, ) # FIXME: Currently we fetch an event again when we already have it @@ -1468,11 +1470,20 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. + def get_persisted_pdu(self, origin, event_id): + """Get an event from the database for the given server. + + Args: + origin [str]: hostname of server which is requesting the event; we + will check that the server is allowed to see it. + event_id [str]: id of the event being requested Returns: - Deferred: Results in a `Pdu`. + Deferred[EventBase|None]: None if we know nothing about the event; + otherwise the (possibly-redacted) event. + + Raises: + AuthError if the server is not currently in the room """ event = yield self.store.get_event( event_id, @@ -1493,20 +1504,17 @@ class FederationHandler(BaseHandler): ) ) - if do_auth: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") - - events = yield self._filter_events_for_server( - origin, event.room_id, [event] - ) - - event = events[0] + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + events = yield self._filter_events_for_server( + origin, event.room_id, [event] + ) + event = events[0] defer.returnValue(event) else: defer.returnValue(None) -- cgit 1.4.1 From 3ff8a619f5ed9d12b72858bfbbe3859e77dd064c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jun 2018 16:37:19 +0100 Subject: UserErasureStore to store which users have been erased --- synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/50/erasure_store.sql | 21 +++++ synapse/storage/user_erasure_store.py | 103 ++++++++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 synapse/storage/schema/delta/50/erasure_store.sql create mode 100644 synapse/storage/user_erasure_store.py (limited to 'synapse') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 979fa22438..e843b702b9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,7 @@ import time import logging from synapse.storage.devices import DeviceStore +from synapse.storage.user_erasure_store import UserErasureStore from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) @@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore, DeviceInboxStore, UserDirectoryStore, GroupServerStore, + UserErasureStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql new file mode 100644 index 0000000000..5d8641a9ab --- /dev/null +++ b/synapse/storage/schema/delta/50/erasure_store.sql @@ -0,0 +1,21 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a table of users who have requested that their details be erased +CREATE TABLE erased_users ( + user_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id); diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py new file mode 100644 index 0000000000..47bfc01e84 --- /dev/null +++ b/synapse/storage/user_erasure_store.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import operator + +from twisted.internet import defer + +from synapse.storage._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedList, cached + + +class UserErasureWorkerStore(SQLBaseStore): + @cached() + def is_user_erased(self, user_id): + """ + Check if the given user id has requested erasure + + Args: + user_id (str): full user id to check + + Returns: + Deferred[bool]: True if the user has requested erasure + """ + return self._simple_select_onecol( + table="erased_users", + keyvalues={"user_id": user_id}, + retcol="1", + desc="is_user_erased", + ).addCallback(operator.truth) + + @cachedList( + cached_method_name="is_user_erased", + list_name="user_ids", + inlineCallbacks=True, + ) + def are_users_erased(self, user_ids): + """ + Checks which users in a list have requested erasure + + Args: + user_ids (iterable[str]): full user id to check + + Returns: + Deferred[dict[str, bool]]: + for each user, whether the user has requested erasure. + """ + # this serves the dual purpose of (a) making sure we can do len and + # iterate it multiple times, and (b) avoiding duplicates. + user_ids = tuple(set(user_ids)) + + def _get_erased_users(txn): + txn.execute( + "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % ( + ",".join("?" * len(user_ids)) + ), + user_ids, + ) + return set(r[0] for r in txn) + + erased_users = yield self.runInteraction( + "are_users_erased", _get_erased_users, + ) + res = dict((u, u in erased_users) for u in user_ids) + defer.returnValue(res) + + +class UserErasureStore(UserErasureWorkerStore): + def mark_user_erased(self, user_id): + """Indicate that user_id wishes their message history to be erased. + + Args: + user_id (str): full user_id to be erased + """ + def f(txn): + # first check if they are already in the list + txn.execute( + "SELECT 1 FROM erased_users WHERE user_id = ?", + (user_id, ) + ) + if txn.fetchone(): + return + + # they are not already there: do the insert. + txn.execute( + "INSERT INTO erased_users (user_id) VALUES (?)", + (user_id, ) + ) + + self._invalidate_cache_and_stream( + txn, self.is_user_erased, (user_id,) + ) + return self.runInteraction("mark_user_erased", f) -- cgit 1.4.1 From f1023ebf4bfff8f3c3b51daeaeb1ba05869ab703 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 12 Jun 2018 09:43:59 +0100 Subject: mark accounts as erased when requested --- synapse/handlers/deactivate_account.py | 7 ++++++- synapse/rest/client/v1/admin.py | 4 +++- synapse/rest/client/v2_alpha/account.py | 13 +++++++++++-- 3 files changed, 20 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 8ec5ba2012..404b662469 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ class DeactivateAccountHandler(BaseHandler): reactor.callWhenRunning(self._start_user_parting) @defer.inlineCallbacks - def deactivate_account(self, user_id): + def deactivate_account(self, user_id, erase_data): """Deactivate a user's account Args: @@ -92,6 +92,11 @@ class DeactivateAccountHandler(BaseHandler): # delete from user directory yield self.user_directory_handler.handle_user_deactivated(user_id) + # Mark the user as erased, if they asked for that + if erase_data: + logger.info("Marking %s as erased", user_id) + yield self.store.mark_user_erased(user_id) + # Now start the process that goes through that list and # parts users from rooms (if it isn't already running) self._start_user_parting() diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index b8665a45eb..ddaedb2a8c 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -254,7 +254,9 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self._deactivate_account_handler.deactivate_account(target_user_id) + yield self._deactivate_account_handler.deactivate_account( + target_user_id, False, + ) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 0291fba9e7..3b822c0cb4 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ # limitations under the License. import logging +from six.moves import http_client from twisted.internet import defer from synapse.api.auth import has_access_token @@ -187,13 +189,20 @@ class DeactivateAccountRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): body = parse_json_object_from_request(request) + erase = body.get("erase", False) + if not isinstance(erase, bool): + raise SynapseError( + http_client.BAD_REQUEST, + "Param 'erase' must be a boolean, if given", + Codes.BAD_JSON, + ) requester = yield self.auth.get_user_by_req(request) # allow ASes to dectivate their own users if requester.app_service: yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string() + requester.user.to_string(), erase, ) defer.returnValue((200, {})) @@ -201,7 +210,7 @@ class DeactivateAccountRestServlet(RestServlet): requester, body, self.hs.get_ip_from_request(request), ) yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), + requester.user.to_string(), erase, ) defer.returnValue((200, {})) -- cgit 1.4.1 From b6faef2ad7cf1b427f418441e50c8ca8d7c67b61 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jun 2018 12:28:03 +0100 Subject: Filter out erased messages Redact any messges sent by erased users. --- synapse/handlers/federation.py | 13 +++++++ synapse/visibility.py | 77 ++++++++++++++++++++++++++++++------------ 2 files changed, 68 insertions(+), 22 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a5dee355e1..6f4adf102b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -495,7 +495,20 @@ class FederationHandler(BaseHandler): for e_id, key_to_eid in event_to_state_ids.iteritems() } + erased_senders = yield self.store.are_users_erased( + e.sender for e in events, + ) + def redact_disallowed(event, state): + # if the sender has been gdpr17ed, always return a redacted + # copy of the event. + if erased_senders[event.sender]: + logger.info( + "Sender of %s has been erased, redacting", + event.event_id, + ) + return prune_event(event) + if not state: return event diff --git a/synapse/visibility.py b/synapse/visibility.py index aef4953c1d..65d79cf0d0 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,15 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools import logging +import operator from twisted.internet import defer -from synapse.api.constants import Membership, EventTypes - -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn - +from synapse.api.constants import EventTypes, Membership +from synapse.events.utils import prune_event +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, +) logger = logging.getLogger(__name__) @@ -95,16 +97,27 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if ignore_dict_content else [] ) + erased_senders = yield store.are_users_erased((e.sender for e in events)) + def allowed(event): """ Args: event (synapse.events.EventBase): event to check + + Returns: + None|EventBase: + None if the user cannot see this event at all + + a redacted copy of the event if they can only see a redacted + version + + the original event if they can see it as normal. """ if not event.is_state() and event.sender in ignore_list: - return False + return None if event.event_id in always_include_ids: - return True + return event state = event_id_to_state[event.event_id] @@ -118,10 +131,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if visibility not in VISIBILITY_PRIORITY: visibility = "shared" - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - # Always allow history visibility events on boundaries. This is done # by setting the effective visibility to the least restrictive # of the old vs new. @@ -155,7 +164,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership == "leave" and ( prev_membership == "join" or prev_membership == "invite" ): - return True + return event new_priority = MEMBERSHIP_PRIORITY.index(membership) old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) @@ -166,31 +175,55 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership is None: membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: + # XXX why do we do this? + # https://github.com/matrix-org/synapse/issues/3350 if membership_event.event_id not in event_id_forgotten: membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. if membership == Membership.JOIN: - return True + return event + + # otherwise, it depends on the room visibility. if visibility == "joined": # we weren't a member at the time of the event, so we can't # see this event. - return False + return None elif visibility == "invited": # user can also see the event if they were *invited* at the time # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event + return ( + event if membership == Membership.INVITE else None + ) + + elif visibility == "shared" and is_peeking: + # if the visibility is shared, users cannot see the event unless + # they have *subequently* joined the room (or were members at the + # time, of course) # # XXX: if the user has subsequently joined and then left again, # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking + # we don't know when they left. We just treat it as though they + # never joined, and restrict access. + return None + + # the visibility is either shared or world_readable, and the user was + # not a member at the time. We allow it, provided the original sender + # has not requested their data to be erased, in which case, we return + # a redacted version. + if erased_senders[event.sender]: + return prune_event(event) + + return event + + # check each event: gives an iterable[None|EventBase] + filtered_events = itertools.imap(allowed, events) + + # remove the None entries + filtered_events = filter(operator.truth, filtered_events) - defer.returnValue(list(filter(allowed, events))) + # we turn it into a list before returning it. + defer.returnValue(list(filtered_events)) -- cgit 1.4.1 From 12285a1a76991f813c71234a14a428192f2c6f93 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Tue, 12 Jun 2018 12:51:31 -0600 Subject: The flag is named enable_metrics, not collect_metrics Signed-off-by: Travis Ralston --- synapse/app/appservice.py | 2 +- synapse/app/client_reader.py | 2 +- synapse/app/event_creator.py | 2 +- synapse/app/federation_reader.py | 2 +- synapse/app/federation_sender.py | 2 +- synapse/app/frontend_proxy.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/media_repository.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/app/user_dir.py | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index dd114dee07..ea052588ef 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -97,7 +97,7 @@ class AppserviceServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 85dada7f9f..654ddb8414 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 5ca77c0f1a..441467093a 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 2a1995d0cd..b2415cc671 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 81ad574043..13d2b70053 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 5a164a7a95..d2bae4ad03 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 714f98a3e0..f855925fc8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 006bba80a8..19a682cce3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 64df47f9cc..13cfbd08b0 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -128,7 +128,7 @@ class PusherServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 6808d6d3e0..82f06ea185 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index ada1c13cec..f5726e3df6 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) -- cgit 1.4.1 From 45768d1640c53d74dda0d2486ce4c441b1a2d501 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Tue, 12 Jun 2018 12:54:55 -0600 Subject: Use the RegistryProxy for appservices too Signed-off-by: Travis Ralston --- synapse/app/appservice.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index dd114dee07..742fbb05c7 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -23,6 +23,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore @@ -62,7 +63,7 @@ class AppserviceServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) root_resource = create_resource_tree(resources, NoResource()) -- cgit 1.4.1 From a61738b316db70a4184d5c355696e0a039e7867f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 14 Jun 2018 18:27:37 +1000 Subject: Remove run_on_reactor (#3395) --- synapse/federation/transaction_queue.py | 4 ---- synapse/handlers/auth.py | 8 ++------ synapse/handlers/federation.py | 6 +----- synapse/handlers/identity.py | 8 -------- synapse/handlers/message.py | 4 +--- synapse/handlers/register.py | 5 +---- synapse/push/pusherpool.py | 3 --- synapse/rest/client/v1/register.py | 7 ------- synapse/rest/client/v2_alpha/account.py | 7 ------- synapse/rest/client/v2_alpha/register.py | 3 --- synapse/util/async.py | 10 +--------- tests/test_distributor.py | 2 -- tests/util/caches/test_descriptors.py | 2 -- 13 files changed, 6 insertions(+), 63 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f0aeb5a0d3..bcbce7f6eb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,6 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.util import logcontext, PreserveLoggingContext -from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes @@ -451,9 +450,6 @@ class TransactionQueue(object): # hence why we throw the result away. yield get_retry_limiter(destination, self.clock, self.store) - # XXX: what's this for? - yield run_on_reactor() - pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 912136534d..dabc744890 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from twisted.internet import defer, threads from ._base import BaseHandler @@ -23,7 +24,6 @@ from synapse.api.errors import ( ) from synapse.module_api import ModuleApi from synapse.types import UserID -from synapse.util.async import run_on_reactor from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable @@ -423,15 +423,11 @@ class AuthHandler(BaseHandler): def _check_msisdn(self, authdict, _): return self._check_threepid('msisdn', authdict) - @defer.inlineCallbacks def _check_dummy_auth(self, authdict, _): - yield run_on_reactor() - defer.returnValue(True) + return defer.succeed(True) @defer.inlineCallbacks def _check_threepid(self, medium, authdict): - yield run_on_reactor() - if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 495ac4c648..af94bf33bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError, logcontext from synapse.util.metrics import measure_func from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, Linearizer +from synapse.util.async import Linearizer from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, @@ -1381,8 +1381,6 @@ class FederationHandler(BaseHandler): def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) @@ -1425,8 +1423,6 @@ class FederationHandler(BaseHandler): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] ) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 529400955d..f00dfe1d3e 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -27,7 +27,6 @@ from synapse.api.errors import ( MatrixCodeMessageException, CodeMessageException ) from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError, Codes logger = logging.getLogger(__name__) @@ -62,8 +61,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def threepid_from_creds(self, creds): - yield run_on_reactor() - if 'id_server' in creds: id_server = creds['id_server'] elif 'idServer' in creds: @@ -106,7 +103,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def bind_threepid(self, creds, mxid): - yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) data = None @@ -188,8 +184,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): - yield run_on_reactor() - if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, @@ -224,8 +218,6 @@ class IdentityHandler(BaseHandler): self, id_server, country, phone_number, client_secret, send_attempt, **kwargs ): - yield run_on_reactor() - if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1cb81b6cf8..18dcc6d196 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) -from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter +from synapse.util.async import ReadWriteLock, Limiter from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import frozendict_json_encoder @@ -959,9 +959,7 @@ class EventCreationHandler(object): event_stream_id, max_stream_id ) - @defer.inlineCallbacks def _notify(): - yield run_on_reactor() try: self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7e52adda3c..e76ef5426d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID, create_requester, RoomID, RoomAlias -from synapse.util.async import run_on_reactor, Linearizer +from synapse.util.async import Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() password_hash = None if password: password_hash = yield self.auth_handler().hash(password) @@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() - if localpart is None: raise SynapseError(400, "Request must include user id") diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 750d11ca38..36bb5bbc65 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,7 +19,6 @@ import logging from twisted.internet import defer from synapse.push.pusher import PusherFactory -from synapse.util.async import run_on_reactor from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -125,7 +124,6 @@ class PusherPool: @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): - yield run_on_reactor() try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id @@ -151,7 +149,6 @@ class PusherPool: @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - yield run_on_reactor() try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 9b3022e0b0..c10320dedf 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils from synapse.http.servlet import parse_json_object_from_request from synapse.types import create_requester -from synapse.util.async import run_on_reactor - from hashlib import sha1 import hmac import logging @@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def _do_password(self, request, register_json, session): - yield run_on_reactor() if (self.hs.config.enable_registration_captcha and not session[LoginType.RECAPTCHA]): # captcha should've been done by this stage! @@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def _do_shared_secret(self, request, register_json, session): - yield run_on_reactor() - if not isinstance(register_json.get("mac", None), string_types): raise SynapseError(400, "Expected mac.") if not isinstance(register_json.get("user", None), string_types): @@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def _do_create(self, requester, user_json): - yield run_on_reactor() - if "localpart" not in user_json: raise SynapseError(400, "Expected 'localpart' key.") diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 0291fba9e7..e1281cfbb6 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -24,7 +24,6 @@ from synapse.http.servlet import ( RestServlet, assert_params_in_request, parse_json_object_from_request, ) -from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler @@ -300,8 +299,6 @@ class ThreepidRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): - yield run_on_reactor() - requester = yield self.auth.get_user_by_req(request) threepids = yield self.datastore.user_get_threepids( @@ -312,8 +309,6 @@ class ThreepidRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() - body = parse_json_object_from_request(request) threePidCreds = body.get('threePidCreds') @@ -365,8 +360,6 @@ class ThreepidDeleteRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() - body = parse_json_object_from_request(request) required = ['medium', 'address'] diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 5cab00aea9..97e7c0f7c6 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler import logging import hmac from hashlib import sha1 -from synapse.util.async import run_on_reactor from synapse.util.ratelimitutils import FederationRateLimiter from six import string_types @@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet): @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() - body = parse_json_object_from_request(request) kind = "user" diff --git a/synapse/util/async.py b/synapse/util/async.py index 9dd4e6b5bc..b8e57efc54 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from twisted.internet import defer, reactor from twisted.internet.defer import CancelledError from twisted.python import failure @@ -41,13 +40,6 @@ def sleep(seconds): defer.returnValue(res) -def run_on_reactor(): - """ This will cause the rest of the function to be invoked upon the next - iteration of the main loop - """ - return sleep(0) - - class ObservableDeferred(object): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original @@ -227,7 +219,7 @@ class Linearizer(object): # the context manager, but it needs to happen while we hold the # lock, and the context manager's exit code must be synchronous, # so actually this is the only sensible place. - yield run_on_reactor() + yield sleep(0) else: logger.info("Acquired uncontended linearizer lock %r for key %r", diff --git a/tests/test_distributor.py b/tests/test_distributor.py index 010aeaee7e..c066381698 100644 --- a/tests/test_distributor.py +++ b/tests/test_distributor.py @@ -19,7 +19,6 @@ from twisted.internet import defer from mock import Mock, patch from synapse.util.distributor import Distributor -from synapse.util.async import run_on_reactor class DistributorTestCase(unittest.TestCase): @@ -95,7 +94,6 @@ class DistributorTestCase(unittest.TestCase): @defer.inlineCallbacks def observer(): - yield run_on_reactor() raise MyException("Oopsie") self.dist.observe("whail", observer) diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 2516fe40f4..24754591df 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -18,7 +18,6 @@ import logging import mock from synapse.api.errors import SynapseError -from synapse.util import async from synapse.util import logcontext from twisted.internet import defer from synapse.util.caches import descriptors @@ -195,7 +194,6 @@ class DescriptorTestCase(unittest.TestCase): def fn(self, arg1): @defer.inlineCallbacks def inner_fn(): - yield async.run_on_reactor() raise SynapseError(400, "blah") return inner_fn() -- cgit 1.4.1 From 557b686eacca0b59820489daf6a5ea929cda1636 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 14 Jun 2018 11:26:27 +0100 Subject: Refactor get_send_level to take a power_levels event it makes it easier for me to reason about --- synapse/api/auth.py | 2 +- synapse/event_auth.py | 45 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 06fa38366d..66639b0089 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -655,7 +655,7 @@ class Auth(object): auth_events[(EventTypes.PowerLevels, "")] = power_level_event send_level = event_auth.get_send_level( - EventTypes.Aliases, "", auth_events + EventTypes.Aliases, "", power_level_event, ) user_level = event_auth.get_user_power_level(user_id, auth_events) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index eaf9cecde6..138175a554 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -355,9 +355,24 @@ def _check_joined_room(member, user_id, room_id): )) -def get_send_level(etype, state_key, auth_events): - key = (EventTypes.PowerLevels, "", ) - send_level_event = auth_events.get(key) +def get_send_level(etype, state_key, power_levels_event): + """Get the power level required to send an event of a given type + + The federation spec [1] refers to this as "Required Power Level". + + https://matrix.org/docs/spec/server_server/unstable.html#definitions + + Args: + etype (str): type of event + state_key (str|None): state_key of state event, or None if it is not + a state event. + power_levels_event (synapse.events.EventBase|None): power levels event + in force at this point in the room + Returns: + int: power level required to send this event. + """ + + send_level_event = power_levels_event # todo: rename refs below send_level = None if send_level_event: send_level = send_level_event.content.get("events", {}).get( @@ -382,8 +397,10 @@ def get_send_level(etype, state_key, auth_events): def _can_send_event(event, auth_events): + power_levels_event = _get_power_level_event(auth_events) + send_level = get_send_level( - event.type, event.get("state_key", None), auth_events + event.type, event.get("state_key"), power_levels_event, ) user_level = get_user_power_level(event.user_id, auth_events) @@ -524,13 +541,22 @@ def _check_power_levels(event, auth_events): def _get_power_level_event(auth_events): - key = (EventTypes.PowerLevels, "", ) - return auth_events.get(key) + return auth_events.get((EventTypes.PowerLevels, "")) def get_user_power_level(user_id, auth_events): - power_level_event = _get_power_level_event(auth_events) + """Get a user's power level + + Args: + user_id (str): user's id to look up in power_levels + auth_events (dict[(str, str), synapse.events.EventBase]): + state in force at this point in the room (or rather, a subset of + it including at least the create event and power levels event. + Returns: + int: the user's power level in this room. + """ + power_level_event = _get_power_level_event(auth_events) if power_level_event: level = power_level_event.content.get("users", {}).get(user_id) if not level: @@ -541,6 +567,11 @@ def get_user_power_level(user_id, auth_events): else: return int(level) else: + # if there is no power levels event, the creator gets 100 and everyone + # else gets 0. + + # some things which call this don't pass the create event: hack around + # that. key = (EventTypes.Create, "", ) create_event = auth_events.get(key) if (create_event is not None and -- cgit 1.4.1 From f116f32ace4aff8e378b4771ad072a89c7433a70 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 14 Jun 2018 20:26:59 +1000 Subject: add a last seen metric (#3396) --- synapse/metrics/__init__.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 429e79c472..0355d020ef 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -190,6 +190,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +last_ticked = time.time() + + +class ReactorLastSeenMetric(object): + + def collect(self): + cm = GaugeMetricFamily( + "python_twisted_reactor_last_seen", + "Seconds since the Twisted reactor was last seen", + ) + cm.add_metric([], time.time() - last_ticked) + yield cm + + +REGISTRY.register(ReactorLastSeenMetric()) + def runUntilCurrentTimer(func): @@ -222,6 +238,11 @@ def runUntilCurrentTimer(func): tick_time.observe(end - start) pending_calls_metric.observe(num_pending) + # Update the time we last ticked, for the metric to test whether + # Synapse's reactor has frozen + global last_ticked + last_ticked = end + if running_on_pypy: return ret -- cgit 1.4.1 From 52423607bddf6c1bae5dbc89fb2c94496ad70fde Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 14 Jun 2018 12:26:17 +0100 Subject: Clarify interface for event_auth stop pretending that it returns a boolean, which just almost gave me a heart attack. --- synapse/event_auth.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 138175a554..36a48870a0 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): event: the event being checked. auth_events (dict: event-key -> event): the existing room state. + Raises: + AuthError if the checks fail Returns: - True if the auth checks pass. + if the auth checks pass. """ if do_size_check: _check_size_limits(event) @@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): # Oh, we don't know what the state of the room was, so we # are trusting that this is allowed (at least for now) logger.warn("Trusting event: %s", event.event_id) - return True + return if event.type == EventTypes.Create: room_id_domain = get_domain_from_id(event.room_id) @@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): "Creation event's room_id domain does not match sender's" ) # FIXME - return True + logger.debug("Allowing! %s", event) + return creation_event = auth_events.get((EventTypes.Create, ""), None) @@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): 403, "Alias event's state_key does not match sender's domain" ) - return True + logger.debug("Allowing! %s", event) + return if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): ) if event.type == EventTypes.Member: - allowed = _is_membership_change_allowed( - event, auth_events - ) - if allowed: - logger.debug("Allowing! %s", event) - else: - logger.debug("Denying! %s", event) - return allowed + _is_membership_change_allowed(event, auth_events) + logger.debug("Allowing! %s", event) + return _check_event_sender_in_room(event, auth_events) @@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): ) ) else: - return True + logger.debug("Allowing! %s", event) + return _can_send_event(event, auth_events) @@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events): create = auth_events.get(key) if create and event.prev_events[0][0] == create.event_id: if create.content["creator"] == event.state_key: - return True + return target_user_id = event.state_key @@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events): raise AuthError( 403, "%s is banned from the room" % (target_user_id,) ) - return True + return if Membership.JOIN != membership: if (caller_invited and Membership.LEAVE == membership and target_user_id == event.user_id): - return True + return if not caller_in_room: # caller isn't joined raise AuthError( @@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events): else: raise AuthError(500, "Unknown membership %s" % membership) - return True - def _check_event_sender_in_room(event, auth_events): key = (EventTypes.Member, event.user_id, ) -- cgit 1.4.1 From 5c9afd6f80cf04367fe9b02c396af9f85e02a611 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 14 Jun 2018 12:28:36 +0100 Subject: Make default state_default 50 Make it so that, before there is a power-levels event in the room, you need a power level of at least 50 to send state. Partially addresses https://github.com/matrix-org/matrix-doc/issues/1192 --- synapse/event_auth.py | 34 +++++------ tests/test_event_auth.py | 153 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 20 deletions(-) create mode 100644 tests/test_event_auth.py (limited to 'synapse') diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 36a48870a0..f512d88145 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -370,28 +370,22 @@ def get_send_level(etype, state_key, power_levels_event): int: power level required to send this event. """ - send_level_event = power_levels_event # todo: rename refs below - send_level = None - if send_level_event: - send_level = send_level_event.content.get("events", {}).get( - etype - ) - if send_level is None: - if state_key is not None: - send_level = send_level_event.content.get( - "state_default", 50 - ) - else: - send_level = send_level_event.content.get( - "events_default", 0 - ) - - if send_level: - send_level = int(send_level) + if power_levels_event: + power_levels_content = power_levels_event.content else: - send_level = 0 + power_levels_content = {} + + # see if we have a custom level for this event type + send_level = power_levels_content.get("events", {}).get(etype) + + # otherwise, fall back to the state_default/events_default. + if send_level is None: + if state_key is not None: + send_level = power_levels_content.get("state_default", 50) + else: + send_level = power_levels_content.get("events_default", 0) - return send_level + return int(send_level) def _can_send_event(event, auth_events): diff --git a/tests/test_event_auth.py b/tests/test_event_auth.py new file mode 100644 index 0000000000..3d756b0d85 --- /dev/null +++ b/tests/test_event_auth.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse import event_auth +from synapse.api.errors import AuthError +from synapse.events import FrozenEvent +import unittest + + +class EventAuthTestCase(unittest.TestCase): + @unittest.DEBUG + def test_random_users_cannot_send_state_before_first_pl(self): + """ + Check that, before the first PL lands, the creator is the only user + that can send a state event. + """ + creator = "@creator:example.com" + joiner = "@joiner:example.com" + auth_events = { + ("m.room.create", ""): _create_event(creator), + ("m.room.member", creator): _join_event(creator), + ("m.room.member", joiner): _join_event(joiner), + } + + # creator should be able to send state + event_auth.check( + _random_state_event(creator), auth_events, + do_sig_check=False, + ) + + # joiner should not be able to send state + self.assertRaises( + AuthError, + event_auth.check, + _random_state_event(joiner), + auth_events, + do_sig_check=False, + ), + + @unittest.DEBUG + def test_state_default_level(self): + """ + Check that users above the state_default level can send state and + those below cannot + """ + creator = "@creator:example.com" + pleb = "@joiner:example.com" + king = "@joiner2:example.com" + + auth_events = { + ("m.room.create", ""): _create_event(creator), + ("m.room.member", creator): _join_event(creator), + ("m.room.power_levels", ""): _power_levels_event(creator, { + "state_default": "30", + "users": { + pleb: "29", + king: "30", + }, + }), + ("m.room.member", pleb): _join_event(pleb), + ("m.room.member", king): _join_event(king), + } + + # pleb should not be able to send state + self.assertRaises( + AuthError, + event_auth.check, + _random_state_event(pleb), + auth_events, + do_sig_check=False, + ), + + # king should be able to send state + event_auth.check( + _random_state_event(king), auth_events, + do_sig_check=False, + ) + + +# helpers for making events + +TEST_ROOM_ID = "!test:room" + + +def _create_event(user_id): + return FrozenEvent({ + "room_id": TEST_ROOM_ID, + "event_id": _get_event_id(), + "type": "m.room.create", + "sender": user_id, + "content": { + "creator": user_id, + }, + }) + + +def _join_event(user_id): + return FrozenEvent({ + "room_id": TEST_ROOM_ID, + "event_id": _get_event_id(), + "type": "m.room.member", + "sender": user_id, + "state_key": user_id, + "content": { + "membership": "join", + }, + }) + + +def _power_levels_event(sender, content): + return FrozenEvent({ + "room_id": TEST_ROOM_ID, + "event_id": _get_event_id(), + "type": "m.room.power_levels", + "sender": sender, + "state_key": "", + "content": content, + }) + + +def _random_state_event(sender): + return FrozenEvent({ + "room_id": TEST_ROOM_ID, + "event_id": _get_event_id(), + "type": "test.state", + "sender": sender, + "state_key": "", + "content": { + "membership": "join", + }, + }) + + +event_count = 0 + + +def _get_event_id(): + global event_count + c = event_count + event_count += 1 + return "!%i:example.com" % (c, ) -- cgit 1.4.1 From 19cd3120ec59f6bce3d01a920fe462a914e1ae0e Mon Sep 17 00:00:00 2001 From: Michael Wagner Date: Thu, 14 Jun 2018 14:08:40 +0200 Subject: Add error code to room creation error This error code is mentioned in the documentation at https://matrix.org/docs/api/client-server/#!/Room32creation/createRoom --- synapse/handlers/room.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2abd63ad05..ab72963d87 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken from synapse.api.constants import ( EventTypes, JoinRules, RoomCreationPreset ) -from synapse.api.errors import AuthError, StoreError, SynapseError +from synapse.api.errors import AuthError, Codes, StoreError, SynapseError from synapse.util import stringutils from synapse.visibility import filter_events_for_client @@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler): ) if mapping: - raise SynapseError(400, "Room alias already taken") + raise SynapseError( + 400, + "Room alias already taken", + Codes.ROOM_IN_USE + ) else: room_alias = None -- cgit 1.4.1 From 7e1c6164528ee0081ceab754c27b15c656337963 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 14 Jun 2018 16:24:32 +0100 Subject: v0.31.2 --- synapse/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/__init__.py b/synapse/__init__.py index 78fc63aa49..faa183a99e 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,4 +17,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.31.1" +__version__ = "0.31.2" -- cgit 1.4.1 From ccfdaf68be5ffb179a81abf4cc44ec13e6984a60 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 16 Jun 2018 07:10:34 +0100 Subject: spell gauge correctly --- synapse/metrics/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 0355d020ef..7f76969467 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -62,7 +62,7 @@ class LaterGauge(object): calls = self.caller() except Exception: logger.exception( - "Exception running callback for LaterGuage(%s)", + "Exception running callback for LaterGauge(%s)", self.name, ) yield g -- cgit 1.4.1 From f6c4d74f9687aaaa7a3cecfdcd65b365633fd96c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 20 Jun 2018 11:18:57 +0100 Subject: Fix inflight requests metric (incorrect name & traceback) (#3413) --- synapse/http/request_metrics.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index dc06f6c443..1b711ca2de 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -117,13 +117,17 @@ def _get_in_flight_counts(): Returns: dict[tuple[str, str], int] """ - for rm in _in_flight_requests: + # Cast to a list to prevent it changing while the Prometheus + # thread is collecting metrics + reqs = list(_in_flight_requests) + + for rm in reqs: rm.update_metrics() # Map from (method, name) -> int, the number of in flight requests of that # type counts = {} - for rm in _in_flight_requests: + for rm in reqs: key = (rm.method, rm.name,) counts[key] = counts.get(key, 0) + 1 @@ -131,7 +135,7 @@ def _get_in_flight_counts(): LaterGauge( - "synapse_http_request_metrics_in_flight_requests_count", + "synapse_http_server_in_flight_requests_count", "", ["method", "servlet"], _get_in_flight_counts, -- cgit 1.4.1 From 245d53d32a6a98081ba8bf7f7214c8f869e353db Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Jun 2018 15:33:14 +0100 Subject: Write a clear restart indicator in logs I'm fed up with never being able to find the point a server restarted in the logs. --- synapse/app/homeserver.py | 7 +------ synapse/config/logger.py | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f855925fc8..ae5fc751d5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -318,11 +318,6 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = "Synapse/" + get_version_string(synapse) - - logger.info("Server hostname: %s", config.server_name) - logger.info("Server version: %s", version_string) - events.USE_FROZEN_DICTS = config.use_frozen_dicts tls_server_context_factory = context_factory.ServerContextFactory(config) @@ -335,7 +330,7 @@ def setup(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - version_string=version_string, + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 6a7228dc2f..557c270fbe 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -12,17 +12,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from ._base import Config -from synapse.util.logcontext import LoggingContextFilter -from twisted.logger import globalLogBeginner, STDLibLogObserver import logging import logging.config -import yaml -from string import Template import os import signal +from string import Template +import sys +from twisted.logger import STDLibLogObserver, globalLogBeginner +import yaml + +import synapse +from synapse.util.logcontext import LoggingContextFilter +from synapse.util.versionstring import get_version_string +from ._base import Config DEFAULT_LOG_CONFIG = Template(""" version: 1 @@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False): if getattr(signal, "SIGHUP"): signal.signal(signal.SIGHUP, sighup) + # make sure that the first thing we log is a thing we can grep backwards + # for + logging.warn("***** STARTING SERVER *****") + logging.warn( + "Server %s version %s", + sys.argv[0], get_version_string(synapse), + ) + logging.info("Server hostname: %s", config.server_name) + # It's critical to point twisted's internal logging somewhere, otherwise it # stacks up and leaks kup to 64K object; # see: https://twistedmatrix.com/trac/ticket/8164 -- cgit 1.4.1 From b088aafcae71fa61433f92ce0b139802a9cbd304 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jun 2018 06:15:03 +0100 Subject: Log number of events fetched from DB When we finish processing a request, log the number of events we fetched from the database to handle it. [I'm trying to figure out which requests are responsible for large amounts of event cache churn. It may turn out to be more helpful to add counts to the prometheus per-request/block metrics, but that is an extension to this code anyway.] --- synapse/http/site.py | 5 ++++- synapse/storage/events_worker.py | 4 ++++ synapse/util/logcontext.py | 4 ++++ 3 files changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/http/site.py b/synapse/http/site.py index 2664006f8c..74a752d6cf 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -99,16 +99,18 @@ class SynapseRequest(Request): db_txn_count = context.db_txn_count db_txn_duration_sec = context.db_txn_duration_sec db_sched_duration_sec = context.db_sched_duration_sec + evt_db_fetch_count = context.evt_db_fetch_count except Exception: ru_utime, ru_stime = (0, 0) db_txn_count, db_txn_duration_sec = (0, 0) + evt_db_fetch_count = 0 end_time = time.time() self.site.access_logger.info( "%s - %s - {%s}" " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" - " %sB %s \"%s %s %s\" \"%s\"", + " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, self.authenticated_entity, @@ -124,6 +126,7 @@ class SynapseRequest(Request): self.get_redacted_uri(), self.clientproto, self.get_user_agent(), + evt_db_fetch_count, ) try: diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 32d9d00ffb..cf4efa9d12 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -21,6 +21,7 @@ from synapse.events.utils import prune_event from synapse.util.logcontext import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background, + LoggingContext, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -145,6 +146,9 @@ class EventsWorkerStore(SQLBaseStore): missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: + log_ctx = LoggingContext.current_context() + log_ctx.evt_db_fetch_count += len(missing_events_ids) + missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a58c723403..2bfd4bee96 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -60,6 +60,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "evt_db_fetch_count", "usage_start", "main_thread", "alive", "request", "tag", @@ -109,6 +110,9 @@ class LoggingContext(object): # sec spent waiting for db txns to be scheduled self.db_sched_duration_sec = 0 + # number of events this thread has fetched from the db + self.evt_db_fetch_count = 0 + # If alive has the thread resource usage when the logcontext last # became active. self.usage_start = None -- cgit 1.4.1 From 99b77aa829bea3a46c72fc3a9932437b431b8b81 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 21 Jun 2018 09:39:27 +0100 Subject: Fix tcp protocol metrics naming (#3410) --- synapse/replication/tcp/protocol.py | 53 ++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c870475cd1..171a698e14 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # The following simply registers metrics for the replication connections pending_commands = LaterGauge( - "pending_commands", "", ["name", "conn_id"], + "synapse_replication_tcp_protocol_pending_commands", + "", + ["name", "conn_id"], lambda: { - (p.name, p.conn_id): len(p.pending_commands) - for p in connected_connections - }) + (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections + }, +) def transport_buffer_size(protocol): @@ -579,11 +581,13 @@ def transport_buffer_size(protocol): transport_send_buffer = LaterGauge( - "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"], + "synapse_replication_tcp_protocol_transport_send_buffer", + "", + ["name", "conn_id"], lambda: { - (p.name, p.conn_id): transport_buffer_size(p) - for p in connected_connections - }) + (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections + }, +) def transport_kernel_read_buffer_size(protocol, read=True): @@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True): tcp_transport_kernel_send_buffer = LaterGauge( - "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"], + "synapse_replication_tcp_protocol_transport_kernel_send_buffer", + "", + ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False) for p in connected_connections - }) + }, +) tcp_transport_kernel_read_buffer = LaterGauge( - "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"], + "synapse_replication_tcp_protocol_transport_kernel_read_buffer", + "", + ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True) for p in connected_connections - }) + }, +) tcp_inbound_commands = LaterGauge( - "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"], + "synapse_replication_tcp_protocol_inbound_commands", + "", + ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections for k, count in iteritems(p.inbound_commands_counter) - }) + }, +) tcp_outbound_commands = LaterGauge( - "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"], + "synapse_replication_tcp_protocol_outbound_commands", + "", + ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections for k, count in iteritems(p.outbound_commands_counter) - }) + }, +) # number of updates received for each RDATA stream -inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", - ["stream_name"]) +inbound_rdata_count = Counter( + "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"] +) -- cgit 1.4.1 From c2eff937ac357a46a176e2b6552a853fab85ee9c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 21 Jun 2018 09:39:58 +0100 Subject: Populate synapse_federation_client_sent_pdu_destinations:count again (#3386) --- synapse/federation/transaction_queue.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bcbce7f6eb..d72b057e28 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -41,8 +41,11 @@ import logging logger = logging.getLogger(__name__) -sent_pdus_destination_dist = Counter( - "synapse_federation_transaction_queue_sent_pdu_destinations", "" +sent_pdus_destination_dist_count = Counter( + "synapse_federation_client_sent_pdu_destinations:count", "" +) +sent_pdus_destination_dist_total = Counter( + "synapse_federation_client_sent_pdu_destinations:total", "" ) @@ -279,7 +282,8 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc(len(destinations)) + sent_pdus_destination_dist_total.inc(len(destinations)) + sent_pdus_destination_dist_count.inc() for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( -- cgit 1.4.1 From cbbfaa4be86f1428e9d55f3b387a167fc36e4aa4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jun 2018 10:02:42 +0100 Subject: Fix description of "python_gc_time" metric --- synapse/metrics/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 7f76969467..7d6e0232ed 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -140,7 +140,7 @@ gc_time = Histogram( class GCCounts(object): def collect(self): - cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) + cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) -- cgit 1.4.1 From 77ac14b960cb8daef76062ce85fc0427749b48af Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 22 Jun 2018 09:37:10 +0100 Subject: Pass around the reactor explicitly (#3385) --- synapse/handlers/auth.py | 30 +++++++++++++++++++---------- synapse/handlers/message.py | 1 + synapse/handlers/user_directory.py | 9 ++++----- synapse/http/client.py | 6 +++--- synapse/http/matrixfederationclient.py | 5 +++-- synapse/notifier.py | 3 +++ synapse/replication/http/send_event.py | 6 +++--- synapse/rest/media/v1/media_repository.py | 3 ++- synapse/rest/media/v1/media_storage.py | 7 +++++-- synapse/server.py | 19 ++++++++++++++---- synapse/storage/background_updates.py | 3 +-- synapse/storage/client_ips.py | 6 ++++-- synapse/storage/event_push_actions.py | 3 +-- synapse/storage/events_worker.py | 6 +++--- synapse/util/__init__.py | 32 +++++++++++++++++++------------ synapse/util/async.py | 25 +++++++++++------------- synapse/util/file_consumer.py | 16 +++++++++++----- synapse/util/ratelimitutils.py | 3 +-- tests/crypto/test_keyring.py | 9 +++++---- tests/rest/client/test_transactions.py | 6 +++--- tests/rest/media/v1/test_media_storage.py | 5 +++-- tests/util/test_file_consumer.py | 6 +++--- tests/util/test_linearizer.py | 7 ++++--- tests/util/test_logcontext.py | 11 ++++++----- tests/utils.py | 7 ++++++- 25 files changed, 141 insertions(+), 93 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dabc744890..a131b7f73f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -33,6 +33,7 @@ import logging import bcrypt import pymacaroons import simplejson +import attr import synapse.util.stringutils as stringutils @@ -854,7 +855,11 @@ class AuthHandler(BaseHandler): return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) - return make_deferred_yieldable(threads.deferToThread(_do_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash + ), + ) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -874,16 +879,21 @@ class AuthHandler(BaseHandler): ) if stored_hash: - return make_deferred_yieldable(threads.deferToThread(_do_validate_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), + self.hs.get_reactor().getThreadPool(), + _do_validate_hash, + ), + ) else: return defer.succeed(False) -class MacaroonGeneartor(object): - def __init__(self, hs): - self.clock = hs.get_clock() - self.server_name = hs.config.server_name - self.macaroon_secret_key = hs.config.macaroon_secret_key +@attr.s +class MacaroonGenerator(object): + + hs = attr.ib() def generate_access_token(self, user_id, extra_caveats=None): extra_caveats = extra_caveats or [] @@ -901,7 +911,7 @@ class MacaroonGeneartor(object): def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") - now = self.clock.time_msec() + now = self.hs.get_clock().time_msec() expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() @@ -913,9 +923,9 @@ class MacaroonGeneartor(object): def _generate_base_macaroon(self, user_id): macaroon = pymacaroons.Macaroon( - location=self.server_name, + location=self.hs.config.server_name, identifier="key", - key=self.macaroon_secret_key) + key=self.hs.config.macaroon_secret_key) macaroon.add_first_party_caveat("gen = 1") macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) return macaroon diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 18dcc6d196..7b9946ab91 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -806,6 +806,7 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( + self.hs.get_clock(), self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a39f0f7343..7e4a114d4f 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -19,7 +19,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure -from synapse.util.async import sleep from synapse.types import get_localpart_from_id from six import iteritems @@ -174,7 +173,7 @@ class UserDirectoryHandler(object): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) logger.info("Processed all rooms.") @@ -188,7 +187,7 @@ class UserDirectoryHandler(object): logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 - yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) logger.info("Processed all users") @@ -236,7 +235,7 @@ class UserDirectoryHandler(object): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 @@ -251,7 +250,7 @@ class UserDirectoryHandler(object): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) count += 1 user_set = (user_id, other_user_id) diff --git a/synapse/http/client.py b/synapse/http/client.py index 8064a84c5c..46ffb41de1 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -98,8 +98,8 @@ class SimpleHttpClient(object): method, uri, *args, **kwargs ) add_timeout_to_deferred( - request_deferred, - 60, cancelled_to_request_timed_out_error, + request_deferred, 60, self.hs.get_reactor(), + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) @@ -115,7 +115,7 @@ class SimpleHttpClient(object): "Error sending request to %s %s: %s %s", method, redact_uri(uri), type(e).__name__, e.message ) - raise e + raise @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}, headers=None): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 993dc06e02..4e0399e762 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep, add_timeout_to_deferred +from synapse.util.async import add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -193,6 +193,7 @@ class MatrixFederationHttpClient(object): add_timeout_to_deferred( request_deferred, timeout / 1000. if timeout else 60, + self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( @@ -234,7 +235,7 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - yield sleep(delay) + yield self.clock.sleep(delay) retries_left -= 1 else: raise diff --git a/synapse/notifier.py b/synapse/notifier.py index 6dce20a284..3c0622a294 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -161,6 +161,7 @@ class Notifier(object): self.user_to_user_stream = {} self.room_to_user_streams = {} + self.hs = hs self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() self.pending_new_room_events = [] @@ -340,6 +341,7 @@ class Notifier(object): add_timeout_to_deferred( listener.deferred, (end_time - now) / 1000., + self.hs.get_reactor(), ) with PreserveLoggingContext(): yield listener.deferred @@ -561,6 +563,7 @@ class Notifier(object): add_timeout_to_deferred( listener.deferred.addTimeout, (end_time - now) / 1000., + self.hs.get_reactor(), ) try: with PreserveLoggingContext(): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index a9baa2c1c3..f080f96cc1 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -21,7 +21,6 @@ from synapse.api.errors import ( from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -33,11 +32,12 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(client, host, port, requester, event, context, +def send_event_to_master(clock, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: + clock (synapse.util.Clock) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context, # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - yield sleep(1) + yield clock.sleep(1) except MatrixCodeMessageException as e: # We convert to SynapseError as we know that it was a SynapseError # on the master process that we should send to the client. (And diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 2ac767d2dc..218ba7a083 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -58,6 +58,7 @@ UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 class MediaRepository(object): def __init__(self, hs): + self.hs = hs self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -94,7 +95,7 @@ class MediaRepository(object): storage_providers.append(provider) self.media_storage = MediaStorage( - self.primary_base_path, self.filepaths, storage_providers, + self.hs, self.primary_base_path, self.filepaths, storage_providers, ) self.clock.looping_call( diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index d23fe10b07..d6b8ebbedb 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -37,13 +37,15 @@ class MediaStorage(object): """Responsible for storing/fetching files from local sources. Args: + hs (synapse.server.Homeserver) local_media_directory (str): Base path where we store media on disk filepaths (MediaFilePaths) storage_providers ([StorageProvider]): List of StorageProvider that are used to fetch and store files. """ - def __init__(self, local_media_directory, filepaths, storage_providers): + def __init__(self, hs, local_media_directory, filepaths, storage_providers): + self.hs = hs self.local_media_directory = local_media_directory self.filepaths = filepaths self.storage_providers = storage_providers @@ -175,7 +177,8 @@ class MediaStorage(object): res = yield provider.fetch(path, file_info) if res: with res: - consumer = BackgroundFileConsumer(open(local_path, "w")) + consumer = BackgroundFileConsumer( + open(local_path, "w"), self.hs.get_reactor()) yield res.write_to_consumer(consumer) yield consumer.wait() defer.returnValue(local_path) diff --git a/synapse/server.py b/synapse/server.py index 58dbf78437..c29c19289a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -40,7 +40,7 @@ from synapse.federation.transport.client import TransportLayerClient from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler -from synapse.handlers.auth import AuthHandler, MacaroonGeneartor +from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler @@ -165,15 +165,19 @@ class HomeServer(object): 'server_notices_sender', ] - def __init__(self, hostname, **kwargs): + def __init__(self, hostname, reactor=None, **kwargs): """ Args: hostname : The hostname for the server. """ + if not reactor: + from twisted.internet import reactor + + self._reactor = reactor self.hostname = hostname self._building = {} - self.clock = Clock() + self.clock = Clock(reactor) self.distributor = Distributor() self.ratelimiter = Ratelimiter() @@ -186,6 +190,12 @@ class HomeServer(object): self.datastore = DataStore(self.get_db_conn(), self) logger.info("Finished setting up.") + def get_reactor(self): + """ + Fetch the Twisted reactor in use by this HomeServer. + """ + return self._reactor + def get_ip_from_request(self, request): # X-Forwarded-For is handled by our custom request type. return request.getClientIP() @@ -261,7 +271,7 @@ class HomeServer(object): return AuthHandler(self) def build_macaroon_generator(self): - return MacaroonGeneartor(self) + return MacaroonGenerator(self) def build_device_handler(self): return DeviceHandler(self) @@ -328,6 +338,7 @@ class HomeServer(object): return adbapi.ConnectionPool( name, + cp_reactor=self.get_reactor(), **self.db_config.get("args", {}) ) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 8af325a9f5..b7e9c716c8 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -92,7 +91,7 @@ class BackgroundUpdateStore(SQLBaseStore): logger.info("Starting background schema updates") while True: - yield synapse.util.async.sleep( + yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ce338514e8..968d2fed22 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer, reactor +from twisted.internet import defer from ._base import Cache from . import background_updates @@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._client_ip_looper = self._clock.looping_call( self._update_client_ips_batch, 5 * 1000 ) - reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0350ee5fe..c4a0208ce4 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -16,7 +16,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction from twisted.internet import defer -from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks import logging @@ -800,7 +799,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) if caught_up: break - yield sleep(5) + yield self.hs.get_clock().sleep(5) finally: self._doing_notif_rotation = False diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 32d9d00ffb..38fcf7d444 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from twisted.internet import defer, reactor +from twisted.internet import defer from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -265,7 +265,7 @@ class EventsWorkerStore(SQLBaseStore): except Exception: logger.exception("Failed to callback") with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) + self.hs.get_reactor().callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -278,7 +278,7 @@ class EventsWorkerStore(SQLBaseStore): if event_list: with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) + self.hs.get_reactor().callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fc11e26623..2a3df7c71d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - -from twisted.internet import defer, reactor, task - -import time import logging - from itertools import islice +import attr +from twisted.internet import defer, task + +from synapse.util.logcontext import PreserveLoggingContext + logger = logging.getLogger(__name__) @@ -31,16 +30,24 @@ def unwrapFirstError(failure): return failure.value.subFailure +@attr.s class Clock(object): - """A small utility that obtains current time-of-day so that time may be - mocked during unit-tests. - - TODO(paul): Also move the sleep() functionality into it """ + A Clock wraps a Twisted reactor and provides utilities on top of it. + """ + _reactor = attr.ib() + + @defer.inlineCallbacks + def sleep(self, seconds): + d = defer.Deferred() + with PreserveLoggingContext(): + self._reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def time(self): """Returns the current system time in seconds since epoch.""" - return time.time() + return self._reactor.seconds() def time_msec(self): """Returns the current system time in miliseconds since epoch.""" @@ -56,6 +63,7 @@ class Clock(object): msec(float): How long to wait between calls in milliseconds. """ call = task.LoopingCall(f) + call.clock = self._reactor call.start(msec / 1000.0, now=False) return call @@ -73,7 +81,7 @@ class Clock(object): callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer, ignore_errs=False): try: diff --git a/synapse/util/async.py b/synapse/util/async.py index b8e57efc54..1668df4ce6 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background ) -from synapse.util import logcontext, unwrapFirstError +from synapse.util import logcontext, unwrapFirstError, Clock from contextlib import contextmanager @@ -31,15 +31,6 @@ from six.moves import range logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def sleep(seconds): - d = defer.Deferred() - with PreserveLoggingContext(): - reactor.callLater(seconds, d.callback, seconds) - res = yield d - defer.returnValue(res) - - class ObservableDeferred(object): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original @@ -172,13 +163,18 @@ class Linearizer(object): # do some work. """ - def __init__(self, name=None): + def __init__(self, name=None, clock=None): if name is None: self.name = id(self) else: self.name = name self.key_to_defer = {} + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock + @defer.inlineCallbacks def queue(self, key): # If there is already a deferred in the queue, we pull it out so that @@ -219,7 +215,7 @@ class Linearizer(object): # the context manager, but it needs to happen while we hold the # lock, and the context manager's exit code must be synchronous, # so actually this is the only sensible place. - yield sleep(0) + yield self._clock.sleep(0) else: logger.info("Acquired uncontended linearizer lock %r for key %r", @@ -396,7 +392,7 @@ class DeferredTimeoutError(Exception): """ -def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): +def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): """ Add a timeout to a deferred by scheduling it to be cancelled after timeout seconds. @@ -411,6 +407,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): Args: deferred (defer.Deferred): deferred to be timed out timeout (Number): seconds to time out after + reactor (twisted.internet.reactor): the Twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e..c78801015b 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from twisted.internet import threads from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices @@ -34,9 +35,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +74,10 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.getThreadPool(), + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +115,7 @@ class BackgroundFileConsumer(object): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +127,7 @@ class BackgroundFileConsumer(object): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 0ab63c3d7d..c5a45cef7c 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError -from synapse.util.async import sleep from synapse.util.logcontext import ( run_in_background, make_deferred_yieldable, PreserveLoggingContext, @@ -153,7 +152,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) + ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 149e443022..cc1c862ba4 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -19,10 +19,10 @@ import signedjson.sign from mock import Mock from synapse.api.errors import SynapseError from synapse.crypto import keyring -from synapse.util import async, logcontext +from synapse.util import logcontext, Clock from synapse.util.logcontext import LoggingContext from tests import unittest, utils -from twisted.internet import defer +from twisted.internet import defer, reactor class MockPerspectiveServer(object): @@ -118,6 +118,7 @@ class KeyringTestCase(unittest.TestCase): @defer.inlineCallbacks def test_verify_json_objects_for_server_awaits_previous_requests(self): + clock = Clock(reactor) key1 = signedjson.key.generate_signing_key(1) kr = keyring.Keyring(self.hs) @@ -167,7 +168,7 @@ class KeyringTestCase(unittest.TestCase): # wait a tick for it to send the request to the perspectives server # (it first tries the datastore) - yield async.sleep(1) # XXX find out why this takes so long! + yield clock.sleep(1) # XXX find out why this takes so long! self.http_client.post_json.assert_called_once() self.assertIs(LoggingContext.current_context(), context_11) @@ -183,7 +184,7 @@ class KeyringTestCase(unittest.TestCase): res_deferreds_2 = kr.verify_json_objects_for_server( [("server10", json1)], ) - yield async.sleep(1) + yield clock.sleep(1) self.http_client.post_json.assert_not_called() res_deferreds_2[0].addBoth(self.check_context, None) diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index b5bc2fa255..6a757289db 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -1,9 +1,9 @@ from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.transactions import CLEANUP_PERIOD_MS -from twisted.internet import defer +from twisted.internet import defer, reactor from mock import Mock, call -from synapse.util import async +from synapse.util import Clock from synapse.util.logcontext import LoggingContext from tests import unittest from tests.utils import MockClock @@ -46,7 +46,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase): def test_logcontexts_with_async_result(self): @defer.inlineCallbacks def cb(): - yield async.sleep(0) + yield Clock(reactor).sleep(0) defer.returnValue("yay") @defer.inlineCallbacks diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index eef38b6781..c5e2f5549a 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -14,7 +14,7 @@ # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.rest.media.v1._base import FileInfo from synapse.rest.media.v1.media_storage import MediaStorage @@ -38,6 +38,7 @@ class MediaStorageTests(unittest.TestCase): self.secondary_base_path = os.path.join(self.test_dir, "secondary") hs = Mock() + hs.get_reactor = Mock(return_value=reactor) hs.config.media_store_path = self.primary_base_path storage_providers = [FileStorageProviderBackend( @@ -46,7 +47,7 @@ class MediaStorageTests(unittest.TestCase): self.filepaths = MediaFilePaths(self.primary_base_path) self.media_storage = MediaStorage( - self.primary_base_path, self.filepaths, storage_providers, + hs, self.primary_base_path, self.filepaths, storage_providers, ) def tearDown(self): diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py index d6e1082779..c2aae8f54c 100644 --- a/tests/util/test_file_consumer.py +++ b/tests/util/test_file_consumer.py @@ -30,7 +30,7 @@ class FileConsumerTests(unittest.TestCase): @defer.inlineCallbacks def test_pull_consumer(self): string_file = StringIO() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = DummyPullProducer() @@ -54,7 +54,7 @@ class FileConsumerTests(unittest.TestCase): @defer.inlineCallbacks def test_push_consumer(self): string_file = BlockingStringWrite() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = NonCallableMock(spec_set=[]) @@ -80,7 +80,7 @@ class FileConsumerTests(unittest.TestCase): @defer.inlineCallbacks def test_push_producer_feedback(self): string_file = BlockingStringWrite() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 4865eb4bc6..bf7e3aa885 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -12,10 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util import async, logcontext + +from synapse.util import logcontext, Clock from tests import unittest -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.util.async import Linearizer from six.moves import range @@ -53,7 +54,7 @@ class LinearizerTestCase(unittest.TestCase): self.assertEqual( logcontext.LoggingContext.current_context(), lc) if sleep: - yield async.sleep(0) + yield Clock(reactor).sleep(0) self.assertEqual( logcontext.LoggingContext.current_context(), lc) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index ad78d884e0..9cf90fcfc4 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -3,8 +3,7 @@ from twisted.internet import defer from twisted.internet import reactor from .. import unittest -from synapse.util.async import sleep -from synapse.util import logcontext +from synapse.util import logcontext, Clock from synapse.util.logcontext import LoggingContext @@ -22,18 +21,20 @@ class LoggingContextTestCase(unittest.TestCase): @defer.inlineCallbacks def test_sleep(self): + clock = Clock(reactor) + @defer.inlineCallbacks def competing_callback(): with LoggingContext() as competing_context: competing_context.request = "competing" - yield sleep(0) + yield clock.sleep(0) self._check_test_key("competing") reactor.callLater(0, competing_callback) with LoggingContext() as context_one: context_one.request = "one" - yield sleep(0) + yield clock.sleep(0) self._check_test_key("one") def _test_run_in_background(self, function): @@ -87,7 +88,7 @@ class LoggingContextTestCase(unittest.TestCase): def test_run_in_background_with_blocking_fn(self): @defer.inlineCallbacks def blocking_function(): - yield sleep(0) + yield Clock(reactor).sleep(0) return self._test_run_in_background(blocking_function) diff --git a/tests/utils.py b/tests/utils.py index 262c4a5714..189fd2711c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -37,11 +37,15 @@ USE_POSTGRES_FOR_TESTS = False @defer.inlineCallbacks -def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): +def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None, + **kargs): """Setup a homeserver suitable for running tests against. Keyword arguments are passed to the Homeserver constructor. If no datastore is supplied a datastore backed by an in-memory sqlite db will be given to the HS. """ + if reactor is None: + from twisted.internet import reactor + if config is None: config = Mock() config.signing_key = [MockKey()] @@ -110,6 +114,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), + reactor=reactor, **kargs ) db_conn = hs.get_db_conn() -- cgit 1.4.1 From 0495fe00350a3610cbfd9b0097ff1a8a1c31f5f4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jun 2018 10:42:28 +0100 Subject: Indirect evt_count updates via method call so that we can stub it for the sentinel and not have a billion failing UTs --- synapse/storage/events_worker.py | 2 +- synapse/util/logcontext.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index cf4efa9d12..3a634842ca 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -147,7 +147,7 @@ class EventsWorkerStore(SQLBaseStore): if missing_events_ids: log_ctx = LoggingContext.current_context() - log_ctx.evt_db_fetch_count += len(missing_events_ids) + log_ctx.record_event_fetch(len(missing_events_ids)) missing_events = yield self._enqueue_events( missing_events_ids, diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 2bfd4bee96..df2b71b791 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -91,6 +91,9 @@ class LoggingContext(object): def add_database_scheduled(self, sched_sec): pass + def record_event_fetch(self, event_count): + pass + def __nonzero__(self): return False __bool__ = __nonzero__ # python3 @@ -247,6 +250,14 @@ class LoggingContext(object): """ self.db_sched_duration_sec += sched_sec + def record_event_fetch(self, event_count): + """Record a number of events being fetched from the db + + Args: + event_count (int): number of events being fetched + """ + self.evt_db_fetch_count += event_count + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each -- cgit 1.4.1 From 43e02c409d163700a293ae67015584699d557c3c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 11 Jun 2018 23:13:06 +0100 Subject: Disable partial state group caching for wildcard lookups When _get_state_for_groups is given a wildcard filter, just do a complete lookup. Hopefully this will give us the best of both worlds by not filling up the ram if we only need one or two keys, but also making the cache still work for the federation reader usecase. --- synapse/storage/state.py | 56 +++++++++++++++++++++++++-------- synapse/util/caches/dictionary_cache.py | 25 +++++++-------- tests/util/test_dict_cache.py | 12 +++---- 3 files changed, 61 insertions(+), 32 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 986a20400c..cd9821c270 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -526,10 +526,23 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_for_groups(self, groups, types=None): - """Given list of groups returns dict of group -> list of state events - with matching types. `types` is a list of `(type, state_key)`, where - a `state_key` of None matches all state_keys. If `types` is None then - all events are returned. + """Gets the state at each of a list of state groups, optionally + filtering by type/state_key + + Args: + groups (iterable[int]): list of state groups for which we want + to get the state. + types (None|iterable[(str, None|str)]): + indicates the state type/keys required. If None, the whole + state is fetched and returned. + + Otherwise, each entry should be a `(type, state_key)` tuple to + include in the response. A `state_key` of None is a wildcard + meaning that we require all state with that type. + + Returns: + Deferred[dict[int, dict[(type, state_key), EventBase]]] + a dictionary mapping from state group to state dictionary. """ if types: types = frozenset(types) @@ -538,7 +551,7 @@ class StateGroupWorkerStore(SQLBaseStore): if types is not None: for group in set(groups): state_dict_ids, _, got_all = self._get_some_state_from_cache( - group, types + group, types, ) results[group] = state_dict_ids @@ -559,22 +572,40 @@ class StateGroupWorkerStore(SQLBaseStore): # Okay, so we have some missing_types, lets fetch them. cache_seq_num = self._state_group_cache.sequence + # the DictionaryCache knows if it has *all* the state, but + # does not know if it has all of the keys of a particular type, + # which makes wildcard lookups expensive unless we have a complete + # cache. Hence, if we are doing a wildcard lookup, populate the + # cache fully so that we can do an efficient lookup next time. + + if types and any(k is None for (t, k) in types): + types_to_fetch = None + else: + types_to_fetch = types + group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types + missing_groups, types_to_fetch, ) - # Now we want to update the cache with all the things we fetched - # from the database. for group, group_state_dict in iteritems(group_to_state_dict): state_dict = results[group] - state_dict.update(group_state_dict) + # update the result, filtering by `types`. + if types: + for k, v in iteritems(group_state_dict): + (typ, _) = k + if k in types or (typ, None) in types: + state_dict[k] = v + else: + state_dict.update(group_state_dict) + + # update the cache with all the things we fetched from the + # database. self._state_group_cache.update( cache_seq_num, key=group, - value=state_dict, - full=(types is None), - known_absent=types, + value=group_state_dict, + fetched_keys=types_to_fetch, ) defer.returnValue(results) @@ -681,7 +712,6 @@ class StateGroupWorkerStore(SQLBaseStore): self._state_group_cache.sequence, key=state_group, value=dict(current_state_ids), - full=True, ) return state_group diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index bdc21e348f..95793d466d 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -107,29 +107,28 @@ class DictionaryCache(object): self.sequence += 1 self.cache.clear() - def update(self, sequence, key, value, full=False, known_absent=None): + def update(self, sequence, key, value, fetched_keys=None): """Updates the entry in the cache Args: sequence - key - value (dict): The value to update the cache with. - full (bool): Whether the given value is the full dict, or just a - partial subset there of. If not full then any existing entries - for the key will be updated. - known_absent (set): Set of keys that we know don't exist in the full - dict. + key (K) + value (dict[X,Y]): The value to update the cache with. + fetched_keys (None|set[X]): All of the dictionary keys which were + fetched from the database. + + If None, this is the complete value for key K. Otherwise, it + is used to infer a list of keys which we know don't exist in + the full dict. """ self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) - if known_absent is None: - known_absent = set() - if full: - self._insert(key, value, known_absent) + if fetched_keys is None: + self._insert(key, value, set()) else: - self._update_or_insert(key, value, known_absent) + self._update_or_insert(key, value, fetched_keys) def _update_or_insert(self, key, value, known_absent): # We pop and reinsert as we need to tell the cache the size may have diff --git a/tests/util/test_dict_cache.py b/tests/util/test_dict_cache.py index bc92f85fa6..543ac5bed9 100644 --- a/tests/util/test_dict_cache.py +++ b/tests/util/test_dict_cache.py @@ -32,7 +32,7 @@ class DictCacheTestCase(unittest.TestCase): seq = self.cache.sequence test_value = {"test": "test_simple_cache_hit_full"} - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key) self.assertEqual(test_value, c.value) @@ -44,7 +44,7 @@ class DictCacheTestCase(unittest.TestCase): test_value = { "test": "test_simple_cache_hit_partial" } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test"]) self.assertEqual(test_value, c.value) @@ -56,7 +56,7 @@ class DictCacheTestCase(unittest.TestCase): test_value = { "test": "test_simple_cache_miss_partial" } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test2"]) self.assertEqual({}, c.value) @@ -70,7 +70,7 @@ class DictCacheTestCase(unittest.TestCase): "test2": "test_simple_cache_hit_miss_partial2", "test3": "test_simple_cache_hit_miss_partial3", } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test2"]) self.assertEqual({"test2": "test_simple_cache_hit_miss_partial2"}, c.value) @@ -82,13 +82,13 @@ class DictCacheTestCase(unittest.TestCase): test_value_1 = { "test": "test_simple_cache_hit_miss_partial", } - self.cache.update(seq, key, test_value_1, full=False) + self.cache.update(seq, key, test_value_1, fetched_keys=set("test")) seq = self.cache.sequence test_value_2 = { "test2": "test_simple_cache_hit_miss_partial2", } - self.cache.update(seq, key, test_value_2, full=False) + self.cache.update(seq, key, test_value_2, fetched_keys=set("test2")) c = self.cache.get(key) self.assertEqual( -- cgit 1.4.1 From 75dc3ddeab645c280319581d252c078168348470 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Feb 2018 13:37:56 +0000 Subject: Make push actions rotation configurable --- synapse/storage/event_push_actions.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index c4a0208ce4..8cb24b7d59 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -83,6 +83,8 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.find_stream_orderings_looping_call = self._clock.looping_call( self._find_stream_orderings_for_times, 10 * 60 * 1000 ) + self._rotate_delay = 3 + self._rotate_count = 10000 @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( @@ -799,7 +801,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) if caught_up: break - yield self.hs.get_clock().sleep(5) + yield self.hs.get_clock().sleep(self._rotate_delay) finally: self._doing_notif_rotation = False @@ -820,8 +822,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore): txn.execute(""" SELECT stream_ordering FROM event_push_actions WHERE stream_ordering > ? - ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 - """, (old_rotate_stream_ordering,)) + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, (old_rotate_stream_ordering, self._rotate_count)) stream_row = txn.fetchone() if stream_row: offset_stream_ordering, = stream_row -- cgit 1.4.1 From f8272813a928e40ce692d05a6d25d5d15922e4f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Feb 2018 13:53:55 +0000 Subject: Make _get_joined_hosts_cache cache non-iterable --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 48a88f755e..829cc4a207 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -455,7 +455,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): defer.returnValue(joined_hosts) - @cached(max_entries=10000, iterable=True) + @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) -- cgit 1.4.1 From 4b4cec3989517e736b7054eabd5f548a5a32a33d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 17:21:18 +0100 Subject: Add some logging to search queries --- synapse/handlers/search.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 1eca26aa1e..df3022fdba 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -64,6 +64,13 @@ class SearchHandler(BaseHandler): except Exception: raise SynapseError(400, "Invalid batch") + logger.info( + "Search batch properties: %r, %r, %r", + batch_group, batch_group_key, batch_token, + ) + + logger.info("Search content: %s", content) + try: room_cat = content["search_categories"]["room_events"] @@ -271,6 +278,8 @@ class SearchHandler(BaseHandler): # We should never get here due to the guard earlier. raise NotImplementedError() + logger.info("Found %d events to return", len(allowed_events)) + # If client has asked for "context" for each event (i.e. some surrounding # events and state), fetch that if event_context is not None: -- cgit 1.4.1 From 28ddc6cfbec665db33dd689806f61010095c13ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 17:26:46 +0100 Subject: Also log number of events for serach context --- synapse/handlers/search.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index df3022fdba..2e3a77ca4b 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -291,6 +291,11 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) + logger.info( + "Context for search returned %d and %d events", + len(res["events_before"]), len(res["events_after"]), + ) + res["events_before"] = yield filter_events_for_client( self.store, user.to_string(), res["events_before"] ) -- cgit 1.4.1 From 9850f66abe97ecaa54ece3223bd415cf0f581d4a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Dec 2016 16:54:03 +0000 Subject: Deleting from event_push_actions needs to use an index --- synapse/storage/events.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index cb1082e864..7d0e59538a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1044,7 +1044,6 @@ class EventsStore(EventsWorkerStore): "event_edge_hashes", "event_edges", "event_forward_extremities", - "event_push_actions", "event_reference_hashes", "event_search", "event_signatures", @@ -1064,6 +1063,14 @@ class EventsStore(EventsWorkerStore): [(ev.event_id,) for ev, _ in events_and_contexts] ) + for table in ( + "event_push_actions", + ): + txn.executemany( + "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + def _store_event_txn(self, txn, events_and_contexts): """Insert new events into the event and event_json tables -- cgit 1.4.1 From 1d009013b3c3e814177afc59f066e02a202b21cd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jun 2018 16:35:10 +0100 Subject: Revert "Merge pull request #3431 from matrix-org/rav/erasure_visibility" This reverts commit ce0d911156b355c5bf452120bfb08653dad96497, reversing changes made to b4a5d767a94f1680d07edfd583aae54ce422573e. --- synapse/handlers/deactivate_account.py | 7 +- synapse/handlers/federation.py | 13 --- synapse/rest/client/v1/admin.py | 4 +- synapse/rest/client/v2_alpha/account.py | 13 +-- synapse/storage/__init__.py | 2 - synapse/storage/schema/delta/50/erasure_store.sql | 21 ----- synapse/storage/user_erasure_store.py | 103 ---------------------- synapse/visibility.py | 77 +++++----------- 8 files changed, 26 insertions(+), 214 deletions(-) delete mode 100644 synapse/storage/schema/delta/50/erasure_store.sql delete mode 100644 synapse/storage/user_erasure_store.py (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 404b662469..8ec5ba2012 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ class DeactivateAccountHandler(BaseHandler): reactor.callWhenRunning(self._start_user_parting) @defer.inlineCallbacks - def deactivate_account(self, user_id, erase_data): + def deactivate_account(self, user_id): """Deactivate a user's account Args: @@ -92,11 +92,6 @@ class DeactivateAccountHandler(BaseHandler): # delete from user directory yield self.user_directory_handler.handle_user_deactivated(user_id) - # Mark the user as erased, if they asked for that - if erase_data: - logger.info("Marking %s as erased", user_id) - yield self.store.mark_user_erased(user_id) - # Now start the process that goes through that list and # parts users from rooms (if it isn't already running) self._start_user_parting() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1ca56c2c97..2571758284 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -495,20 +495,7 @@ class FederationHandler(BaseHandler): for e_id, key_to_eid in event_to_state_ids.iteritems() } - erased_senders = yield self.store.are_users_erased( - e.sender for e in events, - ) - def redact_disallowed(event, state): - # if the sender has been gdpr17ed, always return a redacted - # copy of the event. - if erased_senders[event.sender]: - logger.info( - "Sender of %s has been erased, redacting", - event.event_id, - ) - return prune_event(event) - if not state: return event diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index ddaedb2a8c..b8665a45eb 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -254,9 +254,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self._deactivate_account_handler.deactivate_account( - target_user_id, False, - ) + yield self._deactivate_account_handler.deactivate_account(target_user_id) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 80dbc3c92e..e1281cfbb6 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd -# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +15,6 @@ # limitations under the License. import logging -from six.moves import http_client from twisted.internet import defer from synapse.api.auth import has_access_token @@ -188,20 +186,13 @@ class DeactivateAccountRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): body = parse_json_object_from_request(request) - erase = body.get("erase", False) - if not isinstance(erase, bool): - raise SynapseError( - http_client.BAD_REQUEST, - "Param 'erase' must be a boolean, if given", - Codes.BAD_JSON, - ) requester = yield self.auth.get_user_by_req(request) # allow ASes to dectivate their own users if requester.app_service: yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), erase, + requester.user.to_string() ) defer.returnValue((200, {})) @@ -209,7 +200,7 @@ class DeactivateAccountRestServlet(RestServlet): requester, body, self.hs.get_ip_from_request(request), ) yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), erase, + requester.user.to_string(), ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e843b702b9..979fa22438 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,7 +20,6 @@ import time import logging from synapse.storage.devices import DeviceStore -from synapse.storage.user_erasure_store import UserErasureStore from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) @@ -89,7 +88,6 @@ class DataStore(RoomMemberStore, RoomStore, DeviceInboxStore, UserDirectoryStore, GroupServerStore, - UserErasureStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql deleted file mode 100644 index 5d8641a9ab..0000000000 --- a/synapse/storage/schema/delta/50/erasure_store.sql +++ /dev/null @@ -1,21 +0,0 @@ -/* Copyright 2018 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- a table of users who have requested that their details be erased -CREATE TABLE erased_users ( - user_id TEXT NOT NULL -); - -CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id); diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py deleted file mode 100644 index 47bfc01e84..0000000000 --- a/synapse/storage/user_erasure_store.py +++ /dev/null @@ -1,103 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import operator - -from twisted.internet import defer - -from synapse.storage._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedList, cached - - -class UserErasureWorkerStore(SQLBaseStore): - @cached() - def is_user_erased(self, user_id): - """ - Check if the given user id has requested erasure - - Args: - user_id (str): full user id to check - - Returns: - Deferred[bool]: True if the user has requested erasure - """ - return self._simple_select_onecol( - table="erased_users", - keyvalues={"user_id": user_id}, - retcol="1", - desc="is_user_erased", - ).addCallback(operator.truth) - - @cachedList( - cached_method_name="is_user_erased", - list_name="user_ids", - inlineCallbacks=True, - ) - def are_users_erased(self, user_ids): - """ - Checks which users in a list have requested erasure - - Args: - user_ids (iterable[str]): full user id to check - - Returns: - Deferred[dict[str, bool]]: - for each user, whether the user has requested erasure. - """ - # this serves the dual purpose of (a) making sure we can do len and - # iterate it multiple times, and (b) avoiding duplicates. - user_ids = tuple(set(user_ids)) - - def _get_erased_users(txn): - txn.execute( - "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % ( - ",".join("?" * len(user_ids)) - ), - user_ids, - ) - return set(r[0] for r in txn) - - erased_users = yield self.runInteraction( - "are_users_erased", _get_erased_users, - ) - res = dict((u, u in erased_users) for u in user_ids) - defer.returnValue(res) - - -class UserErasureStore(UserErasureWorkerStore): - def mark_user_erased(self, user_id): - """Indicate that user_id wishes their message history to be erased. - - Args: - user_id (str): full user_id to be erased - """ - def f(txn): - # first check if they are already in the list - txn.execute( - "SELECT 1 FROM erased_users WHERE user_id = ?", - (user_id, ) - ) - if txn.fetchone(): - return - - # they are not already there: do the insert. - txn.execute( - "INSERT INTO erased_users (user_id) VALUES (?)", - (user_id, ) - ) - - self._invalidate_cache_and_stream( - txn, self.is_user_erased, (user_id,) - ) - return self.runInteraction("mark_user_erased", f) diff --git a/synapse/visibility.py b/synapse/visibility.py index 65d79cf0d0..aef4953c1d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,17 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging -import operator from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership -from synapse.events.utils import prune_event -from synapse.util.logcontext import ( - make_deferred_yieldable, preserve_fn, -) +from synapse.api.constants import Membership, EventTypes + +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn + logger = logging.getLogger(__name__) @@ -97,27 +95,16 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if ignore_dict_content else [] ) - erased_senders = yield store.are_users_erased((e.sender for e in events)) - def allowed(event): """ Args: event (synapse.events.EventBase): event to check - - Returns: - None|EventBase: - None if the user cannot see this event at all - - a redacted copy of the event if they can only see a redacted - version - - the original event if they can see it as normal. """ if not event.is_state() and event.sender in ignore_list: - return None + return False if event.event_id in always_include_ids: - return event + return True state = event_id_to_state[event.event_id] @@ -131,6 +118,10 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if visibility not in VISIBILITY_PRIORITY: visibility = "shared" + # if it was world_readable, it's easy: everyone can read it + if visibility == "world_readable": + return True + # Always allow history visibility events on boundaries. This is done # by setting the effective visibility to the least restrictive # of the old vs new. @@ -164,7 +155,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership == "leave" and ( prev_membership == "join" or prev_membership == "invite" ): - return event + return True new_priority = MEMBERSHIP_PRIORITY.index(membership) old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) @@ -175,55 +166,31 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership is None: membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - # XXX why do we do this? - # https://github.com/matrix-org/synapse/issues/3350 if membership_event.event_id not in event_id_forgotten: membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. if membership == Membership.JOIN: - return event - - # otherwise, it depends on the room visibility. + return True if visibility == "joined": # we weren't a member at the time of the event, so we can't # see this event. - return None + return False elif visibility == "invited": # user can also see the event if they were *invited* at the time # of the event. - return ( - event if membership == Membership.INVITE else None - ) - - elif visibility == "shared" and is_peeking: - # if the visibility is shared, users cannot see the event unless - # they have *subequently* joined the room (or were members at the - # time, of course) + return membership == Membership.INVITE + + else: + # visibility is shared: user can also see the event if they have + # become a member since the event # # XXX: if the user has subsequently joined and then left again, # ideally we would share history up to the point they left. But - # we don't know when they left. We just treat it as though they - # never joined, and restrict access. - return None - - # the visibility is either shared or world_readable, and the user was - # not a member at the time. We allow it, provided the original sender - # has not requested their data to be erased, in which case, we return - # a redacted version. - if erased_senders[event.sender]: - return prune_event(event) - - return event - - # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) - - # remove the None entries - filtered_events = filter(operator.truth, filtered_events) + # we don't know when they left. + return not is_peeking - # we turn it into a list before returning it. - defer.returnValue(list(filtered_events)) + defer.returnValue(list(filter(allowed, events))) -- cgit 1.4.1 From ec1e799e177743edee1f613c2118cf1618a67929 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Sun, 24 Jun 2018 12:17:01 -0600 Subject: Don't print invalid access tokens in the logs Tokens shouldn't be appearing the logs, valid or invalid. Signed-off-by: Travis Ralston --- synapse/api/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 66639b0089..54186695cd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -488,7 +488,7 @@ class Auth(object): def _look_up_user_by_access_token(self, token): ret = yield self.store.get_user_by_access_token(token) if not ret: - logger.warn("Unrecognised access token - not in store: %s" % (token,)) + logger.warn("Unrecognised access token - not in store.") raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.", errcode=Codes.UNKNOWN_TOKEN @@ -511,7 +511,7 @@ class Auth(object): ) service = self.store.get_app_service_by_token(token) if not service: - logger.warn("Unrecognised appservice access token: %s" % (token,)) + logger.warn("Unrecognised appservice access token.") raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.", -- cgit 1.4.1 From 244484bf3c1a8c75186ad58296d6554f18bd400b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 13:42:55 +0100 Subject: Revert "Revert "Merge pull request #3431 from matrix-org/rav/erasure_visibility"" This reverts commit 1d009013b3c3e814177afc59f066e02a202b21cd. --- synapse/handlers/deactivate_account.py | 7 +- synapse/handlers/federation.py | 13 +++ synapse/rest/client/v1/admin.py | 4 +- synapse/rest/client/v2_alpha/account.py | 13 ++- synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/50/erasure_store.sql | 21 +++++ synapse/storage/user_erasure_store.py | 103 ++++++++++++++++++++++ synapse/visibility.py | 77 +++++++++++----- 8 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 synapse/storage/schema/delta/50/erasure_store.sql create mode 100644 synapse/storage/user_erasure_store.py (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 8ec5ba2012..404b662469 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ class DeactivateAccountHandler(BaseHandler): reactor.callWhenRunning(self._start_user_parting) @defer.inlineCallbacks - def deactivate_account(self, user_id): + def deactivate_account(self, user_id, erase_data): """Deactivate a user's account Args: @@ -92,6 +92,11 @@ class DeactivateAccountHandler(BaseHandler): # delete from user directory yield self.user_directory_handler.handle_user_deactivated(user_id) + # Mark the user as erased, if they asked for that + if erase_data: + logger.info("Marking %s as erased", user_id) + yield self.store.mark_user_erased(user_id) + # Now start the process that goes through that list and # parts users from rooms (if it isn't already running) self._start_user_parting() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2571758284..1ca56c2c97 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -495,7 +495,20 @@ class FederationHandler(BaseHandler): for e_id, key_to_eid in event_to_state_ids.iteritems() } + erased_senders = yield self.store.are_users_erased( + e.sender for e in events, + ) + def redact_disallowed(event, state): + # if the sender has been gdpr17ed, always return a redacted + # copy of the event. + if erased_senders[event.sender]: + logger.info( + "Sender of %s has been erased, redacting", + event.event_id, + ) + return prune_event(event) + if not state: return event diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index b8665a45eb..ddaedb2a8c 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -254,7 +254,9 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self._deactivate_account_handler.deactivate_account(target_user_id) + yield self._deactivate_account_handler.deactivate_account( + target_user_id, False, + ) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index e1281cfbb6..80dbc3c92e 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ # limitations under the License. import logging +from six.moves import http_client from twisted.internet import defer from synapse.api.auth import has_access_token @@ -186,13 +188,20 @@ class DeactivateAccountRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): body = parse_json_object_from_request(request) + erase = body.get("erase", False) + if not isinstance(erase, bool): + raise SynapseError( + http_client.BAD_REQUEST, + "Param 'erase' must be a boolean, if given", + Codes.BAD_JSON, + ) requester = yield self.auth.get_user_by_req(request) # allow ASes to dectivate their own users if requester.app_service: yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string() + requester.user.to_string(), erase, ) defer.returnValue((200, {})) @@ -200,7 +209,7 @@ class DeactivateAccountRestServlet(RestServlet): requester, body, self.hs.get_ip_from_request(request), ) yield self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), + requester.user.to_string(), erase, ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 979fa22438..e843b702b9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,7 @@ import time import logging from synapse.storage.devices import DeviceStore +from synapse.storage.user_erasure_store import UserErasureStore from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) @@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore, DeviceInboxStore, UserDirectoryStore, GroupServerStore, + UserErasureStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql new file mode 100644 index 0000000000..5d8641a9ab --- /dev/null +++ b/synapse/storage/schema/delta/50/erasure_store.sql @@ -0,0 +1,21 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a table of users who have requested that their details be erased +CREATE TABLE erased_users ( + user_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id); diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py new file mode 100644 index 0000000000..47bfc01e84 --- /dev/null +++ b/synapse/storage/user_erasure_store.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import operator + +from twisted.internet import defer + +from synapse.storage._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedList, cached + + +class UserErasureWorkerStore(SQLBaseStore): + @cached() + def is_user_erased(self, user_id): + """ + Check if the given user id has requested erasure + + Args: + user_id (str): full user id to check + + Returns: + Deferred[bool]: True if the user has requested erasure + """ + return self._simple_select_onecol( + table="erased_users", + keyvalues={"user_id": user_id}, + retcol="1", + desc="is_user_erased", + ).addCallback(operator.truth) + + @cachedList( + cached_method_name="is_user_erased", + list_name="user_ids", + inlineCallbacks=True, + ) + def are_users_erased(self, user_ids): + """ + Checks which users in a list have requested erasure + + Args: + user_ids (iterable[str]): full user id to check + + Returns: + Deferred[dict[str, bool]]: + for each user, whether the user has requested erasure. + """ + # this serves the dual purpose of (a) making sure we can do len and + # iterate it multiple times, and (b) avoiding duplicates. + user_ids = tuple(set(user_ids)) + + def _get_erased_users(txn): + txn.execute( + "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % ( + ",".join("?" * len(user_ids)) + ), + user_ids, + ) + return set(r[0] for r in txn) + + erased_users = yield self.runInteraction( + "are_users_erased", _get_erased_users, + ) + res = dict((u, u in erased_users) for u in user_ids) + defer.returnValue(res) + + +class UserErasureStore(UserErasureWorkerStore): + def mark_user_erased(self, user_id): + """Indicate that user_id wishes their message history to be erased. + + Args: + user_id (str): full user_id to be erased + """ + def f(txn): + # first check if they are already in the list + txn.execute( + "SELECT 1 FROM erased_users WHERE user_id = ?", + (user_id, ) + ) + if txn.fetchone(): + return + + # they are not already there: do the insert. + txn.execute( + "INSERT INTO erased_users (user_id) VALUES (?)", + (user_id, ) + ) + + self._invalidate_cache_and_stream( + txn, self.is_user_erased, (user_id,) + ) + return self.runInteraction("mark_user_erased", f) diff --git a/synapse/visibility.py b/synapse/visibility.py index aef4953c1d..65d79cf0d0 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,15 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools import logging +import operator from twisted.internet import defer -from synapse.api.constants import Membership, EventTypes - -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn - +from synapse.api.constants import EventTypes, Membership +from synapse.events.utils import prune_event +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, +) logger = logging.getLogger(__name__) @@ -95,16 +97,27 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if ignore_dict_content else [] ) + erased_senders = yield store.are_users_erased((e.sender for e in events)) + def allowed(event): """ Args: event (synapse.events.EventBase): event to check + + Returns: + None|EventBase: + None if the user cannot see this event at all + + a redacted copy of the event if they can only see a redacted + version + + the original event if they can see it as normal. """ if not event.is_state() and event.sender in ignore_list: - return False + return None if event.event_id in always_include_ids: - return True + return event state = event_id_to_state[event.event_id] @@ -118,10 +131,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if visibility not in VISIBILITY_PRIORITY: visibility = "shared" - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - # Always allow history visibility events on boundaries. This is done # by setting the effective visibility to the least restrictive # of the old vs new. @@ -155,7 +164,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership == "leave" and ( prev_membership == "join" or prev_membership == "invite" ): - return True + return event new_priority = MEMBERSHIP_PRIORITY.index(membership) old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) @@ -166,31 +175,55 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership is None: membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: + # XXX why do we do this? + # https://github.com/matrix-org/synapse/issues/3350 if membership_event.event_id not in event_id_forgotten: membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. if membership == Membership.JOIN: - return True + return event + + # otherwise, it depends on the room visibility. if visibility == "joined": # we weren't a member at the time of the event, so we can't # see this event. - return False + return None elif visibility == "invited": # user can also see the event if they were *invited* at the time # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event + return ( + event if membership == Membership.INVITE else None + ) + + elif visibility == "shared" and is_peeking: + # if the visibility is shared, users cannot see the event unless + # they have *subequently* joined the room (or were members at the + # time, of course) # # XXX: if the user has subsequently joined and then left again, # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking + # we don't know when they left. We just treat it as though they + # never joined, and restrict access. + return None + + # the visibility is either shared or world_readable, and the user was + # not a member at the time. We allow it, provided the original sender + # has not requested their data to be erased, in which case, we return + # a redacted version. + if erased_senders[event.sender]: + return prune_event(event) + + return event + + # check each event: gives an iterable[None|EventBase] + filtered_events = itertools.imap(allowed, events) + + # remove the None entries + filtered_events = filter(operator.truth, filtered_events) - defer.returnValue(list(filter(allowed, events))) + # we turn it into a list before returning it. + defer.returnValue(list(filtered_events)) -- cgit 1.4.1 From 07cad26d65dcf237643a43ff2bdfac73ca7a7257 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 25 Jun 2018 14:08:28 +0100 Subject: Remove all global reactor imports & pass it around explicitly (#3424) --- synapse/handlers/deactivate_account.py | 4 ++-- synapse/handlers/message.py | 4 ++-- synapse/handlers/presence.py | 4 ++-- synapse/http/endpoint.py | 16 +++++++++------- synapse/push/emailpusher.py | 4 ++-- synapse/push/httppusher.py | 6 ++++-- synapse/replication/tcp/client.py | 6 +++--- synapse/replication/tcp/resource.py | 4 ++-- synapse/util/__init__.py | 3 +++ 9 files changed, 29 insertions(+), 22 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 8ec5ba2012..6116842764 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer from ._base import BaseHandler from synapse.types import UserID, create_requester @@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - reactor.callWhenRunning(self._start_user_parting) + hs.get_reactor().callWhenRunning(self._start_user_parting) @defer.inlineCallbacks def deactivate_account(self, user_id): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7b9946ab91..a812117dea 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,7 +20,7 @@ import sys from canonicaljson import encode_canonical_json import six from six import string_types, itervalues, iteritems -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.defer import succeed from twisted.python.failure import Failure @@ -157,7 +157,7 @@ class MessageHandler(BaseHandler): # remove the purge from the list 24 hours after it completes def clear_purge(): del self._purges_by_id[purge_id] - reactor.callLater(24 * 3600, clear_purge) + self.hs.get_reactor().callLater(24 * 3600, clear_purge) def get_purge_status(self, purge_id): """Get the current status of an active purge diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7fe568132f..7db59fba00 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -22,7 +22,7 @@ The methods that define policy are: - should_notify """ -from twisted.internet import defer, reactor +from twisted.internet import defer from contextlib import contextmanager from six import itervalues, iteritems @@ -179,7 +179,7 @@ class PresenceHandler(object): # have not yet been persisted self.unpersisted_users_changes = set() - reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown) self.serial_to_user = {} self._next_serial = 1 diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 87a482650d..928c1c7407 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -78,17 +78,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, else: return _WrappingEndpointFac(transport_endpoint( reactor, domain, port, **endpoint_kw_args - )) + ), reactor) class _WrappingEndpointFac(object): - def __init__(self, endpoint_fac): + def __init__(self, endpoint_fac, reactor): self.endpoint_fac = endpoint_fac + self.reactor = reactor @defer.inlineCallbacks def connect(self, protocolFactory): conn = yield self.endpoint_fac.connect(protocolFactory) - conn = _WrappedConnection(conn) + conn = _WrappedConnection(conn, self.reactor) defer.returnValue(conn) @@ -98,9 +99,10 @@ class _WrappedConnection(object): """ __slots__ = ["conn", "last_request"] - def __init__(self, conn): + def __init__(self, conn, reactor): object.__setattr__(self, "conn", conn) object.__setattr__(self, "last_request", time.time()) + self._reactor = reactor def __getattr__(self, name): return getattr(self.conn, name) @@ -131,14 +133,14 @@ class _WrappedConnection(object): # Time this connection out if we haven't send a request in the last # N minutes # TODO: Cancel the previous callLater? - reactor.callLater(3 * 60, self._time_things_out_maybe) + self._reactor.callLater(3 * 60, self._time_things_out_maybe) d = self.conn.request(request) def update_request_time(res): self.last_request = time.time() # TODO: Cancel the previous callLater? - reactor.callLater(3 * 60, self._time_things_out_maybe) + self._reactor.callLater(3 * 60, self._time_things_out_maybe) return res d.addCallback(update_request_time) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index ba7286cb72..52d4f087ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled import logging @@ -199,7 +199,7 @@ class EmailPusher(object): self.timed_call = None if soonest_due_at is not None: - self.timed_call = reactor.callLater( + self.timed_call = self.hs.get_reactor().callLater( self.seconds_until(soonest_due_at), self.on_timer ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index bf7ff74a1a..7a481b5a1e 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled from . import push_rule_evaluator @@ -220,7 +220,9 @@ class HttpPusher(object): ) else: logger.info("Push failed: delaying for %ds", self.backoff_delay) - self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) + self.timed_call = self.hs.get_reactor().callLater( + self.backoff_delay, self.on_timer + ) self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 6d2513c4e2..bb852b00af 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -15,7 +15,7 @@ """A replication client for use by synapse workers. """ -from twisted.internet import reactor, defer +from twisted.internet import defer from twisted.internet.protocol import ReconnectingClientFactory from .commands import ( @@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): self.server_name = hs.config.server_name self._clock = hs.get_clock() # As self.clock is defined in super class - reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying) + hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying) def startedConnecting(self, connector): logger.info("Connecting to replication: %r", connector.getDestination()) @@ -95,7 +95,7 @@ class ReplicationClientHandler(object): factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - reactor.connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 63bd6d2652..95ad8c1b4c 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -15,7 +15,7 @@ """The server side of the replication stream. """ -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.protocol import Factory from .streams import STREAMS_MAP, FederationStream @@ -109,7 +109,7 @@ class ReplicationStreamer(object): self.is_looping = False self.pending_updates = False - reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown) + hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown) def on_shutdown(self): # close all connections on shutdown diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 2a3df7c71d..e9886ef299 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -34,6 +34,9 @@ def unwrapFirstError(failure): class Clock(object): """ A Clock wraps a Twisted reactor and provides utilities on top of it. + + Args: + reactor: The Twisted reactor to use. """ _reactor = attr.ib() -- cgit 1.4.1 From eb50c44eafea415f7e01faffaff116171fc5eeb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 14:22:24 +0100 Subject: Add UserErasureWorkerStore to workers --- synapse/replication/slave/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b1f64ef0d8..97d3196633 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamWorkerStore from synapse.storage.signatures import SignatureWorkerStore +from synapse.storage.user_erasure_store import UserErasureWorkerStore from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker @@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore, EventsWorkerStore, StateGroupWorkerStore, SignatureWorkerStore, + UserErasureWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): -- cgit 1.4.1 From 947fea67cb9481c958a545ae7a8e7291511f8465 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 15:22:57 +0100 Subject: Need to pass reactor to endpoint fac --- synapse/http/endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 928c1c7407..80da870584 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -74,7 +74,7 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args - )) + ), reactor) else: return _WrappingEndpointFac(transport_endpoint( reactor, domain, port, **endpoint_kw_args -- cgit 1.4.1 From 6eb861b67f4f4720eae991235935b00fab370e3a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 20 Jun 2018 16:27:18 +0100 Subject: typo --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 51ec727df0..7f486e48e5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. - "device_lists", # List of user_ids whose devices have chanegd + "device_lists", # List of user_ids whose devices have changed "device_one_time_keys_count", # Dict of algorithm to count for one time keys # for this device "groups", -- cgit 1.4.1 From 784189b1f4ebfbba4358889dd5a13512f96027b8 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 25 Jun 2018 17:36:52 +0100 Subject: typos --- synapse/handlers/e2e_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 8a2d177539..4d7bf5defd 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -80,7 +80,7 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids - # Firt get local devices. + # First get local devices. failures = {} results = {} if local_query: -- cgit 1.4.1 From 0269367f18267a3ff3c323a580c8c2b9e5c377b3 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 25 Jun 2018 17:56:10 +0100 Subject: allow non-consented users to still part rooms (to let us autopart them) --- synapse/handlers/message.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a812117dea..8467284758 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -491,7 +491,7 @@ class EventCreationHandler(object): target, e ) - is_exempt = yield self._is_exempt_from_privacy_policy(builder) + is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester) if not is_exempt: yield self.assert_accepted_privacy_policy(requester) @@ -509,12 +509,13 @@ class EventCreationHandler(object): defer.returnValue((event, context)) - def _is_exempt_from_privacy_policy(self, builder): + def _is_exempt_from_privacy_policy(self, builder, requester): """"Determine if an event to be sent is exempt from having to consent to the privacy policy Args: builder (synapse.events.builder.EventBuilder): event being created + requester (Requster): user requesting this event Returns: Deferred[bool]: true if the event can be sent without the user @@ -525,6 +526,9 @@ class EventCreationHandler(object): membership = builder.content.get("membership", None) if membership == Membership.JOIN: return self._is_server_notices_room(builder.room_id) + elif membership == Membership.LEAVE: + # the user is always allowed to leave (but not kick people) + return builder.state_key == requester.user.to_string() return succeed(False) @defer.inlineCallbacks -- cgit 1.4.1 From ea7a9c0483d547014404d3266446bb1b26c1076a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 19:49:13 +0100 Subject: Add fast path to _filter_events_for_server Most rooms have a trivial history visibility like "shared" or "world_readable", especially large rooms, so lets not bother getting the full membership of those rooms in that case. --- synapse/handlers/federation.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1ca56c2c97..123cdb5fab 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -460,11 +460,44 @@ class FederationHandler(BaseHandler): @measure_func("_filter_events_for_server") @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): + # First lets check to see if all the events have a history visibility + # of "shared" or "world_readable". If thats the case then we don't + # need to check membership (as we know the server is in the room). event_to_state_ids = yield self.store.get_state_ids_for_events( frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), + ) + ) + + visibility_ids = set() + for sids in event_to_state_ids.itervalues(): + hist = event_to_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist: + visibility_ids.add(hist) + + # If we failed to find any history visibility events then the default + # is "shared" visiblity. + if not visibility_ids: + defer.returnValue(events) + + events = yield self.store.get_events(visibility_ids) + all_open = all( + e.content.get("history_visibility") in (None, "shared", "world_readable") + for e in events + ) + + if all_open: + defer.returnValue(events) + + # Ok, so we're dealing with events that have non-trivial visibility + # rules, so we need to also get the memberships of the room. + + event_to_state_ids = yield self.store.get_state_ids_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None) ) ) -- cgit 1.4.1 From 7bdc5c8fa37de6256ec2f41a5c683c75ac52f9ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 19:56:02 +0100 Subject: Fix bug with assuming wrong type --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 123cdb5fab..aab69550a0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -481,10 +481,10 @@ class FederationHandler(BaseHandler): if not visibility_ids: defer.returnValue(events) - events = yield self.store.get_events(visibility_ids) + event_map = yield self.store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in events + for e in events.itervalues() ) if all_open: -- cgit 1.4.1 From a0e8a53c6d0ff65629aa8645d43df06558ff88af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 19:57:38 +0100 Subject: Comment --- synapse/handlers/federation.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index aab69550a0..c4ffdf887e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -460,6 +460,14 @@ class FederationHandler(BaseHandler): @measure_func("_filter_events_for_server") @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): + """Filter the given events for the given server, redacting those the + server can't see. + + Assumes the server is currently in the room. + + Returns + list[FrozenEvent] + """ # First lets check to see if all the events have a history visibility # of "shared" or "world_readable". If thats the case then we don't # need to check membership (as we know the server is in the room). -- cgit 1.4.1 From df48f7ef37f9236a13293ef73d17c6192c7b0834 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jun 2018 20:03:41 +0100 Subject: Actually fix it --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c4ffdf887e..b6f8d4cf82 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -480,7 +480,7 @@ class FederationHandler(BaseHandler): visibility_ids = set() for sids in event_to_state_ids.itervalues(): - hist = event_to_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -492,7 +492,7 @@ class FederationHandler(BaseHandler): event_map = yield self.store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in events.itervalues() + for e in event_map.itervalues() ) if all_open: @@ -505,7 +505,7 @@ class FederationHandler(BaseHandler): frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None) + (EventTypes.Member, None), ) ) -- cgit 1.4.1 From 1e788db430344c897b185de831b385a81c23d899 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jun 2018 10:26:54 +0100 Subject: add GDPR erase param to deactivate API --- synapse/handlers/deactivate_account.py | 1 + synapse/rest/client/v1/admin.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index a18d95397c..a84b7b8b80 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -47,6 +47,7 @@ class DeactivateAccountHandler(BaseHandler): Args: user_id (str): ID of user to be deactivated + erase_data (bool): whether to GDPR-erase the user's data Returns: Deferred diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index ddaedb2a8c..3f231e6b29 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -16,6 +16,8 @@ from twisted.internet import defer +from six.moves import http_client + from synapse.api.constants import Membership from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError from synapse.types import UserID, create_requester @@ -247,6 +249,15 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): + body = parse_json_object_from_request(request) + erase = body.get("erase", False) + if not isinstance(erase, bool): + raise SynapseError( + http_client.BAD_REQUEST, + "Param 'erase' must be a boolean, if given", + Codes.BAD_JSON, + ) + UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) is_admin = yield self.auth.is_server_admin(requester.user) @@ -255,7 +266,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): raise AuthError(403, "You are not a server admin") yield self._deactivate_account_handler.deactivate_account( - target_user_id, False, + target_user_id, erase, ) defer.returnValue((200, {})) -- cgit 1.4.1 From 9570aa82ebf0d8dc01c8094df232ce16e683c905 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jun 2018 10:42:50 +0100 Subject: update doc for deactivate API --- docs/admin_api/user_admin_api.rst | 17 +++++++++++++++-- synapse/rest/client/v1/admin.py | 2 +- 2 files changed, 16 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index 1c9c5a6bde..d17121a188 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -44,13 +44,26 @@ Deactivate Account This API deactivates an account. It removes active access tokens, resets the password, and deletes third-party IDs (to prevent the user requesting a -password reset). +password reset). It can also mark the user as GDPR-erased (stopping their data +from distributed further, and deleting it entirely if there are no other +references to it). The api is:: POST /_matrix/client/r0/admin/deactivate/ -including an ``access_token`` of a server admin, and an empty request body. +with a body of: + +.. code:: json + + { + "erase": true + } + +including an ``access_token`` of a server admin. + +The erase parameter is optional and defaults to 'false'. +An empty body may be passed for backwards compatibility. Reset password diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 3f231e6b29..8fb08dc526 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -249,7 +249,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): - body = parse_json_object_from_request(request) + body = parse_json_object_from_request(request, allow_empty_body=True) erase = body.get("erase", False) if not isinstance(erase, bool): raise SynapseError( -- cgit 1.4.1 From 028490afd4c6a9dd9d05586db8510e6bc848703b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Jun 2018 10:52:52 +0100 Subject: Fix error on deleting users pending deactivation Use simple_delete instead of simple_delete_one as commented --- synapse/storage/registration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9c9cf46e7f..0d18f6d869 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -623,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore, Removes the given user to the table of users who need to be parted from all the rooms they're in, effectively marking that user as fully deactivated. """ - return self._simple_delete_one( + # XXX: This should be simple_delete_one but we failed to put a unique index on + # the table, so somehow duplicate entries have ended up in it. + return self._simple_delete( "users_pending_deactivation", keyvalues={ "user_id": user_id, -- cgit 1.4.1 From abfe4b2957645194c59da359779a4d77dcc6f493 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 26 Jun 2018 17:25:34 +0100 Subject: try and make loading items from the cache faster --- synapse/util/caches/stream_change_cache.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 817118e30f..c81518c6bc 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -78,7 +78,8 @@ class StreamChangeCache(object): not_known_entities = set(entities) - set(self._entity_to_key) result = ( - set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) + {self._cache[k] for k in in self._cache.islice( + start=self._cache.bisect_right(stream_pos))} .intersection(entities) .union(not_known_entities) ) @@ -113,7 +114,8 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return self._cache.values()[self._cache.bisect_right(stream_pos) :] + return {self._cache[k] for k in in self._cache.islice( + start=self._cache.bisect_right(stream_pos))} else: return None -- cgit 1.4.1 From bd3d329c88a59fa1adff8354d7e7105e721e2e9e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 26 Jun 2018 17:28:12 +0100 Subject: fixes --- changelog.d/3447.misc | 0 synapse/util/caches/stream_change_cache.py | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 changelog.d/3447.misc (limited to 'synapse') diff --git a/changelog.d/3447.misc b/changelog.d/3447.misc new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c81518c6bc..b19feb3b11 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -78,7 +78,7 @@ class StreamChangeCache(object): not_known_entities = set(entities) - set(self._entity_to_key) result = ( - {self._cache[k] for k in in self._cache.islice( + {self._cache[k] for k in self._cache.islice( start=self._cache.bisect_right(stream_pos))} .intersection(entities) .union(not_known_entities) @@ -114,7 +114,7 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return {self._cache[k] for k in in self._cache.islice( + return {self._cache[k] for k in self._cache.islice( start=self._cache.bisect_right(stream_pos))} else: return None -- cgit 1.4.1 From 1202508067d392cbc71e1c9e0b4d9eca3494143a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 26 Jun 2018 17:29:01 +0100 Subject: fixes --- synapse/util/caches/stream_change_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b19feb3b11..0fb8620001 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -114,8 +114,8 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return {self._cache[k] for k in self._cache.islice( - start=self._cache.bisect_right(stream_pos))} + return [self._cache[k] for k in self._cache.islice( + start=self._cache.bisect_right(stream_pos))] else: return None -- cgit 1.4.1 From 8057489b260a19cdd36b2f4ac5b587565ca7aac0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jun 2018 18:09:01 +0100 Subject: Revert "Try to not use as much CPU in the StreamChangeCache" --- changelog.d/3447.misc | 0 synapse/util/caches/stream_change_cache.py | 6 ++---- 2 files changed, 2 insertions(+), 4 deletions(-) delete mode 100644 changelog.d/3447.misc (limited to 'synapse') diff --git a/changelog.d/3447.misc b/changelog.d/3447.misc deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 0fb8620001..817118e30f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -78,8 +78,7 @@ class StreamChangeCache(object): not_known_entities = set(entities) - set(self._entity_to_key) result = ( - {self._cache[k] for k in self._cache.islice( - start=self._cache.bisect_right(stream_pos))} + set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) .intersection(entities) .union(not_known_entities) ) @@ -114,8 +113,7 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return [self._cache[k] for k in self._cache.islice( - start=self._cache.bisect_right(stream_pos))] + return self._cache.values()[self._cache.bisect_right(stream_pos) :] else: return None -- cgit 1.4.1 From 77078d6c8ef11e2401406edc1ca340e0d7779267 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 27 Jun 2018 11:27:32 +0100 Subject: handle federation not telling us about prev_events --- synapse/federation/federation_server.py | 4 +- synapse/handlers/federation.py | 87 ++++++++---- tests/test_federation.py | 235 ++++++++++++++++++++++++++++++++ tests/unittest.py | 2 +- 4 files changed, 301 insertions(+), 27 deletions(-) create mode 100644 tests/test_federation.py (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d4dd967c60..4096093527 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -549,7 +549,9 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) + yield self.handler.on_receive_pdu( + origin, pdu, get_missing=True, sent_to_us_directly=True, + ) def __str__(self): return "" % self.server_name diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b6f8d4cf82..250a5509d8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,7 @@ from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) +from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.events.utils import prune_event @@ -89,7 +90,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_receive_pdu(self, origin, pdu, get_missing=True): + def on_receive_pdu( + self, origin, pdu, get_missing=True, sent_to_us_directly=False, + ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -163,7 +166,7 @@ class FederationHandler(BaseHandler): "Ignoring PDU %s for room %s from %s as we've left the room!", pdu.event_id, pdu.room_id, origin, ) - return + defer.returnValue(None) state = None @@ -225,26 +228,54 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + if sent_to_us_directly and prevs - seen: + # If they have sent it to us directly, and the server + # isn't telling us about the auth events that it's + # made a message referencing, we explode + raise FederationError( + "ERROR", + 403, + ("Your server isn't divulging details about prev_events " + "referenced in this event."), + affected=pdu.event_id, ) - fetch_state = True + elif prevs - seen: + # If we're walking back up the chain to fetch it, then + # try and find the states. If we can't get the states, + # discard it. + state_groups = [] + auth_chains = set() + try: + # Get the ones we know about + ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + state_groups.append(ours) - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.replication_layer.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except Exception: - logger.exception("Failed to get state for event: %s", pdu.event_id) + for p in prevs - seen: + state, auth_chain = yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, p + ) + auth_chains.update(auth_chain) + state_group = { + (x.type, x.state_key): x.event_id for x in state + } + state_groups.append(state_group) + + def fetch(ev_ids): + return self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + + state = yield resolve_events_with_factory(state_groups, {pdu.event_id: pdu}, fetch) + + state = yield self.store.get_events(state.values()) + state = state.values() + except Exception: + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=pdu.event_id, + ) yield self._process_received_pdu( origin, @@ -322,11 +353,17 @@ class FederationHandler(BaseHandler): for e in missing_events: logger.info("Handling found event %s", e.event_id) - yield self.on_receive_pdu( - origin, - e, - get_missing=False - ) + try: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + except FederationError as e: + if e.code == 403: + logger.warn("Event %s failed history check.") + else: + raise @log_function @defer.inlineCallbacks diff --git a/tests/test_federation.py b/tests/test_federation.py new file mode 100644 index 0000000000..12f4633cd5 --- /dev/null +++ b/tests/test_federation.py @@ -0,0 +1,235 @@ + +from twisted.internet.defer import Deferred, succeed, maybeDeferred + +from synapse.util import Clock +from synapse.events import FrozenEvent +from synapse.types import Requester, UserID +from synapse.replication.slave.storage.events import SlavedEventStore + +from tests import unittest +from tests.server import make_request, setup_test_homeserver, ThreadedMemoryReactorClock + +from mock import Mock + +from synapse.api.errors import CodeMessageException, HttpResponseException + + +class MessageAcceptTests(unittest.TestCase): + def setUp(self): + + self.http_client = Mock() + self.reactor = ThreadedMemoryReactorClock() + self.hs_clock = Clock(self.reactor) + self.homeserver = setup_test_homeserver( + http_client=self.http_client, clock=self.hs_clock, reactor=self.reactor + ) + + user_id = UserID("us", "test") + our_user = Requester(user_id, None, False, None, None) + room_creator = self.homeserver.get_room_creation_handler() + room = room_creator.create_room( + our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False + ) + self.reactor.advance(0.1) + self.room_id = self.successResultOf(room)["room_id"] + + # Figure out what the most recent event is + most_recent = self.successResultOf( + self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + )[0] + + join_event = FrozenEvent( + { + "room_id": self.room_id, + "sender": "@baduser:test.serv", + "state_key": "@baduser:test.serv", + "event_id": "$join:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.member", + "origin": "test.servx", + "content": {"membership": "join"}, + "auth_events": [], + "prev_state": [(most_recent, {})], + "prev_events": [(most_recent, {})], + } + ) + + self.handler = self.homeserver.get_handlers().federation_handler + self.handler.do_auth = lambda *a, **b: succeed(True) + self.client = self.homeserver.get_federation_client() + self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed( + pdus + ) + + # Send the join, it should return None (which is not an error) + d = self.handler.on_receive_pdu( + "test.serv", join_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + self.assertEqual(self.successResultOf(d), None) + + # Make sure we actually joined the room + self.assertEqual( + self.successResultOf( + self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + )[0], + "$join:test.serv", + ) + + def test_cant_hide_direct_ancestors(self): + """ + If you send a message, you must be able to provide the direct + prev_events that said event references. + """ + + def post_json(destination, path, data, headers=None, timeout=0): + # If it asks us for new missing events, give them NOTHING + if path.startswith("/_matrix/federation/v1/get_missing_events/"): + return {"events": []} + + self.http_client.post_json = post_json + + # Figure out what the most recent event is + most_recent = self.successResultOf( + self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + )[0] + + # Now lie about an event + lying_event = FrozenEvent( + { + "room_id": self.room_id, + "sender": "@baduser:test.serv", + "event_id": "one:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.message", + "origin": "test.serv", + "content": "hewwo?", + "auth_events": [], + "prev_events": [("two:test.serv", {}), (most_recent, {})], + } + ) + + d = self.handler.on_receive_pdu( + "test.serv", lying_event, sent_to_us_directly=True + ) + + # Step the reactor, so the database fetches come back + self.reactor.advance(1) + + # on_receive_pdu should throw an error + failure = self.failureResultOf(d) + self.assertEqual( + failure.value.args[0], + ( + "ERROR 403: Your server isn't divulging details about prev_events " + "referenced in this event." + ), + ) + + # Make sure the invalid event isn't there + extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") + + @unittest.DEBUG + def test_cant_hide_past_history(self): + """ + If you send a message, you must be able to provide the direct + prev_events that said event references. + """ + + def post_json(destination, path, data, headers=None, timeout=0): + if path.startswith("/_matrix/federation/v1/get_missing_events/"): + return { + "events": [ + { + "room_id": self.room_id, + "sender": "@baduser:test.serv", + "event_id": "three:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.message", + "origin": "test.serv", + "content": "hewwo?", + "auth_events": [], + "prev_events": [("four:test.serv", {})], + } + ] + } + + self.http_client.post_json = post_json + + def get_json(destination, path, args, headers=None): + print(destination, path, args) + if path.startswith("/_matrix/federation/v1/state_ids/"): + d = self.successResultOf( + self.homeserver.datastore.get_state_ids_for_event("one:test.serv") + ) + + return succeed( + { + "pdu_ids": [ + y + for x, y in d.items() + if x == ("m.room.member", "@us:test") + ], + "auth_chain_ids": d.values(), + } + ) + + self.http_client.get_json = get_json + + # Figure out what the most recent event is + most_recent = self.successResultOf( + self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + )[0] + + # Make a good event + good_event = FrozenEvent( + { + "room_id": self.room_id, + "sender": "@baduser:test.serv", + "event_id": "one:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.message", + "origin": "test.serv", + "content": "hewwo?", + "auth_events": [], + "prev_events": [(most_recent, {})], + } + ) + + d = self.handler.on_receive_pdu( + "test.serv", good_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + self.assertEqual(self.successResultOf(d), None) + + bad_event = FrozenEvent( + { + "room_id": self.room_id, + "sender": "@baduser:test.serv", + "event_id": "two:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.message", + "origin": "test.serv", + "content": "hewwo?", + "auth_events": [], + "prev_events": [("one:test.serv", {}), ("three:test.serv", {})], + } + ) + + d = self.handler.on_receive_pdu( + "test.serv", bad_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + + extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id) + self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv") + + state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id) + self.reactor.advance(1) + self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys()) diff --git a/tests/unittest.py b/tests/unittest.py index 184fe880f3..de24b1d2d4 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -35,7 +35,7 @@ class ToTwistedHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) log_level = record.levelname.lower().replace('warning', 'warn') - self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry) + self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry.replace("{", r"(").replace("}", r")")) handler = ToTwistedHandler() -- cgit 1.4.1 From a7ecf34b70b215f1402ce545e1f6a7c163f8ac28 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 27 Jun 2018 11:36:03 +0100 Subject: cleanups --- synapse/handlers/federation.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 250a5509d8..e9c5d1026a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -240,35 +240,37 @@ class FederationHandler(BaseHandler): affected=pdu.event_id, ) elif prevs - seen: - # If we're walking back up the chain to fetch it, then - # try and find the states. If we can't get the states, - # discard it. + # Calculate the state of the previous events, and + # de-conflict them to find the current state. state_groups = [] auth_chains = set() try: - # Get the ones we know about + # Get the state of the events we know about ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) state_groups.append(ours) + # Ask the remote server for the states we don't + # know about for p in prevs - seen: - state, auth_chain = yield self.replication_layer.get_state_for_room( + state, got_auth_chain = yield self.replication_layer.get_state_for_room( origin, pdu.room_id, p ) - auth_chains.update(auth_chain) - state_group = { - (x.type, x.state_key): x.event_id for x in state - } + auth_chains.update(got_auth_chain) + state_group = {(x.type, x.state_key): x.event_id for x in state} state_groups.append(state_group) + # Resolve any conflicting state def fetch(ev_ids): return self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False, + ev_ids, get_prev_content=False, check_redacted=False ) - state = yield resolve_events_with_factory(state_groups, {pdu.event_id: pdu}, fetch) + state_map = yield resolve_events_with_factory( + state_groups, {pdu.event_id: pdu}, fetch + ) - state = yield self.store.get_events(state.values()) - state = state.values() + state = (yield self.store.get_events(state_map.values())).values() + auth_chain = list(auth_chains) except Exception: raise FederationError( "ERROR", -- cgit 1.4.1 From f03a5d1a1759221bd94d75604f3e4e787cd4133e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 27 Jun 2018 11:38:14 +0100 Subject: pep8 --- synapse/handlers/federation.py | 10 ++++------ tests/test_federation.py | 7 ++----- 2 files changed, 6 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e9c5d1026a..c4d96749a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -169,11 +169,8 @@ class FederationHandler(BaseHandler): defer.returnValue(None) state = None - auth_chain = [] - fetch_state = False - # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -252,9 +249,10 @@ class FederationHandler(BaseHandler): # Ask the remote server for the states we don't # know about for p in prevs - seen: - state, got_auth_chain = yield self.replication_layer.get_state_for_room( - origin, pdu.room_id, p - ) + state, got_auth_chain = ( + yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, p + )) auth_chains.update(got_auth_chain) state_group = {(x.type, x.state_key): x.event_id for x in state} state_groups.append(state_group) diff --git a/tests/test_federation.py b/tests/test_federation.py index 95fa73723c..fc80a69369 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -1,18 +1,15 @@ -from twisted.internet.defer import Deferred, succeed, maybeDeferred +from twisted.internet.defer import succeed, maybeDeferred from synapse.util import Clock from synapse.events import FrozenEvent from synapse.types import Requester, UserID -from synapse.replication.slave.storage.events import SlavedEventStore from tests import unittest -from tests.server import make_request, setup_test_homeserver, ThreadedMemoryReactorClock +from tests.server import setup_test_homeserver, ThreadedMemoryReactorClock from mock import Mock -from synapse.api.errors import CodeMessageException, HttpResponseException - class MessageAcceptTests(unittest.TestCase): def setUp(self): -- cgit 1.4.1 From 99800de63ddec41125c6e1c779c33f96f1cad848 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 27 Jun 2018 11:40:27 +0100 Subject: try and clean up --- synapse/handlers/federation.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c4d96749a3..13117d70fe 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -232,8 +232,10 @@ class FederationHandler(BaseHandler): raise FederationError( "ERROR", 403, - ("Your server isn't divulging details about prev_events " - "referenced in this event."), + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), affected=pdu.event_id, ) elif prevs - seen: @@ -252,7 +254,8 @@ class FederationHandler(BaseHandler): state, got_auth_chain = ( yield self.replication_layer.get_state_for_room( origin, pdu.room_id, p - )) + ) + ) auth_chains.update(got_auth_chain) state_group = {(x.type, x.state_key): x.event_id for x in state} state_groups.append(state_group) -- cgit 1.4.1 From 72d2143ea8fe43464b1db59936acef53293b1b2a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 28 Jun 2018 11:04:18 +0100 Subject: Revert "Revert "Try to not use as much CPU in the StreamChangeCache"" (#3454) --- changelog.d/3447.misc | 0 synapse/util/caches/stream_change_cache.py | 6 ++++-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 changelog.d/3447.misc (limited to 'synapse') diff --git a/changelog.d/3447.misc b/changelog.d/3447.misc new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 817118e30f..0fb8620001 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -78,7 +78,8 @@ class StreamChangeCache(object): not_known_entities = set(entities) - set(self._entity_to_key) result = ( - set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) + {self._cache[k] for k in self._cache.islice( + start=self._cache.bisect_right(stream_pos))} .intersection(entities) .union(not_known_entities) ) @@ -113,7 +114,8 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return self._cache.values()[self._cache.bisect_right(stream_pos) :] + return [self._cache[k] for k in self._cache.islice( + start=self._cache.bisect_right(stream_pos))] else: return None -- cgit 1.4.1 From 6350bf925e8651f2fae70a1e7eb7182e9161c34a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 28 Jun 2018 14:49:57 +0100 Subject: Attempt to be more performant on PyPy (#3462) --- changelog.d/3462.feature | 1 + synapse/api/errors.py | 3 ++- synapse/api/filtering.py | 3 ++- synapse/crypto/keyclient.py | 2 +- synapse/federation/federation_server.py | 2 +- synapse/handlers/auth.py | 5 +++-- synapse/handlers/e2e_keys.py | 5 ++--- synapse/handlers/identity.py | 2 +- synapse/handlers/message.py | 5 ++--- synapse/http/client.py | 2 +- synapse/http/matrixfederationclient.py | 3 +-- synapse/http/server.py | 5 ++--- synapse/http/servlet.py | 6 ++++-- synapse/metrics/__init__.py | 3 ++- synapse/replication/tcp/commands.py | 16 ++++++++++------ synapse/rest/client/v1/login.py | 3 ++- synapse/rest/client/v1/room.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/rest/media/v0/content_repository.py | 3 ++- synapse/rest/media/v1/preview_url_resource.py | 3 ++- synapse/storage/account_data.py | 3 ++- synapse/storage/appservice.py | 2 +- synapse/storage/background_updates.py | 3 ++- synapse/storage/deviceinbox.py | 13 +++++++------ synapse/storage/devices.py | 3 ++- synapse/storage/end_to_end_keys.py | 3 +-- synapse/storage/event_push_actions.py | 3 ++- synapse/storage/events.py | 3 ++- synapse/storage/events_worker.py | 3 ++- synapse/storage/filtering.py | 3 +-- synapse/storage/group_server.py | 2 +- synapse/storage/push_rule.py | 3 ++- synapse/storage/pusher.py | 3 +-- synapse/storage/receipts.py | 3 ++- synapse/storage/room.py | 3 ++- synapse/storage/roommember.py | 2 +- synapse/storage/search.py | 2 +- synapse/storage/tags.py | 3 ++- synapse/storage/transactions.py | 3 +-- synapse/util/frozenutils.py | 2 +- 40 files changed, 79 insertions(+), 62 deletions(-) create mode 100644 changelog.d/3462.feature (limited to 'synapse') diff --git a/changelog.d/3462.feature b/changelog.d/3462.feature new file mode 100644 index 0000000000..305dbbeddd --- /dev/null +++ b/changelog.d/3462.feature @@ -0,0 +1 @@ +Synapse now uses the best performing JSON encoder/decoder according to your runtime (simplejson on CPython, stdlib json on PyPy). \ No newline at end of file diff --git a/synapse/api/errors.py b/synapse/api/errors.py index e6ad3768f0..227a0713b2 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,7 +17,8 @@ import logging -import simplejson as json +from canonicaljson import json + from six import iteritems from six.moves import http_client diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index dbc0e7e445..aae25e7a47 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -17,7 +17,8 @@ from synapse.storage.presence import UserPresenceState from synapse.types import UserID, RoomID from twisted.internet import defer -import simplejson as json +from canonicaljson import json + import jsonschema from jsonschema import FormatChecker diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index f1fd488b90..2a0eddbea1 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -18,7 +18,7 @@ from twisted.web.http import HTTPClient from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_federation_endpoint -import simplejson as json +from canonicaljson import json import logging diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d4dd967c60..a00420a24b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -import simplejson as json +from canonicaljson import json from twisted.internet import defer from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a131b7f73f..cbef1f2770 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -16,6 +16,8 @@ from twisted.internet import defer, threads +from canonicaljson import json + from ._base import BaseHandler from synapse.api.constants import LoginType from synapse.api.errors import ( @@ -32,7 +34,6 @@ from twisted.web.client import PartialDownloadError import logging import bcrypt import pymacaroons -import simplejson import attr import synapse.util.stringutils as stringutils @@ -403,7 +404,7 @@ class AuthHandler(BaseHandler): except PartialDownloadError as pde: # Twisted is silly data = pde.response - resp_body = simplejson.loads(data) + resp_body = json.loads(data) if 'success' in resp_body: # Note that we do NOT check the hostname here: we explicitly diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 4d7bf5defd..62b4892a4e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import simplejson as json import logging -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json from twisted.internet import defer from six import iteritems @@ -357,7 +356,7 @@ def _exception_to_failure(e): # include ConnectionRefused and other errors # # Note that some Exceptions (notably twisted's ResponseFailed etc) don't - # give a string for e.message, which simplejson then fails to serialize. + # give a string for e.message, which json then fails to serialize. return { "status": 503, "message": str(e.message), } diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index f00dfe1d3e..277c2b7760 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -19,7 +19,7 @@ import logging -import simplejson as json +from canonicaljson import json from twisted.internet import defer diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8467284758..cbadf3c88e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson import sys -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json import six from six import string_types, itervalues, iteritems from twisted.internet import defer @@ -797,7 +796,7 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: dump = frozendict_json_encoder.encode(event.content) - simplejson.loads(dump) + json.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise diff --git a/synapse/http/client.py b/synapse/http/client.py index 46ffb41de1..5bdc484c15 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -42,7 +42,7 @@ from twisted.web._newclient import ResponseDone from six import StringIO from prometheus_client import Counter -import simplejson as json +from canonicaljson import json import logging import urllib diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4e0399e762..2cb9e3e231 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,7 @@ from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json from synapse.api.errors import ( SynapseError, Codes, HttpResponseException, FederationDeniedError, @@ -36,7 +36,6 @@ from synapse.api.errors import ( from signedjson.sign import sign_json import cgi -import simplejson as json import logging import random import sys diff --git a/synapse/http/server.py b/synapse/http/server.py index bc09b8b2be..517aaf7b5a 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -29,7 +29,7 @@ import synapse.metrics import synapse.events from canonicaljson import ( - encode_canonical_json, encode_pretty_printed_json + encode_canonical_json, encode_pretty_printed_json, json ) from twisted.internet import defer @@ -41,7 +41,6 @@ from twisted.web.util import redirectTo import collections import logging import urllib -import simplejson logger = logging.getLogger(__name__) @@ -410,7 +409,7 @@ def respond_with_json(request, code, json_object, send_cors=False, if canonical_json or synapse.events.USE_FROZEN_DICTS: json_bytes = encode_canonical_json(json_object) else: - json_bytes = simplejson.dumps(json_object) + json_bytes = json.dumps(json_object) return respond_with_json_bytes( request, code, json_bytes, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index ef8e62901b..ef3a01ddc7 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -18,7 +18,9 @@ from synapse.api.errors import SynapseError, Codes import logging -import simplejson + +from canonicaljson import json + logger = logging.getLogger(__name__) @@ -171,7 +173,7 @@ def parse_json_value_from_request(request, allow_empty_body=False): return None try: - content = simplejson.loads(content_bytes) + content = json.loads(content_bytes) except Exception as e: logger.warn("Unable to parse JSON: %s", e) raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 7d6e0232ed..2d2397caae 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -147,7 +147,8 @@ class GCCounts(object): yield cm -REGISTRY.register(GCCounts()) +if not running_on_pypy: + REGISTRY.register(GCCounts()) # # Twisted reactor metrics diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 12aac3cc6b..f3908df642 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -19,13 +19,17 @@ allowed to be sent by which side. """ import logging -import simplejson +import platform +if platform.python_implementation() == "PyPy": + import json + _json_encoder = json.JSONEncoder() +else: + import simplejson as json + _json_encoder = json.JSONEncoder(namedtuple_as_object=False) logger = logging.getLogger(__name__) -_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False) - class Command(object): """The base command class. @@ -102,7 +106,7 @@ class RdataCommand(Command): return cls( stream_name, None if token == "batch" else int(token), - simplejson.loads(row_json) + json.loads(row_json) ) def to_line(self): @@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command): def from_line(cls, line): cache_func, keys_json = line.split(" ", 1) - return cls(cache_func, simplejson.loads(keys_json)) + return cls(cache_func, json.loads(keys_json)) def to_line(self): return " ".join(( @@ -329,7 +333,7 @@ class UserIpCommand(Command): def from_line(cls, line): user_id, jsn = line.split(" ", 1) - access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn) + access_token, ip, user_agent, device_id, last_seen = json.loads(jsn) return cls( user_id, access_token, ip, user_agent, device_id, last_seen diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 34df5be4e9..88ca5184cd 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -23,7 +23,8 @@ from synapse.util.msisdn import phone_number_to_msisdn from .base import ClientV1RestServlet, client_path_patterns -import simplejson as json +from canonicaljson import json + import urllib from six.moves.urllib import parse as urlparse diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0b984987ed..e6ae5db79b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -31,7 +31,7 @@ from synapse.http.servlet import ( from six.moves.urllib import parse as urlparse import logging -import simplejson as json +from canonicaljson import json logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a291cffbf1..d2aa47b326 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit import itertools import logging -import simplejson as json +from canonicaljson import json logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index 956bd5da75..e44d4276d2 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -22,8 +22,9 @@ from synapse.api.errors import ( from twisted.protocols.basic import FileSender from twisted.web import server, resource +from canonicaljson import json + import base64 -import simplejson as json import logging import os import re diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 565cef2b8d..adca490640 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -23,7 +23,8 @@ import re import shutil import sys import traceback -import simplejson as json + +from canonicaljson import json from six.moves import urllib_parse as urlparse from six import string_types diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 284ec3c970..7034a61399 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -22,8 +22,9 @@ from synapse.storage.util.id_generators import StreamIdGenerator from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from canonicaljson import json + import abc -import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 12ea8a158c..4d32d0bdf6 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -15,8 +15,8 @@ # limitations under the License. import logging import re -import simplejson as json from twisted.internet import defer +from canonicaljson import json from synapse.appservice import AppServiceTransaction from synapse.config.appservice import load_appservices diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b7e9c716c8..af18964510 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -18,7 +18,8 @@ from . import engines from twisted.internet import defer -import simplejson as json +from canonicaljson import json + import logging logger = logging.getLogger(__name__) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index a879e5bfc1..38addbf9c0 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -14,7 +14,8 @@ # limitations under the License. import logging -import simplejson + +from canonicaljson import json from twisted.internet import defer @@ -85,7 +86,7 @@ class DeviceInboxStore(BackgroundUpdateStore): ) rows = [] for destination, edu in remote_messages_by_destination.items(): - edu_json = simplejson.dumps(edu) + edu_json = json.dumps(edu) rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) @@ -177,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore): " WHERE user_id = ?" ) txn.execute(sql, (user_id,)) - message_json = simplejson.dumps(messages_by_device["*"]) + message_json = json.dumps(messages_by_device["*"]) for row in txn: # Add the message for all devices for this user on this # server. @@ -199,7 +200,7 @@ class DeviceInboxStore(BackgroundUpdateStore): # Only insert into the local inbox if the device exists on # this server device = row[0] - message_json = simplejson.dumps(messages_by_device[device]) + message_json = json.dumps(messages_by_device[device]) messages_json_for_user[device] = message_json if messages_json_for_user: @@ -253,7 +254,7 @@ class DeviceInboxStore(BackgroundUpdateStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(simplejson.loads(row[1])) + messages.append(json.loads(row[1])) if len(messages) < limit: stream_pos = current_stream_id return (messages, stream_pos) @@ -389,7 +390,7 @@ class DeviceInboxStore(BackgroundUpdateStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(simplejson.loads(row[1])) + messages.append(json.loads(row[1])) if len(messages) < limit: stream_pos = current_stream_id return (messages, stream_pos) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d149d8392e..2ed9ada783 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson as json from twisted.internet import defer @@ -21,6 +20,8 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from canonicaljson import json + from six import itervalues, iteritems logger = logging.getLogger(__name__) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index b146487943..181047c8b7 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -16,8 +16,7 @@ from twisted.internet import defer from synapse.util.caches.descriptors import cached -from canonicaljson import encode_canonical_json -import simplejson as json +from canonicaljson import encode_canonical_json, json from ._base import SQLBaseStore diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 8cb24b7d59..05cb3f61ce 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -19,7 +19,8 @@ from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks import logging -import simplejson as json + +from canonicaljson import json from six import iteritems diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7d0e59538a..d816d4883c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,8 @@ from functools import wraps import itertools import logging -import simplejson as json +from canonicaljson import json + from twisted.internet import defer from synapse.storage.events_worker import EventsWorkerStore diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index f6a6e46b43..896225aab9 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -29,7 +29,8 @@ from synapse.api.errors import SynapseError from collections import namedtuple import logging -import simplejson as json + +from canonicaljson import json # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 2e2763126d..eae6027cee 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -19,8 +19,7 @@ from ._base import SQLBaseStore from synapse.api.errors import SynapseError, Codes from synapse.util.caches.descriptors import cachedInlineCallbacks -from canonicaljson import encode_canonical_json -import simplejson as json +from canonicaljson import encode_canonical_json, json class FilteringStore(SQLBaseStore): diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index da05ccb027..b77402d295 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError from ._base import SQLBaseStore -import simplejson as json +from canonicaljson import json # The category ID for the "default" category. We don't store as null in the diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 04a0b59a39..9e52e992b3 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -25,9 +25,10 @@ from synapse.push.baserules import list_with_base_rules from synapse.api.constants import EventTypes from twisted.internet import defer +from canonicaljson import json + import abc import logging -import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 307660b99a..c6def861cf 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -17,12 +17,11 @@ from ._base import SQLBaseStore from twisted.internet import defer -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList import logging -import simplejson as json import types logger = logging.getLogger(__name__) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c93c228f6e..f230a3bab7 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -21,9 +21,10 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer +from canonicaljson import json + import abc import logging -import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ea6a189185..ca0eb187e5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -20,9 +20,10 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.search import SearchStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from canonicaljson import json + import collections import logging -import simplejson as json import re logger = logging.getLogger(__name__) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 829cc4a207..8fc9549a75 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -28,7 +28,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id import logging -import simplejson as json +from canonicaljson import json from six import itervalues, iteritems diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f0fa5d7631..9b77c45318 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -16,7 +16,7 @@ from collections import namedtuple import logging import re -import simplejson as json +from canonicaljson import json from six import string_types diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 6671d3cfca..04d123ed95 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -19,7 +19,8 @@ from synapse.storage.account_data import AccountDataWorkerStore from synapse.util.caches.descriptors import cached from twisted.internet import defer -import simplejson as json +from canonicaljson import json + import logging from six.moves import range diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index e485d19b84..acbc03446e 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -19,12 +19,11 @@ from synapse.util.caches.descriptors import cached from twisted.internet import defer import six -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json from collections import namedtuple import logging -import simplejson as json # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 15f0a7ba9e..535e7d0e7a 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -14,7 +14,7 @@ # limitations under the License. from frozendict import frozendict -import simplejson as json +from canonicaljson import json from six import string_types -- cgit 1.4.1 From f4f1cda928911197332a9a8bffcfb67c2b5b81e4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 28 Jun 2018 20:31:53 +0100 Subject: add ip_range_whitelist parameter to limit where ASes can connect from --- synapse/api/auth.py | 6 ++++++ synapse/appservice/__init__.py | 4 +++- synapse/config/appservice.py | 11 ++++++++++- 3 files changed, 19 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 54186695cd..088b4e8b6d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -19,6 +19,7 @@ from six import itervalues import pymacaroons from twisted.internet import defer +from netaddr import IPAddress import synapse.types from synapse import event_auth @@ -244,6 +245,11 @@ class Auth(object): if app_service is None: defer.returnValue((None, None)) + if app_service.ip_range_whitelist: + ip_address = IPAddress(self.hs.get_ip_from_request(request)) + if ip_address not in app_service.ip_range_whitelist: + defer.returnValue((None, None)) + if "user_id" not in request.args: defer.returnValue((app_service.sender, app_service)) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index d1c598622a..328cbfa284 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -85,7 +85,8 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None, - sender=None, id=None, protocols=None, rate_limited=True): + sender=None, id=None, protocols=None, rate_limited=True, + ip_range_whitelist=None): self.token = token self.url = url self.hs_token = hs_token @@ -93,6 +94,7 @@ class ApplicationService(object): self.server_name = hostname self.namespaces = self._check_namespaces(namespaces) self.id = id + self.ip_range_whitelist = ip_range_whitelist if "|" in self.id: raise Exception("application service ID cannot contain '|' character") diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 277305e184..89c07f202f 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -17,6 +17,8 @@ from ._base import Config, ConfigError from synapse.appservice import ApplicationService from synapse.types import UserID +from netaddr import IPSet + import yaml import logging @@ -154,6 +156,12 @@ def _load_appservice(hostname, as_info, config_filename): " will not receive events or queries.", config_filename, ) + + if as_info.get('ip_range_whitelist'): + ip_range_whitelist = IPSet( + as_info.get('ip_range_whitelist') + ) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -163,5 +171,6 @@ def _load_appservice(hostname, as_info, config_filename): sender=user_id, id=as_info["id"], protocols=protocols, - rate_limited=rate_limited + rate_limited=rate_limited, + ip_range_whitelist=ip_range_whitelist, ) -- cgit 1.4.1 From e72234f6bda33d89dcca07751e34c62b88215e9d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 28 Jun 2018 20:56:07 +0100 Subject: fix tests --- synapse/config/appservice.py | 1 + tests/api/test_auth.py | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 89c07f202f..0c27bb2fa7 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -157,6 +157,7 @@ def _load_appservice(hostname, as_info, config_filename): config_filename, ) + ip_range_whitelist = None if as_info.get('ip_range_whitelist'): ip_range_whitelist = IPSet( as_info.get('ip_range_whitelist') diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 4575dd9834..48bd411e49 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -86,11 +86,15 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_user_by_req_appservice_valid_token(self): - app_service = Mock(token="foobar", url="a_url", sender=self.test_user) + app_service = Mock( + token="foobar", url="a_url", sender=self.test_user, + ip_range_whitelist=None, + ) self.store.get_app_service_by_token = Mock(return_value=app_service) self.store.get_user_by_access_token = Mock(return_value=None) request = Mock(args={}) + request.getClientIP.return_value = "127.0.0.1" request.args["access_token"] = [self.test_token] request.requestHeaders.getRawHeaders = mock_getRawHeaders() requester = yield self.auth.get_user_by_req(request) @@ -119,12 +123,16 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_user_by_req_appservice_valid_token_valid_user_id(self): masquerading_user_id = "@doppelganger:matrix.org" - app_service = Mock(token="foobar", url="a_url", sender=self.test_user) + app_service = Mock( + token="foobar", url="a_url", sender=self.test_user, + ip_range_whitelist=None, + ) app_service.is_interested_in_user = Mock(return_value=True) self.store.get_app_service_by_token = Mock(return_value=app_service) self.store.get_user_by_access_token = Mock(return_value=None) request = Mock(args={}) + request.getClientIP.return_value = "127.0.0.1" request.args["access_token"] = [self.test_token] request.args["user_id"] = [masquerading_user_id] request.requestHeaders.getRawHeaders = mock_getRawHeaders() @@ -133,12 +141,16 @@ class AuthTestCase(unittest.TestCase): def test_get_user_by_req_appservice_valid_token_bad_user_id(self): masquerading_user_id = "@doppelganger:matrix.org" - app_service = Mock(token="foobar", url="a_url", sender=self.test_user) + app_service = Mock( + token="foobar", url="a_url", sender=self.test_user, + ip_range_whitelist=None, + ) app_service.is_interested_in_user = Mock(return_value=False) self.store.get_app_service_by_token = Mock(return_value=app_service) self.store.get_user_by_access_token = Mock(return_value=None) request = Mock(args={}) + request.getClientIP.return_value = "127.0.0.1" request.args["access_token"] = [self.test_token] request.args["user_id"] = [masquerading_user_id] request.requestHeaders.getRawHeaders = mock_getRawHeaders() -- cgit 1.4.1 From f131bf8d3e934bb548615214239cfe131f8e33f5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 1 Jul 2018 05:08:51 +0100 Subject: don't mix unicode strings with utf8-in-byte-strings otherwise we explode with: ``` Traceback (most recent call last): File /usr/lib/python2.7/logging/handlers.py, line 78, in emit logging.FileHandler.emit(self, record) File /usr/lib/python2.7/logging/__init__.py, line 950, in emit StreamHandler.emit(self, record) File /usr/lib/python2.7/logging/__init__.py, line 887, in emit self.handleError(record) File /usr/lib/python2.7/logging/__init__.py, line 810, in handleError None, sys.stderr) File /usr/lib/python2.7/traceback.py, line 124, in print_exception _print(file, 'Traceback (most recent call last):') File /usr/lib/python2.7/traceback.py, line 13, in _print file.write(str+terminator) File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_io.py, line 170, in write self.log.emit(self.level, format=u{log_io}, log_io=line) File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_logger.py, line 144, in emit self.observer(event) File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_observer.py, line 136, in __call__ errorLogger = self._errorLoggerForObserver(brokenObserver) File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_observer.py, line 156, in _errorLoggerForObserver if obs is not observer File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_observer.py, line 81, in __init__ self.log = Logger(observer=self) File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_logger.py, line 64, in __init__ namespace = self._namespaceFromCallingContext() File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/logger/_logger.py, line 42, in _namespaceFromCallingContext return currentframe(2).f_globals[__name__] File /home/matrix/.synapse/local/lib/python2.7/site-packages/twisted/python/compat.py, line 93, in currentframe for x in range(n + 1): RuntimeError: maximum recursion depth exceeded while calling a Python object Logged from file site.py, line 129 File /usr/lib/python2.7/logging/__init__.py, line 859, in emit msg = self.format(record) File /usr/lib/python2.7/logging/__init__.py, line 732, in format return fmt.format(record) File /usr/lib/python2.7/logging/__init__.py, line 471, in format record.message = record.getMessage() File /usr/lib/python2.7/logging/__init__.py, line 335, in getMessage msg = msg % self.args UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 4: ordinal not in range(128) Logged from file site.py, line 129 ``` ...where the logger apparently recurses whilst trying to log the error, hitting the maximum recursion depth and killing everything badly. --- synapse/http/site.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/http/site.py b/synapse/http/site.py index 74a752d6cf..2ab0d8ff78 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -113,7 +113,9 @@ class SynapseRequest(Request): " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, - self.authenticated_entity, + # need to decode as it could be raw utf-8 bytes + # from a IDN servname in an auth header + self.authenticated_entity.decode("utf-8"), end_time - self.start_time, ru_utime, ru_stime, @@ -125,7 +127,12 @@ class SynapseRequest(Request): self.method, self.get_redacted_uri(), self.clientproto, - self.get_user_agent(), + # need to decode as could be raw utf-8 bytes + # from a utf-8 user-agent. + # N.B. if you don't do this, the logger explodes + # with maximum recursion trying to log errors about + # the charset problem. + self.get_user_agent().decode("utf-8"), evt_db_fetch_count, ) -- cgit 1.4.1 From 1c867f5391bff1b060290f834b177f4c339af65d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 1 Jul 2018 11:56:33 +0100 Subject: a fix which doesn't NPE everywhere --- synapse/http/site.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/http/site.py b/synapse/http/site.py index 2ab0d8ff78..14ea9c21c8 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -107,15 +107,28 @@ class SynapseRequest(Request): end_time = time.time() + # need to decode as it could be raw utf-8 bytes + # from a IDN servname in an auth header + authenticated_entity = self.authenticated_entity + if authenticated_entity is not None: + authenticated_entity = authenticated_entity.decode("utf-8") + + # ...or could be raw utf-8 bytes in the User-Agent header. + # N.B. if you don't do this, the logger explodes cryptically + # with maximum recursion trying to log errors about + # the charset problem. + # c.f. https://github.com/matrix-org/synapse/issues/3471 + user_agent = self.get_user_agent() + if user_agent is not None: + user_agent = user_agent.decode("utf-8") + self.site.access_logger.info( "%s - %s - {%s}" " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, - # need to decode as it could be raw utf-8 bytes - # from a IDN servname in an auth header - self.authenticated_entity.decode("utf-8"), + authenticated_entity, end_time - self.start_time, ru_utime, ru_stime, @@ -127,12 +140,7 @@ class SynapseRequest(Request): self.method, self.get_redacted_uri(), self.clientproto, - # need to decode as could be raw utf-8 bytes - # from a utf-8 user-agent. - # N.B. if you don't do this, the logger explodes - # with maximum recursion trying to log errors about - # the charset problem. - self.get_user_agent().decode("utf-8"), + user_agent, evt_db_fetch_count, ) -- cgit 1.4.1 From fc4f8f33be7bcfa8fca5d1aa1f07a1507e8d56e7 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 2 Jul 2018 11:33:02 +0100 Subject: replace invalid utf8 with \ufffd --- synapse/http/site.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/http/site.py b/synapse/http/site.py index 14ea9c21c8..fe93643b1e 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -111,7 +111,7 @@ class SynapseRequest(Request): # from a IDN servname in an auth header authenticated_entity = self.authenticated_entity if authenticated_entity is not None: - authenticated_entity = authenticated_entity.decode("utf-8") + authenticated_entity = authenticated_entity.decode("utf-8", "replace") # ...or could be raw utf-8 bytes in the User-Agent header. # N.B. if you don't do this, the logger explodes cryptically @@ -120,7 +120,7 @@ class SynapseRequest(Request): # c.f. https://github.com/matrix-org/synapse/issues/3471 user_agent = self.get_user_agent() if user_agent is not None: - user_agent = user_agent.decode("utf-8") + user_agent = user_agent.decode("utf-8", "replace") self.site.access_logger.info( "%s - %s - {%s}" -- cgit 1.4.1 From 508196e08a834496daa1bfc5f561e69a430e270c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 3 Jul 2018 14:36:14 +0100 Subject: Reject invalid server names (#3480) Make sure that server_names used in auth headers are sane, and reject them with a sensible error code, before they disappear off into the depths of the system. --- changelog.d/3480.feature | 1 + synapse/federation/transport/server.py | 66 ++++++++++++++++++++++------------ synapse/http/endpoint.py | 34 ++++++++++++++++-- tests/http/__init__.py | 0 tests/http/test_endpoint.py | 46 ++++++++++++++++++++++++ 5 files changed, 122 insertions(+), 25 deletions(-) create mode 100644 changelog.d/3480.feature create mode 100644 tests/http/__init__.py create mode 100644 tests/http/test_endpoint.py (limited to 'synapse') diff --git a/changelog.d/3480.feature b/changelog.d/3480.feature new file mode 100644 index 0000000000..a21580943d --- /dev/null +++ b/changelog.d/3480.feature @@ -0,0 +1 @@ +Reject invalid server names in federation requests diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 19d09f5422..1180d4b69d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError, FederationDeniedError +from synapse.http.endpoint import parse_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, @@ -99,26 +100,6 @@ class Authenticator(object): origin = None - def parse_auth_header(header_str): - try: - params = auth.split(" ")[1].split(",") - param_dict = dict(kv.split("=") for kv in params) - - def strip_quotes(value): - if value.startswith("\""): - return value[1:-1] - else: - return value - - origin = strip_quotes(param_dict["origin"]) - key = strip_quotes(param_dict["key"]) - sig = strip_quotes(param_dict["sig"]) - return (origin, key, sig) - except Exception: - raise AuthenticationError( - 400, "Malformed Authorization header", Codes.UNAUTHORIZED - ) - auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") if not auth_headers: @@ -127,8 +108,8 @@ class Authenticator(object): ) for auth in auth_headers: - if auth.startswith("X-Matrix"): - (origin, key, sig) = parse_auth_header(auth) + if auth.startswith(b"X-Matrix"): + (origin, key, sig) = _parse_auth_header(auth) json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig @@ -165,6 +146,47 @@ class Authenticator(object): logger.exception("Error resetting retry timings on %s", origin) +def _parse_auth_header(header_bytes): + """Parse an X-Matrix auth header + + Args: + header_bytes (bytes): header value + + Returns: + Tuple[str, str, str]: origin, key id, signature. + + Raises: + AuthenticationError if the header could not be parsed + """ + try: + header_str = header_bytes.decode('utf-8') + params = header_str.split(" ")[1].split(",") + param_dict = dict(kv.split("=") for kv in params) + + def strip_quotes(value): + if value.startswith(b"\""): + return value[1:-1] + else: + return value + + origin = strip_quotes(param_dict["origin"]) + # ensure that the origin is a valid server name + parse_server_name(origin) + + key = strip_quotes(param_dict["key"]) + sig = strip_quotes(param_dict["sig"]) + return origin, key, sig + except Exception as e: + logger.warn( + "Error parsing auth header '%s': %s", + header_bytes.decode('ascii', 'replace'), + e, + ) + raise AuthenticationError( + 400, "Malformed Authorization header", Codes.UNAUTHORIZED, + ) + + class BaseFederationServlet(object): REQUIRE_AUTH = True diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 80da870584..5a9cbb3324 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -38,6 +38,36 @@ _Server = collections.namedtuple( ) +def parse_server_name(server_name): + """Split a server name into host/port parts. + + Does some basic sanity checking of the + + Args: + server_name (str): server name to parse + + Returns: + Tuple[str, int|None]: host/port parts. + + Raises: + ValueError if the server name could not be parsed. + """ + try: + if server_name[-1] == ']': + # ipv6 literal, hopefully + if server_name[0] != '[': + raise Exception() + + return server_name, None + + domain_port = server_name.rsplit(":", 1) + domain = domain_port[0] + port = int(domain_port[1]) if domain_port[1:] else None + return domain, port + except Exception: + raise ValueError("Invalid server name '%s'" % server_name) + + def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, timeout=None): """Construct an endpoint for the given matrix destination. @@ -50,9 +80,7 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, timeout (int): connection timeout in seconds """ - domain_port = destination.split(":") - domain = domain_port[0] - port = int(domain_port[1]) if domain_port[1:] else None + domain, port = parse_server_name(destination) endpoint_kw_args = {} diff --git a/tests/http/__init__.py b/tests/http/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/http/test_endpoint.py b/tests/http/test_endpoint.py new file mode 100644 index 0000000000..cd74825c85 --- /dev/null +++ b/tests/http/test_endpoint.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.http.endpoint import parse_server_name +from tests import unittest + + +class ServerNameTestCase(unittest.TestCase): + def test_parse_server_name(self): + test_data = { + 'localhost': ('localhost', None), + 'my-example.com:1234': ('my-example.com', 1234), + '1.2.3.4': ('1.2.3.4', None), + '[0abc:1def::1234]': ('[0abc:1def::1234]', None), + '1.2.3.4:1': ('1.2.3.4', 1), + '[0abc:1def::1234]:8080': ('[0abc:1def::1234]', 8080), + } + + for i, o in test_data.items(): + self.assertEqual(parse_server_name(i), o) + + def test_parse_bad_server_names(self): + test_data = [ + "", # empty + "localhost:http", # non-numeric port + "1234]", # smells like ipv6 literal but isn't + ] + for i in test_data: + try: + parse_server_name(i) + self.fail( + "Expected parse_server_name(\"%s\") to throw" % i, + ) + except ValueError: + pass -- cgit 1.4.1