diff --git a/changelog.d/17695.bugfix b/changelog.d/17695.bugfix
new file mode 100644
index 0000000000..c63132704f
--- /dev/null
+++ b/changelog.d/17695.bugfix
@@ -0,0 +1 @@
+Fix bug where room account data would not correctly be sent down sliding sync for old rooms.
diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py
index 287f4b04ad..56e1d9329e 100644
--- a/synapse/handlers/sliding_sync/extensions.py
+++ b/synapse/handlers/sliding_sync/extensions.py
@@ -19,7 +19,6 @@ from typing import (
AbstractSet,
ChainMap,
Dict,
- List,
Mapping,
MutableMapping,
Optional,
@@ -119,6 +118,8 @@ class SlidingSyncExtensionHandler:
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
sync_config=sync_config,
+ previous_connection_state=previous_connection_state,
+ new_connection_state=new_connection_state,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
account_data_request=sync_config.extensions.account_data,
@@ -361,6 +362,8 @@ class SlidingSyncExtensionHandler:
async def get_account_data_extension_response(
self,
sync_config: SlidingSyncConfig,
+ previous_connection_state: "PerConnectionState",
+ new_connection_state: "MutablePerConnectionState",
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
@@ -425,15 +428,7 @@ class SlidingSyncExtensionHandler:
# Fetch room account data
#
- # List of -> Mapping from room_id to mapping of `type` to `content` of room
- # account data events.
- #
- # This is is a list so we can avoid making copies of immutable data and instead
- # just provide multiple maps that need to be combined. Normally, we could
- # reach for `ChainMap` in this scenario, but this is a nested map and accessing
- # the ChainMap by room_id won't combine the two maps for that room (we would
- # need a new `NestedChainMap` type class).
- account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = []
+ account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {}
relevant_room_ids = self.find_relevant_room_ids_for_extension(
requested_lists=account_data_request.lists,
requested_room_ids=account_data_request.rooms,
@@ -441,9 +436,43 @@ class SlidingSyncExtensionHandler:
actual_room_ids=actual_room_ids,
)
if len(relevant_room_ids) > 0:
+ # We need to handle the different cases depending on if we have sent
+ # down account data previously or not, so we split the relevant
+ # rooms up into different collections based on status.
+ live_rooms = set()
+ previously_rooms: Dict[str, int] = {}
+ initial_rooms = set()
+
+ for room_id in relevant_room_ids:
+ if not from_token:
+ initial_rooms.add(room_id)
+ continue
+
+ room_status = previous_connection_state.account_data.have_sent_room(
+ room_id
+ )
+ if room_status.status == HaveSentRoomFlag.LIVE:
+ live_rooms.add(room_id)
+ elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+ assert room_status.last_token is not None
+ previously_rooms[room_id] = room_status.last_token
+ elif room_status.status == HaveSentRoomFlag.NEVER:
+ initial_rooms.add(room_id)
+ else:
+ assert_never(room_status.status)
+
+ # We fetch all room account data since the from_token. This is so
+ # that we can record which rooms have updates that haven't been sent
+ # down.
+ #
+ # Mapping from room_id to mapping of `type` to `content` of room account
+ # data events.
+ all_updates_since_the_from_token: Mapping[
+ str, Mapping[str, JsonMapping]
+ ] = {}
if from_token is not None:
# TODO: This should take into account the `from_token` and `to_token`
- account_data_by_room_map = (
+ all_updates_since_the_from_token = (
await self.store.get_updated_room_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
@@ -456,58 +485,108 @@ class SlidingSyncExtensionHandler:
user_id, from_token.stream_token.account_data_key
)
for room_id, tags in tags_by_room.items():
- account_data_by_room_map.setdefault(room_id, {})[
+ all_updates_since_the_from_token.setdefault(room_id, {})[
AccountDataTypes.TAG
] = {"tags": tags}
- account_data_by_room_maps.append(account_data_by_room_map)
- else:
- # TODO: This should take into account the `to_token`
- immutable_account_data_by_room_map = (
- await self.store.get_room_account_data_for_user(user_id)
- )
- account_data_by_room_maps.append(immutable_account_data_by_room_map)
+ # For live rooms we just get the updates from `all_updates_since_the_from_token`
+ if live_rooms:
+ for room_id in all_updates_since_the_from_token.keys() & live_rooms:
+ account_data_by_room_map[room_id] = (
+ all_updates_since_the_from_token[room_id]
+ )
- # Add room tags
- #
- # TODO: This should take into account the `to_token`
- tags_by_room = await self.store.get_tags_for_user(user_id)
- account_data_by_room_maps.append(
- {
- room_id: {AccountDataTypes.TAG: {"tags": tags}}
- for room_id, tags in tags_by_room.items()
- }
+ # For previously and initial rooms we query each room individually.
+ if previously_rooms or initial_rooms:
+
+ async def handle_previously(room_id: str) -> None:
+ # Either get updates or all account data in the room
+ # depending on if the room state is PREVIOUSLY or NEVER.
+ previous_token = previously_rooms.get(room_id)
+ if previous_token is not None:
+ room_account_data = await (
+ self.store.get_updated_room_account_data_for_user_for_room(
+ user_id=user_id,
+ room_id=room_id,
+ from_stream_id=previous_token,
+ to_stream_id=to_token.account_data_key,
+ )
+ )
+
+ # Add room tags
+ changed = await self.store.has_tags_changed_for_room(
+ user_id=user_id,
+ room_id=room_id,
+ from_stream_id=previous_token,
+ to_stream_id=to_token.account_data_key,
+ )
+ if changed:
+ # XXX: Ideally, this should take into account the `to_token`
+ # and return the set of tags at that time but we don't track
+ # changes to tags so we just have to return all tags for the
+ # room.
+ immutable_tag_map = await self.store.get_tags_for_room(
+ user_id, room_id
+ )
+ room_account_data[AccountDataTypes.TAG] = {
+ "tags": immutable_tag_map
+ }
+
+ # Only add an entry if there were any updates.
+ if room_account_data:
+ account_data_by_room_map[room_id] = room_account_data
+ else:
+ # TODO: This should take into account the `to_token`
+ immutable_room_account_data = (
+ await self.store.get_account_data_for_room(user_id, room_id)
+ )
+
+ # Add room tags
+ #
+ # XXX: Ideally, this should take into account the `to_token`
+ # and return the set of tags at that time but we don't track
+ # changes to tags so we just have to return all tags for the
+ # room.
+ immutable_tag_map = await self.store.get_tags_for_room(
+ user_id, room_id
+ )
+
+ account_data_by_room_map[room_id] = ChainMap(
+ {AccountDataTypes.TAG: {"tags": immutable_tag_map}}
+ if immutable_tag_map
+ else {},
+ # Cast is safe because `ChainMap` only mutates the top-most map,
+ # see https://github.com/python/typeshed/issues/8430
+ cast(
+ MutableMapping[str, JsonMapping],
+ immutable_room_account_data,
+ ),
+ )
+
+ # We handle these rooms concurrently to speed it up.
+ await concurrently_execute(
+ handle_previously,
+ previously_rooms.keys() | initial_rooms,
+ limit=20,
)
- # Filter down to the relevant rooms ... and combine the maps
- relevant_account_data_by_room_map: MutableMapping[
- str, Mapping[str, JsonMapping]
- ] = {}
- for room_id in relevant_room_ids:
- # We want to avoid adding empty maps for relevant rooms that have no room
- # account data so do a quick check to see if it's in any of the maps.
- is_room_in_maps = False
- for room_map in account_data_by_room_maps:
- if room_id in room_map:
- is_room_in_maps = True
- break
+ # Now record which rooms are now up to data, and which rooms have
+ # pending updates to send.
+ new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
+ missing_updates = (
+ all_updates_since_the_from_token.keys() - relevant_room_ids
+ )
+ if missing_updates:
+ # If we have missing updates then we must have had a from_token.
+ assert from_token is not None
- # If we found the room in any of the maps, combine the maps for that room
- if is_room_in_maps:
- relevant_account_data_by_room_map[room_id] = ChainMap(
- {},
- *(
- # Cast is safe because `ChainMap` only mutates the top-most map,
- # see https://github.com/python/typeshed/issues/8430
- cast(MutableMapping[str, JsonMapping], room_map[room_id])
- for room_map in account_data_by_room_maps
- if room_map.get(room_id)
- ),
+ new_connection_state.account_data.record_unsent_rooms(
+ missing_updates, from_token.stream_token.account_data_key
)
return SlidingSyncResult.Extensions.AccountDataExtension(
global_account_data_map=global_account_data_map,
- account_data_by_room_map=relevant_account_data_by_room_map,
+ account_data_by_room_map=account_data_by_room_map,
)
@trace
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index b30639b4e6..e583c182ba 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
get_updated_room_account_data_for_user_txn,
)
+ async def get_updated_room_account_data_for_user_for_room(
+ self,
+ # Since there are multiple arguments with the same type, force keyword arguments
+ # so people don't accidentally swap the order
+ *,
+ user_id: str,
+ room_id: str,
+ from_stream_id: int,
+ to_stream_id: int,
+ ) -> Dict[str, JsonMapping]:
+ """Get the room account_data that's changed for a user in a room.
+
+ (> `from_stream_id` and <= `to_stream_id`)
+
+ Args:
+ user_id: The user to get the account_data for.
+ room_id: The room to check
+ from_stream_id: The point in the stream to fetch from
+ to_stream_id: The point in the stream to fetch to
+
+ Returns:
+ A dict of the room account data.
+ """
+
+ def get_updated_room_account_data_for_user_for_room_txn(
+ txn: LoggingTransaction,
+ ) -> Dict[str, JsonMapping]:
+ sql = """
+ SELECT account_data_type, content FROM room_account_data
+ WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id))
+
+ room_account_data: Dict[str, JsonMapping] = {}
+ for row in txn:
+ room_account_data[row[0]] = db_to_json(row[1])
+
+ return room_account_data
+
+ changed = self._account_data_stream_cache.has_entity_changed(
+ user_id, int(from_stream_id)
+ )
+ if not changed:
+ return {}
+
+ return await self.db_pool.runInteraction(
+ "get_updated_room_account_data_for_user_for_room",
+ get_updated_room_account_data_for_user_for_room_txn,
+ )
+
@cached(max_entries=5000, iterable=True)
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
"""
diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py
index 83939d10b0..f2df37fec1 100644
--- a/synapse/storage/databases/main/sliding_sync.py
+++ b/synapse/storage/databases/main/sliding_sync.py
@@ -267,6 +267,15 @@ class SlidingSyncStore(SQLBaseStore):
(have_sent_room.status.value, have_sent_room.last_token)
)
+ for (
+ room_id,
+ have_sent_room,
+ ) in per_connection_state.account_data._statuses.items():
+ key_values.append((connection_position, "account_data", room_id))
+ value_values.append(
+ (have_sent_room.status.value, have_sent_room.last_token)
+ )
+
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_connection_streams",
@@ -407,6 +416,7 @@ class SlidingSyncStore(SQLBaseStore):
# Now look up the per-room stream data.
rooms: Dict[str, HaveSentRoom[str]] = {}
receipts: Dict[str, HaveSentRoom[str]] = {}
+ account_data: Dict[str, HaveSentRoom[str]] = {}
receipt_rows = self.db_pool.simple_select_list_txn(
txn,
@@ -427,6 +437,8 @@ class SlidingSyncStore(SQLBaseStore):
rooms[room_id] = have_sent_room
elif stream == "receipts":
receipts[room_id] = have_sent_room
+ elif stream == "account_data":
+ account_data[room_id] = have_sent_room
else:
# For forwards compatibility we ignore unknown streams, as in
# future we want to be able to easily add more stream types.
@@ -435,6 +447,7 @@ class SlidingSyncStore(SQLBaseStore):
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
+ account_data=RoomStatusMap(account_data),
room_configs=room_configs,
)
@@ -452,6 +465,7 @@ class PerConnectionStateDB:
rooms: "RoomStatusMap[str]"
receipts: "RoomStatusMap[str]"
+ account_data: "RoomStatusMap[str]"
room_configs: Mapping[str, "RoomSyncConfig"]
@@ -484,10 +498,21 @@ class PerConnectionStateDB:
for room_id, status in per_connection_state.receipts.get_updates().items()
}
+ account_data = {
+ room_id: HaveSentRoom(
+ status=status.status,
+ last_token=(
+ str(status.last_token) if status.last_token is not None else None
+ ),
+ )
+ for room_id, status in per_connection_state.account_data.get_updates().items()
+ }
+
log_kv(
{
"rooms": rooms,
"receipts": receipts,
+ "account_data": account_data,
"room_configs": per_connection_state.room_configs.maps[0],
}
)
@@ -495,6 +520,7 @@ class PerConnectionStateDB:
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
+ account_data=RoomStatusMap(account_data),
room_configs=per_connection_state.room_configs.maps[0],
)
@@ -524,8 +550,19 @@ class PerConnectionStateDB:
for room_id, status in self.receipts._statuses.items()
}
+ account_data = {
+ room_id: HaveSentRoom(
+ status=status.status,
+ last_token=(
+ int(status.last_token) if status.last_token is not None else None
+ ),
+ )
+ for room_id, status in self.account_data._statuses.items()
+ }
+
return PerConnectionState(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
+ account_data=RoomStatusMap(account_data),
room_configs=self.room_configs,
)
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index b498cb9625..44f395f315 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -158,6 +158,52 @@ class TagsWorkerStore(AccountDataWorkerStore):
return results
+ async def has_tags_changed_for_room(
+ self,
+ # Since there are multiple arguments with the same type, force keyword arguments
+ # so people don't accidentally swap the order
+ *,
+ user_id: str,
+ room_id: str,
+ from_stream_id: int,
+ to_stream_id: int,
+ ) -> bool:
+ """Check if the users tags for a room have been updated in the token range
+
+ (> `from_stream_id` and <= `to_stream_id`)
+
+ Args:
+ user_id: The user to get tags for
+ room_id: The room to get tags for
+ from_stream_id: The point in the stream to fetch from
+ to_stream_id: The point in the stream to fetch to
+
+ Returns:
+ A mapping of tags to tag content.
+ """
+
+ # Shortcut if no room has changed for the user
+ changed = self._account_data_stream_cache.has_entity_changed(
+ user_id, int(from_stream_id)
+ )
+ if not changed:
+ return False
+
+ last_change_position_for_room = await self.db_pool.simple_select_one_onecol(
+ table="room_tags_revisions",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ retcol="stream_id",
+ allow_none=True,
+ )
+
+ if last_change_position_for_room is None:
+ return False
+
+ return (
+ last_change_position_for_room > from_stream_id
+ and last_change_position_for_room <= to_stream_id
+ )
+
@cached(num_args=2, tree=True)
async def get_tags_for_room(
self, user_id: str, room_id: str
diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py
index 149920f883..5dd2c9d411 100644
--- a/synapse/types/handlers/sliding_sync.py
+++ b/synapse/types/handlers/sliding_sync.py
@@ -675,7 +675,7 @@ class HaveSentRoomFlag(Enum):
LIVE = "live"
-T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
+T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken, int)
@attr.s(auto_attribs=True, slots=True, frozen=True)
@@ -823,6 +823,7 @@ class PerConnectionState:
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
+ account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
@@ -833,6 +834,7 @@ class PerConnectionState:
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
+ account_data=self.account_data.get_mutable(),
room_configs=ChainMap({}, room_configs),
)
@@ -840,6 +842,7 @@ class PerConnectionState:
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
+ account_data=self.account_data.copy(),
room_configs=dict(self.room_configs),
)
@@ -853,6 +856,7 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
+ account_data: MutableRoomStatusMap[int]
room_configs: typing.ChainMap[str, RoomSyncConfig]
@@ -860,6 +864,7 @@ class MutablePerConnectionState(PerConnectionState):
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
+ or bool(self.account_data.get_updates())
or bool(self.get_room_config_updates())
)
diff --git a/tests/rest/client/sliding_sync/test_extension_account_data.py b/tests/rest/client/sliding_sync/test_extension_account_data.py
index 03b2db39b9..799fbb1856 100644
--- a/tests/rest/client/sliding_sync/test_extension_account_data.py
+++ b/tests/rest/client/sliding_sync/test_extension_account_data.py
@@ -11,9 +11,11 @@
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
+import enum
import logging
-from parameterized import parameterized_class
+from parameterized import parameterized, parameterized_class
+from typing_extensions import assert_never
from twisted.test.proto_helpers import MemoryReactor
@@ -30,6 +32,11 @@ from tests.server import TimedOutException
logger = logging.getLogger(__name__)
+class TagAction(enum.Enum):
+ ADD = enum.auto()
+ REMOVE = enum.auto()
+
+
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
@@ -350,10 +357,20 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
account_data_map[AccountDataTypes.TAG], {"tags": {"m.favourite": {}}}
)
- def test_room_account_data_incremental_sync(self) -> None:
+ @parameterized.expand(
+ [
+ ("add tags", TagAction.ADD),
+ ("remove tags", TagAction.REMOVE),
+ ]
+ )
+ def test_room_account_data_incremental_sync(
+ self, test_description: str, tag_action: TagAction
+ ) -> None:
"""
On incremental sync, we return all account data for a given room but only for
rooms that we request and are being returned in the Sliding Sync response.
+
+ (HaveSentRoomFlag.LIVE)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@@ -432,42 +449,472 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
content={"roo": "rar"},
)
)
- # Add another room tag
+ if tag_action == TagAction.ADD:
+ # Add another room tag
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ elif tag_action == TagAction.REMOVE:
+ # Remove the room tag
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.favourite",
+ )
+ )
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.favourite",
+ )
+ )
+ else:
+ assert_never(tag_action)
+
+ # Make an incremental Sliding Sync request with the account_data extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+ # Even though we requested room2, we only expect room1 to show up because that's
+ # the only room in the Sliding Sync response (room2 is not one of our room
+ # subscriptions or in a sliding window list).
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id1},
+ exact=True,
+ )
+ # We should only see the new room account data that happened after the `from_token`
+ account_data_map = {
+ event["type"]: event["content"]
+ for event in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .get(room_id1)
+ }
+ self.assertIncludes(
+ account_data_map.keys(),
+ {"org.matrix.roorarraz2", AccountDataTypes.TAG},
+ exact=True,
+ )
+ self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
+ if tag_action == TagAction.ADD:
+ self.assertEqual(
+ account_data_map[AccountDataTypes.TAG],
+ {"tags": {"m.favourite": {}, "m.server_notice": {}}},
+ )
+ elif tag_action == TagAction.REMOVE:
+ # If we previously showed the client that the room has tags, when it no
+ # longer has tags, we need to show them an empty map.
+ self.assertEqual(
+ account_data_map[AccountDataTypes.TAG],
+ {"tags": {}},
+ )
+ else:
+ assert_never(tag_action)
+
+ @parameterized.expand(
+ [
+ ("add tags", TagAction.ADD),
+ ("remove tags", TagAction.REMOVE),
+ ]
+ )
+ def test_room_account_data_incremental_sync_out_of_range_never(
+ self, test_description: str, tag_action: TagAction
+ ) -> None:
+ """Tests that we don't return account data for rooms that are out of
+ range, but then do send all account data once they're in range.
+
+ (initial/HaveSentRoomFlag.NEVER)
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Create a room and add some room account data
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+ # Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id1,
- tag="m.server_notice",
+ tag="m.favourite",
content={},
)
)
+
+ # Create another room with some room account data
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+ # Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id2,
- tag="m.server_notice",
+ tag="m.favourite",
content={},
)
)
+ # Now send a message into room1 so that it is at the top of the list
+ self.helper.send(room_id1, body="new event", tok=user1_tok)
+
+ # Make a SS request for only the top room.
+ sync_body = {
+ "lists": {
+ "main": {
+ "ranges": [[0, 0]],
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "lists": ["main"],
+ }
+ },
+ }
+ response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Only room1 should be in the response since it's the latest room with activity
+ # and our range only includes 1 room.
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id1},
+ exact=True,
+ )
+
+ # Add some other room account data
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+ if tag_action == TagAction.ADD:
+ # Add another room tag
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ elif tag_action == TagAction.REMOVE:
+ # Remove the room tag
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.favourite",
+ )
+ )
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.favourite",
+ )
+ )
+ else:
+ assert_never(tag_action)
+
+ # Move room2 into range.
+ self.helper.send(room_id2, body="new event", tok=user1_tok)
+
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
- # Even though we requested room2, we only expect room1 to show up because that's
- # the only room in the Sliding Sync response (room2 is not one of our room
- # subscriptions or in a sliding window list).
+ # We expect to see the account data of room2, as that has the most
+ # recent update.
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id2},
+ exact=True,
+ )
+ # Since this is the first time we're seeing room2 down sync, we should see all
+ # room account data for it.
+ account_data_map = {
+ event["type"]: event["content"]
+ for event in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .get(room_id2)
+ }
+ expected_account_data_keys = {
+ "org.matrix.roorarraz",
+ "org.matrix.roorarraz2",
+ }
+ if tag_action == TagAction.ADD:
+ expected_account_data_keys.add(AccountDataTypes.TAG)
+ self.assertIncludes(
+ account_data_map.keys(),
+ expected_account_data_keys,
+ exact=True,
+ )
+ self.assertEqual(account_data_map["org.matrix.roorarraz"], {"roo": "rar"})
+ self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
+ if tag_action == TagAction.ADD:
+ self.assertEqual(
+ account_data_map[AccountDataTypes.TAG],
+ {"tags": {"m.favourite": {}, "m.server_notice": {}}},
+ )
+ elif tag_action == TagAction.REMOVE:
+ # Since we never told the client about the room tags, we don't need to say
+ # anything if there are no tags now (the client doesn't need an update).
+ self.assertIsNone(
+ account_data_map.get(AccountDataTypes.TAG),
+ account_data_map,
+ )
+ else:
+ assert_never(tag_action)
+
+ @parameterized.expand(
+ [
+ ("add tags", TagAction.ADD),
+ ("remove tags", TagAction.REMOVE),
+ ]
+ )
+ def test_room_account_data_incremental_sync_out_of_range_previously(
+ self, test_description: str, tag_action: TagAction
+ ) -> None:
+ """Tests that we don't return account data for rooms that fall out of
+ range, but then do send all account data that has changed they're back in range.
+
+ (HaveSentRoomFlag.PREVIOUSLY)
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Create a room and add some room account data
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+ # Add a room tag to mark the room as a favourite
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.favourite",
+ content={},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+ # Add a room tag to mark the room as a favourite
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.favourite",
+ content={},
+ )
+ )
+
+ # Make an initial Sliding Sync request for only room1 and room2.
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ room_id2: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "rooms": [room_id1, room_id2],
+ }
+ },
+ }
+ response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Both rooms show up because we have a room subscription for each and they're
+ # requested in the `account_data` extension.
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id1, room_id2},
+ exact=True,
+ )
+
+ # Add some other room account data
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+ if tag_action == TagAction.ADD:
+ # Add another room tag
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_tag_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.server_notice",
+ content={},
+ )
+ )
+ elif tag_action == TagAction.REMOVE:
+ # Remove the room tag
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ tag="m.favourite",
+ )
+ )
+ self.get_success(
+ self.account_data_handler.remove_tag_from_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ tag="m.favourite",
+ )
+ )
+ else:
+ assert_never(tag_action)
+
+ # Make an incremental Sliding Sync request for just room1
+ response_body, from_token = self.do_sync(
+ {
+ **sync_body,
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ },
+ since=from_token,
+ tok=user1_tok,
+ )
+
+ # Only room1 shows up because we only have a room subscription for room1 now.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1},
exact=True,
)
- # We should only see the new room account data that happened after the `from_token`
+
+ # Make an incremental Sliding Sync request for just room2 now
+ response_body, from_token = self.do_sync(
+ {
+ **sync_body,
+ "room_subscriptions": {
+ room_id2: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ },
+ since=from_token,
+ tok=user1_tok,
+ )
+
+ # Only room2 shows up because we only have a room subscription for room2 now.
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id2},
+ exact=True,
+ )
+
+ self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+ # Check for room account data for room2
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id2},
+ exact=True,
+ )
+ # We should see any room account data updates for room2 since the last
+ # time we saw it down sync
account_data_map = {
event["type"]: event["content"]
for event in response_body["extensions"]["account_data"]
.get("rooms")
- .get(room_id1)
+ .get(room_id2)
}
self.assertIncludes(
account_data_map.keys(),
@@ -475,10 +922,20 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
exact=True,
)
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
- self.assertEqual(
- account_data_map[AccountDataTypes.TAG],
- {"tags": {"m.favourite": {}, "m.server_notice": {}}},
- )
+ if tag_action == TagAction.ADD:
+ self.assertEqual(
+ account_data_map[AccountDataTypes.TAG],
+ {"tags": {"m.favourite": {}, "m.server_notice": {}}},
+ )
+ elif tag_action == TagAction.REMOVE:
+ # If we previously showed the client that the room has tags, when it no
+ # longer has tags, we need to show them an empty map.
+ self.assertEqual(
+ account_data_map[AccountDataTypes.TAG],
+ {"tags": {}},
+ )
+ else:
+ assert_never(tag_action)
def test_wait_for_new_data(self) -> None:
"""
|