diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3e8e833367..ebb15a8451 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -41,7 +41,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
from synapse.events import EventBase
from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import set_tag, start_active_span, tag_args, trace
+from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import MemberSummary
@@ -631,21 +631,39 @@ class SlidingSyncHandler:
# previously.
if from_token:
rooms_should_send = set()
+
+ # First we check if there are rooms that match a list/room
+ # subscription and have updates we need to send (i.e. either because
+ # we haven't sent the room down, or we have but there are missing
+ # updates).
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
sync_config,
from_token.connection_position,
room_id,
)
- if status.status != HaveSentRoomFlag.LIVE:
+ if (
+ # The room was never sent down before so the client needs to know
+ # about it regardless of any updates.
+ status.status == HaveSentRoomFlag.NEVER
+ # `PREVIOUSLY` literally means the "room was sent down before *AND*
+ # there are updates we haven't sent down" so we already know this
+ # room has updates.
+ or status.status == HaveSentRoomFlag.PREVIOUSLY
+ ):
rooms_should_send.add(room_id)
+ elif status.status == HaveSentRoomFlag.LIVE:
+ # We know that we've sent all updates up until `from_token`,
+ # so we just need to check if there have been updates since
+ # then.
+ pass
+ else:
+ assert_never(status.status)
- # We only need to check for any new events and not state changes, as
- # state changes can only happen if an event has also been sent.
- rooms_that_have_updates = (
- self.store._events_stream_cache.get_entities_changed(
- relevant_room_map, from_token.stream_token.room_key.stream
- )
+ # We only need to check for new events since any state changes
+ # will also come down as new events.
+ rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+ relevant_room_map.keys(), from_token.stream_token.room_key
)
rooms_should_send.update(rooms_that_have_updates)
relevant_room_map = {
@@ -672,8 +690,9 @@ class SlidingSyncHandler:
if room_sync_result or not from_token:
rooms[room_id] = room_sync_result
- with start_active_span("sliding_sync.generate_room_entries"):
- await concurrently_execute(handle_room, relevant_room_map, 10)
+ if relevant_room_map:
+ with start_active_span("sliding_sync.generate_room_entries"):
+ await concurrently_execute(handle_room, relevant_room_map, 10)
extensions = await self.get_extensions_response(
sync_config=sync_config,
@@ -684,22 +703,22 @@ class SlidingSyncHandler:
)
if has_lists or has_room_subscriptions:
- connection_token = await self.connection_store.record_rooms(
+ connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
- from_token=from_token,
relevant_room_map=relevant_room_map,
+ from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
unsent_room_ids=[],
)
elif from_token:
- connection_token = from_token.connection_position
+ connection_position = from_token.connection_position
else:
# Initial sync without a `from_token` starts at `0`
- connection_token = 0
+ connection_position = 0
return SlidingSyncResult(
- next_pos=SlidingSyncStreamToken(to_token, connection_token),
+ next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1473,7 +1492,6 @@ class SlidingSyncHandler:
connection_token=from_token.connection_position,
room_id=room_id,
)
-
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
@@ -1493,11 +1511,9 @@ class SlidingSyncHandler:
):
from_bound = None
- set_tag("sliding_sync.from_bound", from_bound)
- set_tag("sliding_sync.room_status", room_status.status)
+ log_kv({"sliding_sync.room_status": room_status})
- set_tag("sliding_sync.initial", initial)
- set_tag("room_id", room_id)
+ log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
# Assemble the list of timeline events
#
@@ -1914,6 +1930,7 @@ class SlidingSyncHandler:
highlight_count=0,
)
+ @trace
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
@@ -2384,10 +2401,13 @@ class SlidingSyncConnectionStore:
"""In-memory store of per-connection state, including what rooms we have
previously sent down a sliding sync connection.
- Note: This is NOT safe to run in a worker setup.
+ Note: This is NOT safe to run in a worker setup because connection positions will
+ point to different sets of rooms on different workers. e.g. for the same connection,
+ a connection position of 5 might have totally different states on worker A and
+ worker B.
- The complication here is that we need to handle requests being resent, i.e.
- if we sent down a room in a response that the client received, we must
+ One complication that we need to deal with here is needing to handle requests being
+ resent, i.e. if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
This is handled by using an integer "token", which is returned to the client
@@ -2428,9 +2448,9 @@ class SlidingSyncConnectionStore:
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
+ relevant_room_map: Dict[str, RoomSyncConfig],
from_token: Optional[SlidingSyncStreamToken],
*,
- relevant_room_map: Dict[str, RoomSyncConfig],
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
@@ -2469,9 +2489,7 @@ class SlidingSyncConnectionStore:
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
- new_room_statuses[room_id] = HaveSentRoom(
- HaveSentRoomFlag.LIVE, None, relevant_room_map[room_id].timeline_limit
- )
+ new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
@@ -2494,7 +2512,6 @@ class SlidingSyncConnectionStore:
if from_token:
new_room_statuses[room_id] = HaveSentRoom.previously(
from_token.stream_token.room_key,
- None,
relevant_room_map[room_id].timeline_limit,
)
else:
|