diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3aa2e2b7ba..554c820f79 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -953,7 +953,7 @@ class SyncHandler:
batch: TimelineBatch,
sync_config: SyncConfig,
since_token: Optional[StreamToken],
- now_token: StreamToken,
+ end_token: StreamToken,
full_state: bool,
) -> MutableStateMap[EventBase]:
"""Works out the difference in state between the end of the previous sync and
@@ -964,7 +964,9 @@ class SyncHandler:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
since_token: Token of the end of the previous batch. May be `None`.
- now_token: Token of the end of the current batch.
+ end_token: Token of the end of the current batch. Normally this will be
+ the same as the global "now_token", but if the user has left the room,
+ the point just after their leave event.
full_state: Whether to force returning the full state.
`lazy_load_members` still applies when `full_state` is `True`.
@@ -1044,7 +1046,7 @@ class SyncHandler:
room_id,
sync_config.user,
batch,
- now_token,
+ end_token,
members_to_fetch,
timeline_state,
)
@@ -1058,7 +1060,7 @@ class SyncHandler:
room_id,
batch,
since_token,
- now_token,
+ end_token,
members_to_fetch,
timeline_state,
)
@@ -1130,7 +1132,7 @@ class SyncHandler:
room_id: str,
syncing_user: UserID,
batch: TimelineBatch,
- now_token: StreamToken,
+ end_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
@@ -1143,7 +1145,9 @@ class SyncHandler:
room_id: The room we are calculating for.
syncing_user: The user that is calling `/sync`.
batch: The timeline batch for the room that will be sent to the user.
- now_token: Token of the end of the current batch.
+ end_token: Token of the end of the current batch. Normally this will be
+ the same as the global "now_token", but if the user has left the room,
+ the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline.
timeline_state: The contribution to the room state from state events in
@@ -1183,15 +1187,16 @@ class SyncHandler:
await_full_state = True
lazy_load_members = False
- if batch:
- state_at_timeline_end = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=end_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ if batch:
+ # Strictly speaking, this returns the state *after* the first event in the
+ # timeline, but that is good enough here.
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
@@ -1200,13 +1205,6 @@ class SyncHandler:
)
)
else:
- state_at_timeline_end = await self.get_state_at(
- room_id,
- stream_position=now_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
-
state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state(
@@ -1223,7 +1221,7 @@ class SyncHandler:
room_id: str,
batch: TimelineBatch,
since_token: StreamToken,
- now_token: StreamToken,
+ end_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
@@ -1239,7 +1237,9 @@ class SyncHandler:
room_id: The room we are calculating for.
batch: The timeline batch for the room that will be sent to the user.
since_token: Token of the end of the previous batch.
- now_token: Token of the end of the current batch.
+ end_token: Token of the end of the current batch. Normally this will be
+ the same as the global "now_token", but if the user has left the room,
+ the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline. Otherwise, `None`.
timeline_state: The contribution to the room state from state events in
@@ -1259,25 +1259,25 @@ class SyncHandler:
await_full_state = True
lazy_load_members = False
- if batch.limited:
- if batch:
- state_at_timeline_start = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
- else:
- # We can get here if the user has ignored the senders of all
- # the recent events.
- state_at_timeline_start = await self.get_state_at(
- room_id,
- stream_position=now_token,
+ if batch:
+ state_at_timeline_start = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
+ )
+ else:
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_start = await self.get_state_at(
+ room_id,
+ stream_position=end_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ if batch.limited:
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
@@ -1292,58 +1292,28 @@ class SyncHandler:
# about them).
state_filter = StateFilter.all()
- state_at_previous_sync = await self.get_state_at(
- room_id,
- stream_position=since_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
+ state_at_previous_sync = await self.get_state_at(
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
- if batch:
- state_at_timeline_end = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
- else:
- # We can get here if the user has ignored the senders of all
- # the recent events.
- state_at_timeline_end = await self.get_state_at(
- room_id,
- stream_position=now_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=end_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ state_ids = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end=state_at_previous_sync,
+ lazy_load_members=lazy_load_members,
+ )
- state_ids = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- timeline_end=state_at_timeline_end,
- previous_timeline_end=state_at_previous_sync,
- lazy_load_members=lazy_load_members,
- )
- else:
- state_ids = {}
- if lazy_load_members:
- if members_to_fetch and batch.events:
- # We're returning an incremental sync, with no
- # "gap" since the previous sync, so normally there would be
- # no state to return.
- # But we're lazy-loading, so the client might need some more
- # member events to understand the events in this timeline.
- # So we fish out all the member events corresponding to the
- # timeline here. The caller will then dedupe any redundant ones.
-
- state_ids = await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- # we only want members!
- state_filter=StateFilter.from_types(
- (EventTypes.Member, member) for member in members_to_fetch
- ),
- await_full_state=False,
- )
return state_ids
async def _find_missing_partial_state_memberships(
@@ -2344,6 +2314,7 @@ class SyncHandler:
full_state=False,
since_token=since_token,
upto_token=leave_token,
+ end_token=leave_token,
out_of_band=leave_event.internal_metadata.is_out_of_band_membership(),
)
)
@@ -2381,6 +2352,7 @@ class SyncHandler:
full_state=False,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
+ end_token=now_token,
)
else:
entry = RoomSyncResultBuilder(
@@ -2391,6 +2363,7 @@ class SyncHandler:
full_state=False,
since_token=since_token,
upto_token=since_token,
+ end_token=now_token,
)
room_entries.append(entry)
@@ -2449,6 +2422,7 @@ class SyncHandler:
full_state=True,
since_token=since_token,
upto_token=now_token,
+ end_token=now_token,
)
)
elif event.membership == Membership.INVITE:
@@ -2478,6 +2452,7 @@ class SyncHandler:
full_state=True,
since_token=since_token,
upto_token=leave_token,
+ end_token=leave_token,
)
)
@@ -2548,6 +2523,7 @@ class SyncHandler:
{
"since_token": since_token,
"upto_token": upto_token,
+ "end_token": room_builder.end_token,
}
)
@@ -2621,7 +2597,7 @@ class SyncHandler:
batch,
sync_config,
since_token,
- now_token,
+ room_builder.end_token,
full_state=full_state,
)
else:
@@ -2781,6 +2757,61 @@ def _calculate_state(
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
+ # Naively, we would just return the difference between the state at the start
+ # of the timeline (`timeline_start_ids`) and that at the end of the previous sync
+ # (`previous_timeline_end_ids`). However, that fails in the presence of forks in
+ # the DAG.
+ #
+ # For example, consider a DAG such as the following:
+ #
+ # E1
+ # ↗ ↖
+ # | S2
+ # | ↑
+ # --|------|----
+ # | |
+ # E3 |
+ # ↖ /
+ # E4
+ #
+ # ... and a filter that means we only return 2 events, represented by the dashed
+ # horizontal line. Assuming S2 was *not* included in the previous sync, we need to
+ # include it in the `state` section.
+ #
+ # Note that the state at the start of the timeline (E3) does not include S2. So,
+ # to make sure it gets included in the calculation here, we actually look at
+ # the state at the *end* of the timeline, and subtract any events that are present
+ # in the timeline.
+ #
+ # ----------
+ #
+ # Aside 1: You may then wonder if we need to include `timeline_start` in the
+ # calculation. Consider a linear DAG:
+ #
+ # E1
+ # ↑
+ # S2
+ # ↑
+ # ----|------
+ # |
+ # E3
+ # ↑
+ # S4
+ # ↑
+ # E5
+ #
+ # ... where S2 and S4 change the same piece of state; and where we have a filter
+ # that returns 3 events (E3, S4, E5). We still need to tell the client about S2,
+ # because it might affect the display of E3. However, the state at the end of the
+ # timeline only tells us about S4; if we don't inspect `timeline_start` we won't
+ # find out about S2.
+ #
+ # (There are yet more complicated cases in which a state event is excluded from the
+ # timeline, but whose effect actually lands in the DAG in the *middle* of the
+ # timeline. We have no way to represent that in the /sync response, and we don't
+ # even try; it is ether omitted or plonked into `state` as if it were at the start
+ # of the timeline, depending on what else is in the timeline.)
+
state_ids = (
(timeline_end_ids | timeline_start_ids)
- previous_timeline_end_ids
@@ -2883,13 +2914,30 @@ class RoomSyncResultBuilder:
Attributes:
room_id
+
rtype: One of `"joined"` or `"archived"`
+
events: List of events to include in the room (more events may be added
when generating result).
+
newly_joined: If the user has newly joined the room
+
full_state: Whether the full state should be sent in result
+
since_token: Earliest point to return events from, or None
- upto_token: Latest point to return events from.
+
+ upto_token: Latest point to return events from. If `events` is populated,
+ this is set to the token at the start of `events`
+
+ end_token: The last point in the timeline that the client should see events
+ from. Normally this will be the same as the global `now_token`, but in
+ the case of rooms where the user has left the room, this will be the point
+ just after their leave event.
+
+ This is used in the calculation of the state which is returned in `state`:
+ any state changes *up to* `end_token` (and not beyond!) which are not
+ reflected in the timeline need to be returned in `state`.
+
out_of_band: whether the events in the room are "out of band" events
and the server isn't in the room.
"""
@@ -2901,5 +2949,5 @@ class RoomSyncResultBuilder:
full_state: bool
since_token: Optional[StreamToken]
upto_token: StreamToken
-
+ end_token: StreamToken
out_of_band: bool = False
|