diff options
Diffstat (limited to 'synapse')
7 files changed, 203 insertions, 10 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 66aca2f864..31df7f55cc 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -610,6 +610,8 @@ class FederationEventHandler: self._state_storage_controller.notify_event_un_partial_stated( event.event_id ) + # Notify that there's a new row in the un_partial_stated_events stream. + self._notifier.notify_replication() @trace async def backfill( diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 8575666d9c..110f10aab9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import ( ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream -from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream +from synapse.replication.tcp.streams.partial_state import ( + UnPartialStatedEventStream, + UnPartialStatedRoomStream, +) STREAMS_MAP = { stream.NAME: stream @@ -63,6 +66,7 @@ STREAMS_MAP = { AccountDataStream, UserSignatureStream, UnPartialStatedRoomStream, + UnPartialStatedEventStream, ) } @@ -83,4 +87,5 @@ __all__ = [ "AccountDataStream", "UserSignatureStream", "UnPartialStatedRoomStream", + "UnPartialStatedEventStream", ] diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index 18f087ffa2..b5a2ae74b6 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream): current_token_without_instance(store.get_un_partial_stated_rooms_token), store.get_un_partial_stated_rooms_from_stream, ) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnPartialStatedEventStreamRow: + # ID of the event that has been un-partial-stated. + event_id: str + + # True iff the rejection status of the event changed as a result of being + # un-partial-stated. + rejection_status_changed: bool + + +class UnPartialStatedEventStream(Stream): + """ + Stream to notify about events becoming un-partial-stated. + """ + + NAME = "un_partial_stated_event" + ROW_TYPE = UnPartialStatedEventStreamRow + + def __init__(self, hs: "HomeServer"): + store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + # TODO(faster_joins, multiple writers): we need to account for instance names + current_token_without_instance(store.get_un_partial_stated_events_token), + store.get_un_partial_stated_events_from_stream, + ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 318fd7dc71..e19b16064b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -70,6 +70,7 @@ from synapse.storage.database import ( from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, AbstractStreamIdTracker, MultiWriterIdGenerator, StreamIdGenerator, @@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) + self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator + + if isinstance(database.engine, PostgresEngine): + self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="un_partial_stated_event_stream", + instance_name=hs.get_instance_name(), + tables=[ + ("un_partial_stated_event_stream", "instance_name", "stream_id") + ], + sequence_name="un_partial_stated_event_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) + else: + self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( + db_conn, "un_partial_stated_event_stream", "stream_id" + ) + + def get_un_partial_stated_events_token(self) -> int: + # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple + # writers because workers that don't write often will hold all + # readers up. + return self._un_partial_stated_events_stream_id_gen.get_current_token() + + async def get_un_partial_stated_events_from_stream( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + """Get updates for the un-partial-stated events replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_un_partial_stated_events_from_stream_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + sql = """ + SELECT stream_id, event_id, rejection_status_changed + FROM un_partial_stated_event_stream + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, instance_name, limit)) + updates = [ + ( + row[0], + ( + row[1], + bool(row[2]), + ), + ) + for row in txn + ] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db_pool.runInteraction( + "get_un_partial_stated_events_from_stream", + get_un_partial_stated_events_from_stream_txn, + ) + def process_replication_rows( self, stream_name: str, diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c801a93b5b..f855903c39 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self._instance_name: str = hs.get_instance_name() async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room @@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): context: EventContext, ) -> None: """Update the state group for a partial state event""" - await self.db_pool.runInteraction( - "update_state_for_partial_state_event", - self._update_state_for_partial_state_event_txn, - event, - context, - ) + async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id: + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + un_partial_state_event_stream_id, + ) def _update_state_for_partial_state_event_txn( self, txn: LoggingTransaction, event: EventBase, context: EventContext, + un_partial_state_event_stream_id: int, ) -> None: # we shouldn't have any outliers here assert not event.internal_metadata.is_outlier() @@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # the event may now be rejected where it was not before, or vice versa, # in which case we need to update the rejected flags. - if bool(context.rejected) != (event.rejected_reason is not None): + rejection_status_changed = bool(context.rejected) != ( + event.rejected_reason is not None + ) + if rejection_status_changed: self.mark_event_rejected_txn(txn, event.event_id, context.rejected) self.db_pool.simple_delete_one_txn( @@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): keyvalues={"event_id": event.event_id}, ) - # TODO(faster_joins): need to do something about workers here - # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, @@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): state_group, ) + self.db_pool.simple_insert_txn( + txn, + "un_partial_stated_event_stream", + { + "stream_id": un_partial_state_event_stream_id, + "instance_name": self._instance_name, + "event_id": event.event_id, + "rejection_status_changed": rejection_status_changed, + }, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): diff --git a/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql new file mode 100644 index 0000000000..0e571f78c3 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql @@ -0,0 +1,34 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Stream for notifying that an event has become un-partial-stated. +CREATE TABLE un_partial_stated_event_stream( + -- Position in the stream + stream_id BIGINT PRIMARY KEY NOT NULL, + + -- Which instance wrote this entry. + instance_name TEXT NOT NULL, + + -- Which event has been un-partial-stated. + event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE, + + -- true iff the `rejected` status of the event changed when it became + -- un-partial-stated. + rejection_status_changed BOOLEAN NOT NULL +); + +-- We want an index here because of the foreign key constraint: +-- upon deleting an event, the database needs to be able to check here. +CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id); diff --git a/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres new file mode 100644 index 0000000000..1ec24702f3 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence; + +SELECT setval('un_partial_stated_event_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream +)); |