summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/room.py48
-rw-r--r--synapse/storage/stream.py31
2 files changed, 48 insertions, 31 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9261984b7e..b0b2441b9f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
             self.notifier.on_new_room_event(event, store_id)
 
         yield self.hs.get_federation().handle_new_event(event)
-#
-#    @defer.inlineCallbacks
-#    def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-#                     feedback=False):
-#        """Get messages in a room.
-#
-#        Args:
-#            user_id (str): The user requesting messages.
-#            room_id (str): The room they want messages from.
-#            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-#            config rules to apply, if any.
-#            feedback (bool): True to get compressed feedback with the messages
-#        Returns:
-#            dict: Pagination API results
-#        """
-#        yield self.auth.check_joined_room(room_id, user_id)
-#
-#        data_source = [MessagesStreamData(self.hs, room_id=room_id,
-#                                          feedback=feedback)]
-#        event_stream = EventStream(user_id, data_source)
-#        pagin_config = yield event_stream.fix_tokens(pagin_config)
-#        data_chunk = yield event_stream.get_chunk(config=pagin_config)
-#        defer.returnValue(data_chunk)
-#
+
+    @defer.inlineCallbacks
+    def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+                     feedback=False):
+        """Get messages in a room.
+
+        Args:
+            user_id (str): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+            config rules to apply, if any.
+            feedback (bool): True to get compressed feedback with the messages
+        Returns:
+            dict: Pagination API results
+        """
+        yield self.auth.check_joined_room(room_id, user_id)
+
+        data_source = [EventsStreamData(self.hs, room_id=room_id,
+                                          feedback=feedback)]
+        event_stream = EventStream(user_id, data_source)
+        pagin_config = yield event_stream.fix_tokens(pagin_config)
+        data_chunk = yield event_stream.get_chunk(config=pagin_config)
+        defer.returnValue(data_chunk)
+
     @defer.inlineCallbacks
     def store_room_data(self, event=None, stamp_event=True):
         """ Stores data for a room.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 1300aee8b0..6bfa00d59a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore):
         else:
             limit = 1000
 
+        # From and to keys should be integers from ordering.
+        from_key = int(from_key)
+        to_key = int(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+
         sql = (
             "SELECT * FROM events as e WHERE "
             "((room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
-            " AND e.ordering > ? AND e.ordering < ? "
-            "ORDER BY ordering ASC LIMIT %(limit)d"
         ) % {
             "current": current_room_membership_sql,
             "invites": invites_sql,
-            "limit": limit,
         }
 
+        if from_key < to_key:
+            sql += (
+                "AND e.ordering > ? AND e.ordering < ? "
+                "ORDER BY ordering ASC LIMIT %(limit)d "
+            ) % {"limit": limit}
+        else:
+            sql += (
+                "AND e.ordering < ? AND e.ordering > ? "
+                "ORDER BY ordering DESC LIMIT %(limit)d "
+            ) % {"limit": int(limit)}
+
         rows = yield self._execute_and_decode(
             sql,
             user_id, user_id, Membership.INVITE, from_key, to_key
@@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore):
 
         ret = [self._parse_event_from_row(r) for r in rows]
 
-        if ret:
-            max_id = max([r["ordering"] for r in rows])
+        if from_key < to_key:
+            key = max([r["ordering"] for r in rows])
         else:
-            max_id = to_key
+            key = min([r["ordering"] for r in rows])
 
-        defer.returnValue((ret, max_id))
+        defer.returnValue((ret, key))
 
     @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, with_feedback=False):