summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py97
1 files changed, 85 insertions, 12 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 387120090a..6655bb76f1 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
 from synapse.storage.engines import PostgresEngine
 
 import abc
@@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         end_token = RoomStreamToken.parse(end_token)
 
-        rows, token = yield self.runInteraction(
+        rows, token, _ = yield self.runInteraction(
             "get_recent_event_ids_for_room", self._paginate_room_events_txn,
             room_id, from_token=end_token, limit=limit,
         )
@@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             results["stream_ordering"],
         )
 
-        rows, start_token = self._paginate_room_events_txn(
+        rows, start_token, _ = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
         )
         events_before = [r.event_id for r in rows]
 
-        rows, end_token = self._paginate_room_events_txn(
+        rows, end_token, _ = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
         )
         events_after = [r.event_id for r in rows]
@@ -720,9 +721,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
-            as a list of _EventDictReturn and a token that points to the end
-            of the result set.
+            Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
+            the results as a list of _EventDictReturn, a token that points to
+            the end of the result set, and a list of chunks iterated over.
         """
 
         assert int(limit) >= 0
@@ -771,7 +772,57 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         txn.execute(sql, args)
 
-        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+        rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+        iterated_chunks = []
+
+        chunk_id = None
+        if from_token.chunk:  # FIXME: may be topological but no chunk.
+            if rows:
+                chunk_id = rows[-1].chunk_id
+                iterated_chunks = [r.chunk_id for r in rows]
+            else:
+                chunk_id = from_token.chunk
+                iterated_chunks = [chunk_id]
+
+        table = ChunkDBOrderedListStore(
+            txn, room_id, self.clock,
+        )
+
+        while chunk_id and (limit <= 0 or len(rows) < limit):
+            if chunk_id not in iterated_chunks:
+                iterated_chunks.append(chunk_id)
+
+            if direction == 'b':
+                # FIXME: There may be multiple things here
+                chunk_id = table.get_prev(chunk_id)
+            else:
+                chunk_id = table.get_next(chunk_id)
+
+            if chunk_id is None:
+                break
+
+            sql = (
+                "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
+                " FROM events"
+                " WHERE outlier = ? AND room_id = ? AND chunk_id = %(chunk_id)d"
+                " ORDER BY topological_ordering %(order)s,"
+                " stream_ordering %(order)s LIMIT ?"
+            ) % {
+                "chunk_id": chunk_id,
+                "order": order,
+            }
+
+            txn.execute(sql, args)
+            new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+            if not new_rows:
+                break
+
+            rows.extend(new_rows)
+
+        if limit > 0:
+            rows = rows[:limit]
 
         if rows:
             chunk = rows[-1].chunk_id
@@ -811,16 +862,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             tuple[list[dict], str]: Returns the results as a list of dicts and
             a token that points to the end of the result set. The dicts have
-            the keys "event_id", "topological_ordering" and "stream_orderign".
+            the keys "event_id", "topological_ordering" and "stream_ordering".
         """
 
         from_key = RoomStreamToken.parse(from_key)
         if to_key:
             to_key = RoomStreamToken.parse(to_key)
 
-        rows, token = yield self.runInteraction(
-            "paginate_room_events", self._paginate_room_events_txn,
-            room_id, from_key, to_key, direction, limit, event_filter,
+        def _do_paginate_room_events(txn):
+            rows, token, chunks = self._paginate_room_events_txn(
+                txn, room_id, from_key, to_key, direction, limit, event_filter,
+            )
+
+            extremities = []
+            seen = set()
+            for chunk_id in chunks:
+                if chunk_id in seen:
+                    continue
+                seen.add(chunk_id)
+
+                event_ids = self._simple_select_onecol_txn(
+                    txn,
+                    table="chunk_backwards_extremities",
+                    keyvalues={"chunk_id": chunk_id},
+                    retcol="event_id"
+                )
+
+                extremities.extend(e for e in event_ids if e not in extremities)
+
+            return rows, token, extremities
+
+        rows, token, extremities = yield self.runInteraction(
+            "paginate_room_events", _do_paginate_room_events,
         )
 
         events = yield self._get_events(
@@ -830,7 +903,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         self._set_before_and_after(events, rows)
 
-        defer.returnValue((events, token))
+        defer.returnValue((events, token, extremities))
 
 
 class StreamStore(StreamWorkerStore):