summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-21 10:51:27 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-21 10:51:27 +0000
commit65981bcb27ee4317e2c3621ce77a2ea377a321e1 (patch)
tree2bfe9baebed9d4bcfad450ff77557655c17c6eca /synapse/storage/databases/main
parentMerge branch 'release-v1.74' into matrix-org-hotfixes (diff)
parentMitigate jump to date slowness by adding 30s timeout (diff)
downloadsynapse-65981bcb27ee4317e2c3621ce77a2ea377a321e1.tar.xz
Merge branch 'rei/jumptodate_statement_limit' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/event_push_actions.py9
-rw-r--r--synapse/storage/databases/main/events_worker.py119
-rw-r--r--synapse/storage/databases/main/state.py52
3 files changed, 149 insertions, 31 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py

index 7ebe34f773..3a0c370fde 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -275,15 +275,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ) self.db_pool.updates.register_background_index_update( - "event_push_summary_unique_index", - index_name="event_push_summary_unique_index", - table="event_push_summary", - columns=["user_id", "room_id"], - unique=True, - replaces_index="event_push_summary_user_rm", - ) - - self.db_pool.updates.register_background_index_update( "event_push_summary_unique_index2", index_name="event_push_summary_unique_index2", table="event_push_summary", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..f80b494edb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -59,8 +59,9 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.replication.tcp.streams import BackfillStream +from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream from synapse.replication.tcp.streams.events import EventsStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -70,6 +71,7 @@ from synapse.storage.database import ( from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, AbstractStreamIdTracker, MultiWriterIdGenerator, StreamIdGenerator, @@ -292,6 +294,93 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) + self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator + + if isinstance(database.engine, PostgresEngine): + self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="un_partial_stated_event_stream", + instance_name=hs.get_instance_name(), + tables=[ + ("un_partial_stated_event_stream", "instance_name", "stream_id") + ], + sequence_name="un_partial_stated_event_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) + else: + self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( + db_conn, "un_partial_stated_event_stream", "stream_id" + ) + + def get_un_partial_stated_events_token(self) -> int: + # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple + # writers because workers that don't write often will hold all + # readers up. + return self._un_partial_stated_events_stream_id_gen.get_current_token() + + async def get_un_partial_stated_events_from_stream( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + """Get updates for the un-partial-stated events replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_un_partial_stated_events_from_stream_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + sql = """ + SELECT stream_id, event_id, rejection_status_changed + FROM un_partial_stated_event_stream + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, instance_name, limit)) + updates = [ + ( + row[0], + ( + row[1], + bool(row[2]), + ), + ) + for row in txn + ] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db_pool.runInteraction( + "get_un_partial_stated_events_from_stream", + get_un_partial_stated_events_from_stream_txn, + ) + def process_replication_rows( self, stream_name: str, @@ -303,6 +392,16 @@ class EventsWorkerStore(SQLBaseStore): self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) + elif stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + + self.is_partial_state_event.invalidate((row.event_id,)) + + if row.rejection_status_changed: + # If the partial-stated event became rejected or unrejected + # when it wasn't before, we need to invalidate this cache. + self._invalidate_local_get_event_cache(row.event_id) super().process_replication_rows(stream_name, instance_name, token, rows) @@ -2177,6 +2276,10 @@ class EventsWorkerStore(SQLBaseStore): """ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: + if isinstance(self.database_engine, PostgresEngine): + # Temporary: make sure these queries can't last more than 30s + txn.execute("SET LOCAL statement_timeout = 30000") + txn.execute( sql_template, (room_id, timestamp), @@ -2292,6 +2395,9 @@ class EventsWorkerStore(SQLBaseStore): This can happen, for example, when resyncing state during a faster join. + It is the caller's responsibility to ensure that other workers are + sent a notification so that they call `_invalidate_local_get_event_cache()`. + Args: txn: event_id: ID of event to update @@ -2330,14 +2436,3 @@ class EventsWorkerStore(SQLBaseStore): ) self.invalidate_get_event_cache_after_txn(txn, event_id) - - # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just - # call '_send_invalidation_to_replication', but we actually need the other - # end to call _invalidate_local_get_event_cache() rather than (just) - # _get_event_cache.invalidate(). - # - # One solution might be to (somehow) get the workers to call - # _invalidate_caches_for_event() (though that will invalidate more than - # strictly necessary). - # - # https://github.com/matrix-org/synapse/issues/12994 diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f32cbb2dec 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -14,7 +14,7 @@ # limitations under the License. import collections.abc import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple import attr @@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.opentracing import trace +from synapse.replication.tcp.streams import UnPartialStatedEventStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -80,6 +82,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self._instance_name: str = hs.get_instance_name() + + def process_replication_rows( + self, + stream_name: str, + instance_name: str, + token: int, + rows: Iterable[Any], + ) -> None: + if stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + self._get_state_group_for_event.invalidate((row.event_id,)) + + super().process_replication_rows(stream_name, instance_name, token, rows) async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room @@ -404,18 +421,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): context: EventContext, ) -> None: """Update the state group for a partial state event""" - await self.db_pool.runInteraction( - "update_state_for_partial_state_event", - self._update_state_for_partial_state_event_txn, - event, - context, - ) + async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id: + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + un_partial_state_event_stream_id, + ) def _update_state_for_partial_state_event_txn( self, txn: LoggingTransaction, event: EventBase, context: EventContext, + un_partial_state_event_stream_id: int, ) -> None: # we shouldn't have any outliers here assert not event.internal_metadata.is_outlier() @@ -436,7 +456,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # the event may now be rejected where it was not before, or vice versa, # in which case we need to update the rejected flags. - if bool(context.rejected) != (event.rejected_reason is not None): + rejection_status_changed = bool(context.rejected) != ( + event.rejected_reason is not None + ) + if rejection_status_changed: self.mark_event_rejected_txn(txn, event.event_id, context.rejected) self.db_pool.simple_delete_one_txn( @@ -445,8 +468,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): keyvalues={"event_id": event.event_id}, ) - # TODO(faster_joins): need to do something about workers here - # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, @@ -454,6 +475,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): state_group, ) + self.db_pool.simple_insert_txn( + txn, + "un_partial_stated_event_stream", + { + "stream_id": un_partial_state_event_stream_id, + "instance_name": self._instance_name, + "event_id": event.event_id, + "rejection_status_changed": rejection_status_changed, + }, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):