summary refs log tree commit diff
path: root/synapse/storage/util/sequence.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/util/sequence.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index c4c0602b28..cac3eba1a5 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -88,6 +88,10 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
         """
         ...
 
+    @abc.abstractmethod
+    def get_max_allocated(self, txn: Cursor) -> int:
+        """Get the maximum ID that we have allocated"""
+
 
 class PostgresSequenceGenerator(SequenceGenerator):
     """An implementation of SequenceGenerator which uses a postgres sequence"""
@@ -190,6 +194,17 @@ class PostgresSequenceGenerator(SequenceGenerator):
                 % {"seq": self._sequence_name, "stream_name": stream_name}
             )
 
+    def get_max_allocated(self, txn: Cursor) -> int:
+        # We just read from the sequence what the last value we fetched was.
+        txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
+        row = txn.fetchone()
+        assert row is not None
+
+        last_value, is_called = row
+        if not is_called:
+            last_value -= 1
+        return last_value
+
 
 GetFirstCallbackType = Callable[[Cursor], int]
 
@@ -248,6 +263,15 @@ class LocalSequenceGenerator(SequenceGenerator):
         # There is nothing to do for in memory sequences
         pass
 
+    def get_max_allocated(self, txn: Cursor) -> int:
+        with self._lock:
+            if self._current_max_id is None:
+                assert self._callback is not None
+                self._current_max_id = self._callback(txn)
+                self._callback = None
+
+            return self._current_max_id
+
 
 def build_sequence_generator(
     db_conn: "LoggingDatabaseConnection",