diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index cc27ec3804..63d8350530 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
before this stream ordering.
"""
- last_row = await self.get_room_event_before_stream_ordering(
- room_id=room_id,
- stream_ordering=end_token.stream,
+ def get_last_event_in_room_before_stream_ordering_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[str]:
+ # We need to handle the fact that the stream tokens can be vector
+ # clocks. We do this by getting all rows between the minimum and
+ # maximum stream ordering in the token, plus one row less than the
+ # minimum stream ordering. We then filter the results against the
+ # token and return the first row that matches.
+
+ sql = """
+ SELECT * FROM (
+ SELECT instance_name, stream_ordering, topological_ordering, event_id
+ FROM events
+ LEFT JOIN rejections USING (event_id)
+ WHERE room_id = ?
+ AND ? < stream_ordering AND stream_ordering <= ?
+ AND NOT outlier
+ AND rejections.event_id IS NULL
+ ORDER BY stream_ordering DESC
+ ) AS a
+ UNION
+ SELECT * FROM (
+ SELECT instance_name, stream_ordering, topological_ordering, event_id
+ FROM events
+ LEFT JOIN rejections USING (event_id)
+ WHERE room_id = ?
+ AND stream_ordering <= ?
+ AND NOT outlier
+ AND rejections.event_id IS NULL
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ) AS b
+ """
+ txn.execute(
+ sql,
+ (
+ room_id,
+ end_token.stream,
+ end_token.get_max_stream_pos(),
+ room_id,
+ end_token.stream,
+ ),
+ )
+
+ for instance_name, stream_ordering, topological_ordering, event_id in txn:
+ if _filter_results(
+ lower_token=None,
+ upper_token=end_token,
+ instance_name=instance_name,
+ topological_ordering=topological_ordering,
+ stream_ordering=stream_ordering,
+ ):
+ return event_id
+
+ return None
+
+ return await self.db_pool.runInteraction(
+ "get_last_event_in_room_before_stream_ordering",
+ get_last_event_in_room_before_stream_ordering_txn,
)
- if last_row:
- return last_row[2]
- return None
async def get_current_room_stream_token_for_room_id(
self, room_id: str
|