diff options
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 101 |
1 files changed, 33 insertions, 68 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e39d4b9624..426df2a9d2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -75,12 +75,10 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, MultiWriterIdGenerator, - StreamIdGenerator, ) from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id @@ -195,51 +193,28 @@ class EventsWorkerStore(SQLBaseStore): self._stream_id_gen: AbstractStreamIdGenerator self._backfill_id_gen: AbstractStreamIdGenerator - if isinstance(database.engine, PostgresEngine): - # If we're using Postgres than we can use `MultiWriterIdGenerator` - # regardless of whether this process writes to the streams or not. - self._stream_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - notifier=hs.get_replication_notifier(), - stream_name="events", - instance_name=hs.get_instance_name(), - tables=[("events", "instance_name", "stream_ordering")], - sequence_name="events_stream_seq", - writers=hs.config.worker.writers.events, - ) - self._backfill_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - notifier=hs.get_replication_notifier(), - stream_name="backfill", - instance_name=hs.get_instance_name(), - tables=[("events", "instance_name", "stream_ordering")], - sequence_name="events_backfill_stream_seq", - positive=False, - writers=hs.config.worker.writers.events, - ) - else: - # Multiple writers are not supported for SQLite. - # - # We shouldn't be running in worker mode with SQLite, but its useful - # to support it for unit tests. - self._stream_id_gen = StreamIdGenerator( - db_conn, - hs.get_replication_notifier(), - "events", - "stream_ordering", - is_writer=hs.get_instance_name() in hs.config.worker.writers.events, - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - hs.get_replication_notifier(), - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - is_writer=hs.get_instance_name() in hs.config.worker.writers.events, - ) + + self._stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="events", + instance_name=hs.get_instance_name(), + tables=[("events", "instance_name", "stream_ordering")], + sequence_name="events_stream_seq", + writers=hs.config.worker.writers.events, + ) + self._backfill_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="backfill", + instance_name=hs.get_instance_name(), + tables=[("events", "instance_name", "stream_ordering")], + sequence_name="events_backfill_stream_seq", + positive=False, + writers=hs.config.worker.writers.events, + ) events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( @@ -309,27 +284,17 @@ class EventsWorkerStore(SQLBaseStore): self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator - if isinstance(database.engine, PostgresEngine): - self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - notifier=hs.get_replication_notifier(), - stream_name="un_partial_stated_event_stream", - instance_name=hs.get_instance_name(), - tables=[ - ("un_partial_stated_event_stream", "instance_name", "stream_id") - ], - sequence_name="un_partial_stated_event_stream_sequence", - # TODO(faster_joins, multiple writers) Support multiple writers. - writers=["master"], - ) - else: - self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( - db_conn, - hs.get_replication_notifier(), - "un_partial_stated_event_stream", - "stream_id", - ) + self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="un_partial_stated_event_stream", + instance_name=hs.get_instance_name(), + tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")], + sequence_name="un_partial_stated_event_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) def get_un_partial_stated_events_token(self, instance_name: str) -> int: return ( |