diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cc27ec3804..63d8350530 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): before this stream ordering. """ - last_row = await self.get_room_event_before_stream_ordering( - room_id=room_id, - stream_ordering=end_token.stream, + def get_last_event_in_room_before_stream_ordering_txn( + txn: LoggingTransaction, + ) -> Optional[str]: + # We need to handle the fact that the stream tokens can be vector + # clocks. We do this by getting all rows between the minimum and + # maximum stream ordering in the token, plus one row less than the + # minimum stream ordering. We then filter the results against the + # token and return the first row that matches. + + sql = """ + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND ? < stream_ordering AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + ) AS a + UNION + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + LIMIT 1 + ) AS b + """ + txn.execute( + sql, + ( + room_id, + end_token.stream, + end_token.get_max_stream_pos(), + room_id, + end_token.stream, + ), + ) + + for instance_name, stream_ordering, topological_ordering, event_id in txn: + if _filter_results( + lower_token=None, + upper_token=end_token, + instance_name=instance_name, + topological_ordering=topological_ordering, + stream_ordering=stream_ordering, + ): + return event_id + + return None + + return await self.db_pool.runInteraction( + "get_last_event_in_room_before_stream_ordering", + get_last_event_in_room_before_stream_ordering_txn, ) - if last_row: - return last_row[2] - return None async def get_current_room_stream_token_for_room_id( self, room_id: str |