diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81cff0870e..c350c93c7e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
room_id, max_topo
)
- events, next_key = yield self.store.paginate_room_events(
+ events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b5850db42f..d627b6db13 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -534,7 +534,7 @@ class RoomEventSource(object):
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
- events, next_key = yield self.store.paginate_room_events(
+ events, next_key, _ = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 387120090a..34a5f7e3e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.storage.engines import PostgresEngine
import abc
@@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token = RoomStreamToken.parse(end_token)
- rows, token = yield self.runInteraction(
+ rows, token, _ = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)
@@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results["stream_ordering"],
)
- rows, start_token = self._paginate_room_events_txn(
+ rows, start_token, _ = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r.event_id for r in rows]
- rows, end_token = self._paginate_room_events_txn(
+ rows, end_token, _ = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r.event_id for r in rows]
@@ -720,12 +721,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
- Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
- as a list of _EventDictReturn and a token that points to the end
- of the result set.
+ Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
+ the results as a list of _EventDictReturn, a token that points to
+ the end of the result set, and a list of chunks iterated over.
"""
- assert int(limit) >= 0
+ limit = int(limit) # Sometimes we are passed a string from somewhere
+ assert limit >= 0
+
+ # There are two modes of fetching events: by stream order or by
+ # topological order. This is determined by whether the from_token is a
+ # stream or topological token. If stream then we can simply do a select
+ # ordered by stream_ordering column. If topological, then we need to
+ # fetch events from one chunk at a time until we hit the limit.
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
@@ -756,7 +764,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds += " AND " + filter_clause
args.extend(filter_args)
- args.append(int(limit))
+ args.append(limit)
sql = (
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
@@ -771,7 +779,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, args)
- rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+ rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+ # If we are paginating topologically and we haven't hit the limit on
+ # number of events then we need to fetch events from the previous or
+ # next chunk.
+
+ iterated_chunks = []
+
+ chunk_id = None
+ if from_token.chunk: # FIXME: may be topological but no chunk.
+ if rows:
+ chunk_id = rows[-1].chunk_id
+ iterated_chunks = [r.chunk_id for r in rows]
+ else:
+ chunk_id = from_token.chunk
+ iterated_chunks = [chunk_id]
+
+ table = ChunkDBOrderedListStore(
+ txn, room_id, self.clock,
+ )
+
+ if filter_clause:
+ filter_clause = "AND " + filter_clause
+
+ sql = (
+ "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
+ " FROM events"
+ " WHERE outlier = ? AND room_id = ? %(filter_clause)s"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s LIMIT ?"
+ ) % {
+ "filter_clause": filter_clause,
+ "order": order,
+ }
+
+ args = [False, room_id] + filter_args + [limit]
+
+ while chunk_id and (limit <= 0 or len(rows) < limit):
+ if chunk_id not in iterated_chunks:
+ iterated_chunks.append(chunk_id)
+
+ if direction == 'b':
+ chunk_id = table.get_prev(chunk_id)
+ else:
+ chunk_id = table.get_next(chunk_id)
+
+ if chunk_id is None:
+ break
+
+ txn.execute(sql, args)
+ new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+ if not new_rows:
+ break
+
+ rows.extend(new_rows)
+
+ if limit > 0:
+ rows = rows[:limit]
if rows:
chunk = rows[-1].chunk_id
@@ -809,18 +875,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
- tuple[list[dict], str]: Returns the results as a list of dicts and
- a token that points to the end of the result set. The dicts have
- the keys "event_id", "topological_ordering" and "stream_orderign".
+ tuple[list[dict], str, list[str]]: Returns the results as a list of
+ dicts, a token that points to the end of the result set, and a list
+ of backwards extremities. The dicts have the keys "event_id",
+ "topological_ordering" and "stream_ordering".
"""
from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
- rows, token = yield self.runInteraction(
- "paginate_room_events", self._paginate_room_events_txn,
- room_id, from_key, to_key, direction, limit, event_filter,
+ def _do_paginate_room_events(txn):
+ rows, token, chunks = self._paginate_room_events_txn(
+ txn, room_id, from_key, to_key, direction, limit, event_filter,
+ )
+
+ # We now fetch the extremities by fetching the extremities for
+ # each chunk we iterated over.
+ extremities = []
+ seen = set()
+ for chunk_id in chunks:
+ if chunk_id in seen:
+ continue
+ seen.add(chunk_id)
+
+ event_ids = self._simple_select_onecol_txn(
+ txn,
+ table="chunk_backwards_extremities",
+ keyvalues={"chunk_id": chunk_id},
+ retcol="event_id"
+ )
+
+ extremities.extend(e for e in event_ids if e not in extremities)
+
+ return rows, token, extremities
+
+ rows, token, extremities = yield self.runInteraction(
+ "paginate_room_events", _do_paginate_room_events,
)
events = yield self._get_events(
@@ -830,7 +921,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
- defer.returnValue((events, token))
+ defer.returnValue((events, token, extremities))
class StreamStore(StreamWorkerStore):
|