diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 874d41447a..6017c2a6e8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -466,6 +466,9 @@ class SQLBaseStore(object):
)
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
+ # We need to lock the table :(
+ self.database_engine.lock_table(txn, table)
+
# Try to update
sql = "UPDATE %s SET %s WHERE %s" % (
table,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 00dbae7b60..b8cca9b187 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -42,3 +42,6 @@ class PostgresEngine(object):
def is_connection_closed(self, conn):
return bool(conn)
+
+ def lock_table(self, txn, table):
+ txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 39828a597c..f62d5d1205 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -38,3 +38,6 @@ class Sqlite3Engine(object):
def is_connection_closed(self, conn):
return False
+
+ def lock_table(self, txn, table):
+ return
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7dbf7a396a..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -283,6 +283,35 @@ class EventsStore(SQLBaseStore):
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
+ for hash_alg, hash_base64 in event.hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_event_content_hash_txn(
+ txn, event.event_id, hash_alg, hash_bytes,
+ )
+
+ for prev_event_id, prev_hashes in event.prev_events:
+ for alg, hash_base64 in prev_hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_prev_event_hash_txn(
+ txn, event.event_id, prev_event_id, alg, hash_bytes
+ )
+
+ for auth_id, _ in event.auth_events:
+ self._simple_insert_txn(
+ txn,
+ table="event_auth",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ },
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+ self._store_event_reference_hash_txn(
+ txn, event.event_id, ref_alg, ref_hash_bytes
+ )
+
if event.is_state():
vals = {
"event_id": event.event_id,
@@ -301,20 +330,6 @@ class EventsStore(SQLBaseStore):
vals,
)
- if is_new_state and not context.rejected:
- self._simple_upsert_txn(
- txn,
- "current_state_events",
- keyvalues={
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- values={
- "event_id": event.event_id,
- }
- )
-
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
@@ -327,35 +342,20 @@ class EventsStore(SQLBaseStore):
},
)
- for hash_alg, hash_base64 in event.hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_event_content_hash_txn(
- txn, event.event_id, hash_alg, hash_bytes,
- )
-
- for prev_event_id, prev_hashes in event.prev_events:
- for alg, hash_base64 in prev_hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_prev_event_hash_txn(
- txn, event.event_id, prev_event_id, alg, hash_bytes
+ if is_new_state and not context.rejected:
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
)
- for auth_id, _ in event.auth_events:
- self._simple_insert_txn(
- txn,
- table="event_auth",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- },
- )
-
- (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
- self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
- )
-
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
self._invalidate_get_event_cache(event.redacts)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 7e3add5280..89dd7d8947 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -76,25 +76,16 @@ class TransactionStore(SQLBaseStore):
response_json (str)
"""
- return self.runInteraction(
- "set_received_txn_response",
- self._set_received_txn_response,
- transaction_id, origin, code, response_dict
- )
-
- def _set_received_txn_response(self, txn, transaction_id, origin, code,
- response_json):
- self._simple_upsert_txn(
- txn,
+ return self._simple_insert(
table=ReceivedTransactionsTable.table_name,
- keyvalues={
+ values={
"transaction_id": transaction_id,
"origin": origin,
- },
- values={
"response_code": code,
- "response_json": response_json,
- }
+ "response_json": response_dict,
+ },
+ or_ignore=True,
+ desc="set_received_txn_response",
)
def prep_send_transaction(self, transaction_id, destination,
|