diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 29d7296b43..b1f64ef0d8 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,13 +16,13 @@
import logging
from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.roommember import RoomMemberStore
-from synapse.storage.state import StateGroupReadStore
-from synapse.storage.stream import StreamStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.event_federation import EventFederationWorkerStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -37,138 +38,33 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
+class SlavedEventStore(EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ StreamWorkerStore,
+ EventsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
+ BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering",
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
- self.stream_ordering_month_ago = 0
- self._stream_order_on_start = self.get_room_max_stream_ordering()
+ super(SlavedEventStore, self).__init__(db_conn, hs)
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
- get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
- get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
- get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
- get_users_who_share_room_with_user = (
- RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
- )
- get_latest_event_ids_in_room = EventFederationStore.__dict__[
- "get_latest_event_ids_in_room"
- ]
- get_invited_rooms_for_user = RoomMemberStore.__dict__[
- "get_invited_rooms_for_user"
- ]
- get_unread_event_push_actions_by_room_for_user = (
- EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
- )
- _get_unread_counts_by_receipt_txn = (
- DataStore._get_unread_counts_by_receipt_txn.__func__
- )
- _get_unread_counts_by_pos_txn = (
- DataStore._get_unread_counts_by_pos_txn.__func__
- )
- get_recent_event_ids_for_room = (
- StreamStore.__dict__["get_recent_event_ids_for_room"]
- )
- _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
- has_room_changed_since = DataStore.has_room_changed_since.__func__
-
- get_unread_push_actions_for_user_in_range_for_http = (
- DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
- )
- get_unread_push_actions_for_user_in_range_for_email = (
- DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
- )
- get_push_action_users_in_range = (
- DataStore.get_push_action_users_in_range.__func__
- )
- get_event = DataStore.get_event.__func__
- get_events = DataStore.get_events.__func__
- get_rooms_for_user_where_membership_is = (
- DataStore.get_rooms_for_user_where_membership_is.__func__
- )
- get_membership_changes_for_user = (
- DataStore.get_membership_changes_for_user.__func__
- )
- get_room_events_max_id = DataStore.get_room_events_max_id.__func__
- get_room_events_stream_for_room = (
- DataStore.get_room_events_stream_for_room.__func__
- )
- get_events_around = DataStore.get_events_around.__func__
- get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
- get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
- _get_joined_users_from_context = (
- RoomMemberStore.__dict__["_get_joined_users_from_context"]
- )
-
- get_joined_hosts = DataStore.get_joined_hosts.__func__
- _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"]
-
- get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
- get_room_events_stream_for_rooms = (
- DataStore.get_room_events_stream_for_rooms.__func__
- )
- is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
- get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
- _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
- _get_events = DataStore._get_events.__func__
- _get_events_from_cache = DataStore._get_events_from_cache.__func__
-
- _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
- _enqueue_events = DataStore._enqueue_events.__func__
- _do_fetch = DataStore._do_fetch.__func__
- _fetch_event_rows = DataStore._fetch_event_rows.__func__
- _get_event_from_row = DataStore._get_event_from_row.__func__
- _get_rooms_for_user_where_membership_is_txn = (
- DataStore._get_rooms_for_user_where_membership_is_txn.__func__
- )
- _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
- get_backfill_events = DataStore.get_backfill_events.__func__
- _get_backfill_events = DataStore._get_backfill_events.__func__
- get_missing_events = DataStore.get_missing_events.__func__
- _get_missing_events = DataStore._get_missing_events.__func__
-
- get_auth_chain = DataStore.get_auth_chain.__func__
- get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
- _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
- get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
- get_forward_extremeties_for_room = (
- DataStore.get_forward_extremeties_for_room.__func__
- )
- _get_forward_extremeties_for_room = (
- EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
- )
-
- get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
- get_federation_out_pos = DataStore.get_federation_out_pos.__func__
- update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
|