diff options
author | Erik Johnston <erikj@element.io> | 2024-05-29 13:19:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-29 12:19:10 +0000 |
commit | 466f344547fc6bea2c43257dd65286380fbb512d (patch) | |
tree | 40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage/util | |
parent | Don't invalidate all `get_relations_for_event` on history purge (#17083) (diff) | |
download | synapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz |
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards over restarts when a stream ID is requested but then not inserted into the DB. This is problematic if we want to land #17215, and is generally a potential cause for all sorts of nastiness. Instead of trying to fix `StreamIdGenerator`, we may as well move to `MultiWriterIdGenerator` that does not suffer from this problem (the latest positions are stored in `stream_positions` table). This involves adding SQLite support to the class. This only changes id generators that were already using `MultiWriterIdGenerator` under postgres, a separate PR will move the rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r-- | synapse/storage/util/id_generators.py | 49 |
1 files changed, 38 insertions, 11 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index fadc75cc80..0cf5851ad7 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -53,9 +53,11 @@ from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + make_in_list_sql_clause, ) +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.storage.util.sequence import PostgresSequenceGenerator +from synapse.storage.util.sequence import build_sequence_generator if TYPE_CHECKING: from synapse.notifier import ReplicationNotifier @@ -432,7 +434,22 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # no active writes in progress. self._max_position_of_local_instance = self._max_seen_allocated_stream_id - self._sequence_gen = PostgresSequenceGenerator(sequence_name) + # This goes and fills out the above state from the database. + self._load_current_ids(db_conn, tables) + + self._sequence_gen = build_sequence_generator( + db_conn=db_conn, + database_engine=db.engine, + get_first_callback=lambda _: self._persisted_upto_position, + sequence_name=sequence_name, + # We only need to set the below if we want it to call + # `check_consistency`, but we do that ourselves below so we can + # leave them blank. + table=None, + id_column=None, + stream_name=None, + positive=positive, + ) # We check that the table and sequence haven't diverged. for table, _, id_column in tables: @@ -444,9 +461,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): positive=positive, ) - # This goes and fills out the above state from the database. - self._load_current_ids(db_conn, tables) - self._max_seen_allocated_stream_id = max( self._current_positions.values(), default=1 ) @@ -480,13 +494,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # important if we add back a writer after a long time; we want to # consider that a "new" writer, rather than using the old stale # entry here. - sql = """ + clause, args = make_in_list_sql_clause( + self._db.engine, "instance_name", self._writers, negative=True + ) + + sql = f""" DELETE FROM stream_positions WHERE stream_name = ? - AND instance_name != ALL(?) + AND {clause} """ - cur.execute(sql, (self._stream_name, self._writers)) + cur.execute(sql, [self._stream_name] + args) sql = """ SELECT instance_name, stream_id FROM stream_positions @@ -508,12 +526,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # We add a GREATEST here to ensure that the result is always # positive. (This can be a problem for e.g. backfill streams where # the server has never backfilled). + greatest_func = ( + "GREATEST" if isinstance(self._db.engine, PostgresEngine) else "MAX" + ) max_stream_id = 1 for table, _, id_column in tables: sql = """ - SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1) + SELECT %(greatest_func)s(COALESCE(%(agg)s(%(id)s), 1), 1) FROM %(table)s """ % { + "greatest_func": greatest_func, "id": id_column, "table": table, "agg": "MAX" if self._positive else "-MIN", @@ -913,6 +935,11 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # We upsert the value, ensuring on conflict that we always increase the # value (or decrease if stream goes backwards). + if isinstance(self._db.engine, PostgresEngine): + agg = "GREATEST" if self._positive else "LEAST" + else: + agg = "MAX" if self._positive else "MIN" + sql = """ INSERT INTO stream_positions (stream_name, instance_name, stream_id) VALUES (?, ?, ?) @@ -920,10 +947,10 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): DO UPDATE SET stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id) """ % { - "agg": "GREATEST" if self._positive else "LEAST", + "agg": agg, } - pos = (self.get_current_token_for_writer(self._instance_name),) + pos = self.get_current_token_for_writer(self._instance_name) txn.execute(sql, (self._stream_name, self._instance_name, pos)) |