diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ad2753c1b5..327565c967 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -165,6 +165,101 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
+ def get_files(self, requester, room_id, pagin_config):
+ """Get files in a room.
+
+ Args:
+ requester (Requester): The user requesting files.
+ room_id (str): The room they want files from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ as_client_event (bool): True to get events in client-server format.
+ Returns:
+ dict: Pagination API results
+ """
+ user_id = requester.user.to_string()
+
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token(
+ direction='b'
+ )
+ )
+ room_token = pagin_config.from_token.room_key
+
+ room_token = RoomStreamToken.parse(room_token)
+
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ membership, member_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if source_config.direction == 'b':
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
+
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ events, next_key = yield self.store.paginate_room_file_events(
+ room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ )
+
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
+
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
+ time_now = self.clock.time_msec()
+
+ chunk = {
+ "chunk": [
+ serialize_event(e, time_now)
+ for e in events
+ ],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
+
+ @defer.inlineCallbacks
def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
"""
Given a dict from a client, create a new event.
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 86fbe2747d..2b1d53dc55 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -338,6 +338,25 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
defer.returnValue((200, msgs))
+class RoomFileListRestServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/files$")
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ pagination_config = PaginationConfig.from_request(
+ request, default_limit=10, default_dir='b',
+ )
+ handler = self.handlers.message_handler
+ msgs = yield handler.get_files(
+ room_id=room_id,
+ requester=requester,
+ pagin_config=pagination_config,
+ )
+
+ defer.returnValue((200, msgs))
+
+
# TODO: Needs unit testing
class RoomStateRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/state$")
@@ -667,6 +686,7 @@ def register_servlets(hs, http_server):
RoomCreateRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
RoomMessageListRestServlet(hs).register(http_server)
+ RoomFileListRestServlet(hs).register(http_server)
JoinRoomAliasServlet(hs).register(http_server)
RoomForgetRestServlet(hs).register(http_server)
RoomMembershipRestServlet(hs).register(http_server)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c33ac5a8d7..59788944e4 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -395,6 +395,82 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token))
@defer.inlineCallbacks
+ def paginate_room_file_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1):
+ # Tokens really represent positions between elements, but we use
+ # the convention of pointing to the event before the gap. Hence
+ # we have a bit of asymmetry when it comes to equalities.
+ args = [room_id]
+ if direction == 'b':
+ order = "DESC"
+ bounds = upper_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, lower_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+ else:
+ order = "ASC"
+ bounds = lower_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, upper_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
+
+ sql = (
+ "SELECT * FROM event_files"
+ " WHERE room_id = ? AND %(bounds)s"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s %(limit)s"
+ ) % {
+ "bounds": bounds,
+ "order": order,
+ "limit": limit_str
+ }
+
+ def f(txn):
+ txn.execute(sql, args)
+
+ rows = self.cursor_to_dict(txn)
+
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ if direction == 'b':
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
+ toke -= 1
+ next_token = str(RoomStreamToken(topo, toke))
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ return rows, next_token,
+
+ rows, token = yield self.runInteraction("paginate_file_events", f)
+
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(events, rows)
+
+ defer.returnValue((events, token))
+
+ @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 4f089bfb94..49725d184d 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -56,7 +56,7 @@ class PaginationConfig(object):
@classmethod
def from_request(cls, request, raise_invalid_params=True,
- default_limit=None):
+ default_limit=None, default_dir='f'):
def get_param(name, default=None):
lst = request.args.get(name, [])
if len(lst) > 1:
@@ -68,7 +68,7 @@ class PaginationConfig(object):
else:
return default
- direction = get_param("dir", 'f')
+ direction = get_param("dir", default_dir)
if direction not in ['f', 'b']:
raise SynapseError(400, "'dir' parameter is invalid.")
|