diff options
Diffstat (limited to 'synapse/replication/slave/storage/events.py')
-rw-r--r-- | synapse/replication/slave/storage/events.py | 130 |
1 files changed, 28 insertions, 102 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index ab5937e638..1a1a50a24f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -15,23 +15,21 @@ # limitations under the License. import logging -from synapse.api.constants import EventTypes -from synapse.replication.tcp.streams.events import ( - EventsStreamCurrentStateRow, - EventsStreamEventRow, +from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore +from synapse.storage.data_stores.main.event_push_actions import ( + EventPushActionsWorkerStore, ) -from synapse.storage.event_federation import EventFederationWorkerStore -from synapse.storage.event_push_actions import EventPushActionsWorkerStore -from synapse.storage.events_worker import EventsWorkerStore -from synapse.storage.relations import RelationsWorkerStore -from synapse.storage.roommember import RoomMemberWorkerStore -from synapse.storage.signatures import SignatureWorkerStore -from synapse.storage.state import StateGroupWorkerStore -from synapse.storage.stream import StreamWorkerStore -from synapse.storage.user_erasure_store import UserErasureWorkerStore +from synapse.storage.data_stores.main.events_worker import EventsWorkerStore +from synapse.storage.data_stores.main.relations import RelationsWorkerStore +from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore +from synapse.storage.data_stores.main.signatures import SignatureWorkerStore +from synapse.storage.data_stores.main.state import StateGroupWorkerStore +from synapse.storage.data_stores.main.stream import StreamWorkerStore +from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore +from synapse.storage.database import Database +from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) @@ -57,13 +55,23 @@ class SlavedEventStore( RelationsWorkerStore, BaseSlavedStore, ): - def __init__(self, db_conn, hs): - self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 + def __init__(self, database: Database, db_conn, hs): + super(SlavedEventStore, self).__init__(database, db_conn, hs) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict( + db_conn, + "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, ) - - super(SlavedEventStore, self).__init__(db_conn, hs) # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. @@ -73,85 +81,3 @@ class SlavedEventStore( def get_room_min_stream_ordering(self): return self._backfill_id_gen.get_current_token() - - def stream_positions(self): - result = super(SlavedEventStore, self).stream_positions() - result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = -self._backfill_id_gen.get_current_token() - return result - - def process_replication_rows(self, stream_name, token, rows): - if stream_name == "events": - self._stream_id_gen.advance(token) - for row in rows: - self._process_event_stream_row(token, row) - elif stream_name == "backfill": - self._backfill_id_gen.advance(-token) - for row in rows: - self.invalidate_caches_for_event( - -token, - row.event_id, - row.room_id, - row.type, - row.state_key, - row.redacts, - row.relates_to, - backfilled=True, - ) - return super(SlavedEventStore, self).process_replication_rows( - stream_name, token, rows - ) - - def _process_event_stream_row(self, token, row): - data = row.data - - if row.type == EventsStreamEventRow.TypeId: - self.invalidate_caches_for_event( - token, - data.event_id, - data.room_id, - data.type, - data.state_key, - data.redacts, - data.relates_to, - backfilled=False, - ) - elif row.type == EventsStreamCurrentStateRow.TypeId: - if data.type == EventTypes.Member: - self.get_rooms_for_user_with_stream_ordering.invalidate( - (data.state_key,) - ) - else: - raise Exception("Unknown events stream row type %s" % (row.type,)) - - def invalidate_caches_for_event( - self, - stream_ordering, - event_id, - room_id, - etype, - state_key, - redacts, - relates_to, - backfilled, - ): - self._invalidate_get_event_cache(event_id) - - self.get_latest_event_ids_in_room.invalidate((room_id,)) - - self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) - - if not backfilled: - self._events_stream_cache.entity_has_changed(room_id, stream_ordering) - - if redacts: - self._invalidate_get_event_cache(redacts) - - if etype == EventTypes.Member: - self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) - self.get_invited_rooms_for_user.invalidate((state_key,)) - - if relates_to: - self.get_relations_for_event.invalidate_many((relates_to,)) - self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) - self.get_applicable_edit.invalidate((relates_to,)) |