diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 363 |
1 files changed, 248 insertions, 115 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f12465fa2c..14051aee99 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,13 +16,13 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import RoomError, SynapseError +from synapse.api.errors import SynapseError, AuthError, Codes from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID, RoomStreamToken +from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler @@ -71,34 +71,64 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False, as_client_event=True): + as_client_event=True, is_guest=False): """Get messages in a room. Args: user_id (str): The user requesting messages. room_id (str): The room they want messages from. pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages + config rules to apply, if any. as_client_event (bool): True to get events in client-server format. + is_guest (bool): Whether the requesting user is a guest (as opposed + to a fully registered user). Returns: dict: Pagination API results """ - yield self.auth.check_joined_room(room_id, user_id) - data_source = self.hs.get_event_sources().sources["room"] - if not pagin_config.from_token: + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: pagin_config.from_token = ( yield self.hs.get_event_sources().get_current_token( direction='b' ) ) + room_token = pagin_config.from_token.room_key - room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + room_token = RoomStreamToken.parse(room_token) if room_token.topological is None: raise SynapseError(400, "Invalid token") + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + if not is_guest: + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + if member_event.membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room. + # If they're a guest, we'll just 403 them if they're asking for + # events they can't see. + leave_token = yield self.store.get_topological_token_for_event( + member_event.event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < room_token.topological: + source_config.from_key = str(leave_token) + + if source_config.direction == "f": + if source_config.to_key is None: + source_config.to_key = str(leave_token) + else: + to_token = RoomStreamToken.parse(source_config.to_key) + if leave_token.topological < to_token.topological: + source_config.to_key = str(leave_token) + yield self.hs.get_handlers().federation_handler.maybe_backfill( room_id, room_token.topological ) @@ -106,7 +136,7 @@ class MessageHandler(BaseHandler): user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( - user, pagin_config.get_source_config("room"), room_id + user, source_config, room_id ) next_token = pagin_config.from_token.copy_and_replace( @@ -120,7 +150,7 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self._filter_events_for_client(user_id, room_id, events) + events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest) time_now = self.clock.time_msec() @@ -136,54 +166,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def _filter_events_for_client(self, user_id, room_id, events): - event_id_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - ) - - def allowed(event, state): - if event.type == EventTypes.RoomHistoryVisibility: - return True - - membership_ev = state.get((EventTypes.Member, user_id), None) - if membership_ev: - membership = membership_ev.membership - else: - membership = Membership.LEAVE - - if membership == Membership.JOIN: - return True - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - else: - visibility = "shared" - - if visibility == "public": - return True - elif visibility == "shared": - return True - elif visibility == "joined": - return membership == Membership.JOIN - elif visibility == "invited": - return membership == Membership.INVITE - - return True - - defer.returnValue([ - event - for event in events - if allowed(event, event_id_to_state[event.event_id]) - ]) - - @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, - client=None, txn_id=None): + token_id=None, txn_id=None, is_guest=False): """ Given a dict from a client, create and handle a new event. Creates an FrozenEvent object, filling out auth_events, prev_events, @@ -217,11 +201,8 @@ class MessageHandler(BaseHandler): builder.content ) - if client is not None: - if client.token_id is not None: - builder.internal_metadata.token_id = client.token_id - if client.device_id is not None: - builder.internal_metadata.device_id = client.device_id + if token_id is not None: + builder.internal_metadata.token_id = token_id if txn_id is not None: builder.internal_metadata.txn_id = txn_id @@ -232,7 +213,7 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.change_membership(event, context) + yield member_handler.change_membership(event, context, is_guest=is_guest) else: yield self.handle_new_client_event( event=event, @@ -248,7 +229,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key=""): + event_type=None, state_key="", is_guest=False): """ Get data from a room. Args: @@ -258,29 +239,55 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - have_joined = yield self.auth.check_joined_room(room_id, user_id) - if not have_joined: - raise RoomError(403, "User not in room.") - - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id, is_guest ) - defer.returnValue(data) - @defer.inlineCallbacks - def get_feedback(self, event_id): - # yield self.auth.check_joined_room(room_id, user_id) + if membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) - # Pull out the feedback from the db - fb = yield self.store.get_feedback(event_id) + defer.returnValue(data) - if fb: - defer.returnValue(fb) - defer.returnValue(None) + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError, auth_error: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + if not is_guest: + raise auth_error + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) @defer.inlineCallbacks - def get_state_events(self, user_id, room_id): - """Retrieve all state events for a given room. + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. Args: user_id(str): The user requesting state events. @@ -288,18 +295,26 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - yield self.auth.check_joined_room(room_id, user_id) + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id, is_guest + ) + + if membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] - # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.state_handler.get_current_state(room_id) now = self.clock.time_msec() defer.returnValue( - [serialize_event(c, now) for c in current_state.values()] + [serialize_event(c, now) for c in room_state.values()] ) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False, as_client_event=True): + as_client_event=True, include_archived=False): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -309,17 +324,20 @@ class MessageHandler(BaseHandler): user_id (str): The ID of the user making the request. pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. - feedback (bool): True to get feedback along with these messages. as_client_event (bool): True to get events in client-server format. + include_archived (bool): True to get rooms that the user has left Returns: A list of dicts with "room_id" and "membership" keys for all rooms the user is currently invited or joined in on. Rooms where the user is joined on, may return a "messages" key with messages, depending on the specified PaginationConfig. """ + memberships = [Membership.INVITE, Membership.JOIN] + if include_archived: + memberships.append(Membership.LEAVE) + room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user_id, - membership_list=[Membership.INVITE, Membership.JOIN] + user_id=user_id, membership_list=memberships ) user = UserID.from_string(user_id) @@ -339,6 +357,8 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("receipt"), None ) + tags_by_room = yield self.store.get_tags_for_user(user_id) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -357,28 +377,45 @@ class MessageHandler(BaseHandler): } if event.membership == Membership.INVITE: + time_now = self.clock.time_msec() d["inviter"] = event.sender + invite_event = yield self.store.get_event(event.event_id) + d["invite"] = serialize_event(invite_event, time_now, as_client_event) + rooms_ret.append(d) - if event.membership != Membership.JOIN: + if event.membership not in (Membership.JOIN, Membership.LEAVE): return + try: + if event.membership == Membership.JOIN: + room_end_token = now_token.room_key + deferred_room_state = self.state_handler.get_current_state( + event.room_id + ) + elif event.membership == Membership.LEAVE: + room_end_token = "s%d" % (event.stream_ordering,) + deferred_room_state = self.store.get_state_for_events( + [event.event_id], None + ) + deferred_room_state.addCallback( + lambda states: states[event.event_id] + ) + (messages, token), current_state = yield defer.gatherResults( [ self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id + end_token=room_end_token, ), + deferred_room_state, ] ).addErrback(unwrapFirstError) messages = yield self._filter_events_for_client( - user_id, event.room_id, messages + user_id, messages ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -398,6 +435,15 @@ class MessageHandler(BaseHandler): serialize_event(c, time_now, as_client_event) for c in current_state.values() ] + + private_user_data = [] + tags = tags_by_room.get(event.room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + d["private_user_data"] = private_user_data except: logger.exception("Failed to get snapshot") @@ -420,15 +466,99 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None, - feedback=False): - current_state = yield self.state.get_current_state( - room_id=room_id, + def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False): + """Capture the a snapshot of a room. If user is currently a member of + the room this will be what is currently in the room. If the user left + the room this will be what was in the room when they left. + + Args: + user_id(str): The user to get a snapshot for. + room_id(str): The room to get a snapshot of. + pagin_config(synapse.streams.config.PaginationConfig): + The pagination config used to determine how many messages to + return. + Raises: + AuthError if the user wasn't in the room. + Returns: + A JSON serialisable dict with the snapshot of the room. + """ + + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, + user_id, + is_guest ) - yield self.auth.check_joined_room( - room_id, user_id, - current_state=current_state + if membership == Membership.JOIN: + result = yield self._room_initial_sync_joined( + user_id, room_id, pagin_config, membership, is_guest + ) + elif membership == Membership.LEAVE: + result = yield self._room_initial_sync_parted( + user_id, room_id, pagin_config, membership, member_event_id, is_guest + ) + + private_user_data = [] + tags = yield self.store.get_tags_for_room(user_id, room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + result["private_user_data"] = private_user_data + + defer.returnValue(result) + + @defer.inlineCallbacks + def _room_initial_sync_parted(self, user_id, room_id, pagin_config, + membership, member_event_id, is_guest): + room_state = yield self.store.get_state_for_events( + [member_event_id], None + ) + + room_state = room_state[member_event_id] + + limit = pagin_config.limit if pagin_config else None + if limit is None: + limit = 10 + + stream_token = yield self.store.get_stream_token_for_event( + member_event_id + ) + + messages, token = yield self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=stream_token + ) + + messages = yield self._filter_events_for_client( + user_id, messages, is_guest=is_guest + ) + + start_token = StreamToken(token[0], 0, 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0, 0) + + time_now = self.clock.time_msec() + + defer.returnValue({ + "membership": membership, + "room_id": room_id, + "messages": { + "chunk": [serialize_event(m, time_now) for m in messages], + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": [serialize_event(s, time_now) for s in room_state.values()], + "presence": [], + "receipts": [], + }) + + @defer.inlineCallbacks + def _room_initial_sync_joined(self, user_id, room_id, pagin_config, + membership, is_guest): + current_state = yield self.state.get_current_state( + room_id=room_id, ) # TODO(paul): I wish I was called with user objects not user_id @@ -442,8 +572,6 @@ class MessageHandler(BaseHandler): for x in current_state.values() ] - member_event = current_state.get((EventTypes.Member, user_id,)) - now_token = yield self.hs.get_event_sources().get_current_token() limit = pagin_config.limit if pagin_config else None @@ -460,12 +588,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = {} + if not is_guest: + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) @@ -485,7 +615,7 @@ class MessageHandler(BaseHandler): ).addErrback(unwrapFirstError) messages = yield self._filter_events_for_client( - user_id, room_id, messages + user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -493,8 +623,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue({ - "membership": member_event.membership, + ret = { "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -504,4 +633,8 @@ class MessageHandler(BaseHandler): "state": state, "presence": presence, "receipts": receipts, - }) + } + if not is_guest: + ret["membership"] = membership + + defer.returnValue(ret) |