diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/databases/main/events.py | 29 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 88 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql (renamed from synapse/storage/schema/main/delta/65/02_thread_relations.sql) | 2 |
3 files changed, 75 insertions, 44 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 120e4807d1..06832221ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1,6 +1,6 @@ # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1696,34 +1696,33 @@ class PersistEventsStore: }, ) - def _handle_event_relations(self, txn, event): - """Handles inserting relation data during peristence of events + def _handle_event_relations( + self, txn: LoggingTransaction, event: EventBase + ) -> None: + """Handles inserting relation data during persistence of events Args: - txn - event (EventBase) + txn: The current database transaction. + event: The event which might have relations. """ relation = event.content.get("m.relates_to") if not relation: # No relations return + # Relations must have a type and parent event ID. rel_type = relation.get("rel_type") - if rel_type not in ( - RelationTypes.ANNOTATION, - RelationTypes.REFERENCE, - RelationTypes.REPLACE, - RelationTypes.THREAD, - ): - # Unknown relation type + if not isinstance(rel_type, str): return parent_id = relation.get("event_id") - if not parent_id: - # Invalid relation + if not isinstance(parent_id, str): return - aggregation_key = relation.get("key") + # Annotations have a key field. + aggregation_key = None + if rel_type == RelationTypes.ANNOTATION: + aggregation_key = relation.get("key") self.db_pool.simple_insert_txn( txn, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index ae3a8a63e4..c88fd35e7f 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,4 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -171,8 +171,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + # The event_thread_relation background update was replaced with the + # event_arbitrary_relations one, which handles any relation to avoid + # needed to potentially crawl the entire events table in the future. + self.db_pool.updates.register_noop_background_update("event_thread_relation") + self.db_pool.updates.register_background_update_handler( - "event_thread_relation", self._event_thread_relation + "event_arbitrary_relations", + self._event_arbitrary_relations, ) ################################################################################ @@ -1099,23 +1105,27 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): return result - async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int: - """Background update handler which will store thread relations for existing events.""" + async def _event_arbitrary_relations( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update handler which will store previously unknown relations for existing events.""" last_event_id = progress.get("last_event_id", "") - def _event_thread_relation_txn(txn: LoggingTransaction) -> int: + def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int: + # Fetch events and then filter based on whether the event has a + # relation or not. txn.execute( """ SELECT event_id, json FROM event_json - LEFT JOIN event_relations USING (event_id) - WHERE event_id > ? AND event_relations.event_id IS NULL + WHERE event_id > ? ORDER BY event_id LIMIT ? """, (last_event_id, batch_size), ) results = list(txn) - missing_thread_relations = [] + # (event_id, parent_id, rel_type) for each relation + relations_to_insert: List[Tuple[str, str, str]] = [] for (event_id, event_json_raw) in results: try: event_json = db_to_json(event_json_raw) @@ -1127,48 +1137,70 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) continue - # If there's no relation (or it is not a thread), skip! + # If there's no relation, skip! relates_to = event_json["content"].get("m.relates_to") if not relates_to or not isinstance(relates_to, dict): continue - if relates_to.get("rel_type") != RelationTypes.THREAD: + + # If the relation type or parent event ID is not a string, skip it. + # + # Do not consider relation types that have existed for a long time, + # since they will already be listed in the `event_relations` table. + rel_type = relates_to.get("rel_type") + if not isinstance(rel_type, str) or rel_type in ( + RelationTypes.ANNOTATION, + RelationTypes.REFERENCE, + RelationTypes.REPLACE, + ): continue - # Get the parent ID. parent_id = relates_to.get("event_id") if not isinstance(parent_id, str): continue - missing_thread_relations.append((event_id, parent_id)) + relations_to_insert.append((event_id, parent_id, rel_type)) + + # Insert the missing data, note that we upsert here in case the event + # has already been processed. + if relations_to_insert: + self.db_pool.simple_upsert_many_txn( + txn=txn, + table="event_relations", + key_names=("event_id",), + key_values=[(r[0],) for r in relations_to_insert], + value_names=("relates_to_id", "relation_type"), + value_values=[r[1:] for r in relations_to_insert], + ) - # Insert the missing data. - self.db_pool.simple_insert_many_txn( - txn=txn, - table="event_relations", - values=[ - { - "event_id": event_id, - "relates_to_Id": parent_id, - "relation_type": RelationTypes.THREAD, - } - for event_id, parent_id in missing_thread_relations - ], - ) + # Iterate the parent IDs and invalidate caches. + for parent_id in {r[1] for r in relations_to_insert}: + cache_tuple = (parent_id,) + self._invalidate_cache_and_stream( + txn, self.get_relations_for_event, cache_tuple + ) + self._invalidate_cache_and_stream( + txn, self.get_aggregation_groups_for_event, cache_tuple + ) + self._invalidate_cache_and_stream( + txn, self.get_thread_summary, cache_tuple + ) if results: latest_event_id = results[-1][0] self.db_pool.updates._background_update_progress_txn( - txn, "event_thread_relation", {"last_event_id": latest_event_id} + txn, "event_arbitrary_relations", {"last_event_id": latest_event_id} ) return len(results) num_rows = await self.db_pool.runInteraction( - desc="event_thread_relation", func=_event_thread_relation_txn + desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn ) if not num_rows: - await self.db_pool.updates._end_background_update("event_thread_relation") + await self.db_pool.updates._end_background_update( + "event_arbitrary_relations" + ) return num_rows diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql index d60517f7b4..267b2cb539 100644 --- a/synapse/storage/schema/main/delta/65/02_thread_relations.sql +++ b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql @@ -15,4 +15,4 @@ -- Check old events for thread relations. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (6502, 'event_thread_relation', '{}'); + (6507, 'event_arbitrary_relations', '{}'); |