diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/filtering.py | 3 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 65 |
3 files changed, 59 insertions, 13 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 2b5af264b4..4cf8f0cc8e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -283,9 +283,6 @@ class FilterCollection: await self._room_filter.filter(events) ) - def blocks_all_rooms(self) -> bool: - return self._room_filter.filters_all_rooms() - def blocks_all_presence(self) -> bool: return ( self._presence_filter.filters_all_types() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6942e06c77..20ee2f203a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1793,10 +1793,6 @@ class SyncHandler: - newly_left_users """ - # If the request doesn't care about rooms then nothing to do! - if sync_result_builder.sync_config.filter_collection.blocks_all_rooms(): - return set(), set(), set(), set() - since_token = sync_result_builder.since_token # 1. Start by fetching all ephemeral events in rooms we've joined (if required). diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cc27ec3804..63d8350530 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): before this stream ordering. """ - last_row = await self.get_room_event_before_stream_ordering( - room_id=room_id, - stream_ordering=end_token.stream, + def get_last_event_in_room_before_stream_ordering_txn( + txn: LoggingTransaction, + ) -> Optional[str]: + # We need to handle the fact that the stream tokens can be vector + # clocks. We do this by getting all rows between the minimum and + # maximum stream ordering in the token, plus one row less than the + # minimum stream ordering. We then filter the results against the + # token and return the first row that matches. + + sql = """ + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND ? < stream_ordering AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + ) AS a + UNION + SELECT * FROM ( + SELECT instance_name, stream_ordering, topological_ordering, event_id + FROM events + LEFT JOIN rejections USING (event_id) + WHERE room_id = ? + AND stream_ordering <= ? + AND NOT outlier + AND rejections.event_id IS NULL + ORDER BY stream_ordering DESC + LIMIT 1 + ) AS b + """ + txn.execute( + sql, + ( + room_id, + end_token.stream, + end_token.get_max_stream_pos(), + room_id, + end_token.stream, + ), + ) + + for instance_name, stream_ordering, topological_ordering, event_id in txn: + if _filter_results( + lower_token=None, + upper_token=end_token, + instance_name=instance_name, + topological_ordering=topological_ordering, + stream_ordering=stream_ordering, + ): + return event_id + + return None + + return await self.db_pool.runInteraction( + "get_last_event_in_room_before_stream_ordering", + get_last_event_in_room_before_stream_ordering_txn, ) - if last_row: - return last_row[2] - return None async def get_current_room_stream_token_for_room_id( self, room_id: str |