summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py23
-rw-r--r--synapse/storage/util/sequence.py37
2 files changed, 35 insertions, 25 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 59c8e05c39..48f88a6f8a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -276,9 +276,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         # no active writes in progress.
         self._max_position_of_local_instance = self._max_seen_allocated_stream_id
 
-        # This goes and fills out the above state from the database.
-        self._load_current_ids(db_conn, tables)
-
         self._sequence_gen = build_sequence_generator(
             db_conn=db_conn,
             database_engine=db.engine,
@@ -303,6 +300,13 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 positive=positive,
             )
 
+        # This goes and fills out the above state from the database.
+        # This may read on the PostgreSQL sequence, and
+        # SequenceGenerator.check_consistency might have fixed up the sequence, which
+        # means the SequenceGenerator needs to be setup before we read the value from
+        # the sequence.
+        self._load_current_ids(db_conn, tables, sequence_name)
+
         self._max_seen_allocated_stream_id = max(
             self._current_positions.values(), default=1
         )
@@ -327,6 +331,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         self,
         db_conn: LoggingDatabaseConnection,
         tables: List[Tuple[str, str, str]],
+        sequence_name: str,
     ) -> None:
         cur = db_conn.cursor(txn_name="_load_current_ids")
 
@@ -360,6 +365,18 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 if instance in self._writers
             }
 
+        # If we're a writer, we can assume we're at the end of the stream
+        # Usually, we would get that from the stream_positions, but in some cases,
+        # like if we rolled back Synapse, the stream_positions table might not be up to
+        # date. If we're using Postgres for the sequences, we can just use the current
+        # sequence value as our own position.
+        if self._instance_name in self._writers:
+            if isinstance(self._db.engine, PostgresEngine):
+                cur.execute(f"SELECT last_value FROM {sequence_name}")
+                row = cur.fetchone()
+                assert row is not None
+                self._current_positions[self._instance_name] = row[0]
+
         # We set the `_persisted_upto_position` to be the minimum of all current
         # positions. If empty we use the max stream ID from the DB table.
         min_stream_id = min(self._current_positions.values(), default=None)
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index f57e7ec41c..c4c0602b28 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -36,21 +36,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-_INCONSISTENT_SEQUENCE_ERROR = """
-Postgres sequence '%(seq)s' is inconsistent with associated
-table '%(table)s'. This can happen if Synapse has been downgraded and
-then upgraded again, or due to a bad migration.
-
-To fix this error, shut down Synapse (including any and all workers)
-and run the following SQL:
-
-    SELECT setval('%(seq)s', (
-        %(max_id_sql)s
-    ));
-
-See docs/postgres.md for more information.
-"""
-
 _INCONSISTENT_STREAM_ERROR = """
 Postgres sequence '%(seq)s' is inconsistent with associated stream position
 of '%(stream_name)s' in the 'stream_positions' table.
@@ -169,25 +154,33 @@ class PostgresSequenceGenerator(SequenceGenerator):
             if row:
                 max_in_stream_positions = row[0]
 
-        txn.close()
-
         # If `is_called` is False then `last_value` is actually the value that
         # will be generated next, so we decrement to get the true "last value".
         if not is_called:
             last_value -= 1
 
         if max_stream_id > last_value:
+            # The sequence is lagging behind the tables. This is probably due to
+            # rolling back to a version before the sequence was used and then
+            # forwards again. We resolve this by setting the sequence to the
+            # right value.
             logger.warning(
-                "Postgres sequence %s is behind table %s: %d < %d",
+                "Postgres sequence %s is behind table %s: %d < %d. Updating sequence.",
                 self._sequence_name,
                 table,
                 last_value,
                 max_stream_id,
             )
-            raise IncorrectDatabaseSetup(
-                _INCONSISTENT_SEQUENCE_ERROR
-                % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
-            )
+
+            sql = f"""
+                SELECT setval('{self._sequence_name}', GREATEST(
+                    (SELECT last_value FROM {self._sequence_name}),
+                    ({table_sql})
+                ));
+            """
+            txn.execute(sql)
+
+        txn.close()
 
         # If we have values in the stream positions table then they have to be
         # less than or equal to `last_value`