diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 59c8e05c39..48f88a6f8a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -276,9 +276,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
# no active writes in progress.
self._max_position_of_local_instance = self._max_seen_allocated_stream_id
- # This goes and fills out the above state from the database.
- self._load_current_ids(db_conn, tables)
-
self._sequence_gen = build_sequence_generator(
db_conn=db_conn,
database_engine=db.engine,
@@ -303,6 +300,13 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
positive=positive,
)
+ # This goes and fills out the above state from the database.
+ # This may read on the PostgreSQL sequence, and
+ # SequenceGenerator.check_consistency might have fixed up the sequence, which
+ # means the SequenceGenerator needs to be setup before we read the value from
+ # the sequence.
+ self._load_current_ids(db_conn, tables, sequence_name)
+
self._max_seen_allocated_stream_id = max(
self._current_positions.values(), default=1
)
@@ -327,6 +331,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
self,
db_conn: LoggingDatabaseConnection,
tables: List[Tuple[str, str, str]],
+ sequence_name: str,
) -> None:
cur = db_conn.cursor(txn_name="_load_current_ids")
@@ -360,6 +365,18 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
if instance in self._writers
}
+ # If we're a writer, we can assume we're at the end of the stream
+ # Usually, we would get that from the stream_positions, but in some cases,
+ # like if we rolled back Synapse, the stream_positions table might not be up to
+ # date. If we're using Postgres for the sequences, we can just use the current
+ # sequence value as our own position.
+ if self._instance_name in self._writers:
+ if isinstance(self._db.engine, PostgresEngine):
+ cur.execute(f"SELECT last_value FROM {sequence_name}")
+ row = cur.fetchone()
+ assert row is not None
+ self._current_positions[self._instance_name] = row[0]
+
# We set the `_persisted_upto_position` to be the minimum of all current
# positions. If empty we use the max stream ID from the DB table.
min_stream_id = min(self._current_positions.values(), default=None)
|