summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py23
1 files changed, 13 insertions, 10 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 367ffc9543..8908d5b5da 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -157,7 +157,8 @@ class StreamStore(SQLBaseStore):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+    def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
+                                         order='DESC'):
         from_id = RoomStreamToken.parse_stream_token(from_key).stream
 
         room_ids = yield self._events_stream_cache.get_entities_changed(
@@ -172,16 +173,17 @@ class StreamStore(SQLBaseStore):
         for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
             res = yield defer.gatherResults([
                 preserve_fn(self.get_room_events_stream_for_room)(
-                    room_id, from_key, to_key, limit,
+                    room_id, from_key, to_key, limit, order=order,
                 )
-                for room_id in room_ids
+                for room_id in rm_ids
             ])
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+    def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
+                                        order='DESC'):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -206,8 +208,8 @@ class StreamStore(SQLBaseStore):
                     " room_id = ?"
                     " AND not outlier"
                     " AND stream_ordering > ? AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering DESC LIMIT ?"
-                )
+                    " ORDER BY stream_ordering %s LIMIT ?"
+                ) % (order,)
                 txn.execute(sql, (room_id, from_id, to_id, limit))
             else:
                 sql = (
@@ -215,8 +217,8 @@ class StreamStore(SQLBaseStore):
                     " room_id = ?"
                     " AND not outlier"
                     " AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering DESC LIMIT ?"
-                )
+                    " ORDER BY stream_ordering %s LIMIT ?"
+                ) % (order,)
                 txn.execute(sql, (room_id, to_id, limit))
 
             rows = self.cursor_to_dict(txn)
@@ -232,7 +234,8 @@ class StreamStore(SQLBaseStore):
 
         self._set_before_and_after(ret, rows, topo_order=False)
 
-        ret.reverse()
+        if order.lower() == "desc":
+            ret.reverse()
 
         if rows:
             key = "s%d" % min(r["stream_ordering"] for r in rows)
@@ -528,7 +531,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
-        token = yield self._stream_id_gen.get_max_token(self)
+        token = yield self._stream_id_gen.get_max_token()
         if direction != 'b':
             defer.returnValue("s%d" % (token,))
         else: