diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 0dd15f16ff..5503621ad6 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -52,7 +52,6 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
- AbstractStreamIdTracker,
StreamIdGenerator,
)
from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
@@ -91,7 +90,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
- self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+ self._device_list_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
@@ -712,9 +711,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
The new stream ID.
"""
- # TODO: this looks like it's _writing_. Should this be on DeviceStore rather
- # than DeviceWorkerStore?
- async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
+ async with self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b7e7498125..20b7a68362 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -72,7 +72,6 @@ from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
- AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
)
@@ -187,8 +186,8 @@ class EventsWorkerStore(SQLBaseStore):
):
super().__init__(database, db_conn, hs)
- self._stream_id_gen: AbstractStreamIdTracker
- self._backfill_id_gen: AbstractStreamIdTracker
+ self._stream_id_gen: AbstractStreamIdGenerator
+ self._backfill_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
# If we're using Postgres than we can use `MultiWriterIdGenerator`
# regardless of whether this process writes to the streams or not.
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 9b2bbe060d..9f862f00c1 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -46,7 +46,6 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
- AbstractStreamIdTracker,
IdGenerator,
StreamIdGenerator,
)
@@ -118,7 +117,7 @@ class PushRulesWorkerStore(
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
- self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+ self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index fddbc07afa..9a24f7a655 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -36,7 +36,6 @@ from synapse.storage.database import (
)
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
- AbstractStreamIdTracker,
StreamIdGenerator,
)
from synapse.types import JsonDict
@@ -60,7 +59,7 @@ class PusherWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
- self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+ self._pushers_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"pushers",
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 92a82240ab..074942b167 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -39,7 +39,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import (
- AbstractStreamIdTracker,
+ AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
@@ -65,7 +65,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
- self._receipts_id_gen: AbstractStreamIdTracker
+ self._receipts_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
self._can_write_to_receipts = (
@@ -768,7 +768,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"insert_receipt_conv", self._graph_to_linear, room_id, event_ids
)
- async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
+ async with self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self._insert_linearized_receipt_txn,
|