diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index b8c78baa6c..2084776543 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -53,6 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
MemberSummary,
@@ -65,6 +66,7 @@ from synapse.types import (
PersistedEventPosition,
StateMap,
StrCollection,
+ StreamToken,
get_domain_from_id,
)
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
txn, self.get_forgotten_rooms_for_user, (user_id,)
)
self._invalidate_cache_and_stream(
- txn, self.get_sliding_sync_rooms_for_user, (user_id,)
+ txn,
+ self.get_sliding_sync_rooms_for_user_from_membership_snapshots,
+ (user_id,),
)
await self.db_pool.runInteraction("forget_membership", f)
@@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
)
@cached(iterable=True, max_entries=10000)
- async def get_sliding_sync_rooms_for_user(
- self,
- user_id: str,
+ async def get_sliding_sync_rooms_for_user_from_membership_snapshots(
+ self, user_id: str
) -> Mapping[str, RoomsForUserSlidingSync]:
- """Get all the rooms for a user to handle a sliding sync request.
+ """
+ Get all the rooms for a user to handle a sliding sync request from the
+ `sliding_sync_membership_snapshots` table. These will be current memberships and
+ need to be rewound to the token range.
Ignores forgotten rooms and rooms that the user has left themselves.
+ Args:
+ user_id: The user ID to get the rooms for.
+
Returns:
Map from room ID to membership info
"""
- def get_sliding_sync_rooms_for_user_txn(
+ def _txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
# XXX: If you use any new columns that can change (like from
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
- # `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
- # tests).
+ # `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the
+ # appropriate places (and add tests).
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
@@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
AND (m.membership != 'leave' OR m.user_id != m.sender)
"""
txn.execute(sql, (user_id,))
+
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
@@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
}
return await self.db_pool.runInteraction(
- "get_sliding_sync_rooms_for_user",
- get_sliding_sync_rooms_for_user_txn,
+ "get_sliding_sync_rooms_for_user_from_membership_snapshots",
+ _txn,
+ )
+
+ async def get_sliding_sync_self_leave_rooms_after_to_token(
+ self,
+ user_id: str,
+ to_token: StreamToken,
+ ) -> Dict[str, RoomsForUserSlidingSync]:
+ """
+ Get all the self-leave rooms for a user after the `to_token` (outside the token
+ range) that are potentially relevant[1] and needed to handle a sliding sync
+ request. The results are from the `sliding_sync_membership_snapshots` table and
+ will be current memberships and need to be rewound to the token range.
+
+ [1] If a leave happens after the token range, we may have still been joined (or
+ any non-self-leave which is relevant to sync) to the room before so we need to
+ include it in the list of potentially relevant rooms and apply
+ our rewind logic (outside of this function) to see if it's actually relevant.
+
+ This is basically a sister-function to
+ `get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could
+ alternatively incorporate this logic into
+ `get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results
+ are cached and the `to_token` isn't very cache friendly (people are constantly
+ requesting with new tokens) so we separate it out here.
+
+ Args:
+ user_id: The user ID to get the rooms for.
+ to_token: Any self-leave memberships after this position will be returned.
+
+ Returns:
+ Map from room ID to membership info
+ """
+ # TODO: Potential to check
+ # `self._membership_stream_cache.has_entity_changed(...)` as an early-return
+ # shortcut.
+
+ def _txn(
+ txn: LoggingTransaction,
+ ) -> Dict[str, RoomsForUserSlidingSync]:
+ sql = """
+ SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
+ r.room_version,
+ m.event_instance_name, m.event_stream_ordering,
+ m.has_known_state,
+ m.room_type,
+ m.is_encrypted
+ FROM sliding_sync_membership_snapshots AS m
+ INNER JOIN rooms AS r USING (room_id)
+ WHERE user_id = ?
+ AND m.forgotten = 0
+ AND m.membership = 'leave'
+ AND m.user_id = m.sender
+ AND (m.event_stream_ordering > ?)
+ """
+ # If a leave happens after the token range, we may have still been joined
+ # (or any non-self-leave which is relevant to sync) to the room before so we
+ # need to include it in the list of potentially relevant rooms and apply our
+ # rewind logic (outside of this function).
+ #
+ # To handle tokens with a non-empty instance_map we fetch more
+ # results than necessary and then filter down
+ min_to_token_position = to_token.room_key.stream
+ txn.execute(sql, (user_id, min_to_token_position))
+
+ # Map from room_id to membership info
+ room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {}
+ for row in txn:
+ room_for_user = RoomsForUserSlidingSync(
+ room_id=row[0],
+ sender=row[1],
+ membership=row[2],
+ event_id=row[3],
+ room_version_id=row[4],
+ event_pos=PersistedEventPosition(row[5], row[6]),
+ has_known_state=bool(row[7]),
+ room_type=row[8],
+ is_encrypted=bool(row[9]),
+ )
+
+ # We filter out unknown room versions proactively. They shouldn't go
+ # down sync and their metadata may be in a broken state (causing
+ # errors).
+ if row[4] not in KNOWN_ROOM_VERSIONS:
+ continue
+
+ # We only want to include the self-leave membership if it happened after
+ # the token range.
+ #
+ # Since the database pulls out more than necessary, we need to filter it
+ # down here.
+ if _filter_results_by_stream(
+ lower_token=None,
+ upper_token=to_token.room_key,
+ instance_name=room_for_user.event_pos.instance_name,
+ stream_ordering=room_for_user.event_pos.stream,
+ ):
+ continue
+
+ room_membership_for_user_map[room_for_user.room_id] = room_for_user
+
+ return room_membership_for_user_map
+
+ return await self.db_pool.runInteraction(
+ "get_sliding_sync_self_leave_rooms_after_to_token",
+ _txn,
)
async def get_sliding_sync_room_for_user(
|