summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-29 13:19:10 +0100
committerGitHub <noreply@github.com>2024-05-29 12:19:10 +0000
commit466f344547fc6bea2c43257dd65286380fbb512d (patch)
tree40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage/databases/main/deviceinbox.py
parentDon't invalidate all `get_relations_for_event` on history purge (#17083) (diff)
downloadsynapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards
over restarts when a stream ID is requested but then not inserted into
the DB. This is problematic if we want to land #17215, and is generally
a potential cause for all sorts of nastiness.

Instead of trying to fix `StreamIdGenerator`, we may as well move to
`MultiWriterIdGenerator` that does not suffer from this problem (the
latest positions are stored in `stream_positions` table). This involves
adding SQLite support to the class.

This only changes id generators that were already using
`MultiWriterIdGenerator` under postgres, a separate PR will move the
rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py46
1 files changed, 16 insertions, 30 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..25023b5e7a 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -50,11 +50,9 @@ from synapse.storage.database import (
     LoggingTransaction,
     make_in_list_sql_clause,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.types import JsonDict
 from synapse.util import json_encoder
@@ -89,35 +87,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-        if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_device = (
-                self._instance_name in hs.config.worker.writers.to_device
-            )
+        self._can_write_to_device = (
+            self._instance_name in hs.config.worker.writers.to_device
+        )
 
-            self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
-                MultiWriterIdGenerator(
-                    db_conn=db_conn,
-                    db=database,
-                    notifier=hs.get_replication_notifier(),
-                    stream_name="to_device",
-                    instance_name=self._instance_name,
-                    tables=[
-                        ("device_inbox", "instance_name", "stream_id"),
-                        ("device_federation_outbox", "instance_name", "stream_id"),
-                    ],
-                    sequence_name="device_inbox_sequence",
-                    writers=hs.config.worker.writers.to_device,
-                )
-            )
-        else:
-            self._can_write_to_device = True
-            self._to_device_msg_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "device_inbox",
-                "stream_id",
-                extra_tables=[("device_federation_outbox", "stream_id")],
-            )
+        self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="to_device",
+            instance_name=self._instance_name,
+            tables=[
+                ("device_inbox", "instance_name", "stream_id"),
+                ("device_federation_outbox", "instance_name", "stream_id"),
+            ],
+            sequence_name="device_inbox_sequence",
+            writers=hs.config.worker.writers.to_device,
+        )
 
         max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
         device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(