summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2023-03-02 18:27:00 +0000
committerGitHub <noreply@github.com>2023-03-02 18:27:00 +0000
commit1eea662780a6325af0a61ceb447b4c91a2d3ac98 (patch)
tree368aa5bc8af7bb2d2d553e08afd552180f1088fb /synapse/storage/util
parentAdd support for knocking to workers. (#15133) (diff)
downloadsynapse-1eea662780a6325af0a61ceb447b4c91a2d3ac98.tar.xz
Add a `get_next_txn` method to `StreamIdGenerator` to match `MultiWriterIdGenerator` (#15191
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py45
-rw-r--r--synapse/storage/util/sequence.py2
2 files changed, 45 insertions, 2 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 9adff3f4f5..334d3d718b 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -158,6 +158,15 @@ class AbstractStreamIdGenerator(AbstractStreamIdTracker):
         """
         raise NotImplementedError()
 
+    @abc.abstractmethod
+    def get_next_txn(self, txn: LoggingTransaction) -> int:
+        """
+        Usage:
+            stream_id_gen.get_next_txn(txn)
+            # ... persist events ...
+        """
+        raise NotImplementedError()
+
 
 class StreamIdGenerator(AbstractStreamIdGenerator):
     """Generates and tracks stream IDs for a stream with a single writer.
@@ -263,6 +272,40 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
 
         return _AsyncCtxManagerWrapper(manager())
 
+    def get_next_txn(self, txn: LoggingTransaction) -> int:
+        """
+        Retrieve the next stream ID from within a database transaction.
+
+        Clean-up functions will be called when the transaction finishes.
+
+        Args:
+            txn: The database transaction object.
+
+        Returns:
+            The next stream ID.
+        """
+        if not self._is_writer:
+            raise Exception("Tried to allocate stream ID on non-writer")
+
+        # Get the next stream ID.
+        with self._lock:
+            self._current += self._step
+            next_id = self._current
+
+            self._unfinished_ids[next_id] = next_id
+
+        def clear_unfinished_id(id_to_clear: int) -> None:
+            """A function to mark processing this ID as finished"""
+            with self._lock:
+                self._unfinished_ids.pop(id_to_clear)
+
+        # Mark this ID as finished once the database transaction itself finishes.
+        txn.call_after(clear_unfinished_id, next_id)
+        txn.call_on_exception(clear_unfinished_id, next_id)
+
+        # Return the new ID.
+        return next_id
+
     def get_current_token(self) -> int:
         if not self._is_writer:
             return self._current
@@ -568,7 +611,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         """
         Usage:
 
-            stream_id = stream_id_gen.get_next(txn)
+            stream_id = stream_id_gen.get_next_txn(txn)
             # ... persist event ...
         """
 
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 75268cbe15..80915216de 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -205,7 +205,7 @@ class LocalSequenceGenerator(SequenceGenerator):
         """
         Args:
             get_first_callback: a callback which is called on the first call to
-                 get_next_id_txn; should return the curreent maximum id
+                 get_next_id_txn; should return the current maximum id
         """
         # the callback. this is cleared after it is called, so that it can be GCed.
         self._callback: Optional[GetFirstCallbackType] = get_first_callback