diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index ac65b30290..531089c279 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -810,6 +810,7 @@ class SlidingSyncHandler:
connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
+ room_configs=relevant_room_map,
from_token=from_token,
sent_room_ids=relevant_rooms_to_send_map.keys(),
unsent_room_ids=unsent_room_ids,
@@ -1910,6 +1911,7 @@ class SlidingSyncHandler:
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
+ ignore_timeline_bound = False
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
@@ -1917,6 +1919,7 @@ class SlidingSyncHandler:
connection_token=from_token.connection_position,
room_id=room_id,
)
+
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
@@ -1930,9 +1933,24 @@ class SlidingSyncHandler:
else:
assert_never(room_status.status)
+ if room_status.timeline_limit is not None and (
+ room_status.timeline_limit < room_sync_config.timeline_limit
+ ):
+ # If the timeline limit has been increased since previous
+ # requests then we treat it as request for more events. We do
+ # this by sending down a `limited` sync, ignoring the from
+ # bound.
+ ignore_timeline_bound = True
+
log_kv({"sliding_sync.room_status": room_status})
- log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
+ log_kv(
+ {
+ "sliding_sync.from_bound": from_bound,
+ "sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
+ "sliding_sync.initial": initial,
+ }
+ )
# Assemble the list of timeline events
#
@@ -2004,7 +2022,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
- to_key=from_bound,
+ to_key=from_bound if not ignore_timeline_bound else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -2343,24 +2361,37 @@ class SlidingSyncHandler:
)
# Figure out the last bump event in the room
- last_bump_event_result = (
- await self.store.get_last_event_pos_in_room_before_stream_ordering(
- room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+ bump_stamp = None
+ if timeline_events:
+ for e in reversed(timeline_events):
+ assert e.internal_metadata.stream_ordering is not None
+ if (
+ e.type in DEFAULT_BUMP_EVENT_TYPES
+ and e.internal_metadata.stream_ordering > 0
+ ):
+ bump_stamp = e.internal_metadata.stream_ordering
+ break
+
+ if bump_stamp is None:
+ # By default, just choose the membership event position
+ bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
+
+ last_bump_event_result = (
+ await self.store.get_last_event_pos_in_room_before_stream_ordering(
+ room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+ )
)
- )
- # By default, just choose the membership event position
- bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
- # But if we found a bump event, use that instead
- if last_bump_event_result is not None:
- _, new_bump_event_pos = last_bump_event_result
+ # But if we found a bump event, use that instead
+ if last_bump_event_result is not None:
+ _, new_bump_event_pos = last_bump_event_result
- # If we've just joined a remote room, then the last bump event may
- # have been backfilled (and so have a negative stream ordering).
- # These negative stream orderings can't sensibly be compared, so
- # instead we use the membership event position.
- if new_bump_event_pos.stream > 0:
- bump_stamp = new_bump_event_pos.stream
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
@@ -2606,12 +2637,13 @@ class SlidingSyncHandler:
up_to_stream_id=since_stream_id,
)
- logger.debug(
- "Deleted %d to-device messages up to %d for %s",
- deleted,
- since_stream_id,
- user_id,
- )
+ if deleted:
+ logger.debug(
+ "Deleted %d to-device messages up to %d for %s",
+ deleted,
+ since_stream_id,
+ user_id,
+ )
messages, stream_id = await self.store.get_messages_for_device(
user_id=user_id,
@@ -2948,19 +2980,26 @@ class HaveSentRoom:
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
+ timeline_limit: The timeline limit config for the room, if LIVE or
+ PREVIOUSLY. This is used to track if the client has increased
+ the timeline limit to request more events.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
+ timeline_limit: Optional[int]
+
+ @staticmethod
+ def live(timeline_limit: int) -> "HaveSentRoom":
+ return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
@staticmethod
- def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+ def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
- return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+ return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
-HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
-HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
@attr.s(auto_attribs=True)
@@ -3026,6 +3065,7 @@ class SlidingSyncConnectionStore:
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
+ room_configs: Dict[str, RoomSyncConfig],
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
@@ -3066,8 +3106,12 @@ class SlidingSyncConnectionStore:
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
- new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
- have_updated = True
+ prev_state = new_room_statuses.get(room_id)
+ new_room_statuses[room_id] = HaveSentRoom.live(
+ room_configs[room_id].timeline_limit
+ )
+ if prev_state != new_room_statuses[room_id]:
+ have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
@@ -3078,18 +3122,23 @@ class SlidingSyncConnectionStore:
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
+ #
+ # We only need to do this if `from_token` is not None, as if it is then
+ # we know that there are no existing entires.
- # Work out the new state for unsent rooms that were `LIVE`.
if from_token:
- new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
- else:
- new_unsent_state = HAVE_SENT_ROOM_NEVER
-
- for room_id in unsent_room_ids:
- prev_state = new_room_statuses.get(room_id)
- if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
- new_room_statuses[room_id] = new_unsent_state
- have_updated = True
+ for room_id in unsent_room_ids:
+ prev_state = new_room_statuses.get(room_id)
+ if (
+ prev_state is not None
+ and prev_state.status == HaveSentRoomFlag.LIVE
+ ):
+ assert prev_state.timeline_limit is not None
+ new_room_statuses[room_id] = HaveSentRoom.previously(
+ from_token.stream_token.room_key,
+ prev_state.timeline_limit,
+ )
+ have_updated = True
if not have_updated:
return prev_connection_token
|