summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r--synapse/storage/databases/main/events_worker.py101
1 files changed, 33 insertions, 68 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..426df2a9d2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -75,12 +75,10 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
     MultiWriterIdGenerator,
-    StreamIdGenerator,
 )
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
@@ -195,51 +193,28 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._stream_id_gen: AbstractStreamIdGenerator
         self._backfill_id_gen: AbstractStreamIdGenerator
-        if isinstance(database.engine, PostgresEngine):
-            # If we're using Postgres than we can use `MultiWriterIdGenerator`
-            # regardless of whether this process writes to the streams or not.
-            self._stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="events",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_stream_seq",
-                writers=hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="backfill",
-                instance_name=hs.get_instance_name(),
-                tables=[("events", "instance_name", "stream_ordering")],
-                sequence_name="events_backfill_stream_seq",
-                positive=False,
-                writers=hs.config.worker.writers.events,
-            )
-        else:
-            # Multiple writers are not supported for SQLite.
-            #
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            self._stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
-            self._backfill_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "events",
-                "stream_ordering",
-                step=-1,
-                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
-            )
+
+        self._stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="events",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_stream_seq",
+            writers=hs.config.worker.writers.events,
+        )
+        self._backfill_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="backfill",
+            instance_name=hs.get_instance_name(),
+            tables=[("events", "instance_name", "stream_ordering")],
+            sequence_name="events_backfill_stream_seq",
+            positive=False,
+            writers=hs.config.worker.writers.events,
+        )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
@@ -309,27 +284,17 @@ class EventsWorkerStore(SQLBaseStore):
 
         self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
 
-        if isinstance(database.engine, PostgresEngine):
-            self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                notifier=hs.get_replication_notifier(),
-                stream_name="un_partial_stated_event_stream",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    ("un_partial_stated_event_stream", "instance_name", "stream_id")
-                ],
-                sequence_name="un_partial_stated_event_stream_sequence",
-                # TODO(faster_joins, multiple writers) Support multiple writers.
-                writers=["master"],
-            )
-        else:
-            self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
-                db_conn,
-                hs.get_replication_notifier(),
-                "un_partial_stated_event_stream",
-                "stream_id",
-            )
+        self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+            db_conn=db_conn,
+            db=database,
+            notifier=hs.get_replication_notifier(),
+            stream_name="un_partial_stated_event_stream",
+            instance_name=hs.get_instance_name(),
+            tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
+            sequence_name="un_partial_stated_event_stream_sequence",
+            # TODO(faster_joins, multiple writers) Support multiple writers.
+            writers=["master"],
+        )
 
     def get_un_partial_stated_events_token(self, instance_name: str) -> int:
         return (