diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9261984b7e..b0b2441b9f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
self.notifier.on_new_room_event(event, store_id)
yield self.hs.get_federation().handle_new_event(event)
-#
-# @defer.inlineCallbacks
-# def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-# feedback=False):
-# """Get messages in a room.
-#
-# Args:
-# user_id (str): The user requesting messages.
-# room_id (str): The room they want messages from.
-# pagin_config (synapse.api.streams.PaginationConfig): The pagination
-# config rules to apply, if any.
-# feedback (bool): True to get compressed feedback with the messages
-# Returns:
-# dict: Pagination API results
-# """
-# yield self.auth.check_joined_room(room_id, user_id)
-#
-# data_source = [MessagesStreamData(self.hs, room_id=room_id,
-# feedback=feedback)]
-# event_stream = EventStream(user_id, data_source)
-# pagin_config = yield event_stream.fix_tokens(pagin_config)
-# data_chunk = yield event_stream.get_chunk(config=pagin_config)
-# defer.returnValue(data_chunk)
-#
+
+ @defer.inlineCallbacks
+ def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+ feedback=False):
+ """Get messages in a room.
+
+ Args:
+ user_id (str): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ feedback (bool): True to get compressed feedback with the messages
+ Returns:
+ dict: Pagination API results
+ """
+ yield self.auth.check_joined_room(room_id, user_id)
+
+ data_source = [EventsStreamData(self.hs, room_id=room_id,
+ feedback=feedback)]
+ event_stream = EventStream(user_id, data_source)
+ pagin_config = yield event_stream.fix_tokens(pagin_config)
+ data_chunk = yield event_stream.get_chunk(config=pagin_config)
+ defer.returnValue(data_chunk)
+
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
""" Stores data for a room.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 1300aee8b0..6bfa00d59a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore):
else:
limit = 1000
+ # From and to keys should be integers from ordering.
+ from_key = int(from_key)
+ to_key = int(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+
sql = (
"SELECT * FROM events as e WHERE "
"((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
- " AND e.ordering > ? AND e.ordering < ? "
- "ORDER BY ordering ASC LIMIT %(limit)d"
) % {
"current": current_room_membership_sql,
"invites": invites_sql,
- "limit": limit,
}
+ if from_key < to_key:
+ sql += (
+ "AND e.ordering > ? AND e.ordering < ? "
+ "ORDER BY ordering ASC LIMIT %(limit)d "
+ ) % {"limit": limit}
+ else:
+ sql += (
+ "AND e.ordering < ? AND e.ordering > ? "
+ "ORDER BY ordering DESC LIMIT %(limit)d "
+ ) % {"limit": int(limit)}
+
rows = yield self._execute_and_decode(
sql,
user_id, user_id, Membership.INVITE, from_key, to_key
@@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore):
ret = [self._parse_event_from_row(r) for r in rows]
- if ret:
- max_id = max([r["ordering"] for r in rows])
+ if from_key < to_key:
+ key = max([r["ordering"] for r in rows])
else:
- max_id = to_key
+ key = min([r["ordering"] for r in rows])
- defer.returnValue((ret, max_id))
+ defer.returnValue((ret, key))
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
|