summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-15 16:43:59 +0100
committerGitHub <noreply@github.com>2020-05-15 16:43:59 +0100
commit1f36ff69e8c7b2853e906598898878742d39c42b (patch)
treeb0bfc248df8b260e6b569eb0538d54661ec8f262 /synapse/replication
parentAdd type hints to event_auth code. (#7505) (diff)
downloadsynapse-1f36ff69e8c7b2853e906598898878742d39c42b.tar.xz
Move event stream handling out of slave store. (#7491)
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master.

We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/events.py89
-rw-r--r--synapse/replication/slave/storage/push_rule.py8
2 files changed, 0 insertions, 97 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b313720a4b..1a1a50a24f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -15,11 +15,6 @@
 # limitations under the License.
 import logging
 
-from synapse.api.constants import EventTypes
-from synapse.replication.tcp.streams.events import (
-    EventsStreamCurrentStateRow,
-    EventsStreamEventRow,
-)
 from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
 from synapse.storage.data_stores.main.event_push_actions import (
     EventPushActionsWorkerStore,
@@ -35,7 +30,6 @@ from synapse.storage.database import Database
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 
 logger = logging.getLogger(__name__)
 
@@ -62,11 +56,6 @@ class SlavedEventStore(
     BaseSlavedStore,
 ):
     def __init__(self, database: Database, db_conn, hs):
-        self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
-        self._backfill_id_gen = SlavedIdTracker(
-            db_conn, "events", "stream_ordering", step=-1
-        )
-
         super(SlavedEventStore, self).__init__(database, db_conn, hs)
 
         events_max = self._stream_id_gen.get_current_token()
@@ -92,81 +81,3 @@ class SlavedEventStore(
 
     def get_room_min_stream_ordering(self):
         return self._backfill_id_gen.get_current_token()
-
-    def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "events":
-            self._stream_id_gen.advance(token)
-            for row in rows:
-                self._process_event_stream_row(token, row)
-        elif stream_name == "backfill":
-            self._backfill_id_gen.advance(-token)
-            for row in rows:
-                self.invalidate_caches_for_event(
-                    -token,
-                    row.event_id,
-                    row.room_id,
-                    row.type,
-                    row.state_key,
-                    row.redacts,
-                    row.relates_to,
-                    backfilled=True,
-                )
-        return super().process_replication_rows(stream_name, instance_name, token, rows)
-
-    def _process_event_stream_row(self, token, row):
-        data = row.data
-
-        if row.type == EventsStreamEventRow.TypeId:
-            self.invalidate_caches_for_event(
-                token,
-                data.event_id,
-                data.room_id,
-                data.type,
-                data.state_key,
-                data.redacts,
-                data.relates_to,
-                backfilled=False,
-            )
-        elif row.type == EventsStreamCurrentStateRow.TypeId:
-            self._curr_state_delta_stream_cache.entity_has_changed(
-                row.data.room_id, token
-            )
-
-            if data.type == EventTypes.Member:
-                self.get_rooms_for_user_with_stream_ordering.invalidate(
-                    (data.state_key,)
-                )
-        else:
-            raise Exception("Unknown events stream row type %s" % (row.type,))
-
-    def invalidate_caches_for_event(
-        self,
-        stream_ordering,
-        event_id,
-        room_id,
-        etype,
-        state_key,
-        redacts,
-        relates_to,
-        backfilled,
-    ):
-        self._invalidate_get_event_cache(event_id)
-
-        self.get_latest_event_ids_in_room.invalidate((room_id,))
-
-        self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
-
-        if not backfilled:
-            self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
-
-        if redacts:
-            self._invalidate_get_event_cache(redacts)
-
-        if etype == EventTypes.Member:
-            self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
-            self.get_invited_rooms_for_local_user.invalidate((state_key,))
-
-        if relates_to:
-            self.get_relations_for_event.invalidate_many((relates_to,))
-            self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
-            self.get_applicable_edit.invalidate((relates_to,))
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 5d5816d7eb..6adb19463a 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -15,19 +15,11 @@
 # limitations under the License.
 
 from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
-from synapse.storage.database import Database
 
-from ._slaved_id_tracker import SlavedIdTracker
 from .events import SlavedEventStore
 
 
 class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
-    def __init__(self, database: Database, db_conn, hs):
-        self._push_rules_stream_id_gen = SlavedIdTracker(
-            db_conn, "push_rules_stream", "stream_id"
-        )
-        super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)
-
     def get_push_rules_stream_token(self):
         return (
             self._push_rules_stream_id_gen.get_current_token(),