summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-11-17 15:45:43 +0000
committerErik Johnston <erik@matrix.org>2015-11-17 15:45:43 +0000
commitd3861b44424aa6f03cc65719bb1527330157abea (patch)
tree4377eb0dc5e221862489bdcc802e50e2f1f41cb1 /synapse/handlers/message.py
parentMerge branch 'hotfixes-v0.10.0-r2' of github.com:matrix-org/synapse (diff)
parentSlightly more aggressive retry timers at HTTP level (diff)
downloadsynapse-d3861b44424aa6f03cc65719bb1527330157abea.tar.xz
Merge branch 'release-v0.11.0' of github.com:matrix-org/synapse v0.11.0
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py363
1 files changed, 248 insertions, 115 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f12465fa2c..14051aee99 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,13 +16,13 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError, SynapseError
+from synapse.api.errors import SynapseError, AuthError, Codes
 from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID, RoomStreamToken
+from synapse.types import UserID, RoomStreamToken, StreamToken
 
 from ._base import BaseHandler
 
@@ -71,34 +71,64 @@ class MessageHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-                     feedback=False, as_client_event=True):
+                     as_client_event=True, is_guest=False):
         """Get messages in a room.
 
         Args:
             user_id (str): The user requesting messages.
             room_id (str): The room they want messages from.
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
-            config rules to apply, if any.
-            feedback (bool): True to get compressed feedback with the messages
+                config rules to apply, if any.
             as_client_event (bool): True to get events in client-server format.
+            is_guest (bool): Whether the requesting user is a guest (as opposed
+                to a fully registered user).
         Returns:
             dict: Pagination API results
         """
-        yield self.auth.check_joined_room(room_id, user_id)
-
         data_source = self.hs.get_event_sources().sources["room"]
 
-        if not pagin_config.from_token:
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
             pagin_config.from_token = (
                 yield self.hs.get_event_sources().get_current_token(
                     direction='b'
                 )
             )
+            room_token = pagin_config.from_token.room_key
 
-        room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+        room_token = RoomStreamToken.parse(room_token)
         if room_token.topological is None:
             raise SynapseError(400, "Invalid token")
 
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        if not is_guest:
+            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+            if member_event.membership == Membership.LEAVE:
+                # If they have left the room then clamp the token to be before
+                # they left the room.
+                # If they're a guest, we'll just 403 them if they're asking for
+                # events they can't see.
+                leave_token = yield self.store.get_topological_token_for_event(
+                    member_event.event_id
+                )
+                leave_token = RoomStreamToken.parse(leave_token)
+                if leave_token.topological < room_token.topological:
+                    source_config.from_key = str(leave_token)
+
+                if source_config.direction == "f":
+                    if source_config.to_key is None:
+                        source_config.to_key = str(leave_token)
+                    else:
+                        to_token = RoomStreamToken.parse(source_config.to_key)
+                        if leave_token.topological < to_token.topological:
+                            source_config.to_key = str(leave_token)
+
         yield self.hs.get_handlers().federation_handler.maybe_backfill(
             room_id, room_token.topological
         )
@@ -106,7 +136,7 @@ class MessageHandler(BaseHandler):
         user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(
-            user, pagin_config.get_source_config("room"), room_id
+            user, source_config, room_id
         )
 
         next_token = pagin_config.from_token.copy_and_replace(
@@ -120,7 +150,7 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(user_id, room_id, events)
+        events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest)
 
         time_now = self.clock.time_msec()
 
@@ -136,54 +166,8 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, room_id, events):
-        event_id_to_state = yield self.store.get_state_for_events(
-            room_id, frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_id),
-            )
-        )
-
-        def allowed(event, state):
-            if event.type == EventTypes.RoomHistoryVisibility:
-                return True
-
-            membership_ev = state.get((EventTypes.Member, user_id), None)
-            if membership_ev:
-                membership = membership_ev.membership
-            else:
-                membership = Membership.LEAVE
-
-            if membership == Membership.JOIN:
-                return True
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility == "public":
-                return True
-            elif visibility == "shared":
-                return True
-            elif visibility == "joined":
-                return membership == Membership.JOIN
-            elif visibility == "invited":
-                return membership == Membership.INVITE
-
-            return True
-
-        defer.returnValue([
-            event
-            for event in events
-            if allowed(event, event_id_to_state[event.event_id])
-        ])
-
-    @defer.inlineCallbacks
     def create_and_send_event(self, event_dict, ratelimit=True,
-                              client=None, txn_id=None):
+                              token_id=None, txn_id=None, is_guest=False):
         """ Given a dict from a client, create and handle a new event.
 
         Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -217,11 +201,8 @@ class MessageHandler(BaseHandler):
                     builder.content
                 )
 
-        if client is not None:
-            if client.token_id is not None:
-                builder.internal_metadata.token_id = client.token_id
-            if client.device_id is not None:
-                builder.internal_metadata.device_id = client.device_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
@@ -232,7 +213,7 @@ class MessageHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.change_membership(event, context)
+            yield member_handler.change_membership(event, context, is_guest=is_guest)
         else:
             yield self.handle_new_client_event(
                 event=event,
@@ -248,7 +229,7 @@ class MessageHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
-                      event_type=None, state_key=""):
+                      event_type=None, state_key="", is_guest=False):
         """ Get data from a room.
 
         Args:
@@ -258,29 +239,55 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        have_joined = yield self.auth.check_joined_room(room_id, user_id)
-        if not have_joined:
-            raise RoomError(403, "User not in room.")
-
-        data = yield self.state_handler.get_current_state(
-            room_id, event_type, state_key
+        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+            room_id, user_id, is_guest
         )
-        defer.returnValue(data)
 
-    @defer.inlineCallbacks
-    def get_feedback(self, event_id):
-        # yield self.auth.check_joined_room(room_id, user_id)
+        if membership == Membership.JOIN:
+            data = yield self.state_handler.get_current_state(
+                room_id, event_type, state_key
+            )
+        elif membership == Membership.LEAVE:
+            key = (event_type, state_key)
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], [key]
+            )
+            data = room_state[membership_event_id].get(key)
 
-        # Pull out the feedback from the db
-        fb = yield self.store.get_feedback(event_id)
+        defer.returnValue(data)
 
-        if fb:
-            defer.returnValue(fb)
-        defer.returnValue(None)
+    @defer.inlineCallbacks
+    def _check_in_room_or_world_readable(self, room_id, user_id, is_guest):
+        try:
+            # check_user_was_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+            defer.returnValue((member_event.membership, member_event.event_id))
+            return
+        except AuthError, auth_error:
+            visibility = yield self.state_handler.get_current_state(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility and
+                visibility.content["history_visibility"] == "world_readable"
+            ):
+                defer.returnValue((Membership.JOIN, None))
+                return
+            if not is_guest:
+                raise auth_error
+            raise AuthError(
+                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+            )
 
     @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id):
-        """Retrieve all state events for a given room.
+    def get_state_events(self, user_id, room_id, is_guest=False):
+        """Retrieve all state events for a given room. If the user is
+        joined to the room then return the current state. If the user has
+        left the room return the state events from when they left.
 
         Args:
             user_id(str): The user requesting state events.
@@ -288,18 +295,26 @@ class MessageHandler(BaseHandler):
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         """
-        yield self.auth.check_joined_room(room_id, user_id)
+        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+            room_id, user_id, is_guest
+        )
+
+        if membership == Membership.JOIN:
+            room_state = yield self.state_handler.get_current_state(room_id)
+        elif membership == Membership.LEAVE:
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], None
+            )
+            room_state = room_state[membership_event_id]
 
-        # TODO: This is duplicating logic from snapshot_all_rooms
-        current_state = yield self.state_handler.get_current_state(room_id)
         now = self.clock.time_msec()
         defer.returnValue(
-            [serialize_event(c, now) for c in current_state.values()]
+            [serialize_event(c, now) for c in room_state.values()]
         )
 
     @defer.inlineCallbacks
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
-                           feedback=False, as_client_event=True):
+                           as_client_event=True, include_archived=False):
         """Retrieve a snapshot of all rooms the user is invited or has joined.
 
         This snapshot may include messages for all rooms where the user is
@@ -309,17 +324,20 @@ class MessageHandler(BaseHandler):
             user_id (str): The ID of the user making the request.
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
             config used to determine how many messages *PER ROOM* to return.
-            feedback (bool): True to get feedback along with these messages.
             as_client_event (bool): True to get events in client-server format.
+            include_archived (bool): True to get rooms that the user has left
         Returns:
             A list of dicts with "room_id" and "membership" keys for all rooms
             the user is currently invited or joined in on. Rooms where the user
             is joined on, may return a "messages" key with messages, depending
             on the specified PaginationConfig.
         """
+        memberships = [Membership.INVITE, Membership.JOIN]
+        if include_archived:
+            memberships.append(Membership.LEAVE)
+
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user_id,
-            membership_list=[Membership.INVITE, Membership.JOIN]
+            user_id=user_id, membership_list=memberships
         )
 
         user = UserID.from_string(user_id)
@@ -339,6 +357,8 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("receipt"), None
         )
 
+        tags_by_room = yield self.store.get_tags_for_user(user_id)
+
         public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
@@ -357,28 +377,45 @@ class MessageHandler(BaseHandler):
             }
 
             if event.membership == Membership.INVITE:
+                time_now = self.clock.time_msec()
                 d["inviter"] = event.sender
 
+                invite_event = yield self.store.get_event(event.event_id)
+                d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+
             rooms_ret.append(d)
 
-            if event.membership != Membership.JOIN:
+            if event.membership not in (Membership.JOIN, Membership.LEAVE):
                 return
+
             try:
+                if event.membership == Membership.JOIN:
+                    room_end_token = now_token.room_key
+                    deferred_room_state = self.state_handler.get_current_state(
+                        event.room_id
+                    )
+                elif event.membership == Membership.LEAVE:
+                    room_end_token = "s%d" % (event.stream_ordering,)
+                    deferred_room_state = self.store.get_state_for_events(
+                        [event.event_id], None
+                    )
+                    deferred_room_state.addCallback(
+                        lambda states: states[event.event_id]
+                    )
+
                 (messages, token), current_state = yield defer.gatherResults(
                     [
                         self.store.get_recent_events_for_room(
                             event.room_id,
                             limit=limit,
-                            end_token=now_token.room_key,
-                        ),
-                        self.state_handler.get_current_state(
-                            event.room_id
+                            end_token=room_end_token,
                         ),
+                        deferred_room_state,
                     ]
                 ).addErrback(unwrapFirstError)
 
                 messages = yield self._filter_events_for_client(
-                    user_id, event.room_id, messages
+                    user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -398,6 +435,15 @@ class MessageHandler(BaseHandler):
                     serialize_event(c, time_now, as_client_event)
                     for c in current_state.values()
                 ]
+
+                private_user_data = []
+                tags = tags_by_room.get(event.room_id)
+                if tags:
+                    private_user_data.append({
+                        "type": "m.tag",
+                        "content": {"tags": tags},
+                    })
+                d["private_user_data"] = private_user_data
             except:
                 logger.exception("Failed to get snapshot")
 
@@ -420,15 +466,99 @@ class MessageHandler(BaseHandler):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def room_initial_sync(self, user_id, room_id, pagin_config=None,
-                          feedback=False):
-        current_state = yield self.state.get_current_state(
-            room_id=room_id,
+    def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False):
+        """Capture the a snapshot of a room. If user is currently a member of
+        the room this will be what is currently in the room. If the user left
+        the room this will be what was in the room when they left.
+
+        Args:
+            user_id(str): The user to get a snapshot for.
+            room_id(str): The room to get a snapshot of.
+            pagin_config(synapse.streams.config.PaginationConfig):
+                The pagination config used to determine how many messages to
+                return.
+        Raises:
+            AuthError if the user wasn't in the room.
+        Returns:
+            A JSON serialisable dict with the snapshot of the room.
+        """
+
+        membership, member_event_id = yield self._check_in_room_or_world_readable(
+            room_id,
+            user_id,
+            is_guest
         )
 
-        yield self.auth.check_joined_room(
-            room_id, user_id,
-            current_state=current_state
+        if membership == Membership.JOIN:
+            result = yield self._room_initial_sync_joined(
+                user_id, room_id, pagin_config, membership, is_guest
+            )
+        elif membership == Membership.LEAVE:
+            result = yield self._room_initial_sync_parted(
+                user_id, room_id, pagin_config, membership, member_event_id, is_guest
+            )
+
+        private_user_data = []
+        tags = yield self.store.get_tags_for_room(user_id, room_id)
+        if tags:
+            private_user_data.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+        result["private_user_data"] = private_user_data
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
+                                  membership, member_event_id, is_guest):
+        room_state = yield self.store.get_state_for_events(
+            [member_event_id], None
+        )
+
+        room_state = room_state[member_event_id]
+
+        limit = pagin_config.limit if pagin_config else None
+        if limit is None:
+            limit = 10
+
+        stream_token = yield self.store.get_stream_token_for_event(
+            member_event_id
+        )
+
+        messages, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=limit,
+            end_token=stream_token
+        )
+
+        messages = yield self._filter_events_for_client(
+            user_id, messages, is_guest=is_guest
+        )
+
+        start_token = StreamToken(token[0], 0, 0, 0, 0)
+        end_token = StreamToken(token[1], 0, 0, 0, 0)
+
+        time_now = self.clock.time_msec()
+
+        defer.returnValue({
+            "membership": membership,
+            "room_id": room_id,
+            "messages": {
+                "chunk": [serialize_event(m, time_now) for m in messages],
+                "start": start_token.to_string(),
+                "end": end_token.to_string(),
+            },
+            "state": [serialize_event(s, time_now) for s in room_state.values()],
+            "presence": [],
+            "receipts": [],
+        })
+
+    @defer.inlineCallbacks
+    def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
+                                  membership, is_guest):
+        current_state = yield self.state.get_current_state(
+            room_id=room_id,
         )
 
         # TODO(paul): I wish I was called with user objects not user_id
@@ -442,8 +572,6 @@ class MessageHandler(BaseHandler):
             for x in current_state.values()
         ]
 
-        member_event = current_state.get((EventTypes.Member, user_id,))
-
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         limit = pagin_config.limit if pagin_config else None
@@ -460,12 +588,14 @@ class MessageHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_presence():
-            states = yield presence_handler.get_states(
-                target_users=[UserID.from_string(m.user_id) for m in room_members],
-                auth_user=auth_user,
-                as_event=True,
-                check_auth=False,
-            )
+            states = {}
+            if not is_guest:
+                states = yield presence_handler.get_states(
+                    target_users=[UserID.from_string(m.user_id) for m in room_members],
+                    auth_user=auth_user,
+                    as_event=True,
+                    check_auth=False,
+                )
 
             defer.returnValue(states.values())
 
@@ -485,7 +615,7 @@ class MessageHandler(BaseHandler):
         ).addErrback(unwrapFirstError)
 
         messages = yield self._filter_events_for_client(
-            user_id, room_id, messages
+            user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -493,8 +623,7 @@ class MessageHandler(BaseHandler):
 
         time_now = self.clock.time_msec()
 
-        defer.returnValue({
-            "membership": member_event.membership,
+        ret = {
             "room_id": room_id,
             "messages": {
                 "chunk": [serialize_event(m, time_now) for m in messages],
@@ -504,4 +633,8 @@ class MessageHandler(BaseHandler):
             "state": state,
             "presence": presence,
             "receipts": receipts,
-        })
+        }
+        if not is_guest:
+            ret["membership"] = membership
+
+        defer.returnValue(ret)