diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-02-13 12:16:01 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-02-13 12:16:01 +0000 |
commit | a9b712e9dce119c65fe24913d2a79212ae07a354 (patch) | |
tree | 2c98b0b3d7d31c59ce41db257c3a18c5b3a2df03 /synapse/handlers | |
parent | Factor out common code for search insert (diff) | |
parent | Merge pull request #2857 from matrix-org/erikj/upload_store (diff) | |
download | synapse-a9b712e9dce119c65fe24913d2a79212ae07a354.tar.xz |
Merge branch 'develop' into matthew/gin_work_mem
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/directory.py | 7 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 54 | ||||
-rw-r--r-- | synapse/handlers/message.py | 321 | ||||
-rw-r--r-- | synapse/handlers/room.py | 11 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 21 |
5 files changed, 216 insertions, 198 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index a0464ae5c0..8580ada60a 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -34,6 +34,7 @@ class DirectoryHandler(BaseHandler): self.state = hs.get_state_handler() self.appservice_handler = hs.get_application_service_handler() + self.event_creation_handler = hs.get_event_creation_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler): def send_room_alias_update_event(self, requester, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Aliases, @@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler): if not alias_event or alias_event.content.get("alias", "") != alias_str: return - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.CanonicalAlias, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 677532c87b..46bcf8b081 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -75,6 +76,7 @@ class FederationHandler(BaseHandler): self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() self.replication_layer.set_handler(self) @@ -808,13 +810,12 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") + resolve = logcontext.preserve_fn( + self.state_handler.resolve_state_groups_for_events + ) states = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.preserve_fn(self.state_handler.resolve_state_groups)( - room_id, [e] - ) - for e in event_ids - ], consumeErrors=True, + [resolve(room_id, [e]) for e in event_ids], + consumeErrors=True, )) states = dict(zip(event_ids, [s.state for s in states])) @@ -1008,8 +1009,7 @@ class FederationHandler(BaseHandler): }) try: - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) except AuthError as e: @@ -1249,8 +1249,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -1832,8 +1831,8 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) if different_auth and not event.internal_metadata.is_outlier(): @@ -1914,8 +1913,8 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) try: @@ -1924,11 +1923,15 @@ class FederationHandler(BaseHandler): logger.warn("Failed auth resolution for %r because %s", event, e) raise e - def _update_context_for_auth_events(self, context, auth_events, + @defer.inlineCallbacks + def _update_context_for_auth_events(self, event, context, auth_events, event_key): - """Update the state_ids in an event context after auth event resolution + """Update the state_ids in an event context after auth event resolution, + storing the changes as a new state group. Args: + event (Event): The event we're handling the context for + context (synapse.events.snapshot.EventContext): event context to be updated @@ -1951,7 +1954,13 @@ class FederationHandler(BaseHandler): context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.iteritems() }) - context.state_group = self.store.get_next_state_group() + context.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=context.prev_group, + delta_ids=context.delta_ids, + current_state_ids=context.current_state_ids, + ) @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): @@ -2121,8 +2130,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) @@ -2160,8 +2168,7 @@ class FederationHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -2211,8 +2218,9 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event(builder=builder) + event, context = yield self.event_creation_handler.create_new_client_event( + builder=builder, + ) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21f1717dd2..4e9752ccbd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2017 New Vector Ltd +# Copyright 2017 - 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,23 +47,11 @@ class MessageHandler(BaseHandler): self.hs = hs self.state = hs.get_state_handler() self.clock = hs.get_clock() - self.validator = EventValidator() - self.profile_handler = hs.get_profile_handler() self.pagination_lock = ReadWriteLock() - self.pusher_pool = hs.get_pusherpool() - - # We arbitrarily limit concurrent event creation for a room to 5. - # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) - - self.action_generator = hs.get_action_generator() - - self.spam_checker = hs.get_spam_checker() - @defer.inlineCallbacks - def purge_history(self, room_id, event_id): + def purge_history(self, room_id, event_id, delete_local_events=False): event = yield self.store.get_event(event_id) if event.room_id != room_id: @@ -72,7 +60,7 @@ class MessageHandler(BaseHandler): depth = event.depth with (yield self.pagination_lock.write(room_id)): - yield self.store.delete_old_state(room_id, depth) + yield self.store.purge_history(room_id, depth, delete_local_events) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, @@ -183,6 +171,162 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", is_guest=False): + """ Get data from a room. + + Args: + event : The room path event + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) + + defer.returnValue(data) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A list of dicts representing state events. [{}, {}, {}] + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] + + now = self.clock.time_msec() + defer.returnValue( + [serialize_event(c, now) for c in room_state.values()] + ) + + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + + +class EventCreationHandler(object): + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.clock = hs.get_clock() + self.validator = EventValidator() + self.profile_handler = hs.get_profile_handler() + self.event_builder_factory = hs.get_event_builder_factory() + self.server_name = hs.hostname + self.ratelimiter = hs.get_ratelimiter() + self.notifier = hs.get_notifier() + + # This is only used to get at ratelimit function, and maybe_kick_guest_users + self.base_handler = BaseHandler(hs) + + self.pusher_pool = hs.get_pusherpool() + + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + + self.action_generator = hs.get_action_generator() + + self.spam_checker = hs.get_spam_checker() + + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ @@ -234,7 +378,7 @@ class MessageHandler(BaseHandler): if txn_id is not None: builder.internal_metadata.txn_id = txn_id - event, context = yield self._create_new_client_event( + event, context = yield self.create_new_client_event( builder=builder, requester=requester, prev_event_ids=prev_event_ids, @@ -259,11 +403,6 @@ class MessageHandler(BaseHandler): "Tried to send member event through non-member codepath" ) - # We check here if we are currently being rate limited, so that we - # don't do unnecessary work. We check again just before we actually - # send the event. - yield self.ratelimit(requester, update=False) - user = UserID.from_string(event.sender) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) @@ -342,137 +481,9 @@ class MessageHandler(BaseHandler): ) defer.returnValue(event) + @measure_func("create_new_client_event") @defer.inlineCallbacks - def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key="", is_guest=False): - """ Get data from a room. - - Args: - event : The room path event - Returns: - The path data content. - Raises: - SynapseError if something went wrong. - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - elif membership == Membership.LEAVE: - key = (event_type, state_key) - room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] - ) - data = room_state[membership_event_id].get(key) - - defer.returnValue(data) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): - """Retrieve all state events for a given room. If the user is - joined to the room then return the current state. If the user has - left the room return the state events from when they left. - - Args: - user_id(str): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A list of dicts representing state events. [{}, {}, {}] - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None - ) - room_state = room_state[membership_event_id] - - now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] - ) - - @defer.inlineCallbacks - def get_joined_members(self, requester, room_id): - """Get all the joined members in the room and their profile information. - - If the user has left the room return the state events from when they left. - - Args: - requester(Requester): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A dict of user_id to profile info - """ - user_id = requester.user.to_string() - if not requester.app_service: - # We check AS auth after fetching the room membership, as it - # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - if membership != Membership.JOIN: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or becuase there - # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: - for uid in users_with_profile: - if requester.app_service.is_interested_in_user(uid): - break - else: - # Loop fell through, AS has no interested users in room - raise AuthError(403, "Appservice not in room") - - defer.returnValue({ - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in users_with_profile.iteritems() - }) - - @measure_func("_create_new_client_event") - @defer.inlineCallbacks - def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): + def create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -509,9 +520,7 @@ class MessageHandler(BaseHandler): builder.prev_events = prev_events builder.depth = depth - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) + context = yield self.state.compute_event_context(builder) if requester: context.app_service = requester.app_service @@ -551,7 +560,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - yield self.ratelimit(requester) + yield self.base_handler.ratelimit(requester) try: yield self.auth.check_from_context(event, context) @@ -567,7 +576,7 @@ class MessageHandler(BaseHandler): logger.exception("Failed to encode content: %r", event.content) raise - yield self.maybe_kick_guest_users(event, context) + yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d1cc87a016..6ab020bf41 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +65,7 @@ class RoomCreationHandler(BaseHandler): super(RoomCreationHandler, self).__init__(hs) self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): @@ -163,13 +165,11 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) - msg_handler = self.hs.get_handlers().message_handler room_member_handler = self.hs.get_handlers().room_member_handler yield self._send_events_for_new_room( requester, room_id, - msg_handler, room_member_handler, preset_config=preset_config, invite_list=invite_list, @@ -181,7 +181,7 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -194,7 +194,7 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -249,7 +249,6 @@ class RoomCreationHandler(BaseHandler): self, creator, # A Requester object. room_id, - msg_handler, room_member_handler, preset_config, invite_list, @@ -272,7 +271,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e6467cd1d..37dc5e99ab 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,6 +47,7 @@ class RoomMemberHandler(BaseHandler): super(RoomMemberHandler, self).__init__(hs) self.profile_handler = hs.get_profile_handler() + self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") @@ -66,13 +68,12 @@ class RoomMemberHandler(BaseHandler): ): if content is None: content = {} - msg_handler = self.hs.get_handlers().message_handler content["membership"] = membership if requester.is_guest: content["kind"] = "guest" - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, { "type": EventTypes.Member, @@ -90,12 +91,14 @@ class RoomMemberHandler(BaseHandler): ) # Check if this event matches the previous membership event for the user. - duplicate = yield msg_handler.deduplicate_state_event(event, context) + duplicate = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if duplicate is not None: # Discard the new event since this membership change is a no-op. defer.returnValue(duplicate) - yield msg_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -394,8 +397,9 @@ class RoomMemberHandler(BaseHandler): else: requester = synapse.types.create_requester(target_user) - message_handler = self.hs.get_handlers().message_handler - prev_event = yield message_handler.deduplicate_state_event(event, context) + prev_event = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if prev_event is not None: return @@ -412,7 +416,7 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield message_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -644,8 +648,7 @@ class RoomMemberHandler(BaseHandler): ) ) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_hander.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, |