summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-29 13:19:10 +0100
committerGitHub <noreply@github.com>2024-05-29 12:19:10 +0000
commit466f344547fc6bea2c43257dd65286380fbb512d (patch)
tree40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage
parentDon't invalidate all `get_relations_for_event` on history purge (#17083) (diff)
downloadsynapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards
over restarts when a stream ID is requested but then not inserted into
the DB. This is problematic if we want to land #17215, and is generally
a potential cause for all sorts of nastiness.

Instead of trying to fix `StreamIdGenerator`, we may as well move to
`MultiWriterIdGenerator` that does not suffer from this problem (the
latest positions are stored in `stream_positions` table). This involves
adding SQLite support to the class.

This only changes id generators that were already using
`MultiWriterIdGenerator` under postgres, a separate PR will move the
rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py21
-rw-r--r--synapse/storage/databases/main/account_data.py47
-rw-r--r--synapse/storage/databases/main/deviceinbox.py46
-rw-r--r--synapse/storage/databases/main/events_worker.py101
-rw-r--r--synapse/storage/databases/main/presence.py27
-rw-r--r--synapse/storage/databases/main/receipts.py43
-rw-r--r--synapse/storage/databases/main/room.py34
-rw-r--r--synapse/storage/util/id_generators.py49
8 files changed, 153 insertions, 215 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index d9c85e411e..569f618193 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -2461,7 +2461,11 @@ class DatabasePool:
 
 
 def make_in_list_sql_clause(
-    database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
+    database_engine: BaseDatabaseEngine,
+    column: str,
+    iterable: Collection[Any],
+    *,
+    negative: bool = False,
 ) -> Tuple[str, list]:
     """Returns an SQL clause that checks the given column is in the iterable.
 
@@ -2474,6 +2478,7 @@ def make_in_list_sql_clause(
         database_engine
         column: Name of the column
         iterable: The values to check the column against.
+        negative: Whether we should check for inequality, i.e. `NOT IN`
 
     Returns:
         A tuple of SQL query and the args
@@ -2482,9 +2487,19 @@ def make_in_list_sql_clause(
     if database_engine.supports_using_any_list:
         # This should hopefully be faster, but also makes postgres query
         # stats easier to understand.
-        return "%s = ANY(?)" % (column,), [list(iterable)]
+        if not negative:
+            clause = f"{column} = ANY(?)"
+        else:
+            clause = f"{column} != ALL(?)"
+
+        return clause, [list(iterable)]
     else:
-        return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
+        params = ",".join("?" for _ in iterable)
+        if not negative:
+            clause = f"{column} IN ({params})"
+        else:
+            clause = f"{column} NOT IN ({params})"
+        return clause, list(iterable)
 
 
 # These overloads ensure that `columns` and `iterable` values have the same length.
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 563450a97e..9611a84932 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -43,11 +43,9 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict, JsonMapping
 from synapse.util import json_encoder
@@ -75,37 +73,20 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
 
         self._account_data_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._account_data_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="account_data",
-                instance_name=self._instance_name,
-                tables=[
-                    ("room_account_data", "instance_name", "stream_id"),
-                    ("room_tags_revisions", "instance_name", "stream_id"),
-                    ("account_data", "instance_name", "stream_id"),
-                ],
-                sequence_name="account_data_sequence",
-                writers=hs.config.worker.writers.account_data,
-            )
-        else:
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._account_data_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "room_account_data",
-                "stream_id",
-                extra_tables=[
-                    ("account_data", "stream_id"),
-                    ("room_tags_revisions", "stream_id"),
-                ],
-                is_writer=self._instance_name in hs.config.worker.writers.account_data,
-            )
+        self._account_data_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="account_data",
+            instance_name=self._instance_name,
+            tables=[
+                ("room_account_data", "instance_name", "stream_id"),
+                ("room_tags_revisions", "instance_name", "stream_id"),
+                ("account_data", "instance_name", "stream_id"),
+            ],
+            sequence_name="account_data_sequence",
+            writers=hs.config.worker.writers.account_data,
+        )
 
         account_max = self.get_max_account_data_stream_id()
         self._account_data_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..25023b5e7a 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -50,11 +50,9 @@ from synapse.storage.database import (
     LoggingTransaction,
     make_in_list_sql_clause,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict
 from synapse.util import json_encoder
@@ -89,35 +87,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-        if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_device = (
-                self._instance_name in hs.config.worker.writers.to_device
-            )
+        self._can_write_to_device = (
+            self._instance_name in hs.config.worker.writers.to_device
+        )
 
-            self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
-                MultiWriterIdGenerator(
-                    db_conn=db_conn,
-                    db=database,
-                    notifier=hs.get_replication_notifier(),
-                    stream_name="to_device",
-                    instance_name=self._instance_name,
-                    tables=[
-                        ("device_inbox", "instance_name", "stream_id"),
-                        ("device_federation_outbox", "instance_name", "stream_id"),
-                    ],
-                    sequence_name="device_inbox_sequence",
-                    writers=hs.config.worker.writers.to_device,
-                )
-            )
-        else:
-            self._can_write_to_device = True
-            self._to_device_msg_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "device_inbox",
-                "stream_id",
-                extra_tables=[("device_federation_outbox", "stream_id")],
-            )
+        self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="to_device",
+            instance_name=self._instance_name,
+            tables=[
+                ("device_inbox", "instance_name", "stream_id"),
+                ("device_federation_outbox", "instance_name", "stream_id"),
+            ],
+            sequence_name="device_inbox_sequence",
+            writers=hs.config.worker.writers.to_device,
+        )
 
         max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
         device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..426df2a9d2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -75,12 +75,10 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
@@ -195,51 +193,28 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._stream_id_gen: AbstractStreamIdGenerator
         self._backfill_id_gen: AbstractStreamIdGenerator
-        if isinstance(database.engine, PostgresEngine):
-            # If we're using Postgres than we can use `MultiWriterIdGenerator`
-            # regardless of whether this process writes to the streams or not.
-            self._stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="events",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_stream_seq",
-                writers=hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="backfill",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_backfill_stream_seq",
-                positive=False,
-                writers=hs.config.worker.writers.events,
-            )
-        else:
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                step=-1,
-                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
+
+        self._stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="events",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_stream_seq",
+            writers=hs.config.worker.writers.events,
+        )
+        self._backfill_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="backfill",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_backfill_stream_seq",
+            positive=False,
+            writers=hs.config.worker.writers.events,
+        )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
@@ -309,27 +284,17 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="un_partial_stated_event_stream",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    ("un_partial_stated_event_stream", "instance_name", "stream_id")
-                ],
-                sequence_name="un_partial_stated_event_stream_sequence",
-                # TODO(faster_joins, multiple writers) Support multiple writers.
-                writers=["master"],
-            )
-        else:
-            self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "un_partial_stated_event_stream",
-                "stream_id",
-            )
+        self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="un_partial_stated_event_stream",
+            instance_name=hs.get_instance_name(),
+            tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
+            sequence_name="un_partial_stated_event_stream_sequence",
+            # TODO(faster_joins, multiple writers) Support multiple writers.
+            writers=["master"],
+        )
 
     def get_un_partial_stated_events_token(self, instance_name: str) -> int:
         return (
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 567c2d30bd..923e764491 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -40,13 +40,11 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.engines._base import IsolationLevel
 from synapse.storage.types import Connection
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -91,21 +89,16 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
             self._instance_name in hs.config.worker.writers.presence
         )
 
-        if isinstance(database.engine, PostgresEngine):
-            self._presence_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="presence_stream",
-                instance_name=self._instance_name,
-                tables=[("presence_stream", "instance_name", "stream_id")],
-                sequence_name="presence_stream_sequence",
-                writers=hs.config.worker.writers.presence,
-            )
-        else:
-            self._presence_id_gen = StreamIdGenerator(
-                db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
-            )
+        self._presence_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="presence_stream",
+            instance_name=self._instance_name,
+            tables=[("presence_stream", "instance_name", "stream_id")],
+            sequence_name="presence_stream_sequence",
+            writers=hs.config.worker.writers.presence,
+        )
 
         self.hs = hs
         self._presence_on_startup = self._get_active_presence(db_conn)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 13387a3839..8432560a89 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -44,12 +44,10 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.engines._base import IsolationLevel
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import (
     JsonDict,
@@ -80,35 +78,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
         # class below that is used on the main process.
         self._receipts_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_receipts = (
-                self._instance_name in hs.config.worker.writers.receipts
-            )
+        self._can_write_to_receipts = (
+            self._instance_name in hs.config.worker.writers.receipts
+        )
 
-            self._receipts_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="receipts",
-                instance_name=self._instance_name,
-                tables=[("receipts_linearized", "instance_name", "stream_id")],
-                sequence_name="receipts_sequence",
-                writers=hs.config.worker.writers.receipts,
-            )
-        else:
-            self._can_write_to_receipts = True
-
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._receipts_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "receipts_linearized",
-                "stream_id",
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
-            )
+        self._receipts_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="receipts",
+            instance_name=self._instance_name,
+            tables=[("receipts_linearized", "instance_name", "stream_id")],
+            sequence_name="receipts_sequence",
+            writers=hs.config.worker.writers.receipts,
+        )
 
         super().__init__(database, db_conn, hs)
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 8205109548..616c941687 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -58,13 +58,11 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     IdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
 from synapse.util import json_encoder
@@ -155,27 +153,17 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="un_partial_stated_room_stream",
-                instance_name=self._instance_name,
-                tables=[
-                    ("un_partial_stated_room_stream", "instance_name", "stream_id")
-                ],
-                sequence_name="un_partial_stated_room_stream_sequence",
-                # TODO(faster_joins, multiple writers) Support multiple writers.
-                writers=["master"],
-            )
-        else:
-            self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "un_partial_stated_room_stream",
-                "stream_id",
-            )
+        self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="un_partial_stated_room_stream",
+            instance_name=self._instance_name,
+            tables=[("un_partial_stated_room_stream", "instance_name", "stream_id")],
+            sequence_name="un_partial_stated_room_stream_sequence",
+            # TODO(faster_joins, multiple writers) Support multiple writers.
+            writers=["master"],
+        )
 
     def process_replication_position(
         self, stream_name: str, instance_name: str, token: int
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index fadc75cc80..0cf5851ad7 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -53,9 +53,11 @@ from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
+    make_in_list_sql_clause,
 )
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.storage.util.sequence import PostgresSequenceGenerator
+from synapse.storage.util.sequence import build_sequence_generator
 
 if TYPE_CHECKING:
     from synapse.notifier import ReplicationNotifier
@@ -432,7 +434,22 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
         # no active writes in progress.
         self._max_position_of_local_instance = self._max_seen_allocated_stream_id
 
-        self._sequence_gen = PostgresSequenceGenerator(sequence_name)
+        # This goes and fills out the above state from the database.
+        self._load_current_ids(db_conn, tables)
+
+        self._sequence_gen = build_sequence_generator(
+            db_conn=db_conn,
+            database_engine=db.engine,
+            get_first_callback=lambda _: self._persisted_upto_position,
+            sequence_name=sequence_name,
+            # We only need to set the below if we want it to call
+            # `check_consistency`, but we do that ourselves below so we can
+            # leave them blank.
+            table=None,
+            id_column=None,
+            stream_name=None,
+            positive=positive,
+        )
 
         # We check that the table and sequence haven't diverged.
         for table, _, id_column in tables:
@@ -444,9 +461,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
                 positive=positive,
             )
 
-        # This goes and fills out the above state from the database.
-        self._load_current_ids(db_conn, tables)
-
         self._max_seen_allocated_stream_id = max(
             self._current_positions.values(), default=1
         )
@@ -480,13 +494,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             # important if we add back a writer after a long time; we want to
             # consider that a "new" writer, rather than using the old stale
             # entry here.
-            sql = """
+            clause, args = make_in_list_sql_clause(
+                self._db.engine, "instance_name", self._writers, negative=True
+            )
+
+            sql = f"""
                 DELETE FROM stream_positions
                 WHERE
                     stream_name = ?
-                    AND instance_name != ALL(?)
+                    AND {clause}
             """
-            cur.execute(sql, (self._stream_name, self._writers))
+            cur.execute(sql, [self._stream_name] + args)
 
             sql = """
                 SELECT instance_name, stream_id FROM stream_positions
@@ -508,12 +526,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             # We add a GREATEST here to ensure that the result is always
             # positive. (This can be a problem for e.g. backfill streams where
             # the server has never backfilled).
+            greatest_func = (
+                "GREATEST" if isinstance(self._db.engine, PostgresEngine) else "MAX"
+            )
             max_stream_id = 1
             for table, _, id_column in tables:
                 sql = """
-                    SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1)
+                    SELECT %(greatest_func)s(COALESCE(%(agg)s(%(id)s), 1), 1)
                     FROM %(table)s
                 """ % {
+                    "greatest_func": greatest_func,
                     "id": id_column,
                     "table": table,
                     "agg": "MAX" if self._positive else "-MIN",
@@ -913,6 +935,11 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
 
         # We upsert the value, ensuring on conflict that we always increase the
         # value (or decrease if stream goes backwards).
+        if isinstance(self._db.engine, PostgresEngine):
+            agg = "GREATEST" if self._positive else "LEAST"
+        else:
+            agg = "MAX" if self._positive else "MIN"
+
         sql = """
             INSERT INTO stream_positions (stream_name, instance_name, stream_id)
             VALUES (?, ?, ?)
@@ -920,10 +947,10 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
             DO UPDATE SET
                 stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id)
         """ % {
-            "agg": "GREATEST" if self._positive else "LEAST",
+            "agg": agg,
         }
 
-        pos = (self.get_current_token_for_writer(self._instance_name),)
+        pos = self.get_current_token_for_writer(self._instance_name)
         txn.execute(sql, (self._stream_name, self._instance_name, pos))