diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/acme.py | 27 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 4 | ||||
-rw-r--r-- | synapse/handlers/e2e_room_keys.py | 74 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 160 | ||||
-rw-r--r-- | synapse/handlers/message.py | 61 | ||||
-rw-r--r-- | synapse/handlers/room.py | 21 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 14 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 88 | ||||
-rw-r--r-- | synapse/handlers/search.py | 47 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 27 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 94 |
11 files changed, 431 insertions, 186 deletions
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py index 73ea7ed018..dd0b217965 100644 --- a/synapse/handlers/acme.py +++ b/synapse/handlers/acme.py @@ -18,13 +18,16 @@ import logging import attr from zope.interface import implementer +import twisted +import twisted.internet.error from twisted.internet import defer -from twisted.internet.endpoints import serverFromString from twisted.python.filepath import FilePath from twisted.python.url import URL from twisted.web import server, static from twisted.web.resource import Resource +from synapse.app import check_bind_error + logger = logging.getLogger(__name__) try: @@ -96,16 +99,19 @@ class AcmeHandler(object): srv = server.Site(responder_resource) - listeners = [] - - for host in self.hs.config.acme_bind_addresses: + bind_addresses = self.hs.config.acme_bind_addresses + for host in bind_addresses: logger.info( - "Listening for ACME requests on %s:%s", host, self.hs.config.acme_port - ) - endpoint = serverFromString( - self.reactor, "tcp:%s:interface=%s" % (self.hs.config.acme_port, host) + "Listening for ACME requests on %s:%i", host, self.hs.config.acme_port, ) - listeners.append(endpoint.listen(srv)) + try: + self.reactor.listenTCP( + self.hs.config.acme_port, + srv, + interface=host, + ) + except twisted.internet.error.CannotListenError as e: + check_bind_error(e, host, bind_addresses) # Make sure we are registered to the ACME server. There's no public API # for this, it is usually triggered by startService, but since we don't @@ -114,9 +120,6 @@ class AcmeHandler(object): self._issuer._registered = False yield self._issuer._ensure_registered() - # Return a Deferred that will fire when all the servers have started up. - yield defer.DeferredList(listeners, fireOnOneErrback=True, consumeErrors=True) - @defer.inlineCallbacks def provision_certificate(self): diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 0699731c13..6bb254f899 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -57,8 +57,8 @@ class DirectoryHandler(BaseHandler): # general association creation for both human users and app services for wchar in string.whitespace: - if wchar in room_alias.localpart: - raise SynapseError(400, "Invalid characters in room alias") + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 42b040375f..7bc174070e 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -19,7 +19,13 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError +from synapse.api.errors import ( + Codes, + NotFoundError, + RoomKeysVersionError, + StoreError, + SynapseError, +) from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) @@ -267,7 +273,7 @@ class E2eRoomKeysHandler(object): version(str): Optional; if None gives the most recent version otherwise a historical one. Raises: - StoreError: code 404 if the requested backup version doesn't exist + NotFoundError: if the requested backup version doesn't exist Returns: A deferred of a info dict that gives the info about the new version. @@ -279,7 +285,13 @@ class E2eRoomKeysHandler(object): """ with (yield self._upload_linearizer.queue(user_id)): - res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + try: + res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise defer.returnValue(res) @defer.inlineCallbacks @@ -290,8 +302,60 @@ class E2eRoomKeysHandler(object): user_id(str): the user whose current backup version we're deleting version(str): the version id of the backup being deleted Raises: - StoreError: code 404 if this backup version doesn't exist + NotFoundError: if this backup version doesn't exist """ with (yield self._upload_linearizer.queue(user_id)): - yield self.store.delete_e2e_room_keys_version(user_id, version) + try: + yield self.store.delete_e2e_room_keys_version(user_id, version) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise + + @defer.inlineCallbacks + def update_version(self, user_id, version, version_info): + """Update the info about a given version of the user's backup + + Args: + user_id(str): the user whose current backup version we're updating + version(str): the backup version we're updating + version_info(dict): the new information about the backup + Raises: + NotFoundError: if the requested backup version doesn't exist + Returns: + A deferred of an empty dict. + """ + if "version" not in version_info: + raise SynapseError( + 400, + "Missing version in body", + Codes.MISSING_PARAM + ) + if version_info["version"] != version: + raise SynapseError( + 400, + "Version in body does not match", + Codes.INVALID_PARAM + ) + with (yield self._upload_linearizer.queue(user_id)): + try: + old_info = yield self.store.get_e2e_room_keys_version_info( + user_id, version + ) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise + if old_info["algorithm"] != version_info["algorithm"]: + raise SynapseError( + 400, + "Algorithm does not match", + Codes.INVALID_PARAM + ) + + yield self.store.update_e2e_room_keys_version(user_id, version, version_info) + + defer.returnValue({}) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..083f2e0ac3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -34,6 +34,7 @@ from synapse.api.constants import ( EventTypes, Membership, RejectedReason, + RoomVersions, ) from synapse.api.errors import ( AuthError, @@ -43,10 +44,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import ( - add_hashes_and_signatures, - compute_event_signature, -) +from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room -from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -105,7 +102,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.store = hs.get_datastore() # type: synapse.storage.DataStore + self.store = hs.get_datastore() self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname @@ -342,6 +339,8 @@ class FederationHandler(BaseHandler): room_id, event_id, p, ) + room_version = yield self.store.get_room_version(room_id) + with logcontext.nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped @@ -355,7 +354,7 @@ class FederationHandler(BaseHandler): # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, outlier=True, + [origin], p, room_version, outlier=True, ) if remote_event is None: @@ -379,7 +378,6 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), @@ -655,6 +653,8 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") + room_version = yield self.store.get_room_version(room_id) + events = yield self.federation_client.backfill( dest, room_id, @@ -748,6 +748,7 @@ class FederationHandler(BaseHandler): self.federation_client.get_pdu, [dest], event_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -1060,7 +1061,7 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - origin, event = yield self._make_and_verify_event( + origin, event, event_format_version = yield self._make_and_verify_event( target_hosts, room_id, joinee, @@ -1083,7 +1084,6 @@ class FederationHandler(BaseHandler): handled_events = set() try: - event = self._sign_event(event) # Try the host we successfully got a response to /make_join/ # request first. try: @@ -1091,7 +1091,9 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.federation_client.send_join(target_hosts, event) + ret = yield self.federation_client.send_join( + target_hosts, event, event_format_version, + ) origin = ret["origin"] state = ret["state"] @@ -1164,13 +1166,18 @@ class FederationHandler(BaseHandler): """ event_content = {"membership": Membership.JOIN} - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": event_content, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": event_content, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) try: event, context = yield self.event_creation_handler.create_new_client_event( @@ -1182,7 +1189,9 @@ class FederationHandler(BaseHandler): # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` - yield self.auth.check_from_context(event, context, do_sig_check=False) + yield self.auth.check_from_context( + room_version, event, context, do_sig_check=False, + ) defer.returnValue(event) @@ -1287,11 +1296,11 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = True - event.internal_metadata.invite_from_remote = True + event.internal_metadata.out_of_band_membership = True event.signatures.update( compute_event_signature( - event, + event.get_pdu_json(), self.hs.hostname, self.hs.config.signing_key[0] ) @@ -1304,7 +1313,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( + origin, event, event_format_version = yield self._make_and_verify_event( target_hosts, room_id, user_id, @@ -1313,7 +1322,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event = self._sign_event(event) + event.internal_metadata.out_of_band_membership = True # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1336,7 +1345,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={}, params=None): - origin, pdu = yield self.federation_client.make_membership_event( + origin, event, format_ver = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, @@ -1345,9 +1354,7 @@ class FederationHandler(BaseHandler): params=params, ) - logger.debug("Got response to make_%s: %s", membership, pdu) - - event = pdu + logger.debug("Got response to make_%s: %s", membership, event) # We should assert some things. # FIXME: Do this in a nicer way @@ -1355,28 +1362,7 @@ class FederationHandler(BaseHandler): assert(event.user_id == user_id) assert(event.state_key == user_id) assert(event.room_id == room_id) - defer.returnValue((origin, event)) - - def _sign_event(self, event): - event.internal_metadata.outlier = False - - builder = self.event_builder_factory.new( - unfreeze(event.get_pdu_json()) - ) - - builder.event_id = self.event_builder_factory.create_event_id() - builder.origin = self.hs.hostname - - if not hasattr(event, "signatures"): - builder.signatures = {} - - add_hashes_and_signatures( - builder, - self.hs.hostname, - self.hs.config.signing_key[0], - ) - - return builder.build() + defer.returnValue((origin, event, format_ver)) @defer.inlineCallbacks @log_function @@ -1385,13 +1371,17 @@ class FederationHandler(BaseHandler): leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": {"membership": Membership.LEAVE}, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": {"membership": Membership.LEAVE}, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, @@ -1400,7 +1390,9 @@ class FederationHandler(BaseHandler): try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_leave_request` - yield self.auth.check_from_context(event, context, do_sig_check=False) + yield self.auth.check_from_context( + room_version, event, context, do_sig_check=False, + ) except AuthError as e: logger.warn("Failed to create new leave %r because %s", event, e) raise e @@ -1659,6 +1651,13 @@ class FederationHandler(BaseHandler): create_event = e break + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + + room_version = create_event.content.get("room_version", RoomVersions.V1) + missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): for e_id in e.auth_event_ids(): @@ -1669,6 +1668,7 @@ class FederationHandler(BaseHandler): m_ev = yield self.federation_client.get_pdu( [origin], e_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -1687,7 +1687,7 @@ class FederationHandler(BaseHandler): auth_for_e[(EventTypes.Create, "")] = create_event try: - self.auth.check(e, auth_events=auth_for_e) + self.auth.check(room_version, e, auth_events=auth_for_e) except SynapseError as err: # we may get SynapseErrors here as well as AuthErrors. For # instance, there are a couple of (ancient) events in some @@ -1931,6 +1931,8 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state + room_version = yield self.store.get_room_version(event.room_id) + if different_auth and not event.internal_metadata.is_outlier(): # Do auth conflict res. logger.info("Different auth: %s", different_auth) @@ -1955,8 +1957,6 @@ class FederationHandler(BaseHandler): (d.type, d.state_key): d for d in different_events if d }) - room_version = yield self.store.get_room_version(event.room_id) - new_state = yield self.state_handler.resolve_events( room_version, [list(local_view.values()), list(remote_view.values())], @@ -2056,7 +2056,7 @@ class FederationHandler(BaseHandler): ) try: - self.auth.check(event, auth_events=auth_events) + self.auth.check(room_version, event, auth_events=auth_events) except AuthError as e: logger.warn("Failed auth resolution for %r because %s", event, e) raise e @@ -2279,18 +2279,26 @@ class FederationHandler(BaseHandler): } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): - builder = self.event_builder_factory.new(event_dict) - EventValidator().validate_new(builder) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new(room_version, event_dict) + + EventValidator().validate_builder(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) + EventValidator().validate_new(event) + + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = self.hs.hostname + try: - yield self.auth.check_from_context(event, context) + yield self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warn("Denying new third party invite %r because %s", event, e) raise e @@ -2317,23 +2325,31 @@ class FederationHandler(BaseHandler): Returns: Deferred: resolves (to None) """ - builder = self.event_builder_factory.new(event_dict) + room_version = yield self.store.get_room_version(room_id) + + # NB: event_dict has a particular specced format we might need to fudge + # if we change event formats too much. + builder = self.event_builder_factory.new(room_version, event_dict) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) try: - self.auth.check_from_context(event, context) + self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warn("Denying third party invite %r because %s", event, e) raise e yield self._check_signature(event, context) + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) + # XXX we send the invite here, but send_membership_event also sends it, # so we end up making two requests. I think this is redundant. returned_invite = yield self.send_invite(origin, event) @@ -2344,7 +2360,8 @@ class FederationHandler(BaseHandler): yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks - def add_display_name_to_third_party_invite(self, event_dict, event, context): + def add_display_name_to_third_party_invite(self, room_version, event_dict, + event, context): key = ( EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"] @@ -2368,11 +2385,12 @@ class FederationHandler(BaseHandler): # auth checks. If we need the invite and don't have it then the # auth check code will explode appropriately. - builder = self.event_builder_factory.new(event_dict) - EventValidator().validate_new(builder) + builder = self.event_builder_factory.new(room_version, event_dict) + EventValidator().validate_builder(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) + EventValidator().validate_new(event) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a7cd779b02..3981fe69ce 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from synapse.api.constants import MAX_DEPTH, EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.api.errors import ( AuthError, Codes, @@ -31,7 +31,6 @@ from synapse.api.errors import ( SynapseError, ) from synapse.api.urls import ConsentURIBuilder -from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet @@ -278,9 +277,17 @@ class EventCreationHandler(object): """ yield self.auth.check_auth_blocking(requester.user.to_string()) - builder = self.event_builder_factory.new(event_dict) + if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "": + room_version = event_dict["content"]["room_version"] + else: + try: + room_version = yield self.store.get_room_version(event_dict["room_id"]) + except NotFoundError: + raise AuthError(403, "Unknown room") - self.validator.validate_new(builder) + builder = self.event_builder_factory.new(room_version, event_dict) + + self.validator.validate_builder(builder) if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) @@ -318,6 +325,8 @@ class EventCreationHandler(object): prev_events_and_hashes=prev_events_and_hashes, ) + self.validator.validate_new(event) + defer.returnValue((event, context)) def _is_exempt_from_privacy_policy(self, builder, requester): @@ -535,40 +544,19 @@ class EventCreationHandler(object): prev_events_and_hashes = \ yield self.store.get_prev_events_for_room(builder.room_id) - if prev_events_and_hashes: - depth = max([d for _, _, d in prev_events_and_hashes]) + 1 - # we cap depth of generated events, to ensure that they are not - # rejected by other servers (and so that they can be persisted in - # the db) - depth = min(depth, MAX_DEPTH) - else: - depth = 1 - prev_events = [ (event_id, prev_hashes) for event_id, prev_hashes, _ in prev_events_and_hashes ] - builder.prev_events = prev_events - builder.depth = depth - - context = yield self.state.compute_event_context(builder) + event = yield builder.build( + prev_event_ids=[p for p, _ in prev_events], + ) + context = yield self.state.compute_event_context(event) if requester: context.app_service = requester.app_service - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - signing_key = self.hs.config.signing_key[0] - add_hashes_and_signatures( - builder, self.server_name, signing_key - ) - - event = builder.build() + self.validator.validate_new(event) logger.debug( "Created event %s", @@ -603,8 +591,13 @@ class EventCreationHandler(object): extra_users (list(UserID)): Any extra users to notify about event """ + if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""): + room_version = event.content.get("room_version", RoomVersions.V1) + else: + room_version = yield self.store.get_room_version(event.room_id) + try: - yield self.auth.check_from_context(event, context) + yield self.auth.check_from_context(room_version, event, context) except AuthError as err: logger.warn("Denying new event %r because %s", event, err) raise err @@ -752,7 +745,8 @@ class EventCreationHandler(object): auth_events = { (e.type, e.state_key): e for e in auth_events.values() } - if self.auth.check_redaction(event, auth_events=auth_events): + room_version = yield self.store.get_room_version(event.room_id) + if self.auth.check_redaction(room_version, event, auth_events=auth_events): original_event = yield self.store.get_event( event.redacts, check_redacted=False, @@ -766,6 +760,9 @@ class EventCreationHandler(object): "You don't have permission to redact events" ) + # We've already checked. + event.internal_metadata.recheck_redaction = False + if event.type == EventTypes.Create: prev_state_ids = yield context.get_prev_state_ids(self.store) if prev_state_ids: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cb8c5f77dd..f9af1f0046 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -123,9 +123,12 @@ class RoomCreationHandler(BaseHandler): token_id=requester.access_token_id, ) ) - yield self.auth.check_from_context(tombstone_event, tombstone_context) + old_room_version = yield self.store.get_room_version(old_room_id) + yield self.auth.check_from_context( + old_room_version, tombstone_event, tombstone_context, + ) - yield self.clone_exiting_room( + yield self.clone_existing_room( requester, old_room_id=old_room_id, new_room_id=new_room_id, @@ -230,7 +233,7 @@ class RoomCreationHandler(BaseHandler): ) @defer.inlineCallbacks - def clone_exiting_room( + def clone_existing_room( self, requester, old_room_id, new_room_id, new_room_version, tombstone_event_id, ): @@ -260,8 +263,19 @@ class RoomCreationHandler(BaseHandler): } } + # Check if old room was non-federatable + + # Get old room's create event + old_room_create_event = yield self.store.get_create_event_for_room(old_room_id) + + # Check if the create event specified a non-federatable room + if not old_room_create_event.content.get("m.federate", True): + # If so, mark the new room as non-federatable as well + creation_content["m.federate"] = False + initial_state = dict() + # Replicate relevant room events types_to_copy = ( (EventTypes.JoinRules, ""), (EventTypes.Name, ""), @@ -270,6 +284,7 @@ class RoomCreationHandler(BaseHandler): (EventTypes.GuestAccess, ""), (EventTypes.RoomAvatar, ""), (EventTypes.Encryption, ""), + (EventTypes.ServerACL, ""), ) old_room_state_ids = yield self.store.get_filtered_current_state_ids( diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dc88620885..13e212d669 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -73,8 +73,14 @@ class RoomListHandler(BaseHandler): # We explicitly don't bother caching searches or requests for # appservice specific lists. logger.info("Bypassing cache as search request.") + + # XXX: Quick hack to stop room directory queries taking too long. + # Timeout request after 60s. Probably want a more fundamental + # solution at some point + timeout = self.clock.time() + 60 return self._get_public_room_list( - limit, since_token, search_filter, network_tuple=network_tuple, + limit, since_token, search_filter, + network_tuple=network_tuple, timeout=timeout, ) key = (limit, since_token, network_tuple) @@ -87,7 +93,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID, + timeout=None,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -202,6 +209,9 @@ class RoomListHandler(BaseHandler): chunk = [] for i in range(0, len(rooms_to_scan), step): + if timeout and self.clock.time() > timeout: + raise Exception("Timed out searching room directory") + batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 07fd3e82fc..2beffdf41e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -63,7 +63,7 @@ class RoomMemberHandler(object): self.directory_handler = hs.get_handlers().directory_handler self.registration_handler = hs.get_handlers().registration_handler self.profile_handler = hs.get_profile_handler() - self.event_creation_hander = hs.get_event_creation_handler() + self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") @@ -161,6 +161,8 @@ class RoomMemberHandler(object): ratelimit=True, content=None, ): + user_id = target.to_string() + if content is None: content = {} @@ -168,14 +170,14 @@ class RoomMemberHandler(object): if requester.is_guest: content["kind"] = "guest" - event, context = yield self.event_creation_hander.create_event( + event, context = yield self.event_creation_handler.create_event( requester, { "type": EventTypes.Member, "content": content, "room_id": room_id, "sender": requester.user.to_string(), - "state_key": target.to_string(), + "state_key": user_id, # For backwards compatibility: "membership": membership, @@ -186,14 +188,14 @@ class RoomMemberHandler(object): ) # Check if this event matches the previous membership event for the user. - duplicate = yield self.event_creation_hander.deduplicate_state_event( + duplicate = yield self.event_creation_handler.deduplicate_state_event( event, context, ) if duplicate is not None: # Discard the new event since this membership change is a no-op. defer.returnValue(duplicate) - yield self.event_creation_hander.handle_new_client_event( + yield self.event_creation_handler.handle_new_client_event( requester, event, context, @@ -204,12 +206,12 @@ class RoomMemberHandler(object): prev_state_ids = yield context.get_prev_state_ids(self.store) prev_member_event_id = prev_state_ids.get( - (EventTypes.Member, target.to_string()), + (EventTypes.Member, user_id), None ) if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has acutally joined the + # Only fire user_joined_room if the user has actually joined the # room. Don't bother if the user is just changing their profile # info. newly_joined = True @@ -218,6 +220,18 @@ class RoomMemberHandler(object): newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: yield self._user_joined_room(target, room_id) + + # Copy over direct message status and room tags if this is a join + # on an upgraded room + + # Check if this is an upgraded room + predecessor = yield self.store.get_room_predecessor(room_id) + + if predecessor: + # It is an upgraded room. Copy over old tags + self.copy_room_tags_and_direct_to_room( + predecessor["room_id"], room_id, user_id, + ) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) @@ -227,6 +241,55 @@ class RoomMemberHandler(object): defer.returnValue(event) @defer.inlineCallbacks + def copy_room_tags_and_direct_to_room( + self, + old_room_id, + new_room_id, + user_id, + ): + """Copies the tags and direct room state from one room to another. + + Args: + old_room_id (str) + new_room_id (str) + user_id (str) + + Returns: + Deferred[None] + """ + # Retrieve user account data for predecessor room + user_account_data, _ = yield self.store.get_account_data_for_user( + user_id, + ) + + # Copy direct message state if applicable + direct_rooms = user_account_data.get("m.direct", {}) + + # Check which key this room is under + if isinstance(direct_rooms, dict): + for key, room_id_list in direct_rooms.items(): + if old_room_id in room_id_list and new_room_id not in room_id_list: + # Add new room_id to this key + direct_rooms[key].append(new_room_id) + + # Save back to user's m.direct account data + yield self.store.add_account_data_for_user( + user_id, "m.direct", direct_rooms, + ) + break + + # Copy room tags if applicable + room_tags = yield self.store.get_tags_for_room( + user_id, old_room_id, + ) + + # Copy each room tag to the new room + for tag, tag_content in room_tags.items(): + yield self.store.add_tag_to_room( + user_id, new_room_id, tag, tag_content + ) + + @defer.inlineCallbacks def update_membership( self, requester, @@ -493,7 +556,7 @@ class RoomMemberHandler(object): else: requester = synapse.types.create_requester(target_user) - prev_event = yield self.event_creation_hander.deduplicate_state_event( + prev_event = yield self.event_creation_handler.deduplicate_state_event( event, context, ) if prev_event is not None: @@ -513,7 +576,7 @@ class RoomMemberHandler(object): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield self.event_creation_hander.handle_new_client_event( + yield self.event_creation_handler.handle_new_client_event( requester, event, context, @@ -527,7 +590,7 @@ class RoomMemberHandler(object): ) if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has acutally joined the + # Only fire user_joined_room if the user has actually joined the # room. Don't bother if the user is just changing their profile # info. newly_joined = True @@ -755,7 +818,7 @@ class RoomMemberHandler(object): ) ) - yield self.event_creation_hander.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, @@ -877,7 +940,8 @@ class RoomMemberHandler(object): # first member event? create_event_id = current_state_ids.get(("m.room.create", "")) if len(current_state_ids) == 1 and create_event_id: - defer.returnValue(self.hs.is_mine_id(create_event_id)) + # We can only get here if we're in the process of creating the room + defer.returnValue(True) for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ec936bbb4e..49c439313e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -38,6 +38,41 @@ class SearchHandler(BaseHandler): super(SearchHandler, self).__init__(hs) @defer.inlineCallbacks + def get_old_rooms_from_upgraded_room(self, room_id): + """Retrieves room IDs of old rooms in the history of an upgraded room. + + We do so by checking the m.room.create event of the room for a + `predecessor` key. If it exists, we add the room ID to our return + list and then check that room for a m.room.create event and so on + until we can no longer find any more previous rooms. + + The full list of all found rooms in then returned. + + Args: + room_id (str): id of the room to search through. + + Returns: + Deferred[iterable[unicode]]: predecessor room ids + """ + + historical_room_ids = [] + + while True: + predecessor = yield self.store.get_room_predecessor(room_id) + + # If no predecessor, assume we've hit a dead end + if not predecessor: + break + + # Add predecessor's room ID + historical_room_ids.append(predecessor["room_id"]) + + # Scan through the old room for further predecessors + room_id = predecessor["room_id"] + + defer.returnValue(historical_room_ids) + + @defer.inlineCallbacks def search(self, user, content, batch=None): """Performs a full text search for a user. @@ -137,6 +172,18 @@ class SearchHandler(BaseHandler): ) room_ids = set(r.room_id for r in rooms) + # If doing a subset of all rooms seearch, check if any of the rooms + # are from an upgraded room, and search their contents as well + if search_filter.rooms: + historical_room_ids = [] + for room_id in search_filter.rooms: + # Add any previous rooms to the search if they exist + ids = yield self.get_old_rooms_from_upgraded_room(room_id) + historical_room_ids += ids + + # Prevent any historical events from being filtered + search_filter = search_filter.with_room_ids(historical_room_ids) + room_ids = search_filter.filter_rooms(room_ids) if batch_group == "room_id": diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f7f768f751..bd97241ab4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -895,14 +895,17 @@ class SyncHandler(object): Returns: Deferred(SyncResult) """ - logger.info("Calculating sync response for %r", sync_config.user) - # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + logger.info( + "Calculating sync response for %r between %s and %s", + sync_config.user, since_token, now_token, + ) + user_id = sync_config.user.to_string() app_service = self.store.get_app_service_by_user_id(user_id) if app_service: @@ -1390,6 +1393,12 @@ class SyncHandler(object): room_entries = [] invited = [] for room_id, events in iteritems(mem_change_events_by_room_id): + logger.info( + "Membership changes in %s: [%s]", + room_id, + ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)), + ) + non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) @@ -1473,10 +1482,22 @@ class SyncHandler(object): if since_token and since_token.is_after(leave_token): continue + # If this is an out of band message, like a remote invite + # rejection, we include it in the recents batch. Otherwise, we + # let _load_filtered_recents handle fetching the correct + # batches. + # + # This is all screaming out for a refactor, as the logic here is + # subtle and the moving parts numerous. + if leave_event.internal_metadata.is_out_of_band_membership(): + batch_events = [leave_event] + else: + batch_events = None + room_entries.append(RoomSyncResultBuilder( room_id=room_id, rtype="archived", - events=None, + events=batch_events, newly_joined=room_id in newly_joined_rooms, full_state=False, since_token=since_token, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 120815b09b..283c6c1b81 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -130,7 +130,7 @@ class UserDirectoryHandler(object): # Support users are for diagnostics and should not appear in the user directory. if not is_support: yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, None, + user_id, profile.display_name, profile.avatar_url, None ) @defer.inlineCallbacks @@ -166,8 +166,9 @@ class UserDirectoryHandler(object): self.pos = deltas[-1]["stream_id"] # Expose current event processing position to prometheus - synapse.metrics.event_processing_positions.labels( - "user_dir").set(self.pos) + synapse.metrics.event_processing_positions.labels("user_dir").set( + self.pos + ) yield self.store.update_user_directory_stream_pos(self.pos) @@ -191,21 +192,25 @@ class UserDirectoryHandler(object): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) logger.info("Processed all rooms.") if self.search_all_users: num_processed_users = 0 user_ids = yield self.store.get_all_local_users() - logger.info("Doing initial update of user directory. %d users", len(user_ids)) + logger.info( + "Doing initial update of user directory. %d users", len(user_ids) + ) for user_id in user_ids: # We add profiles for all users even if they don't match the # include pattern, just in case we want to change it in future - logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) + logger.info( + "Handling user %d/%d", num_processed_users + 1, len(user_ids) + ) yield self._handle_local_user(user_id) num_processed_users += 1 - yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0) logger.info("Processed all users") @@ -224,24 +229,24 @@ class UserDirectoryHandler(object): if not is_in_room: return - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) users_with_profile = yield self.state.get_current_user_in_room(room_id) user_ids = set(users_with_profile) unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } + room_id, + {user_id: users_with_profile[user_id] for user_id in unhandled_users}, ) self.initially_handled_users |= unhandled_users if is_public: yield self.store.add_users_to_public_room( - room_id, - user_ids=user_ids - self.initially_handled_users_in_public + room_id, user_ids=user_ids - self.initially_handled_users_in_public ) self.initially_handled_users_in_public |= user_ids @@ -253,7 +258,7 @@ class UserDirectoryHandler(object): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) if not self.is_mine_id(user_id): count += 1 @@ -268,7 +273,7 @@ class UserDirectoryHandler(object): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) count += 1 user_set = (user_id, other_user_id) @@ -290,25 +295,23 @@ class UserDirectoryHandler(object): if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert, + room_id, not is_public, to_insert ) to_insert.clear() if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update, + room_id, not is_public, to_update ) to_update.clear() if to_insert: - yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert, - ) + yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) to_insert.clear() if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update, + room_id, not is_public, to_update ) to_update.clear() @@ -329,11 +332,12 @@ class UserDirectoryHandler(object): # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): yield self._handle_room_publicity_change( - room_id, prev_event_id, event_id, typ, + room_id, prev_event_id, event_id, typ ) elif typ == EventTypes.Member: change = yield self._get_key_change( - prev_event_id, event_id, + prev_event_id, + event_id, key_name="membership", public_value=Membership.JOIN, ) @@ -342,14 +346,16 @@ class UserDirectoryHandler(object): # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room is_in_room = yield self.store.is_host_joined( - room_id, self.server_name, + room_id, self.server_name ) if not is_in_room: logger.info("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not - user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + user_ids = yield self.store.get_users_in_dir_due_to_room( + room_id + ) for user_id in user_ids: yield self._handle_remove_user(room_id, user_id) return @@ -361,7 +367,7 @@ class UserDirectoryHandler(object): if change is None: # Handle any profile changes yield self._handle_profile_change( - state_key, room_id, prev_event_id, event_id, + state_key, room_id, prev_event_id, event_id ) continue @@ -393,13 +399,15 @@ class UserDirectoryHandler(object): if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( - prev_event_id, event_id, + prev_event_id, + event_id, key_name="history_visibility", public_value="world_readable", ) elif typ == EventTypes.JoinRules: change = yield self._get_key_change( - prev_event_id, event_id, + prev_event_id, + event_id, key_name="join_rule", public_value=JoinRules.PUBLIC, ) @@ -524,7 +532,7 @@ class UserDirectoryHandler(object): ) if self.is_mine_id(other_user_id) and not is_appservice: shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id, + other_user_id, user_id ) if shared_is_private is True: # We've already marked in the database they share a private room @@ -539,13 +547,11 @@ class UserDirectoryHandler(object): to_insert.add((other_user_id, user_id)) if to_insert: - yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert, - ) + yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update, + room_id, not is_public, to_update ) @defer.inlineCallbacks @@ -564,15 +570,15 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if (update_user_in_public or update_user_dir): + if update_user_in_public or update_user_dir: # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: - if (not update_user_in_public and not update_user_dir): + if not update_user_in_public and not update_user_dir: break is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name, + j_room_id, self.server_name ) if not is_in_room: @@ -600,19 +606,19 @@ class UserDirectoryHandler(object): # Get a list of user tuples that were in the DB due to this room and # users (this includes tuples where the other user matches `user_id`) user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id, + user_id, room_id ) for user_id, other_user_id in user_tuples: # For each user tuple get a list of rooms that they still share, # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + rooms = yield self.store.get_rooms_in_common_for_users( + user_id, other_user_id + ) # If they dont share a room anymore, remove the mapping if not rooms: - yield self.store.remove_user_who_share_room( - user_id, other_user_id, - ) + yield self.store.remove_user_who_share_room(user_id, other_user_id) continue found_public_share = None @@ -626,13 +632,13 @@ class UserDirectoryHandler(object): else: found_public_share = None yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)], + room_id, not is_public, [(user_id, other_user_id)] ) break if found_public_share: yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)], + room_id, not is_public, [(user_id, other_user_id)] ) @defer.inlineCallbacks @@ -660,7 +666,7 @@ class UserDirectoryHandler(object): if prev_name != new_name or prev_avatar != new_avatar: yield self.store.update_profile_in_user_dir( - user_id, new_name, new_avatar, room_id, + user_id, new_name, new_avatar, room_id ) @defer.inlineCallbacks |