summary refs log tree commit diff
path: root/synapse/events/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/events/__init__.py')
-rw-r--r--synapse/events/__init__.py45
1 files changed, 45 insertions, 0 deletions
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index c238376caf..39ad2793d9 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import abc
+import collections.abc
 import os
 from typing import (
     TYPE_CHECKING,
@@ -32,9 +33,11 @@ from typing import (
     overload,
 )
 
+import attr
 from typing_extensions import Literal
 from unpaddedbase64 import encode_base64
 
+from synapse.api.constants import RelationTypes
 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
 from synapse.types import JsonDict, RoomStreamToken
 from synapse.util.caches import intern_dict
@@ -615,3 +618,45 @@ def make_event_from_dict(
     return event_type(
         event_dict, room_version, internal_metadata_dict or {}, rejected_reason
     )
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _EventRelation:
+    # The target event of the relation.
+    parent_id: str
+    # The relation type.
+    rel_type: str
+    # The aggregation key. Will be None if the rel_type is not m.annotation or is
+    # not a string.
+    aggregation_key: Optional[str]
+
+
+def relation_from_event(event: EventBase) -> Optional[_EventRelation]:
+    """
+    Attempt to parse relation information an event.
+
+    Returns:
+        The event relation information, if it is valid. None, otherwise.
+    """
+    relation = event.content.get("m.relates_to")
+    if not relation or not isinstance(relation, collections.abc.Mapping):
+        # No relation information.
+        return None
+
+    # Relations must have a type and parent event ID.
+    rel_type = relation.get("rel_type")
+    if not isinstance(rel_type, str):
+        return None
+
+    parent_id = relation.get("event_id")
+    if not isinstance(parent_id, str):
+        return None
+
+    # Annotations have a key field.
+    aggregation_key = None
+    if rel_type == RelationTypes.ANNOTATION:
+        aggregation_key = relation.get("key")
+        if not isinstance(aggregation_key, str):
+            aggregation_key = None
+
+    return _EventRelation(parent_id, rel_type, aggregation_key)