From e5ab9cd24b2f63c6ca00ae6354dbcbfcd9127dfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:58:35 +0100 Subject: Don't unnecessarily require token to be stream token This allows calling the `get_recent_event_ids_for_room` function in more situations. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b5baacd32c..5e4327bb96 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if limit == 0: defer.returnValue(([], end_token)) - end_token = RoomStreamToken.parse_stream_token(end_token) + end_token = RoomStreamToken.parse(end_token) rows, token = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, -- cgit 1.4.1 From e2accd7f1d21e34181dd4543eca30ad1ea971b4c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:59:45 +0100 Subject: Refactor sync APIs to reuse pagination API The sync API often returns events in a topological rather than stream ordering, e.g. when the user joined the room or on initial sync. When this happens we can reuse existing pagination storage functions. --- synapse/handlers/sync.py | 19 ++++++++---- synapse/storage/stream.py | 73 +++++++++++++++++++++++------------------------ 2 files changed, 48 insertions(+), 44 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c25a76d215..b75daa340d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -354,12 +354,19 @@ class SyncHandler(object): since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) + if since_key: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + else: + events, end_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + end_token=end_key, + ) loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5e4327bb96..8bb4e85709 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -233,52 +233,49 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, order='DESC'): - # Note: If from_key is None then we return in topological order. This - # is because in that case we're using this as a "get the last few messages - # in a room" function, rather than "get new messages since last sync" - if from_key is not None: - from_id = RoomStreamToken.parse_stream_token(from_key).stream - else: - from_id = None - to_id = RoomStreamToken.parse_stream_token(to_key).stream + """Get new room events in stream ordering since `from_key`. + + Args: + room_id (str) + from_key (str): Token from which no events are returned before + to_key (str): Token from which no events are returned after. (This + is typically the current stream token) + limit (int): Maximum number of events to return + order (str): Either "DESC" or "ASC". Determines which events are + returned when the result is limited. If "DESC" then the most + recent `limit` events are returned, otherwise returns the + oldest `limit` events. + + Returns: + Deferred[tuple[list[FrozenEvent], str]]: Returns the list of + events (in ascending order) and the token from the start of + the chunk of events returned. + """ if from_key == to_key: defer.returnValue(([], from_key)) - if from_id: - has_changed = yield self._events_stream_cache.has_entity_changed( - room_id, from_id - ) + from_id = RoomStreamToken.parse_stream_token(from_key).stream + to_id = RoomStreamToken.parse_stream_token(to_key).stream - if not has_changed: - defer.returnValue(([], from_key)) + has_changed = yield self._events_stream_cache.has_entity_changed( + room_id, from_id + ) - def f(txn): - if from_id is not None: - sql = ( - "SELECT event_id, stream_ordering FROM events WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering > ? AND stream_ordering <= ?" - " ORDER BY stream_ordering %s LIMIT ?" - ) % (order,) - txn.execute(sql, (room_id, from_id, to_id, limit)) - - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] - else: - sql = ( - "SELECT event_id, topological_ordering, stream_ordering" - " FROM events" - " WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering <= ?" - " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" - ) % (order, order,) - txn.execute(sql, (room_id, to_id, limit)) + if not has_changed: + defer.returnValue(([], from_key)) - rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] + def f(txn): + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering %s LIMIT ?" + ) % (order,) + txn.execute(sql, (room_id, from_id, to_id, limit)) + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) -- cgit 1.4.1