diff options
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/id_generators.py | 23 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 37 |
2 files changed, 35 insertions, 25 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 59c8e05c39..48f88a6f8a 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -276,9 +276,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # no active writes in progress. self._max_position_of_local_instance = self._max_seen_allocated_stream_id - # This goes and fills out the above state from the database. - self._load_current_ids(db_conn, tables) - self._sequence_gen = build_sequence_generator( db_conn=db_conn, database_engine=db.engine, @@ -303,6 +300,13 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): positive=positive, ) + # This goes and fills out the above state from the database. + # This may read on the PostgreSQL sequence, and + # SequenceGenerator.check_consistency might have fixed up the sequence, which + # means the SequenceGenerator needs to be setup before we read the value from + # the sequence. + self._load_current_ids(db_conn, tables, sequence_name) + self._max_seen_allocated_stream_id = max( self._current_positions.values(), default=1 ) @@ -327,6 +331,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): self, db_conn: LoggingDatabaseConnection, tables: List[Tuple[str, str, str]], + sequence_name: str, ) -> None: cur = db_conn.cursor(txn_name="_load_current_ids") @@ -360,6 +365,18 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): if instance in self._writers } + # If we're a writer, we can assume we're at the end of the stream + # Usually, we would get that from the stream_positions, but in some cases, + # like if we rolled back Synapse, the stream_positions table might not be up to + # date. If we're using Postgres for the sequences, we can just use the current + # sequence value as our own position. + if self._instance_name in self._writers: + if isinstance(self._db.engine, PostgresEngine): + cur.execute(f"SELECT last_value FROM {sequence_name}") + row = cur.fetchone() + assert row is not None + self._current_positions[self._instance_name] = row[0] + # We set the `_persisted_upto_position` to be the minimum of all current # positions. If empty we use the max stream ID from the DB table. min_stream_id = min(self._current_positions.values(), default=None) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index f57e7ec41c..c4c0602b28 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -36,21 +36,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -_INCONSISTENT_SEQUENCE_ERROR = """ -Postgres sequence '%(seq)s' is inconsistent with associated -table '%(table)s'. This can happen if Synapse has been downgraded and -then upgraded again, or due to a bad migration. - -To fix this error, shut down Synapse (including any and all workers) -and run the following SQL: - - SELECT setval('%(seq)s', ( - %(max_id_sql)s - )); - -See docs/postgres.md for more information. -""" - _INCONSISTENT_STREAM_ERROR = """ Postgres sequence '%(seq)s' is inconsistent with associated stream position of '%(stream_name)s' in the 'stream_positions' table. @@ -169,25 +154,33 @@ class PostgresSequenceGenerator(SequenceGenerator): if row: max_in_stream_positions = row[0] - txn.close() - # If `is_called` is False then `last_value` is actually the value that # will be generated next, so we decrement to get the true "last value". if not is_called: last_value -= 1 if max_stream_id > last_value: + # The sequence is lagging behind the tables. This is probably due to + # rolling back to a version before the sequence was used and then + # forwards again. We resolve this by setting the sequence to the + # right value. logger.warning( - "Postgres sequence %s is behind table %s: %d < %d", + "Postgres sequence %s is behind table %s: %d < %d. Updating sequence.", self._sequence_name, table, last_value, max_stream_id, ) - raise IncorrectDatabaseSetup( - _INCONSISTENT_SEQUENCE_ERROR - % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} - ) + + sql = f""" + SELECT setval('{self._sequence_name}', GREATEST( + (SELECT last_value FROM {self._sequence_name}), + ({table_sql}) + )); + """ + txn.execute(sql) + + txn.close() # If we have values in the stream positions table then they have to be # less than or equal to `last_value` |