diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 0d4d764483..893f954ce9 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -958,7 +958,7 @@ class SlidingSyncHandler:
# Filter by room type (space vs room, etc). A room must match one of the types
# provided in the list. `None` is a valid type for rooms which do not have a
# room type.
- if filters.room_types is not None or filters.not_room_types is not None:
+ if False and (filters.room_types is not None or filters.not_room_types is not None):
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for room_id in filtered_room_id_set.copy():
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 94d5faf9f7..3e40a50d27 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -52,7 +52,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
from synapse.rest.admin.experimental_features import ExperimentalFeature
from synapse.types import JsonDict, Requester, StreamToken
from synapse.types.rest.client import SlidingSyncBody
@@ -897,6 +897,10 @@ class SlidingSyncRestServlet(RestServlet):
# in.
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
logger.info("Sliding sync request: %r", body)
+ log_kv({"request_body": body})
+
+ if body.lists:
+ set_tag("sliding_sync.lists", True)
sync_config = SlidingSyncConfig(
user=user,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index dccae56608..0a2db74401 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -82,6 +82,7 @@ from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
+from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -1185,12 +1186,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
-
async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]:
- def rough_get_last_pos_Txn(
+ def rough_get_last_pos_txn(
txn: LoggingTransaction,
+ batch: StrSequence,
) -> Dict[str, int]:
- clause, args = make_in_list_sql_clause(self.database_engine, "room_id", room_ids)
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", batch
+ )
sql = f"""
SELECT room_id, MAX(stream_ordering) FROM events
WHERE {clause}
@@ -1199,14 +1202,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, (args,))
- return {
- room_id: stream_ordering for room_id, stream_ordering in txn
- }
+ return {room_id: stream_ordering for room_id, stream_ordering in txn}
- return await self.db_pool.runInteraction(
- "rough_get_last_pos",
- rough_get_last_pos_Txn,
- )
+ results = {}
+ for batch in batch_iter(room_ids, 100):
+ results.update(
+ await self.db_pool.runInteraction(
+ "rough_get_last_pos", rough_get_last_pos_txn, batch
+ )
+ )
+
+ return results
async def get_last_event_pos_in_room_before_stream_ordering(
self,
|