diff options
author | Erik Johnston <erikj@element.io> | 2024-06-19 10:33:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-19 10:33:53 +0100 |
commit | bdf82efea505c488953b46eb681b5a63c4e9655d (patch) | |
tree | 676904f520ff38397529441a08c967b58c24d95b /synapse/storage/databases/main | |
parent | Require the 'from' parameter for `/notifications` be an integer (#17283) (diff) | |
download | synapse-bdf82efea505c488953b46eb681b5a63c4e9655d.tar.xz |
Handle large chain calc better (#17291)
We calculate the auth chain links outside of the main persist event transaction to ensure that we do not block other event sending during the calculation.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/events.py | 249 |
1 files changed, 179 insertions, 70 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 66428e6c8e..c6df13c064 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -34,7 +34,6 @@ from typing import ( Optional, Set, Tuple, - Union, cast, ) @@ -100,6 +99,23 @@ class DeltaState: return not self.to_delete and not self.to_insert and not self.no_longer_in_room +@attr.s(slots=True, auto_attribs=True) +class NewEventChainLinks: + """Information about new auth chain links that need to be added to the DB. + + Attributes: + chain_id, sequence_number: the IDs corresponding to the event being + inserted, and the starting point of the links + links: Lists the links that need to be added, 2-tuple of the chain + ID/sequence number of the end point of the link. + """ + + chain_id: int + sequence_number: int + + links: List[Tuple[int, int]] = attr.Factory(list) + + class PersistEventsStore: """Contains all the functions for writing events to the database. @@ -148,6 +164,7 @@ class PersistEventsStore: *, state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], + new_event_links: Dict[str, NewEventChainLinks], use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, ) -> None: @@ -217,6 +234,7 @@ class PersistEventsStore: inhibit_local_membership_updates=inhibit_local_membership_updates, state_delta_for_room=state_delta_for_room, new_forward_extremities=new_forward_extremities, + new_event_links=new_event_links, ) persist_event_counter.inc(len(events_and_contexts)) @@ -243,6 +261,87 @@ class PersistEventsStore: (room_id,), frozenset(new_forward_extremities) ) + async def calculate_chain_cover_index_for_events( + self, room_id: str, events: Collection[EventBase] + ) -> Dict[str, NewEventChainLinks]: + # Filter to state events, and ensure there are no duplicates. + state_events = [] + seen_events = set() + for event in events: + if not event.is_state() or event.event_id in seen_events: + continue + + state_events.append(event) + seen_events.add(event.event_id) + + if not state_events: + return {} + + return await self.db_pool.runInteraction( + "_calculate_chain_cover_index_for_events", + self.calculate_chain_cover_index_for_events_txn, + room_id, + state_events, + ) + + def calculate_chain_cover_index_for_events_txn( + self, txn: LoggingTransaction, room_id: str, state_events: Collection[EventBase] + ) -> Dict[str, NewEventChainLinks]: + # We now calculate chain ID/sequence numbers for any state events we're + # persisting. We ignore out of band memberships as we're not in the room + # and won't have their auth chain (we'll fix it up later if we join the + # room). + # + # See: docs/auth_chain_difference_algorithm.md + + # We ignore legacy rooms that we aren't filling the chain cover index + # for. + row = self.db_pool.simple_select_one_txn( + txn, + table="rooms", + keyvalues={"room_id": room_id}, + retcols=("room_id", "has_auth_chain_index"), + allow_none=True, + ) + if row is None: + return {} + + # Filter out already persisted events. + rows = self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=[e.event_id for e in state_events], + keyvalues={}, + retcols=("event_id",), + ) + already_persisted_events = {event_id for event_id, in rows} + state_events = [ + event + for event in state_events + if event.event_id in already_persisted_events + ] + + if not state_events: + return {} + + # We need to know the type/state_key and auth events of the events we're + # calculating chain IDs for. We don't rely on having the full Event + # instances as we'll potentially be pulling more events from the DB and + # we don't need the overhead of fetching/parsing the full event JSON. + event_to_types = {e.event_id: (e.type, e.state_key) for e in state_events} + event_to_auth_chain = {e.event_id: e.auth_event_ids() for e in state_events} + event_to_room_id = {e.event_id: e.room_id for e in state_events} + + return self._calculate_chain_cover_index( + txn, + self.db_pool, + self.store.event_chain_id_gen, + event_to_room_id, + event_to_types, + event_to_auth_chain, + ) + async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]: """Filter the supplied list of event_ids to get those which are prev_events of existing (non-outlier/rejected) events. @@ -358,6 +457,7 @@ class PersistEventsStore: inhibit_local_membership_updates: bool, state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], + new_event_links: Dict[str, NewEventChainLinks], ) -> None: """Insert some number of room events into the necessary database tables. @@ -466,7 +566,9 @@ class PersistEventsStore: # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) - self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts]) + self._persist_event_auth_chain_txn( + txn, [e for e, _ in events_and_contexts], new_event_links + ) # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. @@ -496,6 +598,7 @@ class PersistEventsStore: self, txn: LoggingTransaction, events: List[EventBase], + new_event_links: Dict[str, NewEventChainLinks], ) -> None: # We only care about state events, so this if there are no state events. if not any(e.is_state() for e in events): @@ -519,62 +622,40 @@ class PersistEventsStore: ], ) - # We now calculate chain ID/sequence numbers for any state events we're - # persisting. We ignore out of band memberships as we're not in the room - # and won't have their auth chain (we'll fix it up later if we join the - # room). - # - # See: docs/auth_chain_difference_algorithm.md + if new_event_links: + self._persist_chain_cover_index(txn, self.db_pool, new_event_links) - # We ignore legacy rooms that we aren't filling the chain cover index - # for. - rows = cast( - List[Tuple[str, Optional[Union[int, bool]]]], - self.db_pool.simple_select_many_txn( - txn, - table="rooms", - column="room_id", - iterable={event.room_id for event in events if event.is_state()}, - keyvalues={}, - retcols=("room_id", "has_auth_chain_index"), - ), - ) - rooms_using_chain_index = { - room_id for room_id, has_auth_chain_index in rows if has_auth_chain_index - } - - state_events = { - event.event_id: event - for event in events - if event.is_state() and event.room_id in rooms_using_chain_index - } - - if not state_events: - return + @classmethod + def _add_chain_cover_index( + cls, + txn: LoggingTransaction, + db_pool: DatabasePool, + event_chain_id_gen: SequenceGenerator, + event_to_room_id: Dict[str, str], + event_to_types: Dict[str, Tuple[str, str]], + event_to_auth_chain: Dict[str, StrCollection], + ) -> None: + """Calculate and persist the chain cover index for the given events. - # We need to know the type/state_key and auth events of the events we're - # calculating chain IDs for. We don't rely on having the full Event - # instances as we'll potentially be pulling more events from the DB and - # we don't need the overhead of fetching/parsing the full event JSON. - event_to_types = { - e.event_id: (e.type, e.state_key) for e in state_events.values() - } - event_to_auth_chain = { - e.event_id: e.auth_event_ids() for e in state_events.values() - } - event_to_room_id = {e.event_id: e.room_id for e in state_events.values()} + Args: + event_to_room_id: Event ID to the room ID of the event + event_to_types: Event ID to type and state_key of the event + event_to_auth_chain: Event ID to list of auth event IDs of the + event (events with no auth events can be excluded). + """ - self._add_chain_cover_index( + new_event_links = cls._calculate_chain_cover_index( txn, - self.db_pool, - self.store.event_chain_id_gen, + db_pool, + event_chain_id_gen, event_to_room_id, event_to_types, event_to_auth_chain, ) + cls._persist_chain_cover_index(txn, db_pool, new_event_links) @classmethod - def _add_chain_cover_index( + def _calculate_chain_cover_index( cls, txn: LoggingTransaction, db_pool: DatabasePool, @@ -582,7 +663,7 @@ class PersistEventsStore: event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], event_to_auth_chain: Dict[str, StrCollection], - ) -> None: + ) -> Dict[str, NewEventChainLinks]: """Calculate the chain cover index for the given events. Args: @@ -590,6 +671,10 @@ class PersistEventsStore: event_to_types: Event ID to type and state_key of the event event_to_auth_chain: Event ID to list of auth event IDs of the event (events with no auth events can be excluded). + + Returns: + A mapping with any new auth chain links we need to add, keyed by + event ID. """ # Map from event ID to chain ID/sequence number. @@ -708,11 +793,11 @@ class PersistEventsStore: room_id = event_to_room_id.get(event_id) if room_id: e_type, state_key = event_to_types[event_id] - db_pool.simple_insert_txn( + db_pool.simple_upsert_txn( txn, table="event_auth_chain_to_calculate", + keyvalues={"event_id": event_id}, values={ - "event_id": event_id, "room_id": room_id, "type": e_type, "state_key": state_key, @@ -724,7 +809,7 @@ class PersistEventsStore: break if not events_to_calc_chain_id_for: - return + return {} # Allocate chain ID/sequence numbers to each new event. new_chain_tuples = cls._allocate_chain_ids( @@ -739,23 +824,10 @@ class PersistEventsStore: ) chain_map.update(new_chain_tuples) - db_pool.simple_insert_many_txn( - txn, - table="event_auth_chains", - keys=("event_id", "chain_id", "sequence_number"), - values=[ - (event_id, c_id, seq) - for event_id, (c_id, seq) in new_chain_tuples.items() - ], - ) - - db_pool.simple_delete_many_txn( - txn, - table="event_auth_chain_to_calculate", - keyvalues={}, - column="event_id", - values=new_chain_tuples, - ) + to_return = { + event_id: NewEventChainLinks(chain_id, sequence_number) + for event_id, (chain_id, sequence_number) in new_chain_tuples.items() + } # Now we need to calculate any new links between chains caused by # the new events. @@ -825,10 +897,38 @@ class PersistEventsStore: auth_chain_id, auth_sequence_number = chain_map[auth_id] # Step 2a, add link between the event and auth event + to_return[event_id].links.append((auth_chain_id, auth_sequence_number)) chain_links.add_link( (chain_id, sequence_number), (auth_chain_id, auth_sequence_number) ) + return to_return + + @classmethod + def _persist_chain_cover_index( + cls, + txn: LoggingTransaction, + db_pool: DatabasePool, + new_event_links: Dict[str, NewEventChainLinks], + ) -> None: + db_pool.simple_insert_many_txn( + txn, + table="event_auth_chains", + keys=("event_id", "chain_id", "sequence_number"), + values=[ + (event_id, new_links.chain_id, new_links.sequence_number) + for event_id, new_links in new_event_links.items() + ], + ) + + db_pool.simple_delete_many_txn( + txn, + table="event_auth_chain_to_calculate", + keyvalues={}, + column="event_id", + values=new_event_links, + ) + db_pool.simple_insert_many_txn( txn, table="event_auth_chain_links", @@ -838,7 +938,16 @@ class PersistEventsStore: "target_chain_id", "target_sequence_number", ), - values=list(chain_links.get_additions()), + values=[ + ( + new_links.chain_id, + new_links.sequence_number, + target_chain_id, + target_sequence_number, + ) + for new_links in new_event_links.values() + for (target_chain_id, target_sequence_number) in new_links.links + ], ) @staticmethod |