diff options
author | Erik Johnston <erik@matrix.org> | 2023-01-12 10:29:09 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-12 10:29:09 +0000 |
commit | 84ce93c12f921063bb6c59400fcf95649a1b7f45 (patch) | |
tree | 5b9c784b9f8d2c4959f6bb1bd6b0302599d81269 /synapse/storage/databases | |
parent | Calculate rooms changed for device lists to work. (#14810) (diff) | |
download | synapse-84ce93c12f921063bb6c59400fcf95649a1b7f45.tar.xz |
Fix race calling `/members?at=` (#14817)
Fixes #14814
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cc27ec3804..63d8350530 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): before this stream ordering. """ - last_row = await self.get_room_event_before_stream_ordering( - room_id=room_id, - stream_ordering=end_token.stream, + def get_last_event_in_room_before_stream_ordering_txn( + txn: LoggingTransaction, + ) -> Optional[str]: + # We need to handle the fact that the stream tokens can be vector + # clocks. We do this by getting all rows between the minimum and + # maximum stream ordering in the token, plus one row less than the + # minimum stream ordering. We then filter the results against the + # token and return the first row that matches. + + sql = """ + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND ? < stream_ordering AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + ) AS a + UNION + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + LIMIT 1 + ) AS b + """ + txn.execute( + sql, + ( + room_id, + end_token.stream, + end_token.get_max_stream_pos(), + room_id, + end_token.stream, + ), + ) + + for instance_name, stream_ordering, topological_ordering, event_id in txn: + if _filter_results( + lower_token=None, + upper_token=end_token, + instance_name=instance_name, + topological_ordering=topological_ordering, + stream_ordering=stream_ordering, + ): + return event_id + + return None + + return await self.db_pool.runInteraction( + "get_last_event_in_room_before_stream_ordering", + get_last_event_in_room_before_stream_ordering_txn, ) - if last_row: - return last_row[2] - return None async def get_current_room_stream_token_for_room_id( self, room_id: str |