diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index b30639b4e6..e583c182ba 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
get_updated_room_account_data_for_user_txn,
)
+ async def get_updated_room_account_data_for_user_for_room(
+ self,
+ # Since there are multiple arguments with the same type, force keyword arguments
+ # so people don't accidentally swap the order
+ *,
+ user_id: str,
+ room_id: str,
+ from_stream_id: int,
+ to_stream_id: int,
+ ) -> Dict[str, JsonMapping]:
+ """Get the room account_data that's changed for a user in a room.
+
+ (> `from_stream_id` and <= `to_stream_id`)
+
+ Args:
+ user_id: The user to get the account_data for.
+ room_id: The room to check
+ from_stream_id: The point in the stream to fetch from
+ to_stream_id: The point in the stream to fetch to
+
+ Returns:
+ A dict of the room account data.
+ """
+
+ def get_updated_room_account_data_for_user_for_room_txn(
+ txn: LoggingTransaction,
+ ) -> Dict[str, JsonMapping]:
+ sql = """
+ SELECT account_data_type, content FROM room_account_data
+ WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id))
+
+ room_account_data: Dict[str, JsonMapping] = {}
+ for row in txn:
+ room_account_data[row[0]] = db_to_json(row[1])
+
+ return room_account_data
+
+ changed = self._account_data_stream_cache.has_entity_changed(
+ user_id, int(from_stream_id)
+ )
+ if not changed:
+ return {}
+
+ return await self.db_pool.runInteraction(
+ "get_updated_room_account_data_for_user_for_room",
+ get_updated_room_account_data_for_user_for_room_txn,
+ )
+
@cached(max_entries=5000, iterable=True)
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
"""
|