1 files changed, 24 insertions, 0 deletions
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index c4c0602b28..cac3eba1a5 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -88,6 +88,10 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
"""
...
+ @abc.abstractmethod
+ def get_max_allocated(self, txn: Cursor) -> int:
+ """Get the maximum ID that we have allocated"""
+
class PostgresSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses a postgres sequence"""
@@ -190,6 +194,17 @@ class PostgresSequenceGenerator(SequenceGenerator):
% {"seq": self._sequence_name, "stream_name": stream_name}
)
+ def get_max_allocated(self, txn: Cursor) -> int:
+ # We just read from the sequence what the last value we fetched was.
+ txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
+ row = txn.fetchone()
+ assert row is not None
+
+ last_value, is_called = row
+ if not is_called:
+ last_value -= 1
+ return last_value
+
GetFirstCallbackType = Callable[[Cursor], int]
@@ -248,6 +263,15 @@ class LocalSequenceGenerator(SequenceGenerator):
# There is nothing to do for in memory sequences
pass
+ def get_max_allocated(self, txn: Cursor) -> int:
+ with self._lock:
+ if self._current_max_id is None:
+ assert self._callback is not None
+ self._current_max_id = self._callback(txn)
+ self._callback = None
+
+ return self._current_max_id
+
def build_sequence_generator(
db_conn: "LoggingDatabaseConnection",
|