diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e01c61d08d..b7c3cf03c8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -160,18 +160,23 @@ class LoggingTransaction(object):
def __setattr__(self, name, value):
setattr(self.txn, name, value)
- def execute(self, sql, *args, **kwargs):
+ def execute(self, sql, *args):
+ self._do_execute(self.txn.execute, sql, *args)
+
+ def executemany(self, sql, *args):
+ self._do_execute(self.txn.executemany, sql, *args)
+
+ def _do_execute(self, func, sql, *args):
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
sql = self.database_engine.convert_param_style(sql)
- if args and args[0]:
+ if args:
try:
sql_logger.debug(
- "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
- self.name,
- *args[0]
+ "[SQL values] {%s} %r",
+ self.name, args[0]
)
except:
# Don't let logging failures stop SQL from working
@@ -180,8 +185,8 @@ class LoggingTransaction(object):
start = time.time() * 1000
try:
- return self.txn.execute(
- sql, *args, **kwargs
+ return func(
+ sql, *args
)
except Exception as e:
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
@@ -434,18 +439,41 @@ class SQLBaseStore(object):
@log_function
def _simple_insert_txn(self, txn, table, values):
+ keys, vals = zip(*values.items())
+
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
- ", ".join(k for k in values),
- ", ".join("?" for k in values)
+ ", ".join(k for k in keys),
+ ", ".join("?" for _ in keys)
)
- logger.debug(
- "[SQL] %s Args=%s",
- sql, values.values(),
+ txn.execute(sql, vals)
+
+ def _simple_insert_many_txn(self, txn, table, values):
+ if not values:
+ return
+
+ keys, vals = zip(*[
+ zip(
+ *(sorted(i.items(), key=lambda kv: kv[0]))
+ )
+ for i in values
+ if i
+ ])
+
+ for k in keys:
+ if k != keys[0]:
+ raise RuntimeError(
+ "All items must have the same keys"
+ )
+
+ sql = "INSERT INTO %s (%s) VALUES(%s)" % (
+ table,
+ ", ".join(k for k in keys[0]),
+ ", ".join("?" for _ in keys[0])
)
- txn.execute(sql, values.values())
+ txn.executemany(sql, vals)
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 68f39bd684..0aca4ba17b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore):
For the given event, update the event edges table and forward and
backward extremities tables.
"""
- for e_id, _ in prev_events:
- # TODO (erikj): This could be done as a bulk insert
- self._simple_insert_txn(
- txn,
+ self._simple_insert_many_txn(
+ txn,
table="event_edges",
- values={
- "event_id": event_id,
- "prev_event_id": e_id,
- "room_id": room_id,
- "is_state": False,
- },
- )
+ values=[
+ {
+ "event_id": event_id,
+ "prev_event_id": e_id,
+ "room_id": room_id,
+ "is_state": False,
+ }
+ for e_id, _ in prev_events
+ ],
+ )
# Update the extremities table if this is not an outlier.
if not outlier:
@@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore):
# Insert all the prev_events as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
- for e_id, _ in prev_events:
- # TODO (erikj): This could be done as a bulk insert
- self._simple_insert_txn(
- txn,
- table="event_backward_extremities",
- values={
+ self._simple_insert_many_txn(
+ txn,
+ table="event_backward_extremities",
+ values=[
+ {
"event_id": e_id,
"room_id": room_id,
- },
- )
+ }
+ for e_id, _ in prev_events
+ ],
+ )
# Also delete from the backwards extremities table all ones that
# reference events that we have already seen
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a3c260ddc4..84e446a99c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -113,17 +113,19 @@ class EventsStore(SQLBaseStore):
keyvalues={"room_id": event.room_id},
)
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
+ self._simple_insert_many_txn(
+ txn,
+ "current_state_events",
+ [
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
- },
- )
+ }
+ for s in current_state
+ ],
+ )
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
@@ -296,16 +298,18 @@ class EventsStore(SQLBaseStore):
txn, event.event_id, prev_event_id, alg, hash_bytes
)
- for auth_id, _ in event.auth_events:
- self._simple_insert_txn(
- txn,
- table="event_auth",
- values={
+ self._simple_insert_many_txn(
+ txn,
+ table="event_auth",
+ values=[
+ {
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
- },
- )
+ }
+ for auth_id, _ in event.auth_events
+ ],
+ )
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
@@ -330,17 +334,19 @@ class EventsStore(SQLBaseStore):
vals,
)
- for e_id, h in event.prev_state:
- self._simple_insert_txn(
- txn,
- table="event_edges",
- values={
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": True,
- },
- )
+ }
+ for e_id, h in event.prev_state
+ ],
+ )
if is_new_state and not context.rejected:
self._simple_upsert_txn(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7e55e8bed6..dbc0e49c1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -104,18 +104,20 @@ class StateStore(SQLBaseStore):
},
)
- for state in state_events.values():
- self._simple_insert_txn(
- txn,
- table="state_groups_state",
- values={
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
"state_group": state_group,
"room_id": state.room_id,
"type": state.type,
"state_key": state.state_key,
"event_id": state.event_id,
- },
- )
+ }
+ for state in state_events.values()
+ ],
+ )
self._simple_insert_txn(
txn,
|