diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81cff0870e..c350c93c7e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
room_id, max_topo
)
- events, next_key = yield self.store.paginate_room_events(
+ events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b5850db42f..d627b6db13 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -534,7 +534,7 @@ class RoomEventSource(object):
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
- events, next_key = yield self.store.paginate_room_events(
+ events, next_key, _ = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 387120090a..6655bb76f1 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.storage.engines import PostgresEngine
import abc
@@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token = RoomStreamToken.parse(end_token)
- rows, token = yield self.runInteraction(
+ rows, token, _ = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)
@@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results["stream_ordering"],
)
- rows, start_token = self._paginate_room_events_txn(
+ rows, start_token, _ = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r.event_id for r in rows]
- rows, end_token = self._paginate_room_events_txn(
+ rows, end_token, _ = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r.event_id for r in rows]
@@ -720,9 +721,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
- Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
- as a list of _EventDictReturn and a token that points to the end
- of the result set.
+ Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
+ the results as a list of _EventDictReturn, a token that points to
+ the end of the result set, and a list of chunks iterated over.
"""
assert int(limit) >= 0
@@ -771,7 +772,57 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, args)
- rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+ rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+ iterated_chunks = []
+
+ chunk_id = None
+ if from_token.chunk: # FIXME: may be topological but no chunk.
+ if rows:
+ chunk_id = rows[-1].chunk_id
+ iterated_chunks = [r.chunk_id for r in rows]
+ else:
+ chunk_id = from_token.chunk
+ iterated_chunks = [chunk_id]
+
+ table = ChunkDBOrderedListStore(
+ txn, room_id, self.clock,
+ )
+
+ while chunk_id and (limit <= 0 or len(rows) < limit):
+ if chunk_id not in iterated_chunks:
+ iterated_chunks.append(chunk_id)
+
+ if direction == 'b':
+ # FIXME: There may be multiple things here
+ chunk_id = table.get_prev(chunk_id)
+ else:
+ chunk_id = table.get_next(chunk_id)
+
+ if chunk_id is None:
+ break
+
+ sql = (
+ "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
+ " FROM events"
+ " WHERE outlier = ? AND room_id = ? AND chunk_id = %(chunk_id)d"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s LIMIT ?"
+ ) % {
+ "chunk_id": chunk_id,
+ "order": order,
+ }
+
+ txn.execute(sql, args)
+ new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
+
+ if not new_rows:
+ break
+
+ rows.extend(new_rows)
+
+ if limit > 0:
+ rows = rows[:limit]
if rows:
chunk = rows[-1].chunk_id
@@ -811,16 +862,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
- the keys "event_id", "topological_ordering" and "stream_orderign".
+ the keys "event_id", "topological_ordering" and "stream_ordering".
"""
from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
- rows, token = yield self.runInteraction(
- "paginate_room_events", self._paginate_room_events_txn,
- room_id, from_key, to_key, direction, limit, event_filter,
+ def _do_paginate_room_events(txn):
+ rows, token, chunks = self._paginate_room_events_txn(
+ txn, room_id, from_key, to_key, direction, limit, event_filter,
+ )
+
+ extremities = []
+ seen = set()
+ for chunk_id in chunks:
+ if chunk_id in seen:
+ continue
+ seen.add(chunk_id)
+
+ event_ids = self._simple_select_onecol_txn(
+ txn,
+ table="chunk_backwards_extremities",
+ keyvalues={"chunk_id": chunk_id},
+ retcol="event_id"
+ )
+
+ extremities.extend(e for e in event_ids if e not in extremities)
+
+ return rows, token, extremities
+
+ rows, token, extremities = yield self.runInteraction(
+ "paginate_room_events", _do_paginate_room_events,
)
events = yield self._get_events(
@@ -830,7 +903,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
- defer.returnValue((events, token))
+ defer.returnValue((events, token, extremities))
class StreamStore(StreamWorkerStore):
|