summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2021-11-22 12:01:47 -0500
committerGitHub <noreply@github.com>2021-11-22 12:01:47 -0500
commit3d893b8cf2358f947678dfb995b73f426200b099 (patch)
treed342b9adf7244ea139924f854ce873d1513ae47b /synapse/storage
parentUpdate README.md (diff)
downloadsynapse-3d893b8cf2358f947678dfb995b73f426200b099.tar.xz
Store arbitrary relations from events. (#11391)
Instead of only known relation types. This also reworks the background
update for thread relations to crawl events and search for any relation
type, not just threaded relations.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py29
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py88
-rw-r--r--synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql (renamed from synapse/storage/schema/main/delta/65/02_thread_relations.sql)2
3 files changed, 75 insertions, 44 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 120e4807d1..06832221ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1,6 +1,6 @@
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2018-2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -1696,34 +1696,33 @@ class PersistEventsStore:
                     },
                 )
 
-    def _handle_event_relations(self, txn, event):
-        """Handles inserting relation data during peristence of events
+    def _handle_event_relations(
+        self, txn: LoggingTransaction, event: EventBase
+    ) -> None:
+        """Handles inserting relation data during persistence of events
 
         Args:
-            txn
-            event (EventBase)
+            txn: The current database transaction.
+            event: The event which might have relations.
         """
         relation = event.content.get("m.relates_to")
         if not relation:
             # No relations
             return
 
+        # Relations must have a type and parent event ID.
         rel_type = relation.get("rel_type")
-        if rel_type not in (
-            RelationTypes.ANNOTATION,
-            RelationTypes.REFERENCE,
-            RelationTypes.REPLACE,
-            RelationTypes.THREAD,
-        ):
-            # Unknown relation type
+        if not isinstance(rel_type, str):
             return
 
         parent_id = relation.get("event_id")
-        if not parent_id:
-            # Invalid relation
+        if not isinstance(parent_id, str):
             return
 
-        aggregation_key = relation.get("key")
+        # Annotations have a key field.
+        aggregation_key = None
+        if rel_type == RelationTypes.ANNOTATION:
+            aggregation_key = relation.get("key")
 
         self.db_pool.simple_insert_txn(
             txn,
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index ae3a8a63e4..c88fd35e7f 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -171,8 +171,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        # The event_thread_relation background update was replaced with the
+        # event_arbitrary_relations one, which handles any relation to avoid
+        # needed to potentially crawl the entire events table in the future.
+        self.db_pool.updates.register_noop_background_update("event_thread_relation")
+
         self.db_pool.updates.register_background_update_handler(
-            "event_thread_relation", self._event_thread_relation
+            "event_arbitrary_relations",
+            self._event_arbitrary_relations,
         )
 
         ################################################################################
@@ -1099,23 +1105,27 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
         return result
 
-    async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
-        """Background update handler which will store thread relations for existing events."""
+    async def _event_arbitrary_relations(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update handler which will store previously unknown relations for existing events."""
         last_event_id = progress.get("last_event_id", "")
 
-        def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
+        def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
+            # Fetch events and then filter based on whether the event has a
+            # relation or not.
             txn.execute(
                 """
                 SELECT event_id, json FROM event_json
-                LEFT JOIN event_relations USING (event_id)
-                WHERE event_id > ? AND event_relations.event_id IS NULL
+                WHERE event_id > ?
                 ORDER BY event_id LIMIT ?
                 """,
                 (last_event_id, batch_size),
             )
 
             results = list(txn)
-            missing_thread_relations = []
+            # (event_id, parent_id, rel_type) for each relation
+            relations_to_insert: List[Tuple[str, str, str]] = []
             for (event_id, event_json_raw) in results:
                 try:
                     event_json = db_to_json(event_json_raw)
@@ -1127,48 +1137,70 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
                     )
                     continue
 
-                # If there's no relation (or it is not a thread), skip!
+                # If there's no relation, skip!
                 relates_to = event_json["content"].get("m.relates_to")
                 if not relates_to or not isinstance(relates_to, dict):
                     continue
-                if relates_to.get("rel_type") != RelationTypes.THREAD:
+
+                # If the relation type or parent event ID is not a string, skip it.
+                #
+                # Do not consider relation types that have existed for a long time,
+                # since they will already be listed in the `event_relations` table.
+                rel_type = relates_to.get("rel_type")
+                if not isinstance(rel_type, str) or rel_type in (
+                    RelationTypes.ANNOTATION,
+                    RelationTypes.REFERENCE,
+                    RelationTypes.REPLACE,
+                ):
                     continue
 
-                # Get the parent ID.
                 parent_id = relates_to.get("event_id")
                 if not isinstance(parent_id, str):
                     continue
 
-                missing_thread_relations.append((event_id, parent_id))
+                relations_to_insert.append((event_id, parent_id, rel_type))
+
+            # Insert the missing data, note that we upsert here in case the event
+            # has already been processed.
+            if relations_to_insert:
+                self.db_pool.simple_upsert_many_txn(
+                    txn=txn,
+                    table="event_relations",
+                    key_names=("event_id",),
+                    key_values=[(r[0],) for r in relations_to_insert],
+                    value_names=("relates_to_id", "relation_type"),
+                    value_values=[r[1:] for r in relations_to_insert],
+                )
 
-            # Insert the missing data.
-            self.db_pool.simple_insert_many_txn(
-                txn=txn,
-                table="event_relations",
-                values=[
-                    {
-                        "event_id": event_id,
-                        "relates_to_Id": parent_id,
-                        "relation_type": RelationTypes.THREAD,
-                    }
-                    for event_id, parent_id in missing_thread_relations
-                ],
-            )
+                # Iterate the parent IDs and invalidate caches.
+                for parent_id in {r[1] for r in relations_to_insert}:
+                    cache_tuple = (parent_id,)
+                    self._invalidate_cache_and_stream(
+                        txn, self.get_relations_for_event, cache_tuple
+                    )
+                    self._invalidate_cache_and_stream(
+                        txn, self.get_aggregation_groups_for_event, cache_tuple
+                    )
+                    self._invalidate_cache_and_stream(
+                        txn, self.get_thread_summary, cache_tuple
+                    )
 
             if results:
                 latest_event_id = results[-1][0]
                 self.db_pool.updates._background_update_progress_txn(
-                    txn, "event_thread_relation", {"last_event_id": latest_event_id}
+                    txn, "event_arbitrary_relations", {"last_event_id": latest_event_id}
                 )
 
             return len(results)
 
         num_rows = await self.db_pool.runInteraction(
-            desc="event_thread_relation", func=_event_thread_relation_txn
+            desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn
         )
 
         if not num_rows:
-            await self.db_pool.updates._end_background_update("event_thread_relation")
+            await self.db_pool.updates._end_background_update(
+                "event_arbitrary_relations"
+            )
 
         return num_rows
 
diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql
index d60517f7b4..267b2cb539 100644
--- a/synapse/storage/schema/main/delta/65/02_thread_relations.sql
+++ b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql
@@ -15,4 +15,4 @@
 
 -- Check old events for thread relations.
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
-  (6502, 'event_thread_relation', '{}');
+  (6507, 'event_arbitrary_relations', '{}');