summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py314
1 files changed, 162 insertions, 152 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py

index 1f600f1190..0f097a2927 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.opentracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], @@ -353,9 +355,9 @@ class PersistEventsStore: txn: LoggingTransaction, *, events_and_contexts: List[Tuple[EventBase, EventContext]], - inhibit_local_membership_updates: bool = False, - state_delta_for_room: Optional[Dict[str, DeltaState]] = None, - new_forward_extremities: Optional[Dict[str, Set[str]]] = None, + inhibit_local_membership_updates: bool, + state_delta_for_room: Dict[str, DeltaState], + new_forward_extremities: Dict[str, Set[str]], ) -> None: """Insert some number of room events into the necessary database tables. @@ -382,9 +384,6 @@ class PersistEventsStore: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - state_delta_for_room = state_delta_for_room or {} - new_forward_extremities = new_forward_extremities or {} - all_events_and_contexts = events_and_contexts min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering @@ -408,6 +407,31 @@ class PersistEventsStore: assert min_stream_order assert max_stream_order + # Once the txn completes, invalidate all of the relevant caches. Note that we do this + # up here because it captures all the events_and_contexts before any are removed. + for event, _ in events_and_contexts: + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) + if event.redacts: + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) + + relates_to = None + relation = relation_from_event(event) + if relation: + relates_to = relation.parent_id + + assert event.internal_metadata.stream_ordering is not None + txn.call_after( + self.store._invalidate_caches_for_event, + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.type, + getattr(event, "state_key", None), + event.redacts, + relates_to, + backfilled=False, + ) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremities, @@ -457,6 +481,7 @@ class PersistEventsStore: # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. + # NB: This function invalidates all state related caches self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) def _persist_event_auth_chain_txn( @@ -1170,17 +1195,16 @@ class PersistEventsStore: ) # Invalidate the various caches - - for member in members_changed: - txn.call_after( - self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member,), - ) - self.store._invalidate_state_caches_and_stream( txn, room_id, members_changed ) + # Check if any of the remote membership changes requires us to + # unsubscribe from their device lists. + self.store.handle_potentially_left_users_txn( + txn, {m for m in members_changed if not self.hs.is_mine_id(m)} + ) + def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None: """Update the room version in the database based off current state events. @@ -1220,9 +1244,6 @@ class PersistEventsStore: self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) - txn.call_after( - self.store.get_latest_event_ids_in_room.invalidate, (room_id,) - ) self.db_pool.simple_insert_many_txn( txn, @@ -1258,9 +1279,10 @@ class PersistEventsStore: Pick the earliest non-outlier if there is one, else the earliest one. Args: - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts: + Returns: - list[(EventBase, EventContext)]: filtered list + filtered list """ new_events_and_contexts: OrderedDict[ str, Tuple[EventBase, EventContext] @@ -1286,14 +1308,11 @@ class PersistEventsStore: """Update min_depth for each room Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + txn: db connection + events_and_contexts: events we are persisting """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: - # Remove the any existing cache entries for the event_ids - self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative @@ -1490,7 +1509,7 @@ class PersistEventsStore: event.sender, "url" in event.content and isinstance(event.content["url"], str), event.get_state_key(), - context.rejected or None, + context.rejected, ) for event, context in events_and_contexts ), @@ -1561,13 +1580,11 @@ class PersistEventsStore: """Update all the miscellaneous tables for new events Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting - all_events_and_contexts (list[(EventBase, EventContext)]): all - events that we were going to persist. This includes events - we've already persisted, etc, that wouldn't appear in - events_and_context. + txn: db connection + events_and_contexts: events we are persisting + all_events_and_contexts: all events that we were going to persist. + This includes events we've already persisted, etc, that wouldn't + appear in events_and_context. inhibit_local_membership_updates: Stop the local_current_membership from being updated by these events. This should be set to True for backfilled events because backfilled events in the past do @@ -1594,7 +1611,7 @@ class PersistEventsStore: ) # Remove from relations table. - self._handle_redact_relations(txn, event.redacts) + self._handle_redact_relations(txn, event.room_id, event.redacts) # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. @@ -1695,16 +1712,7 @@ class PersistEventsStore: txn.async_call_after(prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: - """Invalidate the caches for the redacted event. - - Note that these caches are also cleared as part of event replication in - _invalidate_caches_for_event. - """ assert event.redacts is not None - self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) - txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) - txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) - self.db_pool.simple_upsert_txn( txn, table="redactions", @@ -1805,34 +1813,6 @@ class PersistEventsStore: for event in events: assert event.internal_metadata.stream_ordering is not None - txn.call_after( - self.store._membership_stream_cache.entity_has_changed, - event.state_key, - event.internal_metadata.stream_ordering, - ) - txn.call_after( - self.store.get_invited_rooms_for_local_user.invalidate, - (event.state_key,), - ) - txn.call_after( - self.store.get_local_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_number_joined_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_user_in_room_with_profile.invalidate, - (event.room_id, event.state_key), - ) - - # The `_get_membership_from_event_id` is immutable, except for the - # case where we look up an event *before* persisting it. - txn.call_after( - self.store._get_membership_from_event_id.invalidate, - (event.event_id,), - ) # We update the local_current_membership table only if the event is # "current", i.e., its something that has just happened. @@ -1881,33 +1861,32 @@ class PersistEventsStore: }, ) - txn.call_after( - self.store.get_relations_for_event.invalidate, (relation.parent_id,) - ) - txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate, - (relation.parent_id,), - ) - txn.call_after( - self.store.get_mutual_event_relations_for_rel_type.invalidate, - (relation.parent_id,), - ) - - if relation.rel_type == RelationTypes.REPLACE: - txn.call_after( - self.store.get_applicable_edit.invalidate, (relation.parent_id,) - ) - if relation.rel_type == RelationTypes.THREAD: - txn.call_after( - self.store.get_thread_summary.invalidate, (relation.parent_id,) - ) - # It should be safe to only invalidate the cache if the user has not - # previously participated in the thread, but that's difficult (and - # potentially error-prone) so it is always invalidated. - txn.call_after( - self.store.get_thread_participated.invalidate, - (relation.parent_id, event.sender), + # Upsert into the threads table, but only overwrite the value if the + # new event is of a later topological order OR if the topological + # ordering is equal, but the stream ordering is later. + sql = """ + INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (room_id, thread_id) + DO UPDATE SET + latest_event_id = excluded.latest_event_id, + topological_ordering = excluded.topological_ordering, + stream_ordering = excluded.stream_ordering + WHERE + threads.topological_ordering <= excluded.topological_ordering AND + threads.stream_ordering < excluded.stream_ordering + """ + + txn.execute( + sql, + ( + event.room_id, + relation.parent_id, + event.event_id, + event.depth, + event.internal_metadata.stream_ordering, + ), ) def _handle_insertion_event( @@ -2033,35 +2012,52 @@ class PersistEventsStore: txn.execute(sql, (batch_id,)) def _handle_redact_relations( - self, txn: LoggingTransaction, redacted_event_id: str + self, txn: LoggingTransaction, room_id: str, redacted_event_id: str ) -> None: """Handles receiving a redaction and checking whether the redacted event has any relations which must be removed from the database. Args: txn + room_id: The room ID of the event that was redacted. redacted_event_id: The event that was redacted. """ - # Fetch the current relation of the event being redacted. - redacted_relates_to = self.db_pool.simple_select_one_onecol_txn( + # Fetch the relation of the event being redacted. + row = self.db_pool.simple_select_one_txn( txn, table="event_relations", keyvalues={"event_id": redacted_event_id}, - retcol="relates_to_id", + retcols=("relates_to_id", "relation_type"), allow_none=True, ) + # Nothing to do if no relation is found. + if row is None: + return + + redacted_relates_to = row["relates_to_id"] + rel_type = row["relation_type"] + self.db_pool.simple_delete_txn( + txn, table="event_relations", keyvalues={"event_id": redacted_event_id} + ) + # Any relation information for the related event must be cleared. - if redacted_relates_to is not None: + self.store._invalidate_cache_and_stream( + txn, self.store.get_relations_for_event, (redacted_relates_to,) + ) + if rel_type == RelationTypes.ANNOTATION: self.store._invalidate_cache_and_stream( - txn, self.store.get_relations_for_event, (redacted_relates_to,) + txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) ) + if rel_type == RelationTypes.REFERENCE: self.store._invalidate_cache_and_stream( - txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) + txn, self.store.get_references_for_event, (redacted_relates_to,) ) + if rel_type == RelationTypes.REPLACE: self.store._invalidate_cache_and_stream( txn, self.store.get_applicable_edit, (redacted_relates_to,) ) + if rel_type == RelationTypes.THREAD: self.store._invalidate_cache_and_stream( txn, self.store.get_thread_summary, (redacted_relates_to,) ) @@ -2069,14 +2065,41 @@ class PersistEventsStore: txn, self.store.get_thread_participated, (redacted_relates_to,) ) self.store._invalidate_cache_and_stream( - txn, - self.store.get_mutual_event_relations_for_rel_type, - (redacted_relates_to,), + txn, self.store.get_threads, (room_id,) ) - self.db_pool.simple_delete_txn( - txn, table="event_relations", keyvalues={"event_id": redacted_event_id} - ) + # Find the new latest event in the thread. + sql = """ + SELECT event_id, topological_ordering, stream_ordering + FROM event_relations + INNER JOIN events USING (event_id) + WHERE relates_to_id = ? AND relation_type = ? + ORDER BY topological_ordering DESC, stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (redacted_relates_to, RelationTypes.THREAD)) + + # If a latest event is found, update the threads table, this might + # be the same current latest event (if an earlier event in the thread + # was redacted). + latest_event_row = txn.fetchone() + if latest_event_row: + self.db_pool.simple_upsert_txn( + txn, + table="threads", + keyvalues={"room_id": room_id, "thread_id": redacted_relates_to}, + values={ + "latest_event_id": latest_event_row[0], + "topological_ordering": latest_event_row[1], + "stream_ordering": latest_event_row[2], + }, + ) + + # Otherwise, delete the thread: it no longer exists. + else: + self.db_pool.simple_delete_one_txn( + txn, table="threads", keyvalues={"thread_id": redacted_relates_to} + ) def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase) -> None: if isinstance(event.content.get("topic"), str): @@ -2178,26 +2201,26 @@ class PersistEventsStore: appear in events_and_context. """ - # Only non outlier events will have push actions associated with them, + # Only notifiable events will have push actions associated with them, # so let's filter them out. (This makes joining large rooms faster, as # these queries took seconds to process all the state events). - non_outlier_events = [ + notifiable_events = [ event for event, _ in events_and_contexts - if not event.internal_metadata.is_outlier() + if event.internal_metadata.is_notifiable() ] sql = """ INSERT INTO event_push_actions ( room_id, event_id, user_id, actions, stream_ordering, - topological_ordering, notif, highlight, unread + topological_ordering, notif, highlight, unread, thread_id ) - SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread + SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id FROM event_push_actions_staging WHERE event_id = ? """ - if non_outlier_events: + if notifiable_events: txn.execute_batch( sql, ( @@ -2207,32 +2230,10 @@ class PersistEventsStore: event.depth, event.event_id, ) - for event in non_outlier_events + for event in notifiable_events ), ) - room_to_event_ids: Dict[str, List[str]] = {} - for e in non_outlier_events: - room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - - for room_id, event_ids in room_to_event_ids.items(): - rows = self.db_pool.simple_select_many_txn( - txn, - table="event_push_actions_staging", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("user_id",), - ) - - user_ids = {row["user_id"] for row in rows} - - for user_id in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id, user_id), - ) - # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( @@ -2240,18 +2241,13 @@ class PersistEventsStore: ( (event.event_id,) for event, _ in all_events_and_contexts - if not event.internal_metadata.is_outlier() + if event.internal_metadata.is_notifiable() ), ) def _remove_push_actions_for_event_id_txn( self, txn: LoggingTransaction, room_id: str, event_id: str ) -> None: - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id,), - ) txn.execute( "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", (room_id, event_id), @@ -2433,17 +2429,31 @@ class PersistEventsStore: "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) + backward_extremity_tuples_to_remove = [ + (ev.event_id, ev.room_id) + for ev in events + if not ev.internal_metadata.is_outlier() + # If we encountered an event with no prev_events, then we might + # as well remove it now because it won't ever have anything else + # to backfill from. + or len(ev.prev_event_ids()) == 0 + ] txn.execute_batch( query, - [ - (ev.event_id, ev.room_id) - for ev in events - if not ev.internal_metadata.is_outlier() - # If we encountered an event with no prev_events, then we might - # as well remove it now because it won't ever have anything else - # to backfill from. - or len(ev.prev_event_ids()) == 0 - ], + backward_extremity_tuples_to_remove, + ) + + # Clear out the failed backfill attempts after we successfully pulled + # the event. Since we no longer need these events as backward + # extremities, it also means that they won't be backfilled from again so + # we no longer need to store the backfill attempts around it. + query = """ + DELETE FROM event_failed_pull_attempts + WHERE event_id = ? and room_id = ? + """ + txn.execute_batch( + query, + backward_extremity_tuples_to_remove, )