summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/storage/stream.py123
3 files changed, 109 insertions, 18 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81cff0870e..c350c93c7e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
                     room_id, max_topo
                 )
 
-            events, next_key = yield self.store.paginate_room_events(
+            events, next_key, extremities = yield self.store.paginate_room_events(
                 room_id=room_id,
                 from_key=source_config.from_key,
                 to_key=source_config.to_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b5850db42f..d627b6db13 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -534,7 +534,7 @@ class RoomEventSource(object):
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
-        events, next_key = yield self.store.paginate_room_events(
+        events, next_key, _ = yield self.store.paginate_room_events(
             room_id=key,
             from_key=config.from_key,
             to_key=config.to_key,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 387120090a..34a5f7e3e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
 from synapse.storage.engines import PostgresEngine
 
 import abc
@@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         end_token = RoomStreamToken.parse(end_token)
 
-        rows, token = yield self.runInteraction(
+        rows, token, _ = yield self.runInteraction(
             "get_recent_event_ids_for_room", self._paginate_room_events_txn,
             room_id, from_token=end_token, limit=limit,
         )
@@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             results["stream_ordering"],
         )
 
-        rows, start_token = self._paginate_room_events_txn(
+        rows, start_token, _ = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
         )
         events_before = [r.event_id for r in rows]
 
-        rows, end_token = self._paginate_room_events_txn(
+        rows, end_token, _ = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
         )
         events_after = [r.event_id for r in rows]
@@ -720,12 +721,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
-            as a list of _EventDictReturn and a token that points to the end
-            of the result set.
+            Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
+            the results as a list of _EventDictReturn, a token that points to
+            the end of the result set, and a list of chunks iterated over.
         """
 
-        assert int(limit) >= 0
+        limit = int(limit)  # Sometimes we are passed a string from somewhere
+        assert limit >= 0
+
+        # There are two modes of fetching events: by stream order or by
+        # topological order. This is determined by whether the from_token is a
+        # stream or topological token. If stream then we can simply do a select
+        # ordered by stream_ordering column. If topological, then we need to
+        # fetch events from one chunk at a time until we hit the limit.
 
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
@@ -756,7 +764,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             bounds += " AND " + filter_clause
             args.extend(filter_args)
 
-        args.append(int(limit))
+        args.append(limit)
 
         sql = (
             "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
@@ -771,7 +779,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         txn.execute(sql, args)
 
-        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+        rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+        # If we are paginating topologically and we haven't hit the limit on
+        # number of events then we need to fetch events from the previous or
+        # next chunk.
+
+        iterated_chunks = []
+
+        chunk_id = None
+        if from_token.chunk:  # FIXME: may be topological but no chunk.
+            if rows:
+                chunk_id = rows[-1].chunk_id
+                iterated_chunks = [r.chunk_id for r in rows]
+            else:
+                chunk_id = from_token.chunk
+                iterated_chunks = [chunk_id]
+
+        table = ChunkDBOrderedListStore(
+            txn, room_id, self.clock,
+        )
+
+        if filter_clause:
+            filter_clause = "AND " + filter_clause
+
+        sql = (
+            "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
+            " FROM events"
+            " WHERE outlier = ? AND room_id = ? %(filter_clause)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s LIMIT ?"
+        ) % {
+            "filter_clause": filter_clause,
+            "order": order,
+        }
+
+        args = [False, room_id] + filter_args + [limit]
+
+        while chunk_id and (limit <= 0 or len(rows) < limit):
+            if chunk_id not in iterated_chunks:
+                iterated_chunks.append(chunk_id)
+
+            if direction == 'b':
+                chunk_id = table.get_prev(chunk_id)
+            else:
+                chunk_id = table.get_next(chunk_id)
+
+            if chunk_id is None:
+                break
+
+            txn.execute(sql, args)
+            new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+            if not new_rows:
+                break
+
+            rows.extend(new_rows)
+
+        if limit > 0:
+            rows = rows[:limit]
 
         if rows:
             chunk = rows[-1].chunk_id
@@ -809,18 +875,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
-            the keys "event_id", "topological_ordering" and "stream_orderign".
+            tuple[list[dict], str, list[str]]: Returns the results as a list of
+            dicts, a token that points to the end of the result set, and a list
+            of backwards extremities. The dicts have the keys "event_id",
+            "topological_ordering" and "stream_ordering".
         """
 
         from_key = RoomStreamToken.parse(from_key)
         if to_key:
             to_key = RoomStreamToken.parse(to_key)
 
-        rows, token = yield self.runInteraction(
-            "paginate_room_events", self._paginate_room_events_txn,
-            room_id, from_key, to_key, direction, limit, event_filter,
+        def _do_paginate_room_events(txn):
+            rows, token, chunks = self._paginate_room_events_txn(
+                txn, room_id, from_key, to_key, direction, limit, event_filter,
+            )
+
+            # We now fetch the extremities by fetching the extremities for
+            # each chunk we iterated over.
+            extremities = []
+            seen = set()
+            for chunk_id in chunks:
+                if chunk_id in seen:
+                    continue
+                seen.add(chunk_id)
+
+                event_ids = self._simple_select_onecol_txn(
+                    txn,
+                    table="chunk_backwards_extremities",
+                    keyvalues={"chunk_id": chunk_id},
+                    retcol="event_id"
+                )
+
+                extremities.extend(e for e in event_ids if e not in extremities)
+
+            return rows, token, extremities
+
+        rows, token, extremities = yield self.runInteraction(
+            "paginate_room_events", _do_paginate_room_events,
         )
 
         events = yield self._get_events(
@@ -830,7 +921,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         self._set_before_and_after(events, rows)
 
-        defer.returnValue((events, token))
+        defer.returnValue((events, token, extremities))
 
 
 class StreamStore(StreamWorkerStore):