diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c236dafafb..cf84938be5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,7 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn
@@ -184,6 +184,9 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
+ # Note: If from_key is None then we return in topological order. This
+ # is because in that case we're using this as a "get the last few messages
+ # in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
@@ -217,8 +220,8 @@ class StreamStore(SQLBaseStore):
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
- " ORDER BY stream_ordering %s LIMIT ?"
- ) % (order,)
+ " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
+ ) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
rows = self.cursor_to_dict(txn)
@@ -232,7 +235,7 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
- self._set_before_and_after(ret, rows, topo_order=False)
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
if order.lower() == "desc":
ret.reverse()
@@ -462,9 +465,25 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token))
- @cachedInlineCallbacks(num_args=4)
+ @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+ rows, token = yield self.get_recent_event_ids_for_room(
+ room_id, limit, end_token, from_token
+ )
+
+ logger.debug("stream before")
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+ logger.debug("stream after")
+
+ self._set_before_and_after(events, rows)
+ defer.returnValue((events, token))
+
+ @cached(num_args=4)
+ def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None:
@@ -514,24 +533,13 @@ class StreamStore(SQLBaseStore):
return rows, token
- rows, token = yield self.runInteraction(
+ return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
- logger.debug("stream before")
- events = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
- logger.debug("stream after")
-
- self._set_before_and_after(events, rows)
-
- defer.returnValue((events, token))
-
@defer.inlineCallbacks
def get_room_events_max_id(self, direction='f'):
- token = yield self._stream_id_gen.get_max_token(self)
+ token = yield self._stream_id_gen.get_max_token()
if direction != 'b':
defer.returnValue("s%d" % (token,))
else:
|