summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/e2e_keys.py26
-rw-r--r--synapse/handlers/sliding_sync.py73
2 files changed, 63 insertions, 36 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 668cec513b..f78e66ad0a 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -291,13 +291,20 @@ class E2eKeysHandler:
 
             # Only try and fetch keys for destinations that are not marked as
             # down.
-            filtered_destinations = await filter_destinations_by_retry_limiter(
-                remote_queries_not_in_cache.keys(),
-                self.clock,
-                self.store,
-                # Let's give an arbitrary grace period for those hosts that are
-                # only recently down
-                retry_due_within_ms=60 * 1000,
+            unfiltered_destinations = remote_queries_not_in_cache.keys()
+            filtered_destinations = set(
+                await filter_destinations_by_retry_limiter(
+                    unfiltered_destinations,
+                    self.clock,
+                    self.store,
+                    # Let's give an arbitrary grace period for those hosts that are
+                    # only recently down
+                    retry_due_within_ms=60 * 1000,
+                )
+            )
+            failures.update(
+                (dest, _NOT_READY_FOR_RETRY_FAILURE)
+                for dest in (unfiltered_destinations - filtered_destinations)
             )
 
             await concurrently_execute(
@@ -1641,6 +1648,9 @@ def _check_device_signature(
         raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
 
 
+_NOT_READY_FOR_RETRY_FAILURE = {"status": 503, "message": "Not ready for retry"}
+
+
 def _exception_to_failure(e: Exception) -> JsonDict:
     if isinstance(e, SynapseError):
         return {"status": e.code, "errcode": e.errcode, "message": str(e)}
@@ -1649,7 +1659,7 @@ def _exception_to_failure(e: Exception) -> JsonDict:
         return {"status": e.code, "message": str(e)}
 
     if isinstance(e, NotRetryingDestination):
-        return {"status": 503, "message": "Not ready for retry"}
+        return _NOT_READY_FOR_RETRY_FAILURE
 
     # include ConnectionRefused and other errors
     #
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3e8e833367..ebb15a8451 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -41,7 +41,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import set_tag, start_active_span, tag_args, trace
+from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
@@ -631,21 +631,39 @@ class SlidingSyncHandler:
         # previously.
         if from_token:
             rooms_should_send = set()
+
+            # First we check if there are rooms that match a list/room
+            # subscription and have updates we need to send (i.e. either because
+            # we haven't sent the room down, or we have but there are missing
+            # updates).
             for room_id in relevant_room_map:
                 status = await self.connection_store.have_sent_room(
                     sync_config,
                     from_token.connection_position,
                     room_id,
                 )
-                if status.status != HaveSentRoomFlag.LIVE:
+                if (
+                    # The room was never sent down before so the client needs to know
+                    # about it regardless of any updates.
+                    status.status == HaveSentRoomFlag.NEVER
+                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                    # there are updates we haven't sent down" so we already know this
+                    # room has updates.
+                    or status.status == HaveSentRoomFlag.PREVIOUSLY
+                ):
                     rooms_should_send.add(room_id)
+                elif status.status == HaveSentRoomFlag.LIVE:
+                    # We know that we've sent all updates up until `from_token`,
+                    # so we just need to check if there have been updates since
+                    # then.
+                    pass
+                else:
+                    assert_never(status.status)
 
-            # We only need to check for any new events and not state changes, as
-            # state changes can only happen if an event has also been sent.
-            rooms_that_have_updates = (
-                self.store._events_stream_cache.get_entities_changed(
-                    relevant_room_map, from_token.stream_token.room_key.stream
-                )
+            # We only need to check for new events since any state changes
+            # will also come down as new events.
+            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
             relevant_room_map = {
@@ -672,8 +690,9 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        with start_active_span("sliding_sync.generate_room_entries"):
-            await concurrently_execute(handle_room, relevant_room_map, 10)
+        if relevant_room_map:
+            with start_active_span("sliding_sync.generate_room_entries"):
+                await concurrently_execute(handle_room, relevant_room_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
@@ -684,22 +703,22 @@ class SlidingSyncHandler:
         )
 
         if has_lists or has_room_subscriptions:
-            connection_token = await self.connection_store.record_rooms(
+            connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
-                from_token=from_token,
                 relevant_room_map=relevant_room_map,
+                from_token=from_token,
                 sent_room_ids=relevant_room_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
         elif from_token:
-            connection_token = from_token.connection_position
+            connection_position = from_token.connection_position
         else:
             # Initial sync without a `from_token` starts at `0`
-            connection_token = 0
+            connection_position = 0
 
         return SlidingSyncResult(
-            next_pos=SlidingSyncStreamToken(to_token, connection_token),
+            next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1473,7 +1492,6 @@ class SlidingSyncHandler:
                 connection_token=from_token.connection_position,
                 room_id=room_id,
             )
-
             if room_status.status == HaveSentRoomFlag.LIVE:
                 from_bound = from_token.stream_token.room_key
                 initial = False
@@ -1493,11 +1511,9 @@ class SlidingSyncHandler:
             ):
                 from_bound = None
 
-            set_tag("sliding_sync.from_bound", from_bound)
-            set_tag("sliding_sync.room_status", room_status.status)
+            log_kv({"sliding_sync.room_status": room_status})
 
-        set_tag("sliding_sync.initial", initial)
-        set_tag("room_id", room_id)
+        log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
 
         # Assemble the list of timeline events
         #
@@ -1914,6 +1930,7 @@ class SlidingSyncHandler:
             highlight_count=0,
         )
 
+    @trace
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
@@ -2384,10 +2401,13 @@ class SlidingSyncConnectionStore:
     """In-memory store of per-connection state, including what rooms we have
     previously sent down a sliding sync connection.
 
-    Note: This is NOT safe to run in a worker setup.
+    Note: This is NOT safe to run in a worker setup because connection positions will
+    point to different sets of rooms on different workers. e.g. for the same connection,
+    a connection position of 5 might have totally different states on worker A and
+    worker B.
 
-    The complication here is that we need to handle requests being resent, i.e.
-    if we sent down a room in a response that the client received, we must
+    One complication that we need to deal with here is needing to handle requests being
+    resent, i.e. if we sent down a room in a response that the client received, we must
     consider the room *not* sent when we get the request again.
 
     This is handled by using an integer "token", which is returned to the client
@@ -2428,9 +2448,9 @@ class SlidingSyncConnectionStore:
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
+        relevant_room_map: Dict[str, RoomSyncConfig],
         from_token: Optional[SlidingSyncStreamToken],
         *,
-        relevant_room_map: Dict[str, RoomSyncConfig],
         sent_room_ids: StrCollection,
         unsent_room_ids: StrCollection,
     ) -> int:
@@ -2469,9 +2489,7 @@ class SlidingSyncConnectionStore:
         # end we can treat this as a noop.
         have_updated = False
         for room_id in sent_room_ids:
-            new_room_statuses[room_id] = HaveSentRoom(
-                HaveSentRoomFlag.LIVE, None, relevant_room_map[room_id].timeline_limit
-            )
+            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
             have_updated = True
 
         # Whether we add/update the entries for unsent rooms depends on the
@@ -2494,7 +2512,6 @@ class SlidingSyncConnectionStore:
                 if from_token:
                     new_room_statuses[room_id] = HaveSentRoom.previously(
                         from_token.stream_token.room_key,
-                        None,
                         relevant_room_map[room_id].timeline_limit,
                     )
                 else: