diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 49 | ||||
-rw-r--r-- | synapse/handlers/message.py | 206 | ||||
-rw-r--r-- | synapse/handlers/room.py | 76 |
3 files changed, 169 insertions, 162 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 17f4ddd325..3aff80bf59 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1511,52 +1511,3 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) - - @defer.inlineCallbacks - def _handle_auth_events(self, origin, auth_events): - auth_ids_to_deferred = {} - - def process_auth_ev(ev): - auth_ids = [e_id for e_id, _ in ev.auth_events] - - prev_ds = [ - auth_ids_to_deferred[i] - for i in auth_ids - if i in auth_ids_to_deferred - ] - - d = defer.Deferred() - - auth_ids_to_deferred[ev.event_id] = d - - @defer.inlineCallbacks - def f(*_): - ev.internal_metadata.outlier = True - - try: - auth = { - (e.type, e.state_key): e for e in auth_events - if e.event_id in auth_ids - } - - yield self._handle_new_event( - origin, ev, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - ev.event_id, - ) - - d.callback(None) - - if prev_ds: - dx = defer.DeferredList(prev_ds) - dx.addBoth(f) - else: - f() - - for e in auth_events: - process_auth_ev(e) - - yield defer.DeferredList(auth_ids_to_deferred.values()) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 23b779ad7c..bda8eb5f3f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,13 +16,13 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import RoomError, SynapseError +from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID, RoomStreamToken +from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler @@ -71,7 +71,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False, as_client_event=True): + as_client_event=True): """Get messages in a room. Args: @@ -79,26 +79,52 @@ class MessageHandler(BaseHandler): room_id (str): The room they want messages from. pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages as_client_event (bool): True to get events in client-server format. Returns: dict: Pagination API results """ - yield self.auth.check_joined_room(room_id, user_id) + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) data_source = self.hs.get_event_sources().sources["room"] - if not pagin_config.from_token: + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: pagin_config.from_token = ( yield self.hs.get_event_sources().get_current_token( direction='b' ) ) + room_token = pagin_config.from_token.room_key - room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + room_token = RoomStreamToken.parse(room_token) if room_token.topological is None: raise SynapseError(400, "Invalid token") + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + if member_event.membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room + leave_token = yield self.store.get_topological_token_for_event( + member_event.event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < room_token.topological: + source_config.from_key = str(leave_token) + + if source_config.direction == "f": + if source_config.to_key is None: + source_config.to_key = str(leave_token) + else: + to_token = RoomStreamToken.parse(source_config.to_key) + if leave_token.topological < to_token.topological: + source_config.to_key = str(leave_token) + yield self.hs.get_handlers().federation_handler.maybe_backfill( room_id, room_token.topological ) @@ -106,7 +132,7 @@ class MessageHandler(BaseHandler): user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( - user, pagin_config.get_source_config("room"), room_id + user, source_config, room_id ) next_token = pagin_config.from_token.copy_and_replace( @@ -255,29 +281,26 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - have_joined = yield self.auth.check_joined_room(room_id, user_id) - if not have_joined: - raise RoomError(403, "User not in room.") + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - defer.returnValue(data) - - @defer.inlineCallbacks - def get_feedback(self, event_id): - # yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the feedback from the db - fb = yield self.store.get_feedback(event_id) + if member_event.membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif member_event.membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + room_id, [member_event.event_id], [key] + ) + data = room_state[member_event.event_id].get(key) - if fb: - defer.returnValue(fb) - defer.returnValue(None) + defer.returnValue(data) @defer.inlineCallbacks def get_state_events(self, user_id, room_id): - """Retrieve all state events for a given room. + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. Args: user_id(str): The user requesting state events. @@ -285,18 +308,23 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - yield self.auth.check_joined_room(room_id, user_id) + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + + if member_event.membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif member_event.membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + room_id, [member_event.event_id], None + ) + room_state = room_state[member_event.event_id] - # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.state_handler.get_current_state(room_id) now = self.clock.time_msec() defer.returnValue( - [serialize_event(c, now) for c in current_state.values()] + [serialize_event(c, now) for c in room_state.values()] ) @defer.inlineCallbacks - def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False, as_client_event=True): + def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -306,7 +334,6 @@ class MessageHandler(BaseHandler): user_id (str): The ID of the user making the request. pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. - feedback (bool): True to get feedback along with these messages. as_client_event (bool): True to get events in client-server format. Returns: A list of dicts with "room_id" and "membership" keys for all rooms @@ -316,7 +343,9 @@ class MessageHandler(BaseHandler): """ room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=user_id, - membership_list=[Membership.INVITE, Membership.JOIN] + membership_list=[ + Membership.INVITE, Membership.JOIN, Membership.LEAVE + ] ) user = UserID.from_string(user_id) @@ -358,19 +387,32 @@ class MessageHandler(BaseHandler): rooms_ret.append(d) - if event.membership != Membership.JOIN: + if event.membership not in (Membership.JOIN, Membership.LEAVE): return + try: + if event.membership == Membership.JOIN: + room_end_token = now_token.room_key + deferred_room_state = self.state_handler.get_current_state( + event.room_id + ) + elif event.membership == Membership.LEAVE: + room_end_token = "s%d" % (event.stream_ordering,) + deferred_room_state = self.store.get_state_for_events( + event.room_id, [event.event_id], None + ) + deferred_room_state.addCallback( + lambda states: states[event.event_id] + ) + (messages, token), current_state = yield defer.gatherResults( [ self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id + end_token=room_end_token, ), + deferred_room_state, ] ).addErrback(unwrapFirstError) @@ -417,15 +459,85 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None, - feedback=False): - current_state = yield self.state.get_current_state( - room_id=room_id, + def room_initial_sync(self, user_id, room_id, pagin_config=None): + """Capture the a snapshot of a room. If user is currently a member of + the room this will be what is currently in the room. If the user left + the room this will be what was in the room when they left. + + Args: + user_id(str): The user to get a snapshot for. + room_id(str): The room to get a snapshot of. + pagin_config(synapse.streams.config.PaginationConfig): + The pagination config used to determine how many messages to + return. + Raises: + AuthError if the user wasn't in the room. + Returns: + A JSON serialisable dict with the snapshot of the room. + """ + + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + + if member_event.membership == Membership.JOIN: + result = yield self._room_initial_sync_joined( + user_id, room_id, pagin_config, member_event + ) + elif member_event.membership == Membership.LEAVE: + result = yield self._room_initial_sync_parted( + user_id, room_id, pagin_config, member_event + ) + defer.returnValue(result) + + @defer.inlineCallbacks + def _room_initial_sync_parted(self, user_id, room_id, pagin_config, + member_event): + room_state = yield self.store.get_state_for_events( + member_event.room_id, [member_event.event_id], None + ) + + room_state = room_state[member_event.event_id] + + limit = pagin_config.limit if pagin_config else None + if limit is None: + limit = 10 + + stream_token = yield self.store.get_stream_token_for_event( + member_event.event_id + ) + + messages, token = yield self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=stream_token + ) + + messages = yield self._filter_events_for_client( + user_id, room_id, messages ) - yield self.auth.check_joined_room( - room_id, user_id, - current_state=current_state + start_token = StreamToken(token[0], 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0) + + time_now = self.clock.time_msec() + + defer.returnValue({ + "membership": member_event.membership, + "room_id": room_id, + "messages": { + "chunk": [serialize_event(m, time_now) for m in messages], + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": [serialize_event(s, time_now) for s in room_state.values()], + "presence": [], + "receipts": [], + }) + + @defer.inlineCallbacks + def _room_initial_sync_joined(self, user_id, room_id, pagin_config, + member_event): + current_state = yield self.state.get_current_state( + room_id=room_id, ) # TODO(paul): I wish I was called with user objects not user_id @@ -439,8 +551,6 @@ class MessageHandler(BaseHandler): for x in current_state.values() ] - member_event = current_state.get((EventTypes.Member, user_id,)) - now_token = yield self.hs.get_event_sources().get_current_token() limit = pagin_config.limit if pagin_config else None diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c5d1001b50..b677955523 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,7 +25,6 @@ from synapse.api.constants import ( from synapse.api.errors import StoreError, SynapseError from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor -from synapse.events.utils import serialize_event from collections import OrderedDict import logging @@ -39,7 +38,7 @@ class RoomCreationHandler(BaseHandler): PRESETS_DICT = { RoomCreationPreset.PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, - "history_visibility": "invited", + "history_visibility": "shared", "original_invitees_have_ops": False, }, RoomCreationPreset.PUBLIC_CHAT: { @@ -156,6 +155,7 @@ class RoomCreationHandler(BaseHandler): preset_config=preset_config, invite_list=invite_list, initial_state=initial_state, + room_alias=room_alias, ) msg_handler = self.hs.get_handlers().message_handler @@ -203,7 +203,7 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) def _create_events_for_new_room(self, creator, room_id, preset_config, - invite_list, initial_state): + invite_list, initial_state, room_alias): config = RoomCreationHandler.PRESETS_DICT[preset_config] creator_id = creator.to_string() @@ -272,6 +272,14 @@ class RoomCreationHandler(BaseHandler): returned_events.append(power_levels_event) + if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state: + room_alias_event = create( + etype=EventTypes.CanonicalAlias, + content={"alias": room_alias.to_string()}, + ) + + returned_events.append(room_alias_event) + if (EventTypes.JoinRules, '') not in initial_state: join_rules_event = create( etype=EventTypes.JoinRules, @@ -343,41 +351,6 @@ class RoomMemberHandler(BaseHandler): remotedomains.add(member.domain) @defer.inlineCallbacks - def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None, - limit=0, start_tok=None, - end_tok=None): - """Retrieve a list of room members in the room. - - Args: - room_id (str): The room to get the member list for. - user_id (str): The ID of the user making the request. - limit (int): The max number of members to return. - start_tok (str): Optional. The start token if known. - end_tok (str): Optional. The end token if known. - Returns: - dict: A Pagination streamable dict. - Raises: - SynapseError if something goes wrong. - """ - yield self.auth.check_joined_room(room_id, user_id) - - member_list = yield self.store.get_room_members(room_id=room_id) - time_now = self.clock.time_msec() - event_list = [ - serialize_event(entry, time_now) - for entry in member_list - ] - chunk_data = { - "start": "START", # FIXME (erikj): START is no longer valid - "end": "END", - "chunk": event_list - } - # TODO honor Pagination stream params - # TODO snapshot this list to return on subsequent requests when - # paginating - defer.returnValue(chunk_data) - - @defer.inlineCallbacks def change_membership(self, event, context, do_auth=True): """ Change the membership status of a user in a room. @@ -529,32 +502,6 @@ class RoomMemberHandler(BaseHandler): ) @defer.inlineCallbacks - def _should_invite_join(self, room_id, prev_state, do_auth): - logger.debug("_should_invite_join: room_id: %s", room_id) - - # XXX: We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - - # Only do an invite join dance if a) we were invited, - # b) the person inviting was from a differnt HS and c) we are - # not currently in the room - room_host = None - if prev_state and prev_state.membership == Membership.INVITE: - room = yield self.store.get_room(room_id) - inviter = UserID.from_string( - prev_state.sender - ) - - is_remote_invite_join = not self.hs.is_mine(inviter) and not room - room_host = inviter.domain - else: - is_remote_invite_join = False - - defer.returnValue((is_remote_invite_join, room_host)) - - @defer.inlineCallbacks def get_joined_rooms_for_user(self, user): """Returns a list of roomids that the user has any of the given membership states in.""" @@ -646,7 +593,6 @@ class RoomEventSource(object): to_key=config.to_key, direction=config.direction, limit=config.limit, - with_feedback=True ) defer.returnValue((events, next_key)) |