diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0693d39006..5552dd3c5c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -896,6 +896,9 @@ class DatabasePool:
) -> None:
"""Executes an INSERT query on the named table.
+ The input is given as a list of dicts, with one dict per row.
+ Generally simple_insert_many_values should be preferred for new code.
+
Args:
table: string giving the table name
values: dict of new column names and values for them
@@ -909,6 +912,9 @@ class DatabasePool:
) -> None:
"""Executes an INSERT query on the named table.
+ The input is given as a list of dicts, with one dict per row.
+ Generally simple_insert_many_values_txn should be preferred for new code.
+
Args:
txn: The transaction to use.
table: string giving the table name
@@ -933,23 +939,66 @@ class DatabasePool:
if k != keys[0]:
raise RuntimeError("All items must have the same keys")
+ return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)
+
+ async def simple_insert_many_values(
+ self,
+ table: str,
+ keys: Collection[str],
+ values: Iterable[Iterable[Any]],
+ desc: str,
+ ) -> None:
+ """Executes an INSERT query on the named table.
+
+ The input is given as a list of rows, where each row is a list of values.
+ (Actually any iterable is fine.)
+
+ Args:
+ table: string giving the table name
+ keys: list of column names
+ values: for each row, a list of values in the same order as `keys`
+ desc: description of the transaction, for logging and metrics
+ """
+ await self.runInteraction(
+ desc, self.simple_insert_many_values_txn, table, keys, values
+ )
+
+ @staticmethod
+ def simple_insert_many_values_txn(
+ txn: LoggingTransaction,
+ table: str,
+ keys: Collection[str],
+ values: Iterable[Iterable[Any]],
+ ) -> None:
+ """Executes an INSERT query on the named table.
+
+ The input is given as a list of rows, where each row is a list of values.
+ (Actually any iterable is fine.)
+
+ Args:
+ txn: The transaction to use.
+ table: string giving the table name
+ keys: list of column names
+ values: for each row, a list of values in the same order as `keys`
+ """
+
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % (
table,
- ", ".join(k for k in keys[0]),
+ ", ".join(k for k in keys),
)
- txn.execute_values(sql, vals, fetch=False)
+ txn.execute_values(sql, values, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
- ", ".join(k for k in keys[0]),
- ", ".join("?" for _ in keys[0]),
+ ", ".join(k for k in keys),
+ ", ".join("?" for _ in keys),
)
- txn.execute_batch(sql, vals)
+ txn.execute_batch(sql, values)
async def simple_upsert(
self,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index eed453d836..5184e6bf85 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -19,6 +19,7 @@ from collections import OrderedDict
from typing import (
TYPE_CHECKING,
Any,
+ Collection,
Dict,
Generator,
Iterable,
@@ -1319,14 +1320,13 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
- def _store_event_txn(self, txn, events_and_contexts):
+ def _store_event_txn(
+ self,
+ txn: LoggingTransaction,
+ events_and_contexts: Collection[Tuple[EventBase, EventContext]],
+ ) -> None:
"""Insert new events into the event, event_json, redaction and
state_events tables.
-
- Args:
- txn (twisted.enterprise.adbapi.Connection): db connection
- events_and_contexts (list[(EventBase, EventContext)]): events
- we are persisting
"""
if not events_and_contexts:
@@ -1339,46 +1339,58 @@ class PersistEventsStore:
d.pop("redacted_because", None)
return d
- self.db_pool.simple_insert_many_txn(
+ self.db_pool.simple_insert_many_values_txn(
txn,
table="event_json",
- values=[
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "internal_metadata": json_encoder.encode(
- event.internal_metadata.get_dict()
- ),
- "json": json_encoder.encode(event_dict(event)),
- "format_version": event.format_version,
- }
+ keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
+ values=(
+ (
+ event.event_id,
+ event.room_id,
+ json_encoder.encode(event.internal_metadata.get_dict()),
+ json_encoder.encode(event_dict(event)),
+ event.format_version,
+ )
for event, _ in events_and_contexts
- ],
+ ),
)
- self.db_pool.simple_insert_many_txn(
+ self.db_pool.simple_insert_many_values_txn(
txn,
table="events",
- values=[
- {
- "instance_name": self._instance_name,
- "stream_ordering": event.internal_metadata.stream_ordering,
- "topological_ordering": event.depth,
- "depth": event.depth,
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "processed": True,
- "outlier": event.internal_metadata.is_outlier(),
- "origin_server_ts": int(event.origin_server_ts),
- "received_ts": self._clock.time_msec(),
- "sender": event.sender,
- "contains_url": (
- "url" in event.content and isinstance(event.content["url"], str)
- ),
- }
+ keys=(
+ "instance_name",
+ "stream_ordering",
+ "topological_ordering",
+ "depth",
+ "event_id",
+ "room_id",
+ "type",
+ "processed",
+ "outlier",
+ "origin_server_ts",
+ "received_ts",
+ "sender",
+ "contains_url",
+ ),
+ values=(
+ (
+ self._instance_name,
+ event.internal_metadata.stream_ordering,
+ event.depth, # topological_ordering
+ event.depth, # depth
+ event.event_id,
+ event.room_id,
+ event.type,
+ True, # processed
+ event.internal_metadata.is_outlier(),
+ int(event.origin_server_ts),
+ self._clock.time_msec(),
+ event.sender,
+ "url" in event.content and isinstance(event.content["url"], str),
+ )
for event, _ in events_and_contexts
- ],
+ ),
)
# If we're persisting an unredacted event we go and ensure
@@ -1397,23 +1409,15 @@ class PersistEventsStore:
)
txn.execute(sql + clause, [False] + args)
- state_events_and_contexts = [
- ec for ec in events_and_contexts if ec[0].is_state()
- ]
-
- state_values = []
- for event, _ in state_events_and_contexts:
- vals = {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- }
-
- state_values.append(vals)
-
- self.db_pool.simple_insert_many_txn(
- txn, table="state_events", values=state_values
+ self.db_pool.simple_insert_many_values_txn(
+ txn,
+ table="state_events",
+ keys=("event_id", "room_id", "type", "state_key"),
+ values=(
+ (event.event_id, event.room_id, event.type, event.state_key)
+ for event, _ in events_and_contexts
+ if event.is_state()
+ ),
)
def _store_rejected_events_txn(self, txn, events_and_contexts):
|