diff options
-rw-r--r-- | .travis.yml | 22 | ||||
-rw-r--r-- | changelog.d/4671.misc | 1 | ||||
-rw-r--r-- | changelog.d/4677.misc | 1 | ||||
-rw-r--r-- | changelog.d/4682.feature | 1 | ||||
-rw-r--r-- | docs/tcp_replication.rst | 26 | ||||
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/register.py | 200 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 2 | ||||
-rw-r--r-- | synapse/module_api/__init__.py | 2 | ||||
-rw-r--r-- | synapse/replication/http/login.py | 2 | ||||
-rw-r--r-- | synapse/replication/http/membership.py | 4 | ||||
-rw-r--r-- | synapse/replication/http/register.py | 55 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 19 | ||||
-rw-r--r-- | synapse/rest/client/v1/login.py | 4 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/auth.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/register.py | 145 | ||||
-rw-r--r-- | synapse/rest/consent/consent_resource.py | 2 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/storage/_base.py | 58 | ||||
-rw-r--r-- | synapse/storage/events.py | 25 | ||||
-rw-r--r-- | tests/handlers/test_register.py | 2 | ||||
-rw-r--r-- | tox.ini | 29 |
22 files changed, 387 insertions, 222 deletions
diff --git a/.travis.yml b/.travis.yml index f6c91c2621..d88f10324f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,4 @@ -sudo: false +dist: xenial language: python cache: @@ -54,23 +54,23 @@ matrix: python: 3.5 env: TOX_ENV=py35,codecov TRIAL_FLAGS="-j 2" - - name: "py3.6 / sqlite" - python: 3.6 - env: TOX_ENV=py36,codecov TRIAL_FLAGS="-j 2" + - name: "py3.7 / sqlite" + python: 3.7 + env: TOX_ENV=py37,codecov TRIAL_FLAGS="-j 2" - - name: "py3.6 / postgres9.4" - python: 3.6 + - name: "py3.7 / postgres9.4" + python: 3.7 addons: postgresql: "9.4" - env: TOX_ENV=py36-postgres TRIAL_FLAGS="-j 4" + env: TOX_ENV=py37-postgres TRIAL_FLAGS="-j 4" services: - postgresql - - name: "py3.6 / postgres9.5" - python: 3.6 + - name: "py3.7 / postgres9.5" + python: 3.7 addons: postgresql: "9.5" - env: TOX_ENV=py36-postgres,codecov TRIAL_FLAGS="-j 4" + env: TOX_ENV=py37-postgres,codecov TRIAL_FLAGS="-j 4" services: - postgresql @@ -86,7 +86,7 @@ matrix: install: # this just logs the postgres version we will be testing against (if any) - - psql -At -U postgres -c 'select version();' + - psql -At -U postgres -c 'select version();' || true - pip install tox diff --git a/changelog.d/4671.misc b/changelog.d/4671.misc new file mode 100644 index 0000000000..4dc18378e7 --- /dev/null +++ b/changelog.d/4671.misc @@ -0,0 +1 @@ +Improve replication performance by reducing cache invalidation traffic. diff --git a/changelog.d/4677.misc b/changelog.d/4677.misc new file mode 100644 index 0000000000..6f4596be4a --- /dev/null +++ b/changelog.d/4677.misc @@ -0,0 +1 @@ +Run unit tests against python 3.7. diff --git a/changelog.d/4682.feature b/changelog.d/4682.feature new file mode 100644 index 0000000000..b3a3915eb0 --- /dev/null +++ b/changelog.d/4682.feature @@ -0,0 +1 @@ +Allow registration and login to be handled by a worker instance. diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 62225ba6f4..73436cea62 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -137,7 +137,6 @@ for each stream so that on reconneciton it can start streaming from the correct place. Note: not all RDATA have valid tokens due to batching. See ``RdataCommand`` for more details. - Example ~~~~~~~ @@ -221,3 +220,28 @@ SYNC (S, C) See ``synapse/replication/tcp/commands.py`` for a detailed description and the format of each command. + + +Cache Invalidation Stream +~~~~~~~~~~~~~~~~~~~~~~~~~ + +The cache invalidation stream is used to inform workers when they need to +invalidate any of their caches in the data store. This is done by streaming all +cache invalidations done on master down to the workers, assuming that any caches +on the workers also exist on the master. + +Each individual cache invalidation results in a row being sent down replication, +which includes the cache name (the name of the function) and they key to +invalidate. For example:: + + > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251] + +However, there are times when a number of caches need to be invalidated at the +same time with the same key. To reduce traffic we batch those invalidations into +a single poke by defining a special cache name that workers understand to mean +to expand to invalidate the correct caches. + +Currently the special cache names are declared in ``synapse/storage/_base.py`` +and are: + +1. ``cs_cache_fake`` ─ invalidates caches that depend on the current state diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 413425fed1..2dd183018a 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,6 @@ from .admin import AdminHandler from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler -from .register import RegistrationHandler from .search import SearchHandler @@ -41,7 +40,6 @@ class Handlers(object): """ def __init__(self, hs): - self.registration_handler = RegistrationHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f92ab4d525..24a4cb5a83 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -19,6 +19,7 @@ import logging from twisted.internet import defer from synapse import types +from synapse.api.constants import LoginType from synapse.api.errors import ( AuthError, Codes, @@ -26,9 +27,14 @@ from synapse.api.errors import ( RegistrationError, SynapseError, ) +from synapse.config.server import is_threepid_reserved from synapse.http.client import CaptchaServerHttpClient +from synapse.http.servlet import assert_params_in_dict from synapse.replication.http.login import RegisterDeviceReplicationServlet -from synapse.replication.http.register import ReplicationRegisterServlet +from synapse.replication.http.register import ( + ReplicationPostRegisterActionsServlet, + ReplicationRegisterServlet, +) from synapse.types import RoomAlias, RoomID, UserID, create_requester from synapse.util.async_helpers import Linearizer from synapse.util.threepids import check_3pid_allowed @@ -53,6 +59,7 @@ class RegistrationHandler(BaseHandler): self.profile_handler = hs.get_profile_handler() self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) + self.identity_handler = self.hs.get_handlers().identity_handler self._next_generated_user_id = None @@ -68,8 +75,12 @@ class RegistrationHandler(BaseHandler): self._register_device_client = ( RegisterDeviceReplicationServlet.make_client(hs) ) + self._post_registration_client = ( + ReplicationPostRegisterActionsServlet.make_client(hs) + ) else: self.device_handler = hs.get_device_handler() + self.pusher_pool = hs.get_pusherpool() @defer.inlineCallbacks def check_username(self, localpart, guest_access_token=None, @@ -369,8 +380,7 @@ class RegistrationHandler(BaseHandler): logger.info("validating threepidcred sid %s on id server %s", c['sid'], c['idServer']) try: - identity_handler = self.hs.get_handlers().identity_handler - threepid = yield identity_handler.threepid_from_creds(c) + threepid = yield self.identity_handler.threepid_from_creds(c) except Exception: logger.exception("Couldn't validate 3pid") raise RegistrationError(400, "Couldn't validate 3pid") @@ -394,9 +404,8 @@ class RegistrationHandler(BaseHandler): # Now we have a matrix ID, bind it to the threepids we were given for c in threepidCreds: - identity_handler = self.hs.get_handlers().identity_handler # XXX: This should be a deferred list, shouldn't it? - yield identity_handler.bind_threepid(c, user_id) + yield self.identity_handler.bind_threepid(c, user_id) def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): # don't allow people to register the server notices mxid @@ -671,3 +680,184 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue((device_id, access_token)) + + @defer.inlineCallbacks + def post_registration_actions(self, user_id, auth_result, access_token, + bind_email, bind_msisdn): + """A user has completed registration + + Args: + user_id (str): The user ID that consented + auth_result (dict): The authenticated credentials of the newly + registered user. + access_token (str|None): The access token of the newly logged in + device, or None if `inhibit_login` enabled. + bind_email (bool): Whether to bind the email with the identity + server + bind_msisdn (bool): Whether to bind the msisdn with the identity + server + """ + if self.hs.config.worker_app: + yield self._post_registration_client( + user_id=user_id, + auth_result=auth_result, + access_token=access_token, + bind_email=bind_email, + bind_msisdn=bind_msisdn, + ) + return + + if auth_result and LoginType.EMAIL_IDENTITY in auth_result: + threepid = auth_result[LoginType.EMAIL_IDENTITY] + # Necessary due to auth checks prior to the threepid being + # written to the db + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): + yield self.store.upsert_monthly_active_user(user_id) + + yield self._register_email_threepid( + user_id, threepid, access_token, + bind_email, + ) + + if auth_result and LoginType.MSISDN in auth_result: + threepid = auth_result[LoginType.MSISDN] + yield self._register_msisdn_threepid( + user_id, threepid, bind_msisdn, + ) + + if auth_result and LoginType.TERMS in auth_result: + yield self._on_user_consented( + user_id, self.hs.config.user_consent_version, + ) + + @defer.inlineCallbacks + def _on_user_consented(self, user_id, consent_version): + """A user consented to the terms on registration + + Args: + user_id (str): The user ID that consented + consent_version (str): version of the policy the user has + consented to. + """ + logger.info("%s has consented to the privacy policy", user_id) + yield self.store.user_set_consent_version( + user_id, consent_version, + ) + yield self.post_consent_actions(user_id) + + @defer.inlineCallbacks + def _register_email_threepid(self, user_id, threepid, token, bind_email): + """Add an email address as a 3pid identifier + + Also adds an email pusher for the email address, if configured in the + HS config + + Also optionally binds emails to the given user_id on the identity server + + Must be called on master. + + Args: + user_id (str): id of user + threepid (object): m.login.email.identity auth response + token (str|None): access_token for the user, or None if not logged + in. + bind_email (bool): true if the client requested the email to be + bound at the identity server + Returns: + defer.Deferred: + """ + reqd = ('medium', 'address', 'validated_at') + if any(x not in threepid for x in reqd): + # This will only happen if the ID server returns a malformed response + logger.info("Can't add incomplete 3pid") + return + + yield self._auth_handler.add_threepid( + user_id, + threepid['medium'], + threepid['address'], + threepid['validated_at'], + ) + + # And we add an email pusher for them by default, but only + # if email notifications are enabled (so people don't start + # getting mail spam where they weren't before if email + # notifs are set up on a home server) + if (self.hs.config.email_enable_notifs and + self.hs.config.email_notif_for_new_users + and token): + # Pull the ID of the access token back out of the db + # It would really make more sense for this to be passed + # up when the access token is saved, but that's quite an + # invasive change I'd rather do separately. + user_tuple = yield self.store.get_user_by_access_token( + token + ) + token_id = user_tuple["token_id"] + + yield self.pusher_pool.add_pusher( + user_id=user_id, + access_token=token_id, + kind="email", + app_id="m.email", + app_display_name="Email Notifications", + device_display_name=threepid["address"], + pushkey=threepid["address"], + lang=None, # We don't know a user's language here + data={}, + ) + + if bind_email: + logger.info("bind_email specified: binding") + logger.debug("Binding emails %s to %s" % ( + threepid, user_id + )) + yield self.identity_handler.bind_threepid( + threepid['threepid_creds'], user_id + ) + else: + logger.info("bind_email not specified: not binding email") + + @defer.inlineCallbacks + def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn): + """Add a phone number as a 3pid identifier + + Also optionally binds msisdn to the given user_id on the identity server + + Must be called on master. + + Args: + user_id (str): id of user + threepid (object): m.login.msisdn auth response + token (str): access_token for the user + bind_email (bool): true if the client requested the email to be + bound at the identity server + Returns: + defer.Deferred: + """ + try: + assert_params_in_dict(threepid, ['medium', 'address', 'validated_at']) + except SynapseError as ex: + if ex.errcode == Codes.MISSING_PARAM: + # This will only happen if the ID server returns a malformed response + logger.info("Can't add incomplete 3pid") + defer.returnValue(None) + raise + + yield self._auth_handler.add_threepid( + user_id, + threepid['medium'], + threepid['address'], + threepid['validated_at'], + ) + + if bind_msisdn: + logger.info("bind_msisdn specified: binding") + logger.debug("Binding msisdn %s to %s", threepid, user_id) + yield self.identity_handler.bind_threepid( + threepid['threepid_creds'], user_id + ) + else: + logger.info("bind_msisdn not specified: not binding msisdn") diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2beffdf41e..190ea2c7b1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -61,7 +61,7 @@ class RoomMemberHandler(object): self.federation_handler = hs.get_handlers().federation_handler self.directory_handler = hs.get_handlers().directory_handler - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() self.profile_handler = hs.get_profile_handler() self.event_creation_handler = hs.get_event_creation_handler() diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 097c844d31..fc9a20ff59 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -79,7 +79,7 @@ class ModuleApi(object): Returns: Deferred: a 2-tuple of (user_id, access_token) """ - reg = self.hs.get_handlers().registration_handler + reg = self.hs.get_registration_handler() return reg.register(localpart=localpart) @defer.inlineCallbacks diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index 1590eca317..63bc0405ea 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -35,7 +35,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): def __init__(self, hs): super(RegisterDeviceReplicationServlet, self).__init__(hs) - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() @staticmethod def _serialize_payload(user_id, device_id, initial_display_name, is_guest): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index e58bebf12a..81a2b204c7 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -191,7 +191,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs) - self.registeration_handler = hs.get_handlers().registration_handler + self.registeration_handler = hs.get_registration_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -251,7 +251,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs) - self.registeration_handler = hs.get_handlers().registration_handler + self.registeration_handler = hs.get_registration_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() self.distributor = hs.get_distributor() diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index bdaf37396c..1d27c9221f 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -87,5 +87,60 @@ class ReplicationRegisterServlet(ReplicationEndpoint): defer.returnValue((200, {})) +class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): + """Run any post registration actions + """ + + NAME = "post_register" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(ReplicationPostRegisterActionsServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.registration_handler = hs.get_registration_handler() + + @staticmethod + def _serialize_payload(user_id, auth_result, access_token, bind_email, + bind_msisdn): + """ + Args: + user_id (str): The user ID that consented + auth_result (dict): The authenticated credentials of the newly + registered user. + access_token (str|None): The access token of the newly logged in + device, or None if `inhibit_login` enabled. + bind_email (bool): Whether to bind the email with the identity + server + bind_msisdn (bool): Whether to bind the msisdn with the identity + server + """ + return { + "auth_result": auth_result, + "access_token": access_token, + "bind_email": bind_email, + "bind_msisdn": bind_msisdn, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + auth_result = content["auth_result"] + access_token = content["access_token"] + bind_email = content["bind_email"] + bind_msisdn = content["bind_msisdn"] + + yield self.registration_handler.post_registration_actions( + user_id=user_id, + auth_result=auth_result, + access_token=access_token, + bind_email=bind_email, + bind_msisdn=bind_msisdn, + ) + + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): ReplicationRegisterServlet(hs).register(http_server) + ReplicationPostRegisterActionsServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 2d81d49e9a..1353a32d00 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -17,7 +17,7 @@ import logging import six -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker @@ -54,12 +54,17 @@ class BaseSlavedStore(SQLBaseStore): if stream_name == "caches": self._cache_id_gen.advance(token) for row in rows: - try: - getattr(self, row.cache_func).invalidate(tuple(row.keys)) - except AttributeError: - # We probably haven't pulled in the cache in this worker, - # which is fine. - pass + if row.cache_func == _CURRENT_STATE_CACHE_NAME: + room_id = row.keys[0] + members_changed = set(row.keys[1:]) + self._invalidate_state_caches(room_id, members_changed) + else: + try: + getattr(self, row.cache_func).invalidate(tuple(row.keys)) + except AttributeError: + # We probably haven't pulled in the cache in this worker, + # which is fine. + pass def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 4a5775083f..6121c5b6df 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -94,7 +94,7 @@ class LoginRestServlet(ClientV1RestServlet): self.jwt_algorithm = hs.config.jwt_algorithm self.cas_enabled = hs.config.cas_enabled self.auth_handler = self.hs.get_auth_handler() - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() self.handlers = hs.get_handlers() self._well_known_builder = WellKnownBuilder(hs) @@ -434,7 +434,7 @@ class SSOAuthHandler(object): def __init__(self, hs): self._hostname = hs.hostname self._auth_handler = hs.get_auth_handler() - self._registration_handler = hs.get_handlers().registration_handler + self._registration_handler = hs.get_registration_handler() self._macaroon_gen = hs.get_macaroon_generator() @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index fa73bdf3a1..f7bb710642 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -129,7 +129,7 @@ class AuthRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() def on_GET(self, request, stagetype): session = parse_string(request, "session") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c1cdb8f9c8..94cbba4303 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -145,7 +145,7 @@ class UsernameAvailabilityRestServlet(RestServlet): """ super(UsernameAvailabilityRestServlet, self).__init__() self.hs = hs - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() self.ratelimiter = FederationRateLimiter( hs.get_clock(), # Time window of 2s @@ -187,7 +187,7 @@ class RegisterRestServlet(RestServlet): self.auth = hs.get_auth() self.store = hs.get_datastore() self.auth_handler = hs.get_auth_handler() - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() @@ -389,8 +389,7 @@ class RegisterRestServlet(RestServlet): registered_user_id ) # don't re-register the threepids - add_email = False - add_msisdn = False + registered = False else: # NB: This may be from the auth handler and NOT from the POST assert_params_in_dict(params, ["password"]) @@ -427,34 +426,21 @@ class RegisterRestServlet(RestServlet): session_id, "registered_user_id", registered_user_id ) - add_email = True - add_msisdn = True + registered = True return_dict = yield self._create_registration_details( registered_user_id, params ) - if add_email and auth_result and LoginType.EMAIL_IDENTITY in auth_result: - threepid = auth_result[LoginType.EMAIL_IDENTITY] - yield self._register_email_threepid( - registered_user_id, threepid, return_dict["access_token"], - params.get("bind_email") + if registered: + yield self.registration_handler.post_registration_actions( + user_id=registered_user_id, + auth_result=auth_result, + access_token=return_dict.get("access_token"), + bind_email=params.get("bind_email"), + bind_msisdn=params.get("bind_msisdn"), ) - if add_msisdn and auth_result and LoginType.MSISDN in auth_result: - threepid = auth_result[LoginType.MSISDN] - yield self._register_msisdn_threepid( - registered_user_id, threepid, return_dict["access_token"], - params.get("bind_msisdn") - ) - - if auth_result and LoginType.TERMS in auth_result: - logger.info("%s has consented to the privacy policy" % registered_user_id) - yield self.store.user_set_consent_version( - registered_user_id, self.hs.config.user_consent_version, - ) - yield self.registration_handler.post_consent_actions(registered_user_id) - defer.returnValue((200, return_dict)) def on_OPTIONS(self, _): @@ -506,115 +492,6 @@ class RegisterRestServlet(RestServlet): defer.returnValue(result) @defer.inlineCallbacks - def _register_email_threepid(self, user_id, threepid, token, bind_email): - """Add an email address as a 3pid identifier - - Also adds an email pusher for the email address, if configured in the - HS config - - Also optionally binds emails to the given user_id on the identity server - - Args: - user_id (str): id of user - threepid (object): m.login.email.identity auth response - token (str): access_token for the user - bind_email (bool): true if the client requested the email to be - bound at the identity server - Returns: - defer.Deferred: - """ - reqd = ('medium', 'address', 'validated_at') - if any(x not in threepid for x in reqd): - # This will only happen if the ID server returns a malformed response - logger.info("Can't add incomplete 3pid") - return - - yield self.auth_handler.add_threepid( - user_id, - threepid['medium'], - threepid['address'], - threepid['validated_at'], - ) - - # And we add an email pusher for them by default, but only - # if email notifications are enabled (so people don't start - # getting mail spam where they weren't before if email - # notifs are set up on a home server) - if (self.hs.config.email_enable_notifs and - self.hs.config.email_notif_for_new_users): - # Pull the ID of the access token back out of the db - # It would really make more sense for this to be passed - # up when the access token is saved, but that's quite an - # invasive change I'd rather do separately. - user_tuple = yield self.store.get_user_by_access_token( - token - ) - token_id = user_tuple["token_id"] - - yield self.hs.get_pusherpool().add_pusher( - user_id=user_id, - access_token=token_id, - kind="email", - app_id="m.email", - app_display_name="Email Notifications", - device_display_name=threepid["address"], - pushkey=threepid["address"], - lang=None, # We don't know a user's language here - data={}, - ) - - if bind_email: - logger.info("bind_email specified: binding") - logger.debug("Binding emails %s to %s" % ( - threepid, user_id - )) - yield self.identity_handler.bind_threepid( - threepid['threepid_creds'], user_id - ) - else: - logger.info("bind_email not specified: not binding email") - - @defer.inlineCallbacks - def _register_msisdn_threepid(self, user_id, threepid, token, bind_msisdn): - """Add a phone number as a 3pid identifier - - Also optionally binds msisdn to the given user_id on the identity server - - Args: - user_id (str): id of user - threepid (object): m.login.msisdn auth response - token (str): access_token for the user - bind_email (bool): true if the client requested the email to be - bound at the identity server - Returns: - defer.Deferred: - """ - try: - assert_params_in_dict(threepid, ['medium', 'address', 'validated_at']) - except SynapseError as ex: - if ex.errcode == Codes.MISSING_PARAM: - # This will only happen if the ID server returns a malformed response - logger.info("Can't add incomplete 3pid") - defer.returnValue(None) - raise - - yield self.auth_handler.add_threepid( - user_id, - threepid['medium'], - threepid['address'], - threepid['validated_at'], - ) - - if bind_msisdn: - logger.info("bind_msisdn specified: binding") - logger.debug("Binding msisdn %s to %s", threepid, user_id) - yield self.identity_handler.bind_threepid( - threepid['threepid_creds'], user_id - ) - else: - logger.info("bind_msisdn not specified: not binding msisdn") - - @defer.inlineCallbacks def _create_registration_details(self, user_id, params): """Complete registration of newly-registered user diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index 008d4edae5..6b371bfa2f 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -89,7 +89,7 @@ class ConsentResource(Resource): self.hs = hs self.store = hs.get_datastore() - self.registration_handler = hs.get_handlers().registration_handler + self.registration_handler = hs.get_registration_handler() # this is required by the request_handler wrapper self.clock = hs.get_clock() diff --git a/synapse/server.py b/synapse/server.py index 8615b67ad4..4d364fccce 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -64,6 +64,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler +from synapse.handlers.register import RegistrationHandler from synapse.handlers.room import RoomContextHandler, RoomCreationHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler @@ -181,6 +182,7 @@ class HomeServer(object): 'pagination_handler', 'room_context_handler', 'sendmail', + 'registration_handler', ] # This is overridden in derived application classes @@ -481,6 +483,9 @@ class HomeServer(object): def build_room_context_handler(self): return RoomContextHandler(self) + def build_registration_handler(self): + return RegistrationHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e124161845..f1a5366b95 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging import sys import threading @@ -28,6 +29,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.stringutils import exception_to_unicode @@ -64,6 +66,10 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { "event_search": "event_search_event_id_idx", } +# This is a special cache name we use to batch multiple invalidations of caches +# based on the current state when notifying workers over replication. +_CURRENT_STATE_CACHE_NAME = "cs_cache_fake" + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object @@ -1184,6 +1190,56 @@ class SQLBaseStore(object): be invalidated. """ txn.call_after(cache_func.invalidate, keys) + self._send_invalidation_to_replication(txn, cache_func.__name__, keys) + + def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): + """Special case invalidation of caches based on current state. + + We special case this so that we can batch the cache invalidations into a + single replication poke. + + Args: + txn + room_id (str): Room where state changed + members_changed (iterable[str]): The user_ids of members that have changed + """ + txn.call_after(self._invalidate_state_caches, room_id, members_changed) + + keys = itertools.chain([room_id], members_changed) + self._send_invalidation_to_replication( + txn, _CURRENT_STATE_CACHE_NAME, keys, + ) + + def _invalidate_state_caches(self, room_id, members_changed): + """Invalidates caches that are based on the current state, but does + not stream invalidations down replication. + + Args: + room_id (str): Room where state changed + members_changed (iterable[str]): The user_ids of members that have + changed + """ + for member in members_changed: + self.get_rooms_for_user_with_stream_ordering.invalidate((member,)) + + for host in set(get_domain_from_id(u) for u in members_changed): + self.is_host_joined.invalidate((room_id, host)) + self.was_host_joined.invalidate((room_id, host)) + + self.get_users_in_room.invalidate((room_id,)) + self.get_room_summary.invalidate((room_id,)) + self.get_current_state_ids.invalidate((room_id,)) + + def _send_invalidation_to_replication(self, txn, cache_name, keys): + """Notifies replication that given cache has been invalidated. + + Note that this does *not* invalidate the cache locally. + + Args: + txn + cache_name (str) + keys (iterable[str]) + """ if isinstance(self.database_engine, PostgresEngine): # get_next() returns a context manager which is designed to wrap @@ -1201,7 +1257,7 @@ class SQLBaseStore(object): table="cache_invalidation_stream", values={ "stream_id": stream_id, - "cache_func": cache_func.__name__, + "cache_func": cache_name, "keys": list(keys), "invalidation_ts": self.clock.time_msec(), } diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 81b250480d..06db9e56e6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -979,30 +979,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if ev_type == EventTypes.Member ) - for member in members_changed: - self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user_with_stream_ordering, (member,) - ) - - for host in set(get_domain_from_id(u) for u in members_changed): - self._invalidate_cache_and_stream( - txn, self.is_host_joined, (room_id, host) - ) - self._invalidate_cache_and_stream( - txn, self.was_host_joined, (room_id, host) - ) - - self._invalidate_cache_and_stream( - txn, self.get_users_in_room, (room_id,) - ) - - self._invalidate_cache_and_stream( - txn, self.get_room_summary, (room_id,) - ) - - self._invalidate_cache_and_stream( - txn, self.get_current_state_ids, (room_id,) - ) + self._invalidate_state_caches_and_stream(txn, room_id, members_changed) def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index eb70e1daa6..c9c1506273 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -48,7 +48,7 @@ class RegistrationTestCase(unittest.TestCase): generate_access_token=Mock(return_value='secret') ) self.hs.get_macaroon_generator = Mock(return_value=self.macaroon_generator) - self.handler = self.hs.get_handlers().registration_handler + self.handler = self.hs.get_registration_handler() self.store = self.hs.get_datastore() self.hs.config.max_mau_value = 50 self.lots_of_users = 100 diff --git a/tox.ini b/tox.ini index 3e2dba2925..14437e7334 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,6 @@ envlist = packaging, py27, py36, pep8, check_isort [base] deps = - Twisted>=17.1 mock python-subunit junitxml @@ -38,6 +37,7 @@ whitelist_externals = setenv = {[base]setenv} + postgres: SYNAPSE_POSTGRES = 1 passenv = * @@ -47,8 +47,6 @@ commands = sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py' {envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} -[testenv:py27] - # As of twisted 16.4, trial tries to import the tests as a package (previously # it loaded the files explicitly), which means they need to be on the # pythonpath. Our sdist doesn't include the 'tests' package, so normally it @@ -72,14 +70,7 @@ commands = # ) usedevelop=true -[testenv:py27-postgres] -usedevelop=true -deps = - {[base]deps} - psycopg2 -setenv = - {[base]setenv} - SYNAPSE_POSTGRES = 1 + # A test suite for the oldest supported versions of Python libraries, to catch # any uses of APIs not available in them. @@ -101,22 +92,6 @@ commands = pip install -e . {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} -[testenv:py35] -usedevelop=true - -[testenv:py36] -usedevelop=true - -[testenv:py36-postgres] -usedevelop=true -deps = - {[base]deps} - psycopg2 -setenv = - {[base]setenv} - SYNAPSE_POSTGRES = 1 - - [testenv:packaging] skip_install=True deps = |