diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 206 |
1 files changed, 158 insertions, 48 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 23b779ad7c..bda8eb5f3f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,13 +16,13 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import RoomError, SynapseError +from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID, RoomStreamToken +from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler @@ -71,7 +71,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False, as_client_event=True): + as_client_event=True): """Get messages in a room. Args: @@ -79,26 +79,52 @@ class MessageHandler(BaseHandler): room_id (str): The room they want messages from. pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages as_client_event (bool): True to get events in client-server format. Returns: dict: Pagination API results """ - yield self.auth.check_joined_room(room_id, user_id) + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) data_source = self.hs.get_event_sources().sources["room"] - if not pagin_config.from_token: + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: pagin_config.from_token = ( yield self.hs.get_event_sources().get_current_token( direction='b' ) ) + room_token = pagin_config.from_token.room_key - room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + room_token = RoomStreamToken.parse(room_token) if room_token.topological is None: raise SynapseError(400, "Invalid token") + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + if member_event.membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room + leave_token = yield self.store.get_topological_token_for_event( + member_event.event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < room_token.topological: + source_config.from_key = str(leave_token) + + if source_config.direction == "f": + if source_config.to_key is None: + source_config.to_key = str(leave_token) + else: + to_token = RoomStreamToken.parse(source_config.to_key) + if leave_token.topological < to_token.topological: + source_config.to_key = str(leave_token) + yield self.hs.get_handlers().federation_handler.maybe_backfill( room_id, room_token.topological ) @@ -106,7 +132,7 @@ class MessageHandler(BaseHandler): user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( - user, pagin_config.get_source_config("room"), room_id + user, source_config, room_id ) next_token = pagin_config.from_token.copy_and_replace( @@ -255,29 +281,26 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - have_joined = yield self.auth.check_joined_room(room_id, user_id) - if not have_joined: - raise RoomError(403, "User not in room.") + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - defer.returnValue(data) - - @defer.inlineCallbacks - def get_feedback(self, event_id): - # yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the feedback from the db - fb = yield self.store.get_feedback(event_id) + if member_event.membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif member_event.membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + room_id, [member_event.event_id], [key] + ) + data = room_state[member_event.event_id].get(key) - if fb: - defer.returnValue(fb) - defer.returnValue(None) + defer.returnValue(data) @defer.inlineCallbacks def get_state_events(self, user_id, room_id): - """Retrieve all state events for a given room. + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. Args: user_id(str): The user requesting state events. @@ -285,18 +308,23 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - yield self.auth.check_joined_room(room_id, user_id) + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + + if member_event.membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif member_event.membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + room_id, [member_event.event_id], None + ) + room_state = room_state[member_event.event_id] - # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.state_handler.get_current_state(room_id) now = self.clock.time_msec() defer.returnValue( - [serialize_event(c, now) for c in current_state.values()] + [serialize_event(c, now) for c in room_state.values()] ) @defer.inlineCallbacks - def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False, as_client_event=True): + def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -306,7 +334,6 @@ class MessageHandler(BaseHandler): user_id (str): The ID of the user making the request. pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. - feedback (bool): True to get feedback along with these messages. as_client_event (bool): True to get events in client-server format. Returns: A list of dicts with "room_id" and "membership" keys for all rooms @@ -316,7 +343,9 @@ class MessageHandler(BaseHandler): """ room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=user_id, - membership_list=[Membership.INVITE, Membership.JOIN] + membership_list=[ + Membership.INVITE, Membership.JOIN, Membership.LEAVE + ] ) user = UserID.from_string(user_id) @@ -358,19 +387,32 @@ class MessageHandler(BaseHandler): rooms_ret.append(d) - if event.membership != Membership.JOIN: + if event.membership not in (Membership.JOIN, Membership.LEAVE): return + try: + if event.membership == Membership.JOIN: + room_end_token = now_token.room_key + deferred_room_state = self.state_handler.get_current_state( + event.room_id + ) + elif event.membership == Membership.LEAVE: + room_end_token = "s%d" % (event.stream_ordering,) + deferred_room_state = self.store.get_state_for_events( + event.room_id, [event.event_id], None + ) + deferred_room_state.addCallback( + lambda states: states[event.event_id] + ) + (messages, token), current_state = yield defer.gatherResults( [ self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id + end_token=room_end_token, ), + deferred_room_state, ] ).addErrback(unwrapFirstError) @@ -417,15 +459,85 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None, - feedback=False): - current_state = yield self.state.get_current_state( - room_id=room_id, + def room_initial_sync(self, user_id, room_id, pagin_config=None): + """Capture the a snapshot of a room. If user is currently a member of + the room this will be what is currently in the room. If the user left + the room this will be what was in the room when they left. + + Args: + user_id(str): The user to get a snapshot for. + room_id(str): The room to get a snapshot of. + pagin_config(synapse.streams.config.PaginationConfig): + The pagination config used to determine how many messages to + return. + Raises: + AuthError if the user wasn't in the room. + Returns: + A JSON serialisable dict with the snapshot of the room. + """ + + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + + if member_event.membership == Membership.JOIN: + result = yield self._room_initial_sync_joined( + user_id, room_id, pagin_config, member_event + ) + elif member_event.membership == Membership.LEAVE: + result = yield self._room_initial_sync_parted( + user_id, room_id, pagin_config, member_event + ) + defer.returnValue(result) + + @defer.inlineCallbacks + def _room_initial_sync_parted(self, user_id, room_id, pagin_config, + member_event): + room_state = yield self.store.get_state_for_events( + member_event.room_id, [member_event.event_id], None + ) + + room_state = room_state[member_event.event_id] + + limit = pagin_config.limit if pagin_config else None + if limit is None: + limit = 10 + + stream_token = yield self.store.get_stream_token_for_event( + member_event.event_id + ) + + messages, token = yield self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=stream_token + ) + + messages = yield self._filter_events_for_client( + user_id, room_id, messages ) - yield self.auth.check_joined_room( - room_id, user_id, - current_state=current_state + start_token = StreamToken(token[0], 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0) + + time_now = self.clock.time_msec() + + defer.returnValue({ + "membership": member_event.membership, + "room_id": room_id, + "messages": { + "chunk": [serialize_event(m, time_now) for m in messages], + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": [serialize_event(s, time_now) for s in room_state.values()], + "presence": [], + "receipts": [], + }) + + @defer.inlineCallbacks + def _room_initial_sync_joined(self, user_id, room_id, pagin_config, + member_event): + current_state = yield self.state.get_current_state( + room_id=room_id, ) # TODO(paul): I wish I was called with user objects not user_id @@ -439,8 +551,6 @@ class MessageHandler(BaseHandler): for x in current_state.values() ] - member_event = current_state.get((EventTypes.Member, user_id,)) - now_token = yield self.hs.get_event_sources().get_current_token() limit = pagin_config.limit if pagin_config else None |