summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-04-27 13:22:30 +0100
committerErik Johnston <erik@matrix.org>2015-04-27 13:22:30 +0100
commit2732be83d9e883184f4a783fb7ba15487f30c20d (patch)
tree55ca53c5d3f1c0641d65cbe040ebe32d5ee4992c /synapse/storage
parentHandle the fact that postgres databases can be restarted from under us (diff)
downloadsynapse-2732be83d9e883184f4a783fb7ba15487f30c20d.tar.xz
Shuffle operations so that locking upsert happens last in the txn. This ensures the lock is held for the least amount of time possible.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py3
-rw-r--r--synapse/storage/engines/postgres.py3
-rw-r--r--synapse/storage/engines/sqlite3.py3
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/storage/transactions.py21
5 files changed, 56 insertions, 56 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 874d41447a..6017c2a6e8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -466,6 +466,9 @@ class SQLBaseStore(object):
         )
 
     def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
+        # We need to lock the table :(
+        self.database_engine.lock_table(txn, table)
+
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 00dbae7b60..b8cca9b187 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -42,3 +42,6 @@ class PostgresEngine(object):
 
     def is_connection_closed(self, conn):
         return bool(conn)
+
+    def lock_table(self, txn, table):
+        txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 39828a597c..f62d5d1205 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -38,3 +38,6 @@ class Sqlite3Engine(object):
 
     def is_connection_closed(self, conn):
         return False
+
+    def lock_table(self, txn, table):
+        return
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7dbf7a396a..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -283,6 +283,35 @@ class EventsStore(SQLBaseStore):
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
 
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
         if event.is_state():
             vals = {
                 "event_id": event.event_id,
@@ -301,20 +330,6 @@ class EventsStore(SQLBaseStore):
                 vals,
             )
 
-            if is_new_state and not context.rejected:
-                self._simple_upsert_txn(
-                    txn,
-                    "current_state_events",
-                    keyvalues={
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    values={
-                        "event_id": event.event_id,
-                    }
-                )
-
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
                     txn,
@@ -327,35 +342,20 @@ class EventsStore(SQLBaseStore):
                     },
                 )
 
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
+            if is_new_state and not context.rejected:
+                self._simple_upsert_txn(
+                    txn,
+                    "current_state_events",
+                    keyvalues={
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         self._invalidate_get_event_cache(event.redacts)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 7e3add5280..89dd7d8947 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -76,25 +76,16 @@ class TransactionStore(SQLBaseStore):
             response_json (str)
         """
 
-        return self.runInteraction(
-            "set_received_txn_response",
-            self._set_received_txn_response,
-            transaction_id, origin, code, response_dict
-        )
-
-    def _set_received_txn_response(self, txn, transaction_id, origin, code,
-                                   response_json):
-        self._simple_upsert_txn(
-            txn,
+        return self._simple_insert(
             table=ReceivedTransactionsTable.table_name,
-            keyvalues={
+            values={
                 "transaction_id": transaction_id,
                 "origin": origin,
-            },
-            values={
                 "response_code": code,
-                "response_json": response_json,
-            }
+                "response_json": response_dict,
+            },
+            or_ignore=True,
+            desc="set_received_txn_response",
         )
 
     def prep_send_transaction(self, transaction_id, destination,