diff options
author | Richard van der Hoff <richard@matrix.org> | 2023-01-12 16:45:23 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2023-01-12 16:45:23 +0000 |
commit | 0f061f39f0868d10adfde4e957d74fa3cb7b04c3 (patch) | |
tree | 025169dc3ba1ca84c6e158ad4476464e13d4c8bb /synapse/storage | |
parent | Add rust linting commands to `scripts-dev/lint.sh` (#14822) (diff) | |
parent | Fix race calling `/members?at=` (#14817) (diff) | |
download | synapse-0f061f39f0868d10adfde4e957d74fa3cb7b04c3.tar.xz |
Merge remote-tracking branch 'origin/release-v1.75' into develop
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cc27ec3804..63d8350530 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): before this stream ordering. """ - last_row = await self.get_room_event_before_stream_ordering( - room_id=room_id, - stream_ordering=end_token.stream, + def get_last_event_in_room_before_stream_ordering_txn( + txn: LoggingTransaction, + ) -> Optional[str]: + # We need to handle the fact that the stream tokens can be vector + # clocks. We do this by getting all rows between the minimum and + # maximum stream ordering in the token, plus one row less than the + # minimum stream ordering. We then filter the results against the + # token and return the first row that matches. + + sql = """ + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND ? < stream_ordering AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + ) AS a + UNION + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + LIMIT 1 + ) AS b + """ + txn.execute( + sql, + ( + room_id, + end_token.stream, + end_token.get_max_stream_pos(), + room_id, + end_token.stream, + ), + ) + + for instance_name, stream_ordering, topological_ordering, event_id in txn: + if _filter_results( + lower_token=None, + upper_token=end_token, + instance_name=instance_name, + topological_ordering=topological_ordering, + stream_ordering=stream_ordering, + ): + return event_id + + return None + + return await self.db_pool.runInteraction( + "get_last_event_in_room_before_stream_ordering", + get_last_event_in_room_before_stream_ordering_txn, ) - if last_row: - return last_row[2] - return None async def get_current_room_stream_token_for_room_id( self, room_id: str |