diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/message.py | 95 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 20 | ||||
-rw-r--r-- | synapse/storage/stream.py | 76 | ||||
-rw-r--r-- | synapse/streams/config.py | 4 |
4 files changed, 193 insertions, 2 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ad2753c1b5..327565c967 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -165,6 +165,101 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def get_files(self, requester, room_id, pagin_config): + """Get files in a room. + + Args: + requester (Requester): The user requesting files. + room_id (str): The room they want files from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + as_client_event (bool): True to get events in client-server format. + Returns: + dict: Pagination API results + """ + user_id = requester.user.to_string() + + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: + pagin_config.from_token = ( + yield self.hs.get_event_sources().get_current_token( + direction='b' + ) + ) + room_token = pagin_config.from_token.room_key + + room_token = RoomStreamToken.parse(room_token) + + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if source_config.direction == 'b': + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) + + events, next_key = yield self.store.paginate_room_file_events( + room_id, + from_key=source_config.from_key, + to_key=source_config.to_key, + direction=source_config.direction, + limit=source_config.limit, + ) + + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) + + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + events = yield filter_events_for_client( + self.store, + user_id, + events, + is_peeking=(member_event_id is None), + ) + + time_now = self.clock.time_msec() + + chunk = { + "chunk": [ + serialize_event(e, time_now) + for e in events + ], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) + + @defer.inlineCallbacks def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ Given a dict from a client, create a new event. diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 86fbe2747d..2b1d53dc55 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -338,6 +338,25 @@ class RoomMessageListRestServlet(ClientV1RestServlet): defer.returnValue((200, msgs)) +class RoomFileListRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/files$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + pagination_config = PaginationConfig.from_request( + request, default_limit=10, default_dir='b', + ) + handler = self.handlers.message_handler + msgs = yield handler.get_files( + room_id=room_id, + requester=requester, + pagin_config=pagination_config, + ) + + defer.returnValue((200, msgs)) + + # TODO: Needs unit testing class RoomStateRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/state$") @@ -667,6 +686,7 @@ def register_servlets(hs, http_server): RoomCreateRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) + RoomFileListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) RoomForgetRestServlet(hs).register(http_server) RoomMembershipRestServlet(hs).register(http_server) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c33ac5a8d7..59788944e4 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -395,6 +395,82 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) @defer.inlineCallbacks + def paginate_room_file_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1): + # Tokens really represent positions between elements, but we use + # the convention of pointing to the event before the gap. Hence + # we have a bit of asymmetry when it comes to equalities. + args = [room_id] + if direction == 'b': + order = "DESC" + bounds = upper_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) + if to_key: + bounds = "%s AND %s" % (bounds, lower_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) + else: + order = "ASC" + bounds = lower_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) + if to_key: + bounds = "%s AND %s" % (bounds, upper_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) + + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" + + sql = ( + "SELECT * FROM event_files" + " WHERE room_id = ? AND %(bounds)s" + " ORDER BY topological_ordering %(order)s," + " stream_ordering %(order)s %(limit)s" + ) % { + "bounds": bounds, + "order": order, + "limit": limit_str + } + + def f(txn): + txn.execute(sql, args) + + rows = self.cursor_to_dict(txn) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + toke -= 1 + next_token = str(RoomStreamToken(topo, toke)) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + return rows, next_token, + + rows, token = yield self.runInteraction("paginate_file_events", f) + + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): rows, token = yield self.get_recent_event_ids_for_room( room_id, limit, end_token, from_token diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 4f089bfb94..49725d184d 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -56,7 +56,7 @@ class PaginationConfig(object): @classmethod def from_request(cls, request, raise_invalid_params=True, - default_limit=None): + default_limit=None, default_dir='f'): def get_param(name, default=None): lst = request.args.get(name, []) if len(lst) > 1: @@ -68,7 +68,7 @@ class PaginationConfig(object): else: return default - direction = get_param("dir", 'f') + direction = get_param("dir", default_dir) if direction not in ['f', 'b']: raise SynapseError(400, "'dir' parameter is invalid.") |