diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 88479a16db..e20c5c5302 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1819,7 +1819,7 @@ class DatabasePool:
keyvalues: Optional[Dict[str, Any]] = None,
desc: str = "simple_select_many_batch",
batch_size: int = 100,
- ) -> List[Any]:
+ ) -> List[Dict[str, Any]]:
"""Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts.
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 903606fb46..e8b6cc6b80 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -99,6 +99,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
+ ("device_lists_remote_pending", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 3aa7b94560..fbbc018887 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -60,9 +60,9 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
-from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
+from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.stringutils import MXC_REGEX
if TYPE_CHECKING:
@@ -1255,7 +1255,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
- @cached()
+ @cached(max_entries=10000)
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1274,6 +1274,27 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return entry is not None
+ @cachedList(cached_method_name="is_partial_state_room", list_name="room_ids")
+ async def is_partial_state_room_batched(
+ self, room_ids: StrCollection
+ ) -> Mapping[str, bool]:
+ """Checks if the given rooms have partial state.
+
+ Returns true for "partial-state" rooms, which means that the state
+ at events in the room, and `current_state_events`, may not yet be
+ complete.
+ """
+
+ rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch(
+ table="partial_state_rooms",
+ column="room_id",
+ iterable=room_ids,
+ retcols=("room_id",),
+ desc="is_partial_state_room_batched",
+ )
+ partial_state_rooms = {row_dict["room_id"] for row_dict in rows}
+ return {room_id: room_id in partial_state_rooms for room_id in room_ids}
+
async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
self, room_id: str
) -> Tuple[str, int]:
|