diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index c52389b8a9..3fda49f31f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -80,6 +80,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
+from synapse.storage.roommember import RoomsForUserStateReset
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached, cachedList
@@ -993,6 +994,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
available in the `current_state_delta_stream` table. To actually check for a
state reset, you need to check if a membership still exists in the room.
"""
+
+ assert from_key.topological is None
+ assert to_key.topological is None
+
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []
@@ -1138,6 +1143,203 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if membership_change.room_id not in room_ids_to_exclude
]
+ @trace
+ async def get_sliding_sync_membership_changes(
+ self,
+ user_id: str,
+ from_key: RoomStreamToken,
+ to_key: RoomStreamToken,
+ excluded_room_ids: Optional[AbstractSet[str]] = None,
+ ) -> Dict[str, RoomsForUserStateReset]:
+ """
+ Fetch membership events that result in a meaningful membership change for a
+ given user.
+
+ A meaningful membership changes is one where the `membership` value actually
+ changes. This means memberships changes from `join` to `join` (like a display
+ name change) will be filtered out since they result in no meaningful change.
+
+ Note: This function only works with "live" tokens with `stream_ordering` only.
+
+ We're looking for membership changes in the token range (> `from_key` and <=
+ `to_key`).
+
+ Args:
+ user_id: The user ID to fetch membership events for.
+ from_key: The point in the stream to sync from (fetching events > this point).
+ to_key: The token to fetch rooms up to (fetching events <= this point).
+ excluded_room_ids: Optional list of room IDs to exclude from the results.
+
+ Returns:
+ All meaningful membership changes to the current state in the token range.
+ Events are sorted by `stream_ordering` ascending.
+
+ `event_id`/`sender` can be `None` when the server leaves a room (meaning
+ everyone locally left) or a state reset which removed the person from the
+ room. We can't tell the difference between the two cases with what's
+ available in the `current_state_delta_stream` table. To actually check for a
+ state reset, you need to check if a membership still exists in the room.
+ """
+
+ assert from_key.topological is None
+ assert to_key.topological is None
+
+ # Start by ruling out cases where a DB query is not necessary.
+ if from_key == to_key:
+ return {}
+
+ if from_key:
+ has_changed = self._membership_stream_cache.has_entity_changed(
+ user_id, int(from_key.stream)
+ )
+ if not has_changed:
+ return {}
+
+ room_ids_to_exclude: AbstractSet[str] = set()
+ if excluded_room_ids is not None:
+ room_ids_to_exclude = excluded_room_ids
+
+ def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
+ # To handle tokens with a non-empty instance_map we fetch more
+ # results than necessary and then filter down
+ min_from_id = from_key.stream
+ max_to_id = to_key.get_max_stream_pos()
+
+ # This query looks at membership changes in
+ # `sliding_sync_membership_snapshots` which will not include users
+ # that were state reset out of rooms; so we need to look for that
+ # case in `current_state_delta_stream`.
+ sql = """
+ SELECT
+ room_id,
+ membership_event_id,
+ event_instance_name,
+ event_stream_ordering,
+ membership,
+ sender,
+ prev_membership,
+ room_version
+ FROM
+ (
+ SELECT
+ s.room_id,
+ s.membership_event_id,
+ s.event_instance_name,
+ s.event_stream_ordering,
+ s.membership,
+ s.sender,
+ m_prev.membership AS prev_membership
+ FROM sliding_sync_membership_snapshots as s
+ LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
+ LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
+ WHERE s.user_id = ?
+
+ UNION ALL
+
+ SELECT
+ s.room_id,
+ e.event_id,
+ s.instance_name,
+ s.stream_id,
+ m.membership,
+ e.sender,
+ m_prev.membership AS prev_membership
+ FROM current_state_delta_stream AS s
+ LEFT JOIN events AS e ON e.event_id = s.event_id
+ LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
+ LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
+ WHERE
+ s.type = ?
+ AND s.state_key = ?
+ ) AS c
+ INNER JOIN rooms USING (room_id)
+ WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
+ ORDER BY event_stream_ordering ASC
+ """
+
+ txn.execute(
+ sql,
+ (user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
+ )
+
+ membership_changes: Dict[str, RoomsForUserStateReset] = {}
+ for (
+ room_id,
+ membership_event_id,
+ event_instance_name,
+ event_stream_ordering,
+ membership,
+ sender,
+ prev_membership,
+ room_version_id,
+ ) in txn:
+ assert room_id is not None
+ assert event_stream_ordering is not None
+
+ if room_id in room_ids_to_exclude:
+ continue
+
+ if _filter_results_by_stream(
+ from_key,
+ to_key,
+ event_instance_name,
+ event_stream_ordering,
+ ):
+ # When the server leaves a room, it will insert new rows into the
+ # `current_state_delta_stream` table with `event_id = null` for all
+ # current state. This means we might already have a row for the
+ # leave event and then another for the same leave where the
+ # `event_id=null` but the `prev_event_id` is pointing back at the
+ # earlier leave event. We don't want to report the leave, if we
+ # already have a leave event.
+ if (
+ membership_event_id is None
+ and prev_membership == Membership.LEAVE
+ ):
+ continue
+
+ if membership_event_id is None and room_id in membership_changes:
+ # SUSPICIOUS: if we join a room and get state reset out of it
+ # in the same queried window,
+ # won't this ignore the 'state reset out of it' part?
+ continue
+
+ # When `s.event_id = null`, we won't be able to get respective
+ # `room_membership` but can assume the user has left the room
+ # because this only happens when the server leaves a room
+ # (meaning everyone locally left) or a state reset which removed
+ # the person from the room.
+ membership = (
+ membership if membership is not None else Membership.LEAVE
+ )
+
+ if membership == prev_membership:
+ # If `membership` and `prev_membership` are the same then this
+ # is not a meaningful change so we can skip it.
+ # An example of this happening is when the user changes their display name.
+ continue
+
+ membership_change = RoomsForUserStateReset(
+ room_id=room_id,
+ sender=sender,
+ membership=membership,
+ event_id=membership_event_id,
+ event_pos=PersistedEventPosition(
+ event_instance_name, event_stream_ordering
+ ),
+ room_version_id=room_version_id,
+ )
+
+ membership_changes[room_id] = membership_change
+
+ return membership_changes
+
+ membership_changes = await self.db_pool.runInteraction(
+ "get_sliding_sync_membership_changes", f
+ )
+
+ return membership_changes
+
@cancellable
async def get_membership_changes_for_user(
self,
|