diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 59c8e05c39..48f88a6f8a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -276,9 +276,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
# no active writes in progress.
self._max_position_of_local_instance = self._max_seen_allocated_stream_id
- # This goes and fills out the above state from the database.
- self._load_current_ids(db_conn, tables)
-
self._sequence_gen = build_sequence_generator(
db_conn=db_conn,
database_engine=db.engine,
@@ -303,6 +300,13 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
positive=positive,
)
+ # This goes and fills out the above state from the database.
+ # This may read on the PostgreSQL sequence, and
+ # SequenceGenerator.check_consistency might have fixed up the sequence, which
+ # means the SequenceGenerator needs to be setup before we read the value from
+ # the sequence.
+ self._load_current_ids(db_conn, tables, sequence_name)
+
self._max_seen_allocated_stream_id = max(
self._current_positions.values(), default=1
)
@@ -327,6 +331,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
self,
db_conn: LoggingDatabaseConnection,
tables: List[Tuple[str, str, str]],
+ sequence_name: str,
) -> None:
cur = db_conn.cursor(txn_name="_load_current_ids")
@@ -360,6 +365,18 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
if instance in self._writers
}
+ # If we're a writer, we can assume we're at the end of the stream
+ # Usually, we would get that from the stream_positions, but in some cases,
+ # like if we rolled back Synapse, the stream_positions table might not be up to
+ # date. If we're using Postgres for the sequences, we can just use the current
+ # sequence value as our own position.
+ if self._instance_name in self._writers:
+ if isinstance(self._db.engine, PostgresEngine):
+ cur.execute(f"SELECT last_value FROM {sequence_name}")
+ row = cur.fetchone()
+ assert row is not None
+ self._current_positions[self._instance_name] = row[0]
+
# We set the `_persisted_upto_position` to be the minimum of all current
# positions. If empty we use the max stream ID from the DB table.
min_stream_id = min(self._current_positions.values(), default=None)
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index f57e7ec41c..c4c0602b28 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -36,21 +36,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-_INCONSISTENT_SEQUENCE_ERROR = """
-Postgres sequence '%(seq)s' is inconsistent with associated
-table '%(table)s'. This can happen if Synapse has been downgraded and
-then upgraded again, or due to a bad migration.
-
-To fix this error, shut down Synapse (including any and all workers)
-and run the following SQL:
-
- SELECT setval('%(seq)s', (
- %(max_id_sql)s
- ));
-
-See docs/postgres.md for more information.
-"""
-
_INCONSISTENT_STREAM_ERROR = """
Postgres sequence '%(seq)s' is inconsistent with associated stream position
of '%(stream_name)s' in the 'stream_positions' table.
@@ -169,25 +154,33 @@ class PostgresSequenceGenerator(SequenceGenerator):
if row:
max_in_stream_positions = row[0]
- txn.close()
-
# If `is_called` is False then `last_value` is actually the value that
# will be generated next, so we decrement to get the true "last value".
if not is_called:
last_value -= 1
if max_stream_id > last_value:
+ # The sequence is lagging behind the tables. This is probably due to
+ # rolling back to a version before the sequence was used and then
+ # forwards again. We resolve this by setting the sequence to the
+ # right value.
logger.warning(
- "Postgres sequence %s is behind table %s: %d < %d",
+ "Postgres sequence %s is behind table %s: %d < %d. Updating sequence.",
self._sequence_name,
table,
last_value,
max_stream_id,
)
- raise IncorrectDatabaseSetup(
- _INCONSISTENT_SEQUENCE_ERROR
- % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
- )
+
+ sql = f"""
+ SELECT setval('{self._sequence_name}', GREATEST(
+ (SELECT last_value FROM {self._sequence_name}),
+ ({table_sql})
+ ));
+ """
+ txn.execute(sql)
+
+ txn.close()
# If we have values in the stream positions table then they have to be
# less than or equal to `last_value`
|