From ba00e20234eadae66f105f8bda64e39beed9a92d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 21 Oct 2021 14:39:16 -0400 Subject: Add a thread relation type per MSC3440. (#11088) Adds experimental support for MSC3440's `io.element.thread` relation type (and the aggregation for it). --- synapse/api/constants.py | 1 + synapse/config/experimental.py | 2 + synapse/events/utils.py | 17 +++++++++ synapse/rest/client/relations.py | 3 +- synapse/storage/databases/main/events.py | 4 ++ synapse/storage/databases/main/relations.py | 59 ++++++++++++++++++++++++++++- 6 files changed, 84 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index a31f037748..a33ac34161 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -176,6 +176,7 @@ class RelationTypes: ANNOTATION = "m.annotation" REPLACE = "m.replace" REFERENCE = "m.reference" + THREAD = "io.element.thread" class LimitBlockingTypes: diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index b013a3918c..8b098ad48d 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -26,6 +26,8 @@ class ExperimentalConfig(Config): # Whether to enable experimental MSC1849 (aka relations) support self.msc1849_enabled = config.get("experimental_msc1849_support_enabled", True) + # MSC3440 (thread relation) + self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 3f3eba86a8..6fa631aa1d 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -386,6 +386,7 @@ class EventClientSerializer: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self._msc1849_enabled = hs.config.experimental.msc1849_enabled + self._msc3440_enabled = hs.config.experimental.msc3440_enabled async def serialize_event( self, @@ -462,6 +463,22 @@ class EventClientSerializer: "sender": edit.sender, } + # If this event is the start of a thread, include a summary of the replies. + if self._msc3440_enabled: + ( + thread_count, + latest_thread_event, + ) = await self.store.get_thread_summary(event_id) + if latest_thread_event: + r = serialized_event["unsigned"].setdefault("m.relations", {}) + r[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": await self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=False + ), + "count": thread_count, + } + return serialized_event async def serialize_events( diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index d695c18be2..58f6699073 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -128,9 +128,10 @@ class RelationSendServlet(RestServlet): content["m.relates_to"] = { "event_id": parent_id, - "key": aggregation_key, "rel_type": relation_type, } + if aggregation_key is not None: + content["m.relates_to"]["key"] = aggregation_key event_dict = { "type": event_type, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 37439f8562..8d9086ecf0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1710,6 +1710,7 @@ class PersistEventsStore: RelationTypes.ANNOTATION, RelationTypes.REFERENCE, RelationTypes.REPLACE, + RelationTypes.THREAD, ): # Unknown relation type return @@ -1740,6 +1741,9 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + if rel_type == RelationTypes.THREAD: + txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,)) + def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): """Handles keeping track of insertion events and edges/connections. Part of MSC2716. diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 2bbf6d6a95..40760fbd1b 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional +from typing import Optional, Tuple import attr @@ -269,6 +269,63 @@ class RelationsWorkerStore(SQLBaseStore): return await self.get_event(edit_id, allow_none=True) + @cached() + async def get_thread_summary( + self, event_id: str + ) -> Tuple[int, Optional[EventBase]]: + """Get the number of threaded replies, the senders of those replies, and + the latest reply (if any) for the given event. + + Args: + event_id: The original event ID + + Returns: + The number of items in the thread and the most recent response, if any. + """ + + def _get_thread_summary_txn(txn) -> Tuple[int, Optional[str]]: + # Fetch the count of threaded events and the latest event ID. + # TODO Should this only allow m.room.message events. + sql = """ + SELECT event_id + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id = ? + AND relation_type = ? + ORDER BY topological_ordering DESC, stream_ordering DESC + LIMIT 1 + """ + + txn.execute(sql, (event_id, RelationTypes.THREAD)) + row = txn.fetchone() + if row is None: + return 0, None + + latest_event_id = row[0] + + sql = """ + SELECT COALESCE(COUNT(event_id), 0) + FROM event_relations + WHERE + relates_to_id = ? + AND relation_type = ? + """ + txn.execute(sql, (event_id, RelationTypes.THREAD)) + count = txn.fetchone()[0] + + return count, latest_event_id + + count, latest_event_id = await self.db_pool.runInteraction( + "get_thread_summary", _get_thread_summary_txn + ) + + latest_event = None + if latest_event_id: + latest_event = await self.get_event(latest_event_id, allow_none=True) + + return count, latest_event + async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str ) -> bool: -- cgit 1.4.1