diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-04-29 13:15:14 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-04-29 13:15:14 +0100 |
commit | 4ad8b451559682645c818f6c1180e5f9d42a7eeb (patch) | |
tree | 94fe0beaaaa4ed8c1bb9451f1274df819d9ce8d7 /synapse/storage/util | |
parent | Update the query format used by keyring to match current key v2 spec (diff) | |
parent | Mention that postgres databases must have the correct charset encoding (diff) | |
download | synapse-4ad8b451559682645c818f6c1180e5f9d42a7eeb.tar.xz |
Merge branch 'develop' into key_distribution
Conflicts: synapse/config/homeserver.py
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/__init__.py | 14 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 131 |
2 files changed, 145 insertions, 0 deletions
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..9d461d5e96 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + cur = val or 0 + cur += 1 + self._next_id = cur + 1 + + return cur + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + with self._lock: + if not self._current_max: + self._compute_current_max(txn) + + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_id + finally: + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + @defer.inlineCallbacks + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + defer.returnValue(self._unfinished_ids[0] - 1) + + if not self._current_max: + yield store.runInteraction( + "_compute_current_max", + self._compute_current_max, + ) + + defer.returnValue(self._current_max) + + def _compute_current_max(self, txn): + txn.execute("SELECT MAX(stream_ordering) FROM events") + val, = txn.fetchone() + + self._current_max = int(val) if val else 1 + + return self._current_max |