diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index de0b26f437..d2495ff994 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -17,13 +17,13 @@ import logging
from synapse.api.constants import EventTypes
from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
+from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
-from synapse.storage.signatures import SignatureStore
+from synapse.storage.signatures import SignatureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -40,8 +40,12 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
- EventsWorkerStore, StateGroupWorkerStore,
+class SlavedEventStore(EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ EventsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):
@@ -72,9 +76,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
- get_latest_event_ids_in_room = EventFederationStore.__dict__[
- "get_latest_event_ids_in_room"
- ]
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
@@ -100,48 +101,13 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
_get_events_around_txn = DataStore._get_events_around_txn.__func__
- get_backfill_events = DataStore.get_backfill_events.__func__
- _get_backfill_events = DataStore._get_backfill_events.__func__
- get_missing_events = DataStore.get_missing_events.__func__
- _get_missing_events = DataStore._get_missing_events.__func__
-
- get_auth_chain = DataStore.get_auth_chain.__func__
- get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
- _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
- get_forward_extremeties_for_room = (
- DataStore.get_forward_extremeties_for_room.__func__
- )
- _get_forward_extremeties_for_room = (
- EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
- )
-
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
- get_latest_event_ids_and_hashes_in_room = (
- DataStore.get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_latest_event_ids_and_hashes_in_room = (
- DataStore._get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_event_reference_hashes_txn = (
- DataStore._get_event_reference_hashes_txn.__func__
- )
- add_event_hashes = (
- DataStore.add_event_hashes.__func__
- )
- get_event_reference_hashes = (
- SignatureStore.__dict__["get_event_reference_hashes"]
- )
- get_event_reference_hash = (
- SignatureStore.__dict__["get_event_reference_hash"]
- )
-
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 55a05c59d5..00ee82d300 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -15,7 +15,10 @@
from twisted.internet import defer
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+
from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
@@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
-class EventFederationStore(SQLBaseStore):
- """ Responsible for storing and serving up the various graphs associated
- with an event. Including the main event graph and the auth chains for an
- event.
-
- Also has methods for getting the front (latest) and back (oldest) edges
- of the event graphs. These are used to generate the parents for new events
- and backfilling from another server respectively.
- """
-
- EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
-
- def __init__(self, db_conn, hs):
- super(EventFederationStore, self).__init__(db_conn, hs)
-
- self.register_background_update_handler(
- self.EVENT_AUTH_STATE_ONLY,
- self._background_delete_non_state_event_auth,
- )
-
- hs.get_clock().looping_call(
- self._delete_old_forward_extrem_cache, 60 * 60 * 1000
- )
-
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
+ SQLBaseStore):
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.
@@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore):
return int(min_depth) if min_depth is not None else None
- def _update_min_depth_for_room_txn(self, txn, room_id, depth):
- min_depth = self._get_min_depth_interaction(txn, room_id)
-
- if min_depth and depth >= min_depth:
- return
-
- self._simple_upsert_txn(
- txn,
- table="room_depth",
- keyvalues={
- "room_id": room_id,
- },
- values={
- "min_depth": depth,
- },
- )
-
- def _handle_mult_prev_events(self, txn, events):
- """
- For the given event, update the event edges table and forward and
- backward extremities tables.
- """
- self._simple_insert_many_txn(
- txn,
- table="event_edges",
- values=[
- {
- "event_id": ev.event_id,
- "prev_event_id": e_id,
- "room_id": ev.room_id,
- "is_state": False,
- }
- for ev in events
- for e_id, _ in ev.prev_events
- ],
- )
-
- self._update_backward_extremeties(txn, events)
-
- def _update_backward_extremeties(self, txn, events):
- """Updates the event_backward_extremities tables based on the new/updated
- events being persisted.
-
- This is called for new events *and* for events that were outliers, but
- are now being persisted as non-outliers.
-
- Forward extremities are handled when we first start persisting the events.
- """
- events_by_room = {}
- for ev in events:
- events_by_room.setdefault(ev.room_id, []).append(ev)
-
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- " AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
- " )"
- )
-
- txn.executemany(query, [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ])
-
- query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- )
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id) for ev in events
- if not ev.internal_metadata.is_outlier()
- ]
- )
-
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
@@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore):
get_forward_extremeties_for_room_txn
)
- def _delete_old_forward_extrem_cache(self):
- def _delete_old_forward_extrem_cache_txn(txn):
- # Delete entries older than a month, while making sure we don't delete
- # the only entries for a room.
- sql = ("""
- DELETE FROM stream_ordering_to_exterm
- WHERE
- room_id IN (
- SELECT room_id
- FROM stream_ordering_to_exterm
- WHERE stream_ordering > ?
- ) AND stream_ordering < ?
- """)
- txn.execute(
- sql,
- (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
- )
- return self.runInteraction(
- "_delete_old_forward_extrem_cache",
- _delete_old_forward_extrem_cache_txn
- )
-
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
@@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore):
return event_results
+
+class EventFederationStore(EventFederationWorkerStore):
+ """ Responsible for storing and serving up the various graphs associated
+ with an event. Including the main event graph and the auth chains for an
+ event.
+
+ Also has methods for getting the front (latest) and back (oldest) edges
+ of the event graphs. These are used to generate the parents for new events
+ and backfilling from another server respectively.
+ """
+
+ EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
+ def __init__(self, db_conn, hs):
+ super(EventFederationStore, self).__init__(db_conn, hs)
+
+ self.register_background_update_handler(
+ self.EVENT_AUTH_STATE_ONLY,
+ self._background_delete_non_state_event_auth,
+ )
+
+ hs.get_clock().looping_call(
+ self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+ )
+
+ def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+ min_depth = self._get_min_depth_interaction(txn, room_id)
+
+ if min_depth and depth >= min_depth:
+ return
+
+ self._simple_upsert_txn(
+ txn,
+ table="room_depth",
+ keyvalues={
+ "room_id": room_id,
+ },
+ values={
+ "min_depth": depth,
+ },
+ )
+
+ def _handle_mult_prev_events(self, txn, events):
+ """
+ For the given event, update the event edges table and forward and
+ backward extremities tables.
+ """
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
+ "event_id": ev.event_id,
+ "prev_event_id": e_id,
+ "room_id": ev.room_id,
+ "is_state": False,
+ }
+ for ev in events
+ for e_id, _ in ev.prev_events
+ ],
+ )
+
+ self._update_backward_extremeties(txn, events)
+
+ def _update_backward_extremeties(self, txn, events):
+ """Updates the event_backward_extremities tables based on the new/updated
+ events being persisted.
+
+ This is called for new events *and* for events that were outliers, but
+ are now being persisted as non-outliers.
+
+ Forward extremities are handled when we first start persisting the events.
+ """
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
+
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
+
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ])
+
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ )
+
+ def _delete_old_forward_extrem_cache(self):
+ def _delete_old_forward_extrem_cache_txn(txn):
+ # Delete entries older than a month, while making sure we don't delete
+ # the only entries for a room.
+ sql = ("""
+ DELETE FROM stream_ordering_to_exterm
+ WHERE
+ room_id IN (
+ SELECT room_id
+ FROM stream_ordering_to_exterm
+ WHERE stream_ordering > ?
+ ) AND stream_ordering < ?
+ """)
+ txn.execute(
+ sql,
+ (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+ )
+ return self.runInteraction(
+ "_delete_old_forward_extrem_cache",
+ _delete_old_forward_extrem_cache_txn
+ )
+
def clean_room_for_join(self, room_id):
return self.runInteraction(
"clean_room_for_join",
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 67d5d9969a..e6eeb1b641 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -22,9 +22,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
-class SignatureStore(SQLBaseStore):
- """Persistence for event signatures and hashes"""
-
+class SignatureWorkerStore(SQLBaseStore):
@cached()
def get_event_reference_hash(self, event_id):
return self._get_event_reference_hashes_txn(event_id)
@@ -74,6 +72,10 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return {k: v for k, v in txn}
+
+class SignatureStore(SignatureWorkerStore):
+ """Persistence for event signatures and hashes"""
+
def _store_event_reference_hashes_txn(self, txn, events):
"""Store a hash for a PDU
Args:
|