diff options
author | Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> | 2023-03-02 18:27:00 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-02 18:27:00 +0000 |
commit | 1eea662780a6325af0a61ceb447b4c91a2d3ac98 (patch) | |
tree | 368aa5bc8af7bb2d2d553e08afd552180f1088fb /synapse/storage/util | |
parent | Add support for knocking to workers. (#15133) (diff) | |
download | synapse-1eea662780a6325af0a61ceb447b4c91a2d3ac98.tar.xz |
Add a `get_next_txn` method to `StreamIdGenerator` to match `MultiWriterIdGenerator` (#15191
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/id_generators.py | 45 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 2 |
2 files changed, 45 insertions, 2 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9adff3f4f5..334d3d718b 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -158,6 +158,15 @@ class AbstractStreamIdGenerator(AbstractStreamIdTracker): """ raise NotImplementedError() + @abc.abstractmethod + def get_next_txn(self, txn: LoggingTransaction) -> int: + """ + Usage: + stream_id_gen.get_next_txn(txn) + # ... persist events ... + """ + raise NotImplementedError() + class StreamIdGenerator(AbstractStreamIdGenerator): """Generates and tracks stream IDs for a stream with a single writer. @@ -263,6 +272,40 @@ class StreamIdGenerator(AbstractStreamIdGenerator): return _AsyncCtxManagerWrapper(manager()) + def get_next_txn(self, txn: LoggingTransaction) -> int: + """ + Retrieve the next stream ID from within a database transaction. + + Clean-up functions will be called when the transaction finishes. + + Args: + txn: The database transaction object. + + Returns: + The next stream ID. + """ + if not self._is_writer: + raise Exception("Tried to allocate stream ID on non-writer") + + # Get the next stream ID. + with self._lock: + self._current += self._step + next_id = self._current + + self._unfinished_ids[next_id] = next_id + + def clear_unfinished_id(id_to_clear: int) -> None: + """A function to mark processing this ID as finished""" + with self._lock: + self._unfinished_ids.pop(id_to_clear) + + # Mark this ID as finished once the database transaction itself finishes. + txn.call_after(clear_unfinished_id, next_id) + txn.call_on_exception(clear_unfinished_id, next_id) + + # Return the new ID. + return next_id + def get_current_token(self) -> int: if not self._is_writer: return self._current @@ -568,7 +611,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): """ Usage: - stream_id = stream_id_gen.get_next(txn) + stream_id = stream_id_gen.get_next_txn(txn) # ... persist event ... """ diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 75268cbe15..80915216de 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -205,7 +205,7 @@ class LocalSequenceGenerator(SequenceGenerator): """ Args: get_first_callback: a callback which is called on the first call to - get_next_id_txn; should return the curreent maximum id + get_next_id_txn; should return the current maximum id """ # the callback. this is cleared after it is called, so that it can be GCed. self._callback: Optional[GetFirstCallbackType] = get_first_callback |