diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 120e4807d1..06832221ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1,6 +1,6 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -1696,34 +1696,33 @@ class PersistEventsStore:
},
)
- def _handle_event_relations(self, txn, event):
- """Handles inserting relation data during peristence of events
+ def _handle_event_relations(
+ self, txn: LoggingTransaction, event: EventBase
+ ) -> None:
+ """Handles inserting relation data during persistence of events
Args:
- txn
- event (EventBase)
+ txn: The current database transaction.
+ event: The event which might have relations.
"""
relation = event.content.get("m.relates_to")
if not relation:
# No relations
return
+ # Relations must have a type and parent event ID.
rel_type = relation.get("rel_type")
- if rel_type not in (
- RelationTypes.ANNOTATION,
- RelationTypes.REFERENCE,
- RelationTypes.REPLACE,
- RelationTypes.THREAD,
- ):
- # Unknown relation type
+ if not isinstance(rel_type, str):
return
parent_id = relation.get("event_id")
- if not parent_id:
- # Invalid relation
+ if not isinstance(parent_id, str):
return
- aggregation_key = relation.get("key")
+ # Annotations have a key field.
+ aggregation_key = None
+ if rel_type == RelationTypes.ANNOTATION:
+ aggregation_key = relation.get("key")
self.db_pool.simple_insert_txn(
txn,
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index ae3a8a63e4..c88fd35e7f 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -171,8 +171,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._purged_chain_cover_index,
)
+ # The event_thread_relation background update was replaced with the
+ # event_arbitrary_relations one, which handles any relation to avoid
+ # needed to potentially crawl the entire events table in the future.
+ self.db_pool.updates.register_noop_background_update("event_thread_relation")
+
self.db_pool.updates.register_background_update_handler(
- "event_thread_relation", self._event_thread_relation
+ "event_arbitrary_relations",
+ self._event_arbitrary_relations,
)
################################################################################
@@ -1099,23 +1105,27 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
return result
- async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
- """Background update handler which will store thread relations for existing events."""
+ async def _event_arbitrary_relations(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Background update handler which will store previously unknown relations for existing events."""
last_event_id = progress.get("last_event_id", "")
- def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
+ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
+ # Fetch events and then filter based on whether the event has a
+ # relation or not.
txn.execute(
"""
SELECT event_id, json FROM event_json
- LEFT JOIN event_relations USING (event_id)
- WHERE event_id > ? AND event_relations.event_id IS NULL
+ WHERE event_id > ?
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)
results = list(txn)
- missing_thread_relations = []
+ # (event_id, parent_id, rel_type) for each relation
+ relations_to_insert: List[Tuple[str, str, str]] = []
for (event_id, event_json_raw) in results:
try:
event_json = db_to_json(event_json_raw)
@@ -1127,48 +1137,70 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
continue
- # If there's no relation (or it is not a thread), skip!
+ # If there's no relation, skip!
relates_to = event_json["content"].get("m.relates_to")
if not relates_to or not isinstance(relates_to, dict):
continue
- if relates_to.get("rel_type") != RelationTypes.THREAD:
+
+ # If the relation type or parent event ID is not a string, skip it.
+ #
+ # Do not consider relation types that have existed for a long time,
+ # since they will already be listed in the `event_relations` table.
+ rel_type = relates_to.get("rel_type")
+ if not isinstance(rel_type, str) or rel_type in (
+ RelationTypes.ANNOTATION,
+ RelationTypes.REFERENCE,
+ RelationTypes.REPLACE,
+ ):
continue
- # Get the parent ID.
parent_id = relates_to.get("event_id")
if not isinstance(parent_id, str):
continue
- missing_thread_relations.append((event_id, parent_id))
+ relations_to_insert.append((event_id, parent_id, rel_type))
+
+ # Insert the missing data, note that we upsert here in case the event
+ # has already been processed.
+ if relations_to_insert:
+ self.db_pool.simple_upsert_many_txn(
+ txn=txn,
+ table="event_relations",
+ key_names=("event_id",),
+ key_values=[(r[0],) for r in relations_to_insert],
+ value_names=("relates_to_id", "relation_type"),
+ value_values=[r[1:] for r in relations_to_insert],
+ )
- # Insert the missing data.
- self.db_pool.simple_insert_many_txn(
- txn=txn,
- table="event_relations",
- values=[
- {
- "event_id": event_id,
- "relates_to_Id": parent_id,
- "relation_type": RelationTypes.THREAD,
- }
- for event_id, parent_id in missing_thread_relations
- ],
- )
+ # Iterate the parent IDs and invalidate caches.
+ for parent_id in {r[1] for r in relations_to_insert}:
+ cache_tuple = (parent_id,)
+ self._invalidate_cache_and_stream(
+ txn, self.get_relations_for_event, cache_tuple
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_aggregation_groups_for_event, cache_tuple
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_thread_summary, cache_tuple
+ )
if results:
latest_event_id = results[-1][0]
self.db_pool.updates._background_update_progress_txn(
- txn, "event_thread_relation", {"last_event_id": latest_event_id}
+ txn, "event_arbitrary_relations", {"last_event_id": latest_event_id}
)
return len(results)
num_rows = await self.db_pool.runInteraction(
- desc="event_thread_relation", func=_event_thread_relation_txn
+ desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn
)
if not num_rows:
- await self.db_pool.updates._end_background_update("event_thread_relation")
+ await self.db_pool.updates._end_background_update(
+ "event_arbitrary_relations"
+ )
return num_rows
diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql
index d60517f7b4..267b2cb539 100644
--- a/synapse/storage/schema/main/delta/65/02_thread_relations.sql
+++ b/synapse/storage/schema/main/delta/65/07_arbitrary_relations.sql
@@ -15,4 +15,4 @@
-- Check old events for thread relations.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
- (6502, 'event_thread_relation', '{}');
+ (6507, 'event_arbitrary_relations', '{}');
|