diff --git a/changelog.d/18375.bugfix b/changelog.d/18375.bugfix
new file mode 100644
index 0000000000..faebe6f046
--- /dev/null
+++ b/changelog.d/18375.bugfix
@@ -0,0 +1 @@
+Pass leave from remote invite rejection down Sliding Sync.
diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py
index 459d3c3e24..cb56eb53fc 100644
--- a/synapse/handlers/sliding_sync/__init__.py
+++ b/synapse/handlers/sliding_sync/__init__.py
@@ -271,6 +271,7 @@ class SlidingSyncHandler:
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
+ newly_left=room_id in interested_rooms.newly_left_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)
@@ -542,6 +543,7 @@ class SlidingSyncHandler:
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
+ newly_left: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
@@ -559,6 +561,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
+ newly_left: If the user has newly left the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
@@ -856,6 +859,26 @@ class SlidingSyncHandler:
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
+
+ # For the case of rejecting remote invites, the leave event won't be
+ # returned by `get_current_state_deltas_for_room`. This is due to the current
+ # state only being filled out for rooms the server is in, and so doesn't pick
+ # up out-of-band leaves (including locally rejected invites) as these events
+ # are outliers and not added to the `current_state_delta_stream`.
+ #
+ # We rely on being explicitly told that the room has been `newly_left` to
+ # ensure we extract the out-of-band leave.
+ if newly_left and room_membership_for_user_at_to_token.event_id is not None:
+ membership_changed = True
+ leave_event = await self.store.get_event(
+ room_membership_for_user_at_to_token.event_id
+ )
+ state_key = leave_event.get_state_key()
+ if state_key is not None:
+ room_state_delta_id_map[(leave_event.type, state_key)] = (
+ room_membership_for_user_at_to_token.event_id
+ )
+
deltas = await self.get_current_state_deltas_for_room(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py
index 7e3cf539df..6d1ac91605 100644
--- a/synapse/handlers/sliding_sync/room_lists.py
+++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -1120,7 +1120,7 @@ class SlidingSyncRoomLists:
(
newly_joined_room_ids,
newly_left_room_map,
- ) = await self._get_newly_joined_and_left_rooms(
+ ) = await self._get_newly_joined_and_left_rooms_fallback(
user_id, to_token=to_token, from_token=from_token
)
@@ -1187,6 +1187,53 @@ class SlidingSyncRoomLists:
was state reset out of the room. To actually check for a state reset, you
need to check if a membership still exists in the room.
"""
+
+ newly_joined_room_ids: Set[str] = set()
+ newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
+
+ if not from_token:
+ return newly_joined_room_ids, newly_left_room_map
+
+ changes = await self.store.get_sliding_sync_membership_changes(
+ user_id,
+ from_key=from_token.room_key,
+ to_key=to_token.room_key,
+ excluded_room_ids=set(self.rooms_to_exclude_globally),
+ )
+
+ for room_id, entry in changes.items():
+ if entry.membership == Membership.JOIN:
+ newly_joined_room_ids.add(room_id)
+ elif entry.membership == Membership.LEAVE:
+ newly_left_room_map[room_id] = entry
+
+ return newly_joined_room_ids, newly_left_room_map
+
+ @trace
+ async def _get_newly_joined_and_left_rooms_fallback(
+ self,
+ user_id: str,
+ to_token: StreamToken,
+ from_token: Optional[StreamToken],
+ ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
+ """Fetch the sets of rooms that the user newly joined or left in the
+ given token range.
+
+ Note: there may be rooms in the newly left rooms where the user was
+ "state reset" out of the room, and so that room would not be part of the
+ "current memberships" of the user.
+
+ Returns:
+ A 2-tuple of newly joined room IDs and a map of newly_left room
+ IDs to the `RoomsForUserStateReset` entry.
+
+ We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
+ user was state reset of the rooms. It's just that the `event_id`/`sender`
+ are optional and we can't tell the difference between the server leaving the
+ room when the user was the last person participating in the room and left or
+ was state reset out of the room. To actually check for a state reset, you
+ need to check if a membership still exists in the room.
+ """
newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index c52389b8a9..3fda49f31f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -80,6 +80,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
+from synapse.storage.roommember import RoomsForUserStateReset
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached, cachedList
@@ -993,6 +994,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
available in the `current_state_delta_stream` table. To actually check for a
state reset, you need to check if a membership still exists in the room.
"""
+
+ assert from_key.topological is None
+ assert to_key.topological is None
+
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []
@@ -1138,6 +1143,203 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if membership_change.room_id not in room_ids_to_exclude
]
+ @trace
+ async def get_sliding_sync_membership_changes(
+ self,
+ user_id: str,
+ from_key: RoomStreamToken,
+ to_key: RoomStreamToken,
+ excluded_room_ids: Optional[AbstractSet[str]] = None,
+ ) -> Dict[str, RoomsForUserStateReset]:
+ """
+ Fetch membership events that result in a meaningful membership change for a
+ given user.
+
+ A meaningful membership changes is one where the `membership` value actually
+ changes. This means memberships changes from `join` to `join` (like a display
+ name change) will be filtered out since they result in no meaningful change.
+
+ Note: This function only works with "live" tokens with `stream_ordering` only.
+
+ We're looking for membership changes in the token range (> `from_key` and <=
+ `to_key`).
+
+ Args:
+ user_id: The user ID to fetch membership events for.
+ from_key: The point in the stream to sync from (fetching events > this point).
+ to_key: The token to fetch rooms up to (fetching events <= this point).
+ excluded_room_ids: Optional list of room IDs to exclude from the results.
+
+ Returns:
+ All meaningful membership changes to the current state in the token range.
+ Events are sorted by `stream_ordering` ascending.
+
+ `event_id`/`sender` can be `None` when the server leaves a room (meaning
+ everyone locally left) or a state reset which removed the person from the
+ room. We can't tell the difference between the two cases with what's
+ available in the `current_state_delta_stream` table. To actually check for a
+ state reset, you need to check if a membership still exists in the room.
+ """
+
+ assert from_key.topological is None
+ assert to_key.topological is None
+
+ # Start by ruling out cases where a DB query is not necessary.
+ if from_key == to_key:
+ return {}
+
+ if from_key:
+ has_changed = self._membership_stream_cache.has_entity_changed(
+ user_id, int(from_key.stream)
+ )
+ if not has_changed:
+ return {}
+
+ room_ids_to_exclude: AbstractSet[str] = set()
+ if excluded_room_ids is not None:
+ room_ids_to_exclude = excluded_room_ids
+
+ def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
+ # To handle tokens with a non-empty instance_map we fetch more
+ # results than necessary and then filter down
+ min_from_id = from_key.stream
+ max_to_id = to_key.get_max_stream_pos()
+
+ # This query looks at membership changes in
+ # `sliding_sync_membership_snapshots` which will not include users
+ # that were state reset out of rooms; so we need to look for that
+ # case in `current_state_delta_stream`.
+ sql = """
+ SELECT
+ room_id,
+ membership_event_id,
+ event_instance_name,
+ event_stream_ordering,
+ membership,
+ sender,
+ prev_membership,
+ room_version
+ FROM
+ (
+ SELECT
+ s.room_id,
+ s.membership_event_id,
+ s.event_instance_name,
+ s.event_stream_ordering,
+ s.membership,
+ s.sender,
+ m_prev.membership AS prev_membership
+ FROM sliding_sync_membership_snapshots as s
+ LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
+ LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
+ WHERE s.user_id = ?
+
+ UNION ALL
+
+ SELECT
+ s.room_id,
+ e.event_id,
+ s.instance_name,
+ s.stream_id,
+ m.membership,
+ e.sender,
+ m_prev.membership AS prev_membership
+ FROM current_state_delta_stream AS s
+ LEFT JOIN events AS e ON e.event_id = s.event_id
+ LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
+ LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
+ WHERE
+ s.type = ?
+ AND s.state_key = ?
+ ) AS c
+ INNER JOIN rooms USING (room_id)
+ WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
+ ORDER BY event_stream_ordering ASC
+ """
+
+ txn.execute(
+ sql,
+ (user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
+ )
+
+ membership_changes: Dict[str, RoomsForUserStateReset] = {}
+ for (
+ room_id,
+ membership_event_id,
+ event_instance_name,
+ event_stream_ordering,
+ membership,
+ sender,
+ prev_membership,
+ room_version_id,
+ ) in txn:
+ assert room_id is not None
+ assert event_stream_ordering is not None
+
+ if room_id in room_ids_to_exclude:
+ continue
+
+ if _filter_results_by_stream(
+ from_key,
+ to_key,
+ event_instance_name,
+ event_stream_ordering,
+ ):
+ # When the server leaves a room, it will insert new rows into the
+ # `current_state_delta_stream` table with `event_id = null` for all
+ # current state. This means we might already have a row for the
+ # leave event and then another for the same leave where the
+ # `event_id=null` but the `prev_event_id` is pointing back at the
+ # earlier leave event. We don't want to report the leave, if we
+ # already have a leave event.
+ if (
+ membership_event_id is None
+ and prev_membership == Membership.LEAVE
+ ):
+ continue
+
+ if membership_event_id is None and room_id in membership_changes:
+ # SUSPICIOUS: if we join a room and get state reset out of it
+ # in the same queried window,
+ # won't this ignore the 'state reset out of it' part?
+ continue
+
+ # When `s.event_id = null`, we won't be able to get respective
+ # `room_membership` but can assume the user has left the room
+ # because this only happens when the server leaves a room
+ # (meaning everyone locally left) or a state reset which removed
+ # the person from the room.
+ membership = (
+ membership if membership is not None else Membership.LEAVE
+ )
+
+ if membership == prev_membership:
+ # If `membership` and `prev_membership` are the same then this
+ # is not a meaningful change so we can skip it.
+ # An example of this happening is when the user changes their display name.
+ continue
+
+ membership_change = RoomsForUserStateReset(
+ room_id=room_id,
+ sender=sender,
+ membership=membership,
+ event_id=membership_event_id,
+ event_pos=PersistedEventPosition(
+ event_instance_name, event_stream_ordering
+ ),
+ room_version_id=room_version_id,
+ )
+
+ membership_changes[room_id] = membership_change
+
+ return membership_changes
+
+ membership_changes = await self.db_pool.runInteraction(
+ "get_sliding_sync_membership_changes", f
+ )
+
+ return membership_changes
+
@cancellable
async def get_membership_changes_for_user(
self,
diff --git a/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql b/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql
new file mode 100644
index 0000000000..c694203f95
--- /dev/null
+++ b/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql
@@ -0,0 +1,16 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2025 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- So we can fetch all rooms for a given user sorted by stream order
+DROP INDEX IF EXISTS sliding_sync_membership_snapshots_user_id;
+CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id, event_stream_ordering);
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index cbacf21ae7..7144c58217 100644
--- a/tests/handlers/test_sliding_sync.py
+++ b/tests/handlers/test_sliding_sync.py
@@ -594,6 +594,12 @@ class ComputeInterestedRoomsTestCase(SlidingSyncBase):
the correct list of rooms IDs.
"""
+ # FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
+ # instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
+ # in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
+ # well to stress that logic and we shouldn't remove them just because we're removing
+ # the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
+
servlets = [
admin.register_servlets,
knock.register_servlets,
@@ -2976,6 +2982,12 @@ class ComputeInterestedRoomsShardTestCase(
sharded event stream_writers enabled
"""
+ # FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
+ # instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
+ # in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
+ # well to stress that logic and we shouldn't remove them just because we're removing
+ # the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
+
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py
index f3cf2111ec..dcec5b4cf0 100644
--- a/tests/rest/client/sliding_sync/test_sliding_sync.py
+++ b/tests/rest/client/sliding_sync/test_sliding_sync.py
@@ -790,6 +790,64 @@ class SlidingSyncTestCase(SlidingSyncBase):
exact=True,
)
+ def test_reject_remote_invite(self) -> None:
+ """Test that rejecting a remote invite comes down incremental sync"""
+
+ user_id = self.register_user("user1", "pass")
+ user_tok = self.login(user_id, "pass")
+
+ # Create a remote room invite (out-of-band membership)
+ room_id = "!room:remote.server"
+ self._create_remote_invite_room_for_user(user_id, None, room_id)
+
+ # Make the Sliding Sync request
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [(EventTypes.Member, StateValues.ME)],
+ "timeline_limit": 3,
+ }
+ }
+ }
+ response_body, from_token = self.do_sync(sync_body, tok=user_tok)
+ # We should see the room (like normal)
+ self.assertIncludes(
+ set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
+ {room_id},
+ exact=True,
+ )
+
+ # Reject the remote room invite
+ self.helper.leave(room_id, user_id, tok=user_tok)
+
+ # Sync again after rejecting the invite
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user_tok)
+
+ # The fix to add the leave event to incremental sync when rejecting a remote
+ # invite relies on the new tables to work.
+ if self.use_new_tables:
+ # We should see the newly_left room
+ self.assertIncludes(
+ set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
+ {room_id},
+ exact=True,
+ )
+ # We should see the leave state for the room so clients don't end up with stuck
+ # invites
+ self.assertIncludes(
+ {
+ (
+ state["type"],
+ state["state_key"],
+ state["content"].get("membership"),
+ )
+ for state in response_body["rooms"][room_id]["required_state"]
+ },
+ {(EventTypes.Member, user_id, Membership.LEAVE)},
+ exact=True,
+ )
+
def test_ignored_user_invites_initial_sync(self) -> None:
"""
Make sure we ignore invites if they are from one of the `m.ignored_user_list` on
|