diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c236dafafb..cf84938be5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,7 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn @@ -184,6 +184,9 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, order='DESC'): + # Note: If from_key is None then we return in topological order. This + # is because in that case we're using this as a "get the last few messages + # in a room" function, rather than "get new messages since last sync" if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -217,8 +220,8 @@ class StreamStore(SQLBaseStore): " room_id = ?" " AND not outlier" " AND stream_ordering <= ?" - " ORDER BY stream_ordering %s LIMIT ?" - ) % (order,) + " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" + ) % (order, order,) txn.execute(sql, (room_id, to_id, limit)) rows = self.cursor_to_dict(txn) @@ -232,7 +235,7 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) + self._set_before_and_after(ret, rows, topo_order=from_id is None) if order.lower() == "desc": ret.reverse() @@ -462,9 +465,25 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @cachedInlineCallbacks(num_args=4) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + rows, token = yield self.get_recent_event_ids_for_room( + room_id, limit, end_token, from_token + ) + + logger.debug("stream before") + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + logger.debug("stream after") + + self._set_before_and_after(events, rows) + defer.returnValue((events, token)) + + @cached(num_args=4) + def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): end_token = RoomStreamToken.parse_stream_token(end_token) if from_token is None: @@ -514,24 +533,13 @@ class StreamStore(SQLBaseStore): return rows, token - rows, token = yield self.runInteraction( + return self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) - logger.debug("stream before") - events = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - logger.debug("stream after") - - self._set_before_and_after(events, rows) - - defer.returnValue((events, token)) - @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): - token = yield self._stream_id_gen.get_max_token(self) + token = yield self._stream_id_gen.get_max_token() if direction != 'b': defer.returnValue("s%d" % (token,)) else: |