diff options
author | Erik Johnston <erik@matrix.org> | 2015-04-09 11:41:36 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-04-09 11:41:36 +0100 |
commit | 8ad0f4912ed72daced74ae4d1c939ebdbc517476 (patch) | |
tree | 0d957164f88804f2f5863f739d4b6f7849b7316e /synapse/storage/util | |
parent | Fix tests after commit 9a0579 (diff) | |
download | synapse-8ad0f4912ed72daced74ae4d1c939ebdbc517476.tar.xz |
Stream ordering and out of order insertions.
Handle the fact that events can be persisted out of order, and so to get the "current max" stream token becomes non trivial - as we need to make sure that *all* stream tokens less than the current max have also successfully been persisted.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/__init__.py | 14 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 126 |
2 files changed, 140 insertions, 0 deletions
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..8f419323a7 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + self._next_id = val or 2 + + return 1 + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + with self._lock: + if not self._current_max: + self._compute_current_max(txn) + + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + yield next_id + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + return self._unfinished_ids[0] - 1 + + if not self._current_max: + return store.runInteraction( + "_compute_current_max", + self._compute_current_max, + ) + + return self._current_max + + def _compute_current_max(self, txn): + txn.execute("SELECT MAX(stream_ordering) FROM events") + val, = txn.fetchone() + + self._current_max = int(val) if val else 1 + + return self._current_max |