summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/util/id_generators.py23
1 files changed, 20 insertions, 3 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 59c8e05c39..48f88a6f8a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -276,9 +276,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         # no active writes in progress.
         self._max_position_of_local_instance = self._max_seen_allocated_stream_id
 
-        # This goes and fills out the above state from the database.
-        self._load_current_ids(db_conn, tables)
-
         self._sequence_gen = build_sequence_generator(
             db_conn=db_conn,
             database_engine=db.engine,
@@ -303,6 +300,13 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 positive=positive,
             )
 
+        # This goes and fills out the above state from the database.
+        # This may read on the PostgreSQL sequence, and
+        # SequenceGenerator.check_consistency might have fixed up the sequence, which
+        # means the SequenceGenerator needs to be setup before we read the value from
+        # the sequence.
+        self._load_current_ids(db_conn, tables, sequence_name)
+
         self._max_seen_allocated_stream_id = max(
             self._current_positions.values(), default=1
         )
@@ -327,6 +331,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         self,
         db_conn: LoggingDatabaseConnection,
         tables: List[Tuple[str, str, str]],
+        sequence_name: str,
     ) -> None:
         cur = db_conn.cursor(txn_name="_load_current_ids")
 
@@ -360,6 +365,18 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 if instance in self._writers
             }
 
+        # If we're a writer, we can assume we're at the end of the stream
+        # Usually, we would get that from the stream_positions, but in some cases,
+        # like if we rolled back Synapse, the stream_positions table might not be up to
+        # date. If we're using Postgres for the sequences, we can just use the current
+        # sequence value as our own position.
+        if self._instance_name in self._writers:
+            if isinstance(self._db.engine, PostgresEngine):
+                cur.execute(f"SELECT last_value FROM {sequence_name}")
+                row = cur.fetchone()
+                assert row is not None
+                self._current_positions[self._instance_name] = row[0]
+
         # We set the `_persisted_upto_position` to be the minimum of all current
         # positions. If empty we use the max stream ID from the DB table.
         min_stream_id = min(self._current_positions.values(), default=None)