summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-29 13:19:10 +0100
committerGitHub <noreply@github.com>2024-05-29 12:19:10 +0000
commit466f344547fc6bea2c43257dd65286380fbb512d (patch)
tree40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage/util
parentDon't invalidate all `get_relations_for_event` on history purge (#17083) (diff)
downloadsynapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards
over restarts when a stream ID is requested but then not inserted into
the DB. This is problematic if we want to land #17215, and is generally
a potential cause for all sorts of nastiness.

Instead of trying to fix `StreamIdGenerator`, we may as well move to
`MultiWriterIdGenerator` that does not suffer from this problem (the
latest positions are stored in `stream_positions` table). This involves
adding SQLite support to the class.

This only changes id generators that were already using
`MultiWriterIdGenerator` under postgres, a separate PR will move the
rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py49
1 files changed, 38 insertions, 11 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index fadc75cc80..0cf5851ad7 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -53,9 +53,11 @@ from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
+    make_in_list_sql_clause,
 )
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.storage.util.sequence import PostgresSequenceGenerator
+from synapse.storage.util.sequence import build_sequence_generator
 
 if TYPE_CHECKING:
     from synapse.notifier import ReplicationNotifier
@@ -432,7 +434,22 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         # no active writes in progress.
         self._max_position_of_local_instance = self._max_seen_allocated_stream_id
 
-        self._sequence_gen = PostgresSequenceGenerator(sequence_name)
+        # This goes and fills out the above state from the database.
+        self._load_current_ids(db_conn, tables)
+
+        self._sequence_gen = build_sequence_generator(
+            db_conn=db_conn,
+            database_engine=db.engine,
+            get_first_callback=lambda _: self._persisted_upto_position,
+            sequence_name=sequence_name,
+            # We only need to set the below if we want it to call
+            # `check_consistency`, but we do that ourselves below so we can
+            # leave them blank.
+            table=None,
+            id_column=None,
+            stream_name=None,
+            positive=positive,
+        )
 
         # We check that the table and sequence haven't diverged.
         for table, _, id_column in tables:
@@ -444,9 +461,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 positive=positive,
             )
 
-        # This goes and fills out the above state from the database.
-        self._load_current_ids(db_conn, tables)
-
         self._max_seen_allocated_stream_id = max(
             self._current_positions.values(), default=1
         )
@@ -480,13 +494,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             # important if we add back a writer after a long time; we want to
             # consider that a "new" writer, rather than using the old stale
             # entry here.
-            sql = """
+            clause, args = make_in_list_sql_clause(
+                self._db.engine, "instance_name", self._writers, negative=True
+            )
+
+            sql = f"""
                 DELETE FROM stream_positions
                 WHERE
                     stream_name = ?
-                    AND instance_name != ALL(?)
+                    AND {clause}
             """
-            cur.execute(sql, (self._stream_name, self._writers))
+            cur.execute(sql, [self._stream_name] + args)
 
             sql = """
                 SELECT instance_name, stream_id FROM stream_positions
@@ -508,12 +526,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             # We add a GREATEST here to ensure that the result is always
             # positive. (This can be a problem for e.g. backfill streams where
             # the server has never backfilled).
+            greatest_func = (
+                "GREATEST" if isinstance(self._db.engine, PostgresEngine) else "MAX"
+            )
             max_stream_id = 1
             for table, _, id_column in tables:
                 sql = """
-                    SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1)
+                    SELECT %(greatest_func)s(COALESCE(%(agg)s(%(id)s), 1), 1)
                     FROM %(table)s
                 """ % {
+                    "greatest_func": greatest_func,
                     "id": id_column,
                     "table": table,
                     "agg": "MAX" if self._positive else "-MIN",
@@ -913,6 +935,11 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
 
         # We upsert the value, ensuring on conflict that we always increase the
         # value (or decrease if stream goes backwards).
+        if isinstance(self._db.engine, PostgresEngine):
+            agg = "GREATEST" if self._positive else "LEAST"
+        else:
+            agg = "MAX" if self._positive else "MIN"
+
         sql = """
             INSERT INTO stream_positions (stream_name, instance_name, stream_id)
             VALUES (?, ?, ?)
@@ -920,10 +947,10 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             DO UPDATE SET
                 stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id)
         """ % {
-            "agg": "GREATEST" if self._positive else "LEAST",
+            "agg": agg,
         }
 
-        pos = (self.get_current_token_for_writer(self._instance_name),)
+        pos = self.get_current_token_for_writer(self._instance_name)
         txn.execute(sql, (self._stream_name, self._instance_name, pos))