From bc3fc3927f9e457fece51bf5611e85ab785ce08b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 May 2020 11:25:51 +0100 Subject: Move events ID gens to EventWorkerStore --- synapse/replication/slave/storage/events.py | 20 -------------------- synapse/storage/data_stores/main/__init__.py | 13 ------------- synapse/storage/data_stores/main/events_worker.py | 22 ++++++++++++++++++++++ 3 files changed, 22 insertions(+), 33 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fc04ad5fe0..1c3dd72b2d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -58,26 +58,6 @@ class SlavedEventStore( BaseSlavedStore, ): def __init__(self, database: Database, db_conn, hs): - if hs.config.worker_app is None: - self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - extra_tables=[("local_invites", "stream_id")], - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) - else: - self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) - super(SlavedEventStore, self).__init__(database, db_conn, hs) events_max = self._stream_id_gen.get_current_token() diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 5df9dce79d..fa63bad0a8 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -125,19 +125,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - extra_tables=[("local_invites", "stream_id")], - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" ) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 970c31bd05..8e0f269721 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -37,8 +37,10 @@ from synapse.events import make_event_from_dict from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks from synapse.util.iterutils import batch_iter @@ -74,6 +76,26 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) + if hs.config.worker_app is None: + self._stream_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + extra_tables=[("local_invites", "stream_id")], + ) + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + ) + else: + self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) + self._get_event_cache = Cache( "*getEvent*", keylen=3, -- cgit 1.4.1