diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a2527d2a36..515a04699a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -416,6 +416,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_recent_events_for_room", get_recent_events_for_room_txn
)
+ def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
+ """Gets details of the first event in a room at or after a stream ordering
+
+ Args:
+ room_id (str):
+ stream_ordering (int):
+
+ Returns:
+ Deferred[(int, int, str)]:
+ (stream ordering, topological ordering, event_id)
+ """
+ def _f(txn):
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering >= ?"
+ " AND NOT outlier"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+ txn.execute(sql, (room_id, stream_ordering, ))
+ return txn.fetchone()
+
+ return self.runInteraction(
+ "get_room_event_after_stream_ordering", _f,
+ )
+
@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
"""Returns the current token for rooms stream.
|