diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 00a6afc963..3ee6a091c5 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -156,7 +156,7 @@ class PaginationHandler(object):
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
- r = yield self.store.get_room_event_after_stream_ordering(
+ r = yield self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 140da8dad6..223ce7fedb 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -536,20 +536,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Deferred[(int, int, str)]:
(stream ordering, topological ordering, event_id)
"""
+ return self.db.runInteraction(
+ "get_room_event_after_stream_ordering",
+ self.get_room_event_around_stream_ordering_txn,
+ room_id, stream_ordering, "f",
+ )
- def _f(txn):
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id"
- " FROM events"
- " WHERE room_id = ? AND stream_ordering >= ?"
- " AND NOT outlier"
- " ORDER BY stream_ordering"
- " LIMIT 1"
- )
- txn.execute(sql, (room_id, stream_ordering))
- return txn.fetchone()
+ def get_room_event_before_stream_ordering(self, room_id, stream_ordering):
+ """Gets details of the first event in a room at or before a stream ordering
+
+ Args:
+ room_id (str):
+ stream_ordering (int):
+
+ Returns:
+ Deferred[(int, int, str)]:
+ (stream ordering, topological ordering, event_id)
+ """
+ return self.db.runInteraction(
+ "get_room_event_before_stream_ordering",
+ self.get_room_event_around_stream_ordering_txn,
+ room_id, stream_ordering, "f",
+ )
+
+ def get_room_event_around_stream_ordering_txn(
+ self, txn, room_id, stream_ordering, dir="f"
+ ):
+ """Gets details of the first event in a room at or either after or before a
+ stream ordering, depending on the provided direction.
+
+ Args:
+ room_id (str):
+ stream_ordering (int):
+ dir (str): Direction in which we're looking towards in the room's history,
+ either "f" (forward) or "b" (backward).
- return self.db.runInteraction("get_room_event_after_stream_ordering", _f)
+ Returns:
+ Deferred[(int, int, str)]:
+ (stream ordering, topological ordering, event_id)
+ """
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering %s ?"
+ " AND NOT outlier"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ ) % ("<=" if dir == "b" else ">=",)
+ txn.execute(sql, (room_id, stream_ordering))
+ return txn.fetchone()
@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
|