summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py54
-rw-r--r--synapse/storage/events.py13
-rw-r--r--synapse/storage/registration.py10
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/storage/transactions.py4
5 files changed, 68 insertions, 19 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index de4f661973..9f63f07080 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -163,8 +163,8 @@ class LoggingTransaction(object):
             return self.txn.execute(
                 sql, *args, **kwargs
             )
-        except:
-                logger.exception("[SQL FAIL] {%s}", self.name)
+        except Exception as e:
+                logger.debug("[SQL FAIL] {%s} %s", self.name, e)
                 raise
         finally:
             msecs = (time.time() * 1000) - start
@@ -209,6 +209,46 @@ class PerformanceCounters(object):
         return top_n_counters
 
 
+class IdGenerator(object):
+    def __init__(self, table, column, store):
+        self.table = table
+        self.column = column
+        self.store = store
+        self._lock = threading.Lock()
+        self._next_id = None
+
+    @defer.inlineCallbacks
+    def get_next(self):
+        with self._lock:
+            if not self._next_id:
+                res = yield self.store._execute_and_decode(
+                    "IdGenerator_%s" % (self.table,),
+                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+                )
+
+                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+            i = self._next_id
+            self._next_id += 1
+            defer.returnValue(i)
+
+    def get_next_txn(self, txn):
+        with self._lock:
+            if self._next_id:
+                i = self._next_id
+                self._next_id += 1
+                return i
+            else:
+                txn.execute(
+                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+                )
+
+                val, = txn.fetchone()
+                self._next_id = val or 2
+
+                return 1
+
+
 class SQLBaseStore(object):
     _TXN_ID = 0
 
@@ -234,8 +274,10 @@ class SQLBaseStore(object):
         # Pretend the getEventCache is just another named cache
         caches_by_name["*getEvent*"] = self._get_event_cache
 
-        self._next_stream_id_lock = threading.Lock()
-        self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
+        self._stream_id_gen = IdGenerator("events", "stream_ordering", self)
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
 
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
@@ -292,8 +334,8 @@ class SQLBaseStore(object):
                         LoggingTransaction(txn, name, self.database_engine),
                         *args, **kwargs
                     )
-                except:
-                    logger.exception("[TXN FAIL] {%s}", name)
+                except Exception as e:
+                    logger.debug("[TXN FAIL] {%s}", name, e)
                     raise
                 finally:
                     end = time.time() * 1000
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 69f598967e..514feebcbf 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -96,12 +96,16 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._get_event_cache.pop(event.event_id)
 
+        if stream_ordering is None:
+            stream_ordering = self._stream_id_gen.get_next_txn(txn)
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -240,9 +244,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is None:
-            stream_ordering = self.get_next_stream_id()
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 0c785ec989..b62b4a3414 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -39,10 +39,12 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        yield self._simple_insert(
+        next_id = yield self._access_tokens_id_gen.get_next()
+
+        self._simple_insert(
             "access_tokens",
             {
-                "id": self.get_next_stream_id(),
+                "id": next_id,
                 "user_id": user_id,
                 "token": token
             },
@@ -68,6 +70,8 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash):
         now = int(self.clock.time())
 
+        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
         try:
             txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
                         "VALUES (?,?,?)",
@@ -82,7 +86,7 @@ class RegistrationStore(SQLBaseStore):
         txn.execute(
             "INSERT INTO access_tokens(id, user_id, token)"
             " VALUES (?,?,?)",
-            (self.get_next_stream_id(), user_id, token,)
+            (next_id, user_id, token,)
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 65ea9c4d83..3e55cb81bf 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -93,12 +93,12 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
-            group = _make_group_id(self._clock)
-            state_group = self._simple_insert_txn(
+            state_group = _make_group_id(self._clock)
+            self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
-                    "id": group,
+                    "id": state_group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e3e484fb2d..9594fe1f2b 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -123,6 +123,8 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
+        next_id = self._transaction_id_gen.get_next_txn(txn)
+
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
@@ -143,7 +145,7 @@ class TransactionStore(SQLBaseStore):
             txn,
             table=SentTransactions.table_name,
             values={
-                "id": self.get_next_stream_id(),
+                "id": next_id,
                 "transaction_id": transaction_id,
                 "destination": destination,
                 "ts": origin_server_ts,