diff options
-rw-r--r-- | synapse/handlers/message.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/storage/stream.py | 123 |
3 files changed, 109 insertions, 18 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81cff0870e..c350c93c7e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -235,7 +235,7 @@ class MessageHandler(BaseHandler): room_id, max_topo ) - events, next_key = yield self.store.paginate_room_events( + events, next_key, extremities = yield self.store.paginate_room_events( room_id=room_id, from_key=source_config.from_key, to_key=source_config.to_key, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b5850db42f..d627b6db13 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -534,7 +534,7 @@ class RoomEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - events, next_key = yield self.store.paginate_room_events( + events, next_key, _ = yield self.store.paginate_room_events( room_id=key, from_key=config.from_key, to_key=config.to_key, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 387120090a..34a5f7e3e7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore from synapse.storage.engines import PostgresEngine import abc @@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token = RoomStreamToken.parse(end_token) - rows, token = yield self.runInteraction( + rows, token, _ = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, room_id, from_token=end_token, limit=limit, ) @@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results["stream_ordering"], ) - rows, start_token = self._paginate_room_events_txn( + rows, start_token, _ = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) events_before = [r.event_id for r in rows] - rows, end_token = self._paginate_room_events_txn( + rows, end_token, _ = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) events_after = [r.event_id for r in rows] @@ -720,12 +721,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns the results - as a list of _EventDictReturn and a token that points to the end - of the result set. + Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns + the results as a list of _EventDictReturn, a token that points to + the end of the result set, and a list of chunks iterated over. """ - assert int(limit) >= 0 + limit = int(limit) # Sometimes we are passed a string from somewhere + assert limit >= 0 + + # There are two modes of fetching events: by stream order or by + # topological order. This is determined by whether the from_token is a + # stream or topological token. If stream then we can simply do a select + # ordered by stream_ordering column. If topological, then we need to + # fetch events from one chunk at a time until we hit the limit. # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -756,7 +764,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds += " AND " + filter_clause args.extend(filter_args) - args.append(int(limit)) + args.append(limit) sql = ( "SELECT event_id, chunk_id, topological_ordering, stream_ordering" @@ -771,7 +779,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) - rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] + rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + + # If we are paginating topologically and we haven't hit the limit on + # number of events then we need to fetch events from the previous or + # next chunk. + + iterated_chunks = [] + + chunk_id = None + if from_token.chunk: # FIXME: may be topological but no chunk. + if rows: + chunk_id = rows[-1].chunk_id + iterated_chunks = [r.chunk_id for r in rows] + else: + chunk_id = from_token.chunk + iterated_chunks = [chunk_id] + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + if filter_clause: + filter_clause = "AND " + filter_clause + + sql = ( + "SELECT event_id, chunk_id, topological_ordering, stream_ordering" + " FROM events" + " WHERE outlier = ? AND room_id = ? %(filter_clause)s" + " ORDER BY topological_ordering %(order)s," + " stream_ordering %(order)s LIMIT ?" + ) % { + "filter_clause": filter_clause, + "order": order, + } + + args = [False, room_id] + filter_args + [limit] + + while chunk_id and (limit <= 0 or len(rows) < limit): + if chunk_id not in iterated_chunks: + iterated_chunks.append(chunk_id) + + if direction == 'b': + chunk_id = table.get_prev(chunk_id) + else: + chunk_id = table.get_next(chunk_id) + + if chunk_id is None: + break + + txn.execute(sql, args) + new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + + if not new_rows: + break + + rows.extend(new_rows) + + if limit > 0: + rows = rows[:limit] if rows: chunk = rows[-1].chunk_id @@ -809,18 +875,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "topological_ordering" and "stream_orderign". + tuple[list[dict], str, list[str]]: Returns the results as a list of + dicts, a token that points to the end of the result set, and a list + of backwards extremities. The dicts have the keys "event_id", + "topological_ordering" and "stream_ordering". """ from_key = RoomStreamToken.parse(from_key) if to_key: to_key = RoomStreamToken.parse(to_key) - rows, token = yield self.runInteraction( - "paginate_room_events", self._paginate_room_events_txn, - room_id, from_key, to_key, direction, limit, event_filter, + def _do_paginate_room_events(txn): + rows, token, chunks = self._paginate_room_events_txn( + txn, room_id, from_key, to_key, direction, limit, event_filter, + ) + + # We now fetch the extremities by fetching the extremities for + # each chunk we iterated over. + extremities = [] + seen = set() + for chunk_id in chunks: + if chunk_id in seen: + continue + seen.add(chunk_id) + + event_ids = self._simple_select_onecol_txn( + txn, + table="chunk_backwards_extremities", + keyvalues={"chunk_id": chunk_id}, + retcol="event_id" + ) + + extremities.extend(e for e in event_ids if e not in extremities) + + return rows, token, extremities + + rows, token, extremities = yield self.runInteraction( + "paginate_room_events", _do_paginate_room_events, ) events = yield self._get_events( @@ -830,7 +921,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._set_before_and_after(events, rows) - defer.returnValue((events, token)) + defer.returnValue((events, token, extremities)) class StreamStore(StreamWorkerStore): |