diff options
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/id_generators.py | 36 |
1 files changed, 9 insertions, 27 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f58bf7fd2c..5c522f4ab9 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -72,28 +72,24 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self, table, column): + def __init__(self, db_conn, table, column): self.table = table self.column = column self._lock = threading.Lock() - self._current_max = None + cur = db_conn.cursor() + self._current_max = self._get_or_compute_current_max(cur) + cur.close() + self._unfinished_ids = deque() - @defer.inlineCallbacks def get_next(self, store): """ Usage: with yield stream_id_gen.get_next as stream_id: # ... persist event ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: self._current_max += 1 next_id = self._current_max @@ -108,21 +104,14 @@ class StreamIdGenerator(object): with self._lock: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_next_mult(self, store, n): """ Usage: with yield stream_id_gen.get_next(store, n) as stream_ids: # ... persist events ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: next_ids = range(self._current_max + 1, self._current_max + n + 1) self._current_max += n @@ -139,24 +128,17 @@ class StreamIdGenerator(object): for next_id in next_ids: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_max_token(self, store): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: if self._unfinished_ids: - defer.returnValue(self._unfinished_ids[0] - 1) + return self._unfinished_ids[0] - 1 - defer.returnValue(self._current_max) + return self._current_max def _get_or_compute_current_max(self, txn): with self._lock: |