summary refs log tree commit diff
path: root/synapse/storage/util/sequence.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-02 12:39:49 +0100
committerErik Johnston <erik@matrix.org>2024-07-02 12:40:03 +0100
commit1ce59d7ba002a869ee94fbe375898cc79c6eb4d1 (patch)
tree40781e146a4bc05fd7e0fbe0bc5d2619c28b7683 /synapse/storage/util/sequence.py
parentFix building debian packages for sid (#17389) (diff)
downloadsynapse-1ce59d7ba002a869ee94fbe375898cc79c6eb4d1.tar.xz
Fix sync waiting for an invalid token from the "future" (#17386)
Fixes https://github.com/element-hq/synapse/issues/17274, hopefully.

Basically, old versions of Synapse could advance streams without
persisting anything in the DB (fixed in #17229). On restart those
updates would get lost, and so the position of the stream would revert
to an older position. If this happened across an upgrade to a later
Synapse version which included #17215, then sync could get blocked
indefinitely (until the stream advanced to the position in the token).

We fix this by bounding the stream positions we'll wait for to the
maximum position of the underlying stream ID generator.
Diffstat (limited to 'synapse/storage/util/sequence.py')
-rw-r--r--synapse/storage/util/sequence.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py

index c4c0602b28..cac3eba1a5 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py
@@ -88,6 +88,10 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """ ... + @abc.abstractmethod + def get_max_allocated(self, txn: Cursor) -> int: + """Get the maximum ID that we have allocated""" + class PostgresSequenceGenerator(SequenceGenerator): """An implementation of SequenceGenerator which uses a postgres sequence""" @@ -190,6 +194,17 @@ class PostgresSequenceGenerator(SequenceGenerator): % {"seq": self._sequence_name, "stream_name": stream_name} ) + def get_max_allocated(self, txn: Cursor) -> int: + # We just read from the sequence what the last value we fetched was. + txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}") + row = txn.fetchone() + assert row is not None + + last_value, is_called = row + if not is_called: + last_value -= 1 + return last_value + GetFirstCallbackType = Callable[[Cursor], int] @@ -248,6 +263,15 @@ class LocalSequenceGenerator(SequenceGenerator): # There is nothing to do for in memory sequences pass + def get_max_allocated(self, txn: Cursor) -> int: + with self._lock: + if self._current_max_id is None: + assert self._callback is not None + self._current_max_id = self._callback(txn) + self._callback = None + + return self._current_max_id + def build_sequence_generator( db_conn: "LoggingDatabaseConnection",