summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py127
1 files changed, 88 insertions, 39 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py

index ac65b30290..531089c279 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py
@@ -810,6 +810,7 @@ class SlidingSyncHandler: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, + room_configs=relevant_room_map, from_token=from_token, sent_room_ids=relevant_rooms_to_send_map.keys(), unsent_room_ids=unsent_room_ids, @@ -1910,6 +1911,7 @@ class SlidingSyncHandler: # # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 from_bound = None + ignore_timeline_bound = False initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = await self.connection_store.have_sent_room( @@ -1917,6 +1919,7 @@ class SlidingSyncHandler: connection_token=from_token.connection_position, room_id=room_id, ) + if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -1930,9 +1933,24 @@ class SlidingSyncHandler: else: assert_never(room_status.status) + if room_status.timeline_limit is not None and ( + room_status.timeline_limit < room_sync_config.timeline_limit + ): + # If the timeline limit has been increased since previous + # requests then we treat it as request for more events. We do + # this by sending down a `limited` sync, ignoring the from + # bound. + ignore_timeline_bound = True + log_kv({"sliding_sync.room_status": room_status}) - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + "sliding_sync.initial": initial, + } + ) # Assemble the list of timeline events # @@ -2004,7 +2022,7 @@ class SlidingSyncHandler: # (from newer to older events) starting at to_bound. # This ensures we fill the `limit` with the newest events first, from_key=to_bound, - to_key=from_bound, + to_key=from_bound if not ignore_timeline_bound else None, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -2343,24 +2361,37 @@ class SlidingSyncHandler: ) # Figure out the last bump event in the room - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + bump_stamp = None + if timeline_events: + for e in reversed(timeline_events): + assert e.internal_metadata.stream_ordering is not None + if ( + e.type in DEFAULT_BUMP_EVENT_TYPES + and e.internal_metadata.stream_ordering > 0 + ): + bump_stamp = e.internal_metadata.stream_ordering + break + + if bump_stamp is None: + # By default, just choose the membership event position + bump_stamp = room_membership_for_user_at_to_token.event_pos.stream + + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + ) ) - ) - # By default, just choose the membership event position - bump_stamp = room_membership_for_user_at_to_token.event_pos.stream - # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, new_bump_event_pos = last_bump_event_result + # But if we found a bump event, use that instead + if last_bump_event_result is not None: + _, new_bump_event_pos = last_bump_event_result - # If we've just joined a remote room, then the last bump event may - # have been backfilled (and so have a negative stream ordering). - # These negative stream orderings can't sensibly be compared, so - # instead we use the membership event position. - if new_bump_event_pos.stream > 0: - bump_stamp = new_bump_event_pos.stream + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) @@ -2606,12 +2637,13 @@ class SlidingSyncHandler: up_to_stream_id=since_stream_id, ) - logger.debug( - "Deleted %d to-device messages up to %d for %s", - deleted, - since_stream_id, - user_id, - ) + if deleted: + logger.debug( + "Deleted %d to-device messages up to %d for %s", + deleted, + since_stream_id, + user_id, + ) messages, stream_id = await self.store.get_messages_for_device( user_id=user_id, @@ -2948,19 +2980,26 @@ class HaveSentRoom: contains the last stream token of the last updates we sent down the room, i.e. we still need to send everything since then to the client. + timeline_limit: The timeline limit config for the room, if LIVE or + PREVIOUSLY. This is used to track if the client has increased + the timeline limit to request more events. """ status: HaveSentRoomFlag last_token: Optional[RoomStreamToken] + timeline_limit: Optional[int] + + @staticmethod + def live(timeline_limit: int) -> "HaveSentRoom": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit) @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom": """Constructor for `PREVIOUSLY` flag.""" - return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) + return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit) -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) +HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None) @attr.s(auto_attribs=True) @@ -3026,6 +3065,7 @@ class SlidingSyncConnectionStore: async def record_rooms( self, sync_config: SlidingSyncConfig, + room_configs: Dict[str, RoomSyncConfig], from_token: Optional[SlidingSyncStreamToken], *, sent_room_ids: StrCollection, @@ -3066,8 +3106,12 @@ class SlidingSyncConnectionStore: # end we can treat this as a noop. have_updated = False for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True + prev_state = new_room_statuses.get(room_id) + new_room_statuses[room_id] = HaveSentRoom.live( + room_configs[room_id].timeline_limit + ) + if prev_state != new_room_statuses[room_id]: + have_updated = True # Whether we add/update the entries for unsent rooms depends on the # existing entry: @@ -3078,18 +3122,23 @@ class SlidingSyncConnectionStore: # given token, so we don't need to update the entry. # - NEVER: We have never previously sent down the room, and we haven't # sent anything down this time either so we leave it as NEVER. + # + # We only need to do this if `from_token` is not None, as if it is then + # we know that there are no existing entires. - # Work out the new state for unsent rooms that were `LIVE`. if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True + for room_id in unsent_room_ids: + prev_state = new_room_statuses.get(room_id) + if ( + prev_state is not None + and prev_state.status == HaveSentRoomFlag.LIVE + ): + assert prev_state.timeline_limit is not None + new_room_statuses[room_id] = HaveSentRoom.previously( + from_token.stream_token.room_key, + prev_state.timeline_limit, + ) + have_updated = True if not have_updated: return prev_connection_token