summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-29 13:19:10 +0100
committerGitHub <noreply@github.com>2024-05-29 12:19:10 +0000
commit466f344547fc6bea2c43257dd65286380fbb512d (patch)
tree40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage/databases
parentDon't invalidate all `get_relations_for_event` on history purge (#17083) (diff)
downloadsynapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards
over restarts when a stream ID is requested but then not inserted into
the DB. This is problematic if we want to land #17215, and is generally
a potential cause for all sorts of nastiness.

Instead of trying to fix `StreamIdGenerator`, we may as well move to
`MultiWriterIdGenerator` that does not suffer from this problem (the
latest positions are stored in `stream_positions` table). This involves
adding SQLite support to the class.

This only changes id generators that were already using
`MultiWriterIdGenerator` under postgres, a separate PR will move the
rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/account_data.py47
-rw-r--r--synapse/storage/databases/main/deviceinbox.py46
-rw-r--r--synapse/storage/databases/main/events_worker.py101
-rw-r--r--synapse/storage/databases/main/presence.py27
-rw-r--r--synapse/storage/databases/main/receipts.py43
-rw-r--r--synapse/storage/databases/main/room.py34
6 files changed, 97 insertions, 201 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 563450a97e..9611a84932 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -43,11 +43,9 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict, JsonMapping
 from synapse.util import json_encoder
@@ -75,37 +73,20 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
 
         self._account_data_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._account_data_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="account_data",
-                instance_name=self._instance_name,
-                tables=[
-                    ("room_account_data", "instance_name", "stream_id"),
-                    ("room_tags_revisions", "instance_name", "stream_id"),
-                    ("account_data", "instance_name", "stream_id"),
-                ],
-                sequence_name="account_data_sequence",
-                writers=hs.config.worker.writers.account_data,
-            )
-        else:
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._account_data_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "room_account_data",
-                "stream_id",
-                extra_tables=[
-                    ("account_data", "stream_id"),
-                    ("room_tags_revisions", "stream_id"),
-                ],
-                is_writer=self._instance_name in hs.config.worker.writers.account_data,
-            )
+        self._account_data_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="account_data",
+            instance_name=self._instance_name,
+            tables=[
+                ("room_account_data", "instance_name", "stream_id"),
+                ("room_tags_revisions", "instance_name", "stream_id"),
+                ("account_data", "instance_name", "stream_id"),
+            ],
+            sequence_name="account_data_sequence",
+            writers=hs.config.worker.writers.account_data,
+        )
 
         account_max = self.get_max_account_data_stream_id()
         self._account_data_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..25023b5e7a 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -50,11 +50,9 @@ from synapse.storage.database import (
     LoggingTransaction,
     make_in_list_sql_clause,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict
 from synapse.util import json_encoder
@@ -89,35 +87,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-        if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_device = (
-                self._instance_name in hs.config.worker.writers.to_device
-            )
+        self._can_write_to_device = (
+            self._instance_name in hs.config.worker.writers.to_device
+        )
 
-            self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
-                MultiWriterIdGenerator(
-                    db_conn=db_conn,
-                    db=database,
-                    notifier=hs.get_replication_notifier(),
-                    stream_name="to_device",
-                    instance_name=self._instance_name,
-                    tables=[
-                        ("device_inbox", "instance_name", "stream_id"),
-                        ("device_federation_outbox", "instance_name", "stream_id"),
-                    ],
-                    sequence_name="device_inbox_sequence",
-                    writers=hs.config.worker.writers.to_device,
-                )
-            )
-        else:
-            self._can_write_to_device = True
-            self._to_device_msg_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "device_inbox",
-                "stream_id",
-                extra_tables=[("device_federation_outbox", "stream_id")],
-            )
+        self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="to_device",
+            instance_name=self._instance_name,
+            tables=[
+                ("device_inbox", "instance_name", "stream_id"),
+                ("device_federation_outbox", "instance_name", "stream_id"),
+            ],
+            sequence_name="device_inbox_sequence",
+            writers=hs.config.worker.writers.to_device,
+        )
 
         max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
         device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..426df2a9d2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -75,12 +75,10 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
@@ -195,51 +193,28 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._stream_id_gen: AbstractStreamIdGenerator
         self._backfill_id_gen: AbstractStreamIdGenerator
-        if isinstance(database.engine, PostgresEngine):
-            # If we're using Postgres than we can use `MultiWriterIdGenerator`
-            # regardless of whether this process writes to the streams or not.
-            self._stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="events",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_stream_seq",
-                writers=hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="backfill",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_backfill_stream_seq",
-                positive=False,
-                writers=hs.config.worker.writers.events,
-            )
-        else:
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                step=-1,
-                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
+
+        self._stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="events",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_stream_seq",
+            writers=hs.config.worker.writers.events,
+        )
+        self._backfill_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="backfill",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_backfill_stream_seq",
+            positive=False,
+            writers=hs.config.worker.writers.events,
+        )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
@@ -309,27 +284,17 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="un_partial_stated_event_stream",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    ("un_partial_stated_event_stream", "instance_name", "stream_id")
-                ],
-                sequence_name="un_partial_stated_event_stream_sequence",
-                # TODO(faster_joins, multiple writers) Support multiple writers.
-                writers=["master"],
-            )
-        else:
-            self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "un_partial_stated_event_stream",
-                "stream_id",
-            )
+        self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="un_partial_stated_event_stream",
+            instance_name=hs.get_instance_name(),
+            tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
+            sequence_name="un_partial_stated_event_stream_sequence",
+            # TODO(faster_joins, multiple writers) Support multiple writers.
+            writers=["master"],
+        )
 
     def get_un_partial_stated_events_token(self, instance_name: str) -> int:
         return (
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 567c2d30bd..923e764491 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -40,13 +40,11 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.engines._base import IsolationLevel
 from synapse.storage.types import Connection
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -91,21 +89,16 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
             self._instance_name in hs.config.worker.writers.presence
         )
 
-        if isinstance(database.engine, PostgresEngine):
-            self._presence_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="presence_stream",
-                instance_name=self._instance_name,
-                tables=[("presence_stream", "instance_name", "stream_id")],
-                sequence_name="presence_stream_sequence",
-                writers=hs.config.worker.writers.presence,
-            )
-        else:
-            self._presence_id_gen = StreamIdGenerator(
-                db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
-            )
+        self._presence_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="presence_stream",
+            instance_name=self._instance_name,
+            tables=[("presence_stream", "instance_name", "stream_id")],
+            sequence_name="presence_stream_sequence",
+            writers=hs.config.worker.writers.presence,
+        )
 
         self.hs = hs
         self._presence_on_startup = self._get_active_presence(db_conn)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 13387a3839..8432560a89 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -44,12 +44,10 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.engines._base import IsolationLevel
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import (
     JsonDict,
@@ -80,35 +78,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
         # class below that is used on the main process.
         self._receipts_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_receipts = (
-                self._instance_name in hs.config.worker.writers.receipts
-            )
+        self._can_write_to_receipts = (
+            self._instance_name in hs.config.worker.writers.receipts
+        )
 
-            self._receipts_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="receipts",
-                instance_name=self._instance_name,
-                tables=[("receipts_linearized", "instance_name", "stream_id")],
-                sequence_name="receipts_sequence",
-                writers=hs.config.worker.writers.receipts,
-            )
-        else:
-            self._can_write_to_receipts = True
-
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._receipts_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "receipts_linearized",
-                "stream_id",
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
-            )
+        self._receipts_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="receipts",
+            instance_name=self._instance_name,
+            tables=[("receipts_linearized", "instance_name", "stream_id")],
+            sequence_name="receipts_sequence",
+            writers=hs.config.worker.writers.receipts,
+        )
 
         super().__init__(database, db_conn, hs)
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 8205109548..616c941687 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -58,13 +58,11 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     IdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
 from synapse.util import json_encoder
@@ -155,27 +153,17 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="un_partial_stated_room_stream",
-                instance_name=self._instance_name,
-                tables=[
-                    ("un_partial_stated_room_stream", "instance_name", "stream_id")
-                ],
-                sequence_name="un_partial_stated_room_stream_sequence",
-                # TODO(faster_joins, multiple writers) Support multiple writers.
-                writers=["master"],
-            )
-        else:
-            self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "un_partial_stated_room_stream",
-                "stream_id",
-            )
+        self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="un_partial_stated_room_stream",
+            instance_name=self._instance_name,
+            tables=[("un_partial_stated_room_stream", "instance_name", "stream_id")],
+            sequence_name="un_partial_stated_room_stream_sequence",
+            # TODO(faster_joins, multiple writers) Support multiple writers.
+            writers=["master"],
+        )
 
     def process_replication_position(
         self, stream_name: str, instance_name: str, token: int