diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 303ef6ea27..34d6c52e39 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -705,7 +705,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
key_names=("destination", "user_id"),
key_values=[(destination, user_id) for user_id, _ in rows],
value_names=("stream_id",),
- value_values=((stream_id,) for _, stream_id in rows),
+ value_values=[(stream_id,) for _, stream_id in rows],
)
# Delete all sent outbound pokes
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 647ba182f6..7c34bde3e5 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1476,7 +1476,7 @@ class PersistEventsStore:
txn,
table="event_json",
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
- values=(
+ values=[
(
event.event_id,
event.room_id,
@@ -1485,7 +1485,7 @@ class PersistEventsStore:
event.format_version,
)
for event, _ in events_and_contexts
- ),
+ ],
)
self.db_pool.simple_insert_many_txn(
@@ -1508,7 +1508,7 @@ class PersistEventsStore:
"state_key",
"rejection_reason",
),
- values=(
+ values=[
(
self._instance_name,
event.internal_metadata.stream_ordering,
@@ -1527,7 +1527,7 @@ class PersistEventsStore:
context.rejected,
)
for event, context in events_and_contexts
- ),
+ ],
)
# If we're persisting an unredacted event we go and ensure
@@ -1550,11 +1550,11 @@ class PersistEventsStore:
txn,
table="state_events",
keys=("event_id", "room_id", "type", "state_key"),
- values=(
+ values=[
(event.event_id, event.room_id, event.type, event.state_key)
for event, _ in events_and_contexts
if event.is_state()
- ),
+ ],
)
def _store_rejected_events_txn(
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 22025eca56..37135d431d 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -28,8 +28,11 @@ from typing import (
cast,
)
+from twisted.internet import defer
+
from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
@@ -51,7 +54,8 @@ from synapse.storage.util.id_generators import (
)
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
from synapse.types import JsonDict
-from synapse.util import json_encoder
+from synapse.util import json_encoder, unwrapFirstError
+from synapse.util.async_helpers import gather_results
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -249,23 +253,33 @@ class PushRulesWorkerStore(
user_id: [] for user_id in user_ids
}
- rows = cast(
- List[Tuple[str, str, int, int, str, str]],
- await self.db_pool.simple_select_many_batch(
- table="push_rules",
- column="user_name",
- iterable=user_ids,
- retcols=(
- "user_name",
- "rule_id",
- "priority_class",
- "priority",
- "conditions",
- "actions",
+ # gatherResults loses all type information.
+ rows, enabled_map_by_user = await make_deferred_yieldable(
+ gather_results(
+ (
+ cast(
+ "defer.Deferred[List[Tuple[str, str, int, int, str, str]]]",
+ run_in_background(
+ self.db_pool.simple_select_many_batch,
+ table="push_rules",
+ column="user_name",
+ iterable=user_ids,
+ retcols=(
+ "user_name",
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ),
+ desc="bulk_get_push_rules",
+ batch_size=1000,
+ ),
+ ),
+ run_in_background(self.bulk_get_push_rules_enabled, user_ids),
),
- desc="bulk_get_push_rules",
- batch_size=1000,
- ),
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
)
# Sort by highest priority_class, then highest priority.
@@ -276,8 +290,6 @@ class PushRulesWorkerStore(
(rule_id, priority_class, conditions, actions)
)
- enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
-
results: Dict[str, FilteredPushRules] = {}
for user_id, rules in raw_rules.items():
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6d4b9891e7..afb880532e 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2268,7 +2268,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
txn,
table="partial_state_rooms_servers",
keys=("room_id", "server_name"),
- values=((room_id, s) for s in servers),
+ values=[(room_id, s) for s in servers],
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
self._invalidate_cache_and_stream(
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index dbde9130c6..f4bef4c99b 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -106,7 +106,7 @@ class SearchWorkerStore(SQLBaseStore):
txn,
table="event_search",
keys=("event_id", "room_id", "key", "value"),
- values=(
+ values=[
(
entry.event_id,
entry.room_id,
@@ -114,7 +114,7 @@ class SearchWorkerStore(SQLBaseStore):
_clean_value_for_search(entry.value),
)
for entry in entries
- ),
+ ],
)
else:
|