diff options
author | Erik Johnston <erikj@element.io> | 2024-05-29 13:19:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-29 12:19:10 +0000 |
commit | 466f344547fc6bea2c43257dd65286380fbb512d (patch) | |
tree | 40ba5abd666ac6584b7d7cbae3603a3898ce91c5 /synapse/storage/databases/main/deviceinbox.py | |
parent | Don't invalidate all `get_relations_for_event` on history purge (#17083) (diff) | |
download | synapse-466f344547fc6bea2c43257dd65286380fbb512d.tar.xz |
Move towards using `MultiWriterIdGenerator` everywhere (#17226)
There is a problem with `StreamIdGenerator` where it can go backwards over restarts when a stream ID is requested but then not inserted into the DB. This is problematic if we want to land #17215, and is generally a potential cause for all sorts of nastiness. Instead of trying to fix `StreamIdGenerator`, we may as well move to `MultiWriterIdGenerator` that does not suffer from this problem (the latest positions are stored in `stream_positions` table). This involves adding SQLite support to the class. This only changes id generators that were already using `MultiWriterIdGenerator` under postgres, a separate PR will move the rest of the uses of `StreamIdGenerator` over.
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 46 |
1 files changed, 16 insertions, 30 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index e17821ff6e..25023b5e7a 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -50,11 +50,9 @@ from synapse.storage.database import ( LoggingTransaction, make_in_list_sql_clause, ) -from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, MultiWriterIdGenerator, - StreamIdGenerator, ) from synapse.types import JsonDict from synapse.util import json_encoder @@ -89,35 +87,23 @@ class DeviceInboxWorkerStore(SQLBaseStore): expiry_ms=30 * 60 * 1000, ) - if isinstance(database.engine, PostgresEngine): - self._can_write_to_device = ( - self._instance_name in hs.config.worker.writers.to_device - ) + self._can_write_to_device = ( + self._instance_name in hs.config.worker.writers.to_device + ) - self._to_device_msg_id_gen: AbstractStreamIdGenerator = ( - MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - notifier=hs.get_replication_notifier(), - stream_name="to_device", - instance_name=self._instance_name, - tables=[ - ("device_inbox", "instance_name", "stream_id"), - ("device_federation_outbox", "instance_name", "stream_id"), - ], - sequence_name="device_inbox_sequence", - writers=hs.config.worker.writers.to_device, - ) - ) - else: - self._can_write_to_device = True - self._to_device_msg_id_gen = StreamIdGenerator( - db_conn, - hs.get_replication_notifier(), - "device_inbox", - "stream_id", - extra_tables=[("device_federation_outbox", "stream_id")], - ) + self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="to_device", + instance_name=self._instance_name, + tables=[ + ("device_inbox", "instance_name", "stream_id"), + ("device_federation_outbox", "instance_name", "stream_id"), + ], + sequence_name="device_inbox_sequence", + writers=hs.config.worker.writers.to_device, + ) max_device_inbox_id = self._to_device_msg_id_gen.get_current_token() device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( |