diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/acme.py | 62 | ||||
-rw-r--r-- | synapse/handlers/acme_issuing_service.py | 117 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 2 | ||||
-rw-r--r-- | synapse/handlers/device.py | 14 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room.py | 140 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 1 | ||||
-rw-r--r-- | synapse/handlers/set_password.py | 3 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 70 |
9 files changed, 280 insertions, 133 deletions
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py index 01e0ef408d..fbef2f3d38 100644 --- a/synapse/handlers/acme.py +++ b/synapse/handlers/acme.py @@ -15,14 +15,9 @@ import logging -import attr -from zope.interface import implementer - import twisted import twisted.internet.error from twisted.internet import defer -from twisted.python.filepath import FilePath -from twisted.python.url import URL from twisted.web import server, static from twisted.web.resource import Resource @@ -30,27 +25,6 @@ from synapse.app import check_bind_error logger = logging.getLogger(__name__) -try: - from txacme.interfaces import ICertificateStore - - @attr.s - @implementer(ICertificateStore) - class ErsatzStore(object): - """ - A store that only stores in memory. - """ - - certs = attr.ib(default=attr.Factory(dict)) - - def store(self, server_name, pem_objects): - self.certs[server_name] = [o.as_bytes() for o in pem_objects] - return defer.succeed(None) - - -except ImportError: - # txacme is missing - pass - class AcmeHandler(object): def __init__(self, hs): @@ -60,6 +34,7 @@ class AcmeHandler(object): @defer.inlineCallbacks def start_listening(self): + from synapse.handlers import acme_issuing_service # Configure logging for txacme, if you need to debug # from eliot import add_destinations @@ -67,37 +42,18 @@ class AcmeHandler(object): # # add_destinations(TwistedDestination()) - from txacme.challenges import HTTP01Responder - from txacme.service import AcmeIssuingService - from txacme.endpoint import load_or_create_client_key - from txacme.client import Client - from josepy.jwa import RS256 - - self._store = ErsatzStore() - responder = HTTP01Responder() - - self._issuer = AcmeIssuingService( - cert_store=self._store, - client_creator=( - lambda: Client.from_url( - reactor=self.reactor, - url=URL.from_text(self.hs.config.acme_url), - key=load_or_create_client_key( - FilePath(self.hs.config.config_dir_path) - ), - alg=RS256, - ) - ), - clock=self.reactor, - responders=[responder], + well_known = Resource() + + self._issuer = acme_issuing_service.create_issuing_service( + self.reactor, + acme_url=self.hs.config.acme_url, + account_key_file=self.hs.config.acme_account_key_file, + well_known_resource=well_known, ) - well_known = Resource() - well_known.putChild(b"acme-challenge", responder.resource) responder_resource = Resource() responder_resource.putChild(b".well-known", well_known) responder_resource.putChild(b"check", static.Data(b"OK", b"text/plain")) - srv = server.Site(responder_resource) bind_addresses = self.hs.config.acme_bind_addresses @@ -128,7 +84,7 @@ class AcmeHandler(object): logger.exception("Fail!") raise logger.warning("Reprovisioned %s, saving.", self._acme_domain) - cert_chain = self._store.certs[self._acme_domain] + cert_chain = self._issuer.cert_store.certs[self._acme_domain] try: with open(self.hs.config.tls_private_key_file, "wb") as private_key_file: diff --git a/synapse/handlers/acme_issuing_service.py b/synapse/handlers/acme_issuing_service.py new file mode 100644 index 0000000000..e1d4224e74 --- /dev/null +++ b/synapse/handlers/acme_issuing_service.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility function to create an ACME issuing service. + +This file contains the unconditional imports on the acme and cryptography bits that we +only need (and may only have available) if we are doing ACME, so is designed to be +imported conditionally. +""" +import logging + +import attr +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from josepy import JWKRSA +from josepy.jwa import RS256 +from txacme.challenges import HTTP01Responder +from txacme.client import Client +from txacme.interfaces import ICertificateStore +from txacme.service import AcmeIssuingService +from txacme.util import generate_private_key +from zope.interface import implementer + +from twisted.internet import defer +from twisted.python.filepath import FilePath +from twisted.python.url import URL + +logger = logging.getLogger(__name__) + + +def create_issuing_service(reactor, acme_url, account_key_file, well_known_resource): + """Create an ACME issuing service, and attach it to a web Resource + + Args: + reactor: twisted reactor + acme_url (str): URL to use to request certificates + account_key_file (str): where to store the account key + well_known_resource (twisted.web.IResource): web resource for .well-known. + we will attach a child resource for "acme-challenge". + + Returns: + AcmeIssuingService + """ + responder = HTTP01Responder() + + well_known_resource.putChild(b"acme-challenge", responder.resource) + + store = ErsatzStore() + + return AcmeIssuingService( + cert_store=store, + client_creator=( + lambda: Client.from_url( + reactor=reactor, + url=URL.from_text(acme_url), + key=load_or_create_client_key(account_key_file), + alg=RS256, + ) + ), + clock=reactor, + responders=[responder], + ) + + +@attr.s +@implementer(ICertificateStore) +class ErsatzStore(object): + """ + A store that only stores in memory. + """ + + certs = attr.ib(default=attr.Factory(dict)) + + def store(self, server_name, pem_objects): + self.certs[server_name] = [o.as_bytes() for o in pem_objects] + return defer.succeed(None) + + +def load_or_create_client_key(key_file): + """Load the ACME account key from a file, creating it if it does not exist. + + Args: + key_file (str): name of the file to use as the account key + """ + # this is based on txacme.endpoint.load_or_create_client_key, but doesn't + # hardcode the 'client.key' filename + acme_key_file = FilePath(key_file) + if acme_key_file.exists(): + logger.info("Loading ACME account key from '%s'", acme_key_file) + key = serialization.load_pem_private_key( + acme_key_file.getContent(), password=None, backend=default_backend() + ) + else: + logger.info("Saving new ACME account key to '%s'", acme_key_file) + key = generate_private_key("rsa") + acme_key_file.setContent( + key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + return JWKRSA(key=key) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 97b21c4093..c8c1ed3246 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -743,7 +743,7 @@ class AuthHandler(BaseHandler): result = (result, None) defer.returnValue(result) - if login_type == LoginType.PASSWORD: + if login_type == LoginType.PASSWORD and self.hs.config.password_localdb_enabled: known_login_type = True canonical_user_id = yield self._check_local_password( diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f59d0479b5..99e8413092 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler): room_ids = yield self.store.get_rooms_for_user(user_id) - # First we check if any devices have changed - changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key + # First we check if any devices have changed for users that we share + # rooms with. + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + changed = yield self.store.get_users_whose_devices_changed( + from_token.device_list_key, users_who_share_room ) # Then work out if any users have since joined @@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler): break if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - # Take the intersection of the users whose devices may have changed # and those that actually still share a room with the user possibly_joined = possibly_changed & users_who_share_room diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 062e026e5f..76ee97ddd3 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -180,9 +180,7 @@ class PaginationHandler(object): room_token = pagin_config.from_token.room_key else: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token_for_room( - room_id=room_id - ) + yield self.hs.get_event_sources().get_current_token_for_pagination() ) room_token = pagin_config.from_token.room_key diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 89d89fc27c..db3f8cb76b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -32,6 +32,7 @@ from synapse.storage.state import StateFilter from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.util.async_helpers import Linearizer +from synapse.util.caches.response_cache import ResponseCache from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -40,6 +41,8 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" +FIVE_MINUTES_IN_MS = 5 * 60 * 1000 + class RoomCreationHandler(BaseHandler): @@ -75,6 +78,12 @@ class RoomCreationHandler(BaseHandler): # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") + # If a user tries to update the same room multiple times in quick + # succession, only process the first attempt and return its result to + # subsequent requests + self._upgrade_response_cache = ResponseCache( + hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS + ) self._server_notices_mxid = hs.config.server_notices_mxid self.third_party_event_rules = hs.get_third_party_event_rules() @@ -95,67 +104,96 @@ class RoomCreationHandler(BaseHandler): user_id = requester.user.to_string() - with (yield self._upgrade_linearizer.queue(old_room_id)): - # start by allocating a new room id - r = yield self.store.get_room(old_room_id) - if r is None: - raise NotFoundError("Unknown room id %s" % (old_room_id,)) - new_room_id = yield self._generate_room_id( - creator_id=user_id, is_public=r["is_public"] - ) + # Check if this room is already being upgraded by another person + for key in self._upgrade_response_cache.pending_result_cache: + if key[0] == old_room_id and key[1] != user_id: + # Two different people are trying to upgrade the same room. + # Send the second an error. + # + # Note that this of course only gets caught if both users are + # on the same homeserver. + raise SynapseError( + 400, "An upgrade for this room is currently in progress" + ) - logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) + # Upgrade the room + # + # If this user has sent multiple upgrade requests for the same room + # and one of them is not complete yet, cache the response and + # return it to all subsequent requests + ret = yield self._upgrade_response_cache.wrap( + (old_room_id, user_id), + self._upgrade_room, + requester, + old_room_id, + new_version, # args for _upgrade_room + ) + defer.returnValue(ret) - # we create and auth the tombstone event before properly creating the new - # room, to check our user has perms in the old room. - tombstone_event, tombstone_context = ( - yield self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - }, - }, - token_id=requester.access_token_id, - ) - ) - old_room_version = yield self.store.get_room_version(old_room_id) - yield self.auth.check_from_context( - old_room_version, tombstone_event, tombstone_context - ) + @defer.inlineCallbacks + def _upgrade_room(self, requester, old_room_id, new_version): + user_id = requester.user.to_string() + + # start by allocating a new room id + r = yield self.store.get_room(old_room_id) + if r is None: + raise NotFoundError("Unknown room id %s" % (old_room_id,)) + new_room_id = yield self._generate_room_id( + creator_id=user_id, is_public=r["is_public"] + ) + + logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) - yield self.clone_existing_room( + # we create and auth the tombstone event before properly creating the new + # room, to check our user has perms in the old room. + tombstone_event, tombstone_context = ( + yield self.event_creation_handler.create_event( requester, - old_room_id=old_room_id, - new_room_id=new_room_id, - new_room_version=new_version, - tombstone_event_id=tombstone_event.event_id, + { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, + }, + }, + token_id=requester.access_token_id, ) + ) + old_room_version = yield self.store.get_room_version(old_room_id) + yield self.auth.check_from_context( + old_room_version, tombstone_event, tombstone_context + ) - # now send the tombstone - yield self.event_creation_handler.send_nonmember_event( - requester, tombstone_event, tombstone_context - ) + yield self.clone_existing_room( + requester, + old_room_id=old_room_id, + new_room_id=new_room_id, + new_room_version=new_version, + tombstone_event_id=tombstone_event.event_id, + ) - old_room_state = yield tombstone_context.get_current_state_ids(self.store) + # now send the tombstone + yield self.event_creation_handler.send_nonmember_event( + requester, tombstone_event, tombstone_context + ) - # update any aliases - yield self._move_aliases_to_new_room( - requester, old_room_id, new_room_id, old_room_state - ) + old_room_state = yield tombstone_context.get_current_state_ids(self.store) - # and finally, shut down the PLs in the old room, and update them in the new - # room. - yield self._update_upgraded_room_pls( - requester, old_room_id, new_room_id, old_room_state - ) + # update any aliases + yield self._move_aliases_to_new_room( + requester, old_room_id, new_room_id, old_room_state + ) + + # and finally, shut down the PLs in the old room, and update them in the new + # room. + yield self._update_upgraded_room_pls( + requester, old_room_id, new_room_id, old_room_state + ) - defer.returnValue(new_room_id) + defer.returnValue(new_room_id) @defer.inlineCallbacks def _update_upgraded_room_pls( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 4d6e883802..66b05b4732 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -823,6 +823,7 @@ class RoomMemberHandler(object): "sender": user.to_string(), "state_key": token, }, + ratelimit=False, txn_id=txn_id, ) diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 5a0995d4fe..d90c9e0108 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -33,6 +33,9 @@ class SetPasswordHandler(BaseHandler): @defer.inlineCallbacks def set_password(self, user_id, newpassword, requester=None): + if not self.hs.config.password_localdb_enabled: + raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) + password_hash = yield self._auth_handler.hash(newpassword) except_device_id = requester.device_id if requester else None diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c5188a1f8e..a3f550554f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1058,40 +1058,74 @@ class SyncHandler(object): newly_left_rooms, newly_left_users, ): + """Generate the DeviceLists section of sync + + Args: + sync_result_builder (SyncResultBuilder) + newly_joined_rooms (set[str]): Set of rooms user has joined since + previous sync + newly_joined_or_invited_users (set[str]): Set of users that have + joined or been invited to a room since previous sync. + newly_left_rooms (set[str]): Set of rooms user has left since + previous sync + newly_left_users (set[str]): Set of users that have left a room + we're in since previous sync + + Returns: + Deferred[DeviceLists] + """ + user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token + # We're going to mutate these fields, so lets copy them rather than + # assume they won't get used later. + newly_joined_or_invited_users = set(newly_joined_or_invited_users) + newly_left_users = set(newly_left_users) + if since_token and since_token.device_list_key: - changed = yield self.store.get_user_whose_devices_changed( - since_token.device_list_key + # We want to figure out what user IDs the client should refetch + # device keys for, and which users we aren't going to track changes + # for anymore. + # + # For the first step we check: + # a. if any users we share a room with have updated their devices, + # and + # b. we also check if we've joined any new rooms, or if a user has + # joined a room we're in. + # + # For the second step we just find any users we no longer share a + # room with by looking at all users that have left a room plus users + # that were in a room we've left. + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Step 1a, check for changes in devices of users we share a room with + users_that_have_changed = yield self.store.get_users_whose_devices_changed( + since_token.device_list_key, users_who_share_room ) - # TODO: Be more clever than this, i.e. remove users who we already - # share a room with? + # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: joined_users = yield self.state.get_current_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) - for room_id in newly_left_rooms: - left_users = yield self.state.get_current_users_in_room(room_id) - newly_left_users.update(left_users) - # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. - changed.update(newly_joined_or_invited_users) + users_that_have_changed.update(newly_joined_or_invited_users) - if not changed and not newly_left_users: - defer.returnValue(DeviceLists(changed=[], left=newly_left_users)) + # Now find users that we no longer track + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_users_in_room(room_id) + newly_left_users.update(left_users) - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) + # Remove any users that we still share a room with. + newly_left_users -= users_who_share_room defer.returnValue( - DeviceLists( - changed=users_who_share_room & changed, - left=set(newly_left_users) - users_who_share_room, - ) + DeviceLists(changed=users_that_have_changed, left=newly_left_users) ) else: defer.returnValue(DeviceLists(changed=[], left=[])) |