diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 36ace7c613..b0c08a074d 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -178,7 +178,9 @@ class RelationTypes:
ANNOTATION: Final = "m.annotation"
REPLACE: Final = "m.replace"
REFERENCE: Final = "m.reference"
- THREAD: Final = "io.element.thread"
+ THREAD: Final = "m.thread"
+ # TODO Remove this in Synapse >= v1.57.0.
+ UNSTABLE_THREAD: Final = "io.element.thread"
class LimitBlockingTypes:
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index cb532d7238..27e97d6f37 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -88,7 +88,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
# MSC3440, filtering by event relations.
+ "related_by_senders": {"type": "array", "items": {"type": "string"}},
"io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
+ "related_by_rel_types": {"type": "array", "items": {"type": "string"}},
"io.element.relation_types": {"type": "array", "items": {"type": "string"}},
},
}
@@ -318,19 +320,18 @@ class Filter:
self.labels = filter_json.get("org.matrix.labels", None)
self.not_labels = filter_json.get("org.matrix.not_labels", [])
- # Ideally these would be rejected at the endpoint if they were provided
- # and not supported, but that would involve modifying the JSON schema
- # based on the homeserver configuration.
+ self.related_by_senders = self.filter_json.get("related_by_senders", None)
+ self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None)
+
+ # Fallback to the unstable prefix if the stable version is not given.
if hs.config.experimental.msc3440_enabled:
- self.relation_senders = self.filter_json.get(
+ self.related_by_senders = self.related_by_senders or self.filter_json.get(
"io.element.relation_senders", None
)
- self.relation_types = self.filter_json.get(
- "io.element.relation_types", None
+ self.related_by_rel_types = (
+ self.related_by_rel_types
+ or self.filter_json.get("io.element.relation_types", None)
)
- else:
- self.relation_senders = None
- self.relation_types = None
def filters_all_types(self) -> bool:
return "*" in self.not_types
@@ -461,7 +462,7 @@ class Filter:
event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
event_ids_to_keep = set(
await self._store.events_have_relations(
- event_ids, self.relation_senders, self.relation_types
+ event_ids, self.related_by_senders, self.related_by_rel_types
)
)
@@ -474,7 +475,7 @@ class Filter:
async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
result = [event for event in events if self._check(event)]
- if self.relation_senders or self.relation_types:
+ if self.related_by_senders or self.related_by_rel_types:
return await self._check_event_relations(result)
return result
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index ee34cb46e4..b2a237c1e0 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -38,6 +38,7 @@ from synapse.util.frozenutils import unfreeze
from . import EventBase
if TYPE_CHECKING:
+ from synapse.server import HomeServer
from synapse.storage.databases.main.relations import BundledAggregations
@@ -395,6 +396,9 @@ class EventClientSerializer:
clients.
"""
+ def __init__(self, hs: "HomeServer"):
+ self._msc3440_enabled = hs.config.experimental.msc3440_enabled
+
def serialize_event(
self,
event: Union[JsonDict, EventBase],
@@ -515,11 +519,14 @@ class EventClientSerializer:
thread.latest_event, serialized_latest_event, thread.latest_edit
)
- serialized_aggregations[RelationTypes.THREAD] = {
+ thread_summary = {
"latest_event": serialized_latest_event,
"count": thread.count,
"current_user_participated": thread.current_user_participated,
}
+ serialized_aggregations[RelationTypes.THREAD] = thread_summary
+ if self._msc3440_enabled:
+ serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
# Include the bundled aggregations in the event.
if serialized_aggregations:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0799ec9a84..f9544fe7fb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1079,7 +1079,10 @@ class EventCreationHandler:
raise SynapseError(400, "Can't send same reaction twice")
# Don't attempt to start a thread if the parent event is a relation.
- elif relation_type == RelationTypes.THREAD:
+ elif (
+ relation_type == RelationTypes.THREAD
+ or relation_type == RelationTypes.UNSTABLE_THREAD
+ ):
if await self.store.event_includes_relation(relates_to):
raise SynapseError(
400, "Cannot start threads from an event with a relation"
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 2e5d0e4e22..9a65aa4843 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -101,6 +101,7 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440": self.config.experimental.msc3440_enabled,
+ "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
},
},
)
diff --git a/synapse/server.py b/synapse/server.py
index 1270abb5a3..7741ff29dc 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -754,7 +754,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
- return EventClientSerializer()
+ return EventClientSerializer(self)
@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1a322882bf..1f60aef180 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1814,7 +1814,10 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
- if rel_type == RelationTypes.THREAD:
+ if (
+ rel_type == RelationTypes.THREAD
+ or rel_type == RelationTypes.UNSTABLE_THREAD
+ ):
txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index be1500092b..c4869d64e6 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -508,7 +508,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
"""
else:
@@ -523,16 +523,22 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
latest_event_ids = {}
for parent_event_id, child_event_id in txn:
# Only consider the latest threaded reply (by topological ordering).
@@ -552,7 +558,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
GROUP BY parent.event_id
"""
@@ -561,9 +567,15 @@ class RelationsWorkerStore(SQLBaseStore):
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
return counts, latest_event_ids
@@ -626,16 +638,24 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
AND child.sender = ?
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.extend((RelationTypes.THREAD, user_id))
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ args.append(user_id)
+
+ txn.execute(sql % (clause, relations_clause), args)
return {row[0] for row in txn.fetchall()}
participated_threads = await self.db_pool.runInteraction(
@@ -834,26 +854,23 @@ class RelationsWorkerStore(SQLBaseStore):
results.setdefault(event_id, BundledAggregations()).replace = edit
# Fetch thread summaries.
- if self._msc3440_enabled:
- summaries = await self._get_thread_summaries(events_by_id.keys())
- # Only fetch participated for a limited selection based on what had
- # summaries.
- participated = await self._get_threads_participated(
- summaries.keys(), user_id
- )
- for event_id, summary in summaries.items():
- if summary:
- thread_count, latest_thread_event, edit = summary
- results.setdefault(
- event_id, BundledAggregations()
- ).thread = _ThreadAggregation(
- latest_event=latest_thread_event,
- latest_edit=edit,
- count=thread_count,
- # If there's a thread summary it must also exist in the
- # participated dictionary.
- current_user_participated=participated[event_id],
- )
+ summaries = await self._get_thread_summaries(events_by_id.keys())
+ # Only fetch participated for a limited selection based on what had
+ # summaries.
+ participated = await self._get_threads_participated(summaries.keys(), user_id)
+ for event_id, summary in summaries.items():
+ if summary:
+ thread_count, latest_thread_event, edit = summary
+ results.setdefault(
+ event_id, BundledAggregations()
+ ).thread = _ThreadAggregation(
+ latest_event=latest_thread_event,
+ latest_edit=edit,
+ count=thread_count,
+ # If there's a thread summary it must also exist in the
+ # participated dictionary.
+ current_user_participated=participated[event_id],
+ )
return results
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a898f847e7..39e1efe373 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -325,21 +325,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
args.extend(event_filter.labels)
# Filter on relation_senders / relation types from the joined tables.
- if event_filter.relation_senders:
+ if event_filter.related_by_senders:
clauses.append(
"(%s)"
% " OR ".join(
- "related_event.sender = ?" for _ in event_filter.relation_senders
+ "related_event.sender = ?" for _ in event_filter.related_by_senders
)
)
- args.extend(event_filter.relation_senders)
+ args.extend(event_filter.related_by_senders)
- if event_filter.relation_types:
+ if event_filter.related_by_rel_types:
clauses.append(
"(%s)"
- % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+ % " OR ".join(
+ "relation_type = ?" for _ in event_filter.related_by_rel_types
+ )
)
- args.extend(event_filter.relation_types)
+ args.extend(event_filter.related_by_rel_types)
return " AND ".join(clauses), args
@@ -1203,7 +1205,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# If there is a filter on relation_senders and relation_types join to the
# relations table.
if event_filter and (
- event_filter.relation_senders or event_filter.relation_types
+ event_filter.related_by_senders or event_filter.related_by_rel_types
):
# Filtering by relations could cause the same event to appear multiple
# times (since there's no limit on the number of relations to an event).
@@ -1211,7 +1213,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
join_clause += """
LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
"""
- if event_filter.relation_senders:
+ if event_filter.related_by_senders:
join_clause += """
LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
"""
|