summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-07-13 16:21:34 +0100
committerErik Johnston <erik@matrix.org>2016-07-13 16:29:35 +0100
commit9854f4c7ff11fd568c0eead9a427ed02461f0196 (patch)
treeb423b13e52250a6f5bdad164eaf7f09d006c1fa4
parentTrack in DB file message events (diff)
downloadsynapse-9854f4c7ff11fd568c0eead9a427ed02461f0196.tar.xz
-rw-r--r--synapse/handlers/message.py95
-rw-r--r--synapse/rest/client/v1/room.py20
-rw-r--r--synapse/storage/stream.py76
-rw-r--r--synapse/streams/config.py4
4 files changed, 193 insertions, 2 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ad2753c1b5..327565c967 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -165,6 +165,101 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
+    def get_files(self, requester, room_id, pagin_config):
+        """Get files in a room.
+
+        Args:
+            requester (Requester): The user requesting files.
+            room_id (str): The room they want files from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token(
+                    direction='b'
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        membership, member_event_id = yield self._check_in_room_or_world_readable(
+            room_id, user_id
+        )
+
+        if source_config.direction == 'b':
+            if room_token.topological:
+                max_topo = room_token.topological
+            else:
+                max_topo = yield self.store.get_max_topological_token(
+                    room_id, room_token.stream
+                )
+
+            if membership == Membership.LEAVE:
+                # If they have left the room then clamp the token to be before
+                # they left the room, to save the effort of loading from the
+                # database.
+                leave_token = yield self.store.get_topological_token_for_event(
+                    member_event_id
+                )
+                leave_token = RoomStreamToken.parse(leave_token)
+                if leave_token.topological < max_topo:
+                    source_config.from_key = str(leave_token)
+
+        events, next_key = yield self.store.paginate_room_file_events(
+            room_id,
+            from_key=source_config.from_key,
+            to_key=source_config.to_key,
+            direction=source_config.direction,
+            limit=source_config.limit,
+        )
+
+        next_token = pagin_config.from_token.copy_and_replace(
+            "room_key", next_key
+        )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
+
+    @defer.inlineCallbacks
     def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
         """
         Given a dict from a client, create a new event.
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 86fbe2747d..2b1d53dc55 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -338,6 +338,25 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         defer.returnValue((200, msgs))
 
 
+class RoomFileListRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/files$")
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+        pagination_config = PaginationConfig.from_request(
+            request, default_limit=10, default_dir='b',
+        )
+        handler = self.handlers.message_handler
+        msgs = yield handler.get_files(
+            room_id=room_id,
+            requester=requester,
+            pagin_config=pagination_config,
+        )
+
+        defer.returnValue((200, msgs))
+
+
 # TODO: Needs unit testing
 class RoomStateRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/state$")
@@ -667,6 +686,7 @@ def register_servlets(hs, http_server):
     RoomCreateRestServlet(hs).register(http_server)
     RoomMemberListRestServlet(hs).register(http_server)
     RoomMessageListRestServlet(hs).register(http_server)
+    RoomFileListRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
     RoomForgetRestServlet(hs).register(http_server)
     RoomMembershipRestServlet(hs).register(http_server)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c33ac5a8d7..59788944e4 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -395,6 +395,82 @@ class StreamStore(SQLBaseStore):
         defer.returnValue((events, token))
 
     @defer.inlineCallbacks
+    def paginate_room_file_events(self, room_id, from_key, to_key=None,
+                                  direction='b', limit=-1):
+        # Tokens really represent positions between elements, but we use
+        # the convention of pointing to the event before the gap. Hence
+        # we have a bit of asymmetry when it comes to equalities.
+        args = [room_id]
+        if direction == 'b':
+            order = "DESC"
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+        else:
+            order = "ASC"
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+
+        if int(limit) > 0:
+            args.append(int(limit))
+            limit_str = " LIMIT ?"
+        else:
+            limit_str = ""
+
+        sql = (
+            "SELECT * FROM event_files"
+            " WHERE room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
+        ) % {
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
+
+        def f(txn):
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+
+            if rows:
+                topo = rows[-1]["topological_ordering"]
+                toke = rows[-1]["stream_ordering"]
+                if direction == 'b':
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
+                    toke -= 1
+                next_token = str(RoomStreamToken(topo, toke))
+            else:
+                # TODO (erikj): We should work out what to do here instead.
+                next_token = to_key if to_key else from_key
+
+            return rows, next_token,
+
+        rows, token = yield self.runInteraction("paginate_file_events", f)
+
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
+
+    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
         rows, token = yield self.get_recent_event_ids_for_room(
             room_id, limit, end_token, from_token
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 4f089bfb94..49725d184d 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -56,7 +56,7 @@ class PaginationConfig(object):
 
     @classmethod
     def from_request(cls, request, raise_invalid_params=True,
-                     default_limit=None):
+                     default_limit=None, default_dir='f'):
         def get_param(name, default=None):
             lst = request.args.get(name, [])
             if len(lst) > 1:
@@ -68,7 +68,7 @@ class PaginationConfig(object):
             else:
                 return default
 
-        direction = get_param("dir", 'f')
+        direction = get_param("dir", default_dir)
         if direction not in ['f', 'b']:
             raise SynapseError(400, "'dir' parameter is invalid.")