diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-15 16:43:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-15 16:43:59 +0100 |
commit | 1f36ff69e8c7b2853e906598898878742d39c42b (patch) | |
tree | b0bfc248df8b260e6b569eb0538d54661ec8f262 | |
parent | Add type hints to event_auth code. (#7505) (diff) | |
download | synapse-1f36ff69e8c7b2853e906598898878742d39c42b.tar.xz |
Move event stream handling out of slave store. (#7491)
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master. We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
-rw-r--r-- | changelog.d/7491.misc | 1 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 89 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 8 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/__init__.py | 17 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/cache.py | 102 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/events_worker.py | 35 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/push_rule.py | 14 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 11 |
8 files changed, 161 insertions, 116 deletions
diff --git a/changelog.d/7491.misc b/changelog.d/7491.misc new file mode 100644 index 0000000000..50eb226db7 --- /dev/null +++ b/changelog.d/7491.misc @@ -0,0 +1 @@ +Move event stream handling out of slave store. diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b313720a4b..1a1a50a24f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -15,11 +15,6 @@ # limitations under the License. import logging -from synapse.api.constants import EventTypes -from synapse.replication.tcp.streams.events import ( - EventsStreamCurrentStateRow, - EventsStreamEventRow, -) from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore from synapse.storage.data_stores.main.event_push_actions import ( EventPushActionsWorkerStore, @@ -35,7 +30,6 @@ from synapse.storage.database import Database from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) @@ -62,11 +56,6 @@ class SlavedEventStore( BaseSlavedStore, ): def __init__(self, database: Database, db_conn, hs): - self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) - super(SlavedEventStore, self).__init__(database, db_conn, hs) events_max = self._stream_id_gen.get_current_token() @@ -92,81 +81,3 @@ class SlavedEventStore( def get_room_min_stream_ordering(self): return self._backfill_id_gen.get_current_token() - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "events": - self._stream_id_gen.advance(token) - for row in rows: - self._process_event_stream_row(token, row) - elif stream_name == "backfill": - self._backfill_id_gen.advance(-token) - for row in rows: - self.invalidate_caches_for_event( - -token, - row.event_id, - row.room_id, - row.type, - row.state_key, - row.redacts, - row.relates_to, - backfilled=True, - ) - return super().process_replication_rows(stream_name, instance_name, token, rows) - - def _process_event_stream_row(self, token, row): - data = row.data - - if row.type == EventsStreamEventRow.TypeId: - self.invalidate_caches_for_event( - token, - data.event_id, - data.room_id, - data.type, - data.state_key, - data.redacts, - data.relates_to, - backfilled=False, - ) - elif row.type == EventsStreamCurrentStateRow.TypeId: - self._curr_state_delta_stream_cache.entity_has_changed( - row.data.room_id, token - ) - - if data.type == EventTypes.Member: - self.get_rooms_for_user_with_stream_ordering.invalidate( - (data.state_key,) - ) - else: - raise Exception("Unknown events stream row type %s" % (row.type,)) - - def invalidate_caches_for_event( - self, - stream_ordering, - event_id, - room_id, - etype, - state_key, - redacts, - relates_to, - backfilled, - ): - self._invalidate_get_event_cache(event_id) - - self.get_latest_event_ids_in_room.invalidate((room_id,)) - - self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) - - if not backfilled: - self._events_stream_cache.entity_has_changed(room_id, stream_ordering) - - if redacts: - self._invalidate_get_event_cache(redacts) - - if etype == EventTypes.Member: - self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) - self.get_invited_rooms_for_local_user.invalidate((state_key,)) - - if relates_to: - self.get_relations_for_event.invalidate_many((relates_to,)) - self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) - self.get_applicable_edit.invalidate((relates_to,)) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 5d5816d7eb..6adb19463a 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -15,19 +15,11 @@ # limitations under the License. from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore -from synapse.storage.database import Database -from ._slaved_id_tracker import SlavedIdTracker from .events import SlavedEventStore class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): - def __init__(self, database: Database, db_conn, hs): - self._push_rules_stream_id_gen = SlavedIdTracker( - db_conn, "push_rules_stream", "stream_id" - ) - super(SlavedPushRuleStore, self).__init__(database, db_conn, hs) - def get_push_rules_stream_token(self): return ( self._push_rules_stream_id_gen.get_current_token(), diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 5df9dce79d..4b4763c701 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -24,7 +24,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import Database from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( - ChainedIdGenerator, IdGenerator, MultiWriterIdGenerator, StreamIdGenerator, @@ -125,19 +124,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - extra_tables=[("local_invites", "stream_id")], - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" ) @@ -164,9 +150,6 @@ class DataStore( self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") - self._push_rules_stream_id_gen = ChainedIdGenerator( - self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" - ) self._pushers_id_gen = StreamIdGenerator( db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] ) diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index 342a87a46b..eac5a4e55b 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -16,8 +16,13 @@ import itertools import logging -from typing import Any, Iterable, Optional +from typing import Any, Iterable, Optional, Tuple +from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams.events import ( + EventsStreamCurrentStateRow, + EventsStreamEventRow, +) from synapse.storage._base import SQLBaseStore from synapse.storage.database import Database from synapse.storage.engines import PostgresEngine @@ -66,7 +71,22 @@ class CacheInvalidationWorkerStore(SQLBaseStore): ) def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "caches": + if stream_name == "events": + for row in rows: + self._process_event_stream_row(token, row) + elif stream_name == "backfill": + for row in rows: + self._invalidate_caches_for_event( + -token, + row.event_id, + row.room_id, + row.type, + row.state_key, + row.redacts, + row.relates_to, + backfilled=True, + ) + elif stream_name == "caches": if self._cache_id_gen: self._cache_id_gen.advance(instance_name, token) @@ -85,6 +105,84 @@ class CacheInvalidationWorkerStore(SQLBaseStore): super().process_replication_rows(stream_name, instance_name, token, rows) + def _process_event_stream_row(self, token, row): + data = row.data + + if row.type == EventsStreamEventRow.TypeId: + self._invalidate_caches_for_event( + token, + data.event_id, + data.room_id, + data.type, + data.state_key, + data.redacts, + data.relates_to, + backfilled=False, + ) + elif row.type == EventsStreamCurrentStateRow.TypeId: + self._curr_state_delta_stream_cache.entity_has_changed( + row.data.room_id, token + ) + + if data.type == EventTypes.Member: + self.get_rooms_for_user_with_stream_ordering.invalidate( + (data.state_key,) + ) + else: + raise Exception("Unknown events stream row type %s" % (row.type,)) + + def _invalidate_caches_for_event( + self, + stream_ordering, + event_id, + room_id, + etype, + state_key, + redacts, + relates_to, + backfilled, + ): + self._invalidate_get_event_cache(event_id) + + self.get_latest_event_ids_in_room.invalidate((room_id,)) + + self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) + + if not backfilled: + self._events_stream_cache.entity_has_changed(room_id, stream_ordering) + + if redacts: + self._invalidate_get_event_cache(redacts) + + if etype == EventTypes.Member: + self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) + self.get_invited_rooms_for_local_user.invalidate((state_key,)) + + if relates_to: + self.get_relations_for_event.invalidate_many((relates_to,)) + self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) + self.get_applicable_edit.invalidate((relates_to,)) + + async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ + cache_func = getattr(self, cache_name, None) + if not cache_func: + return + + cache_func.invalidate(keys) + await self.db.runInteraction( + "invalidate_cache_and_stream", + self._send_invalidation_to_replication, + cache_func.__name__, + keys, + ) + def _invalidate_cache_and_stream(self, txn, cache_func, keys): """Invalidates the cache and adds it to the cache stream so slaves will know to invalidate their caches. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 970c31bd05..9130b74eb5 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -37,8 +37,10 @@ from synapse.events import make_event_from_dict from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks from synapse.util.iterutils import batch_iter @@ -74,6 +76,31 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) + if hs.config.worker_app is None: + # We are the process in charge of generating stream ids for events, + # so instantiate ID generators based on the database + self._stream_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + extra_tables=[("local_invites", "stream_id")], + ) + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + ) + else: + # Another process is in charge of persisting events and generating + # stream IDs: rely on the replication streams to let us know which + # IDs we can process. + self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) + self._get_event_cache = Cache( "*getEvent*", keylen=3, @@ -85,6 +112,14 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_list = [] self._event_fetch_ongoing = 0 + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == "events": + self._stream_id_gen.advance(token) + elif stream_name == "backfill": + self._backfill_id_gen.advance(-token) + + super().process_replication_rows(stream_name, instance_name, token, rows) + def get_received_ts(self, event_id): """Get received_ts (when it was persisted) for the event. diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index b3faafa0a4..ef8f40959f 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -16,19 +16,23 @@ import abc import logging +from typing import Union from canonicaljson import json from twisted.internet import defer from synapse.push.baserules import list_with_base_rules +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.storage._base import SQLBaseStore from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore +from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.pusher import PusherWorkerStore from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore from synapse.storage.database import Database from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException +from synapse.storage.util.id_generators import ChainedIdGenerator from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -64,6 +68,7 @@ class PushRulesWorkerStore( ReceiptsWorkerStore, PusherWorkerStore, RoomMemberWorkerStore, + EventsWorkerStore, SQLBaseStore, ): """This is an abstract base class where subclasses must implement @@ -77,6 +82,15 @@ class PushRulesWorkerStore( def __init__(self, database: Database, db_conn, hs): super(PushRulesWorkerStore, self).__init__(database, db_conn, hs) + if hs.config.worker.worker_app is None: + self._push_rules_stream_id_gen = ChainedIdGenerator( + self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" + ) # type: Union[ChainedIdGenerator, SlavedIdTracker] + else: + self._push_rules_stream_id_gen = SlavedIdTracker( + db_conn, "push_rules_stream", "stream_id" + ) + push_rules_prefill, push_rules_id = self.db.get_cache_dict( db_conn, "push_rules_stream", diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 86d04ea9ac..f89ce0bed2 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -166,6 +166,7 @@ class ChainedIdGenerator(object): def __init__(self, chained_generator, db_conn, table, column): self.chained_generator = chained_generator + self._table = table self._lock = threading.Lock() self._current_max = _load_current_id(db_conn, table, column) self._unfinished_ids = deque() # type: Deque[Tuple[int, int]] @@ -204,6 +205,16 @@ class ChainedIdGenerator(object): return self._current_max, self.chained_generator.get_current_token() + def advance(self, token: int): + """Stub implementation for advancing the token when receiving updates + over replication; raises an exception as this instance should be the + only source of updates. + """ + + raise Exception( + "Attempted to advance token on source for table %r", self._table + ) + class MultiWriterIdGenerator: """An ID generator that tracks a stream that can have multiple writers. |