diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/e2e_keys.py | 26 | ||||
-rw-r--r-- | synapse/handlers/sliding_sync.py | 73 |
2 files changed, 63 insertions, 36 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 668cec513b..f78e66ad0a 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -291,13 +291,20 @@ class E2eKeysHandler: # Only try and fetch keys for destinations that are not marked as # down. - filtered_destinations = await filter_destinations_by_retry_limiter( - remote_queries_not_in_cache.keys(), - self.clock, - self.store, - # Let's give an arbitrary grace period for those hosts that are - # only recently down - retry_due_within_ms=60 * 1000, + unfiltered_destinations = remote_queries_not_in_cache.keys() + filtered_destinations = set( + await filter_destinations_by_retry_limiter( + unfiltered_destinations, + self.clock, + self.store, + # Let's give an arbitrary grace period for those hosts that are + # only recently down + retry_due_within_ms=60 * 1000, + ) + ) + failures.update( + (dest, _NOT_READY_FOR_RETRY_FAILURE) + for dest in (unfiltered_destinations - filtered_destinations) ) await concurrently_execute( @@ -1641,6 +1648,9 @@ def _check_device_signature( raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE) +_NOT_READY_FOR_RETRY_FAILURE = {"status": 503, "message": "Not ready for retry"} + + def _exception_to_failure(e: Exception) -> JsonDict: if isinstance(e, SynapseError): return {"status": e.code, "errcode": e.errcode, "message": str(e)} @@ -1649,7 +1659,7 @@ def _exception_to_failure(e: Exception) -> JsonDict: return {"status": e.code, "message": str(e)} if isinstance(e, NotRetryingDestination): - return {"status": 503, "message": "Not ready for retry"} + return _NOT_READY_FOR_RETRY_FAILURE # include ConnectionRefused and other errors # diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 3e8e833367..ebb15a8451 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -41,7 +41,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe from synapse.events import EventBase from synapse.events.utils import strip_event from synapse.handlers.relations import BundledAggregations -from synapse.logging.opentracing import set_tag, start_active_span, tag_args, trace +from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.stream import CurrentStateDeltaMembership from synapse.storage.roommember import MemberSummary @@ -631,21 +631,39 @@ class SlidingSyncHandler: # previously. if from_token: rooms_should_send = set() + + # First we check if there are rooms that match a list/room + # subscription and have updates we need to send (i.e. either because + # we haven't sent the room down, or we have but there are missing + # updates). for room_id in relevant_room_map: status = await self.connection_store.have_sent_room( sync_config, from_token.connection_position, room_id, ) - if status.status != HaveSentRoomFlag.LIVE: + if ( + # The room was never sent down before so the client needs to know + # about it regardless of any updates. + status.status == HaveSentRoomFlag.NEVER + # `PREVIOUSLY` literally means the "room was sent down before *AND* + # there are updates we haven't sent down" so we already know this + # room has updates. + or status.status == HaveSentRoomFlag.PREVIOUSLY + ): rooms_should_send.add(room_id) + elif status.status == HaveSentRoomFlag.LIVE: + # We know that we've sent all updates up until `from_token`, + # so we just need to check if there have been updates since + # then. + pass + else: + assert_never(status.status) - # We only need to check for any new events and not state changes, as - # state changes can only happen if an event has also been sent. - rooms_that_have_updates = ( - self.store._events_stream_cache.get_entities_changed( - relevant_room_map, from_token.stream_token.room_key.stream - ) + # We only need to check for new events since any state changes + # will also come down as new events. + rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( + relevant_room_map.keys(), from_token.stream_token.room_key ) rooms_should_send.update(rooms_that_have_updates) relevant_room_map = { @@ -672,8 +690,9 @@ class SlidingSyncHandler: if room_sync_result or not from_token: rooms[room_id] = room_sync_result - with start_active_span("sliding_sync.generate_room_entries"): - await concurrently_execute(handle_room, relevant_room_map, 10) + if relevant_room_map: + with start_active_span("sliding_sync.generate_room_entries"): + await concurrently_execute(handle_room, relevant_room_map, 10) extensions = await self.get_extensions_response( sync_config=sync_config, @@ -684,22 +703,22 @@ class SlidingSyncHandler: ) if has_lists or has_room_subscriptions: - connection_token = await self.connection_store.record_rooms( + connection_position = await self.connection_store.record_rooms( sync_config=sync_config, - from_token=from_token, relevant_room_map=relevant_room_map, + from_token=from_token, sent_room_ids=relevant_room_map.keys(), # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` unsent_room_ids=[], ) elif from_token: - connection_token = from_token.connection_position + connection_position = from_token.connection_position else: # Initial sync without a `from_token` starts at `0` - connection_token = 0 + connection_position = 0 return SlidingSyncResult( - next_pos=SlidingSyncStreamToken(to_token, connection_token), + next_pos=SlidingSyncStreamToken(to_token, connection_position), lists=lists, rooms=rooms, extensions=extensions, @@ -1473,7 +1492,6 @@ class SlidingSyncHandler: connection_token=from_token.connection_position, room_id=room_id, ) - if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -1493,11 +1511,9 @@ class SlidingSyncHandler: ): from_bound = None - set_tag("sliding_sync.from_bound", from_bound) - set_tag("sliding_sync.room_status", room_status.status) + log_kv({"sliding_sync.room_status": room_status}) - set_tag("sliding_sync.initial", initial) - set_tag("room_id", room_id) + log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) # Assemble the list of timeline events # @@ -1914,6 +1930,7 @@ class SlidingSyncHandler: highlight_count=0, ) + @trace async def get_extensions_response( self, sync_config: SlidingSyncConfig, @@ -2384,10 +2401,13 @@ class SlidingSyncConnectionStore: """In-memory store of per-connection state, including what rooms we have previously sent down a sliding sync connection. - Note: This is NOT safe to run in a worker setup. + Note: This is NOT safe to run in a worker setup because connection positions will + point to different sets of rooms on different workers. e.g. for the same connection, + a connection position of 5 might have totally different states on worker A and + worker B. - The complication here is that we need to handle requests being resent, i.e. - if we sent down a room in a response that the client received, we must + One complication that we need to deal with here is needing to handle requests being + resent, i.e. if we sent down a room in a response that the client received, we must consider the room *not* sent when we get the request again. This is handled by using an integer "token", which is returned to the client @@ -2428,9 +2448,9 @@ class SlidingSyncConnectionStore: async def record_rooms( self, sync_config: SlidingSyncConfig, + relevant_room_map: Dict[str, RoomSyncConfig], from_token: Optional[SlidingSyncStreamToken], *, - relevant_room_map: Dict[str, RoomSyncConfig], sent_room_ids: StrCollection, unsent_room_ids: StrCollection, ) -> int: @@ -2469,9 +2489,7 @@ class SlidingSyncConnectionStore: # end we can treat this as a noop. have_updated = False for room_id in sent_room_ids: - new_room_statuses[room_id] = HaveSentRoom( - HaveSentRoomFlag.LIVE, None, relevant_room_map[room_id].timeline_limit - ) + new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE have_updated = True # Whether we add/update the entries for unsent rooms depends on the @@ -2494,7 +2512,6 @@ class SlidingSyncConnectionStore: if from_token: new_room_statuses[room_id] = HaveSentRoom.previously( from_token.stream_token.room_key, - None, relevant_room_map[room_id].timeline_limit, ) else: |