diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index be1500092b..c4869d64e6 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -508,7 +508,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
"""
else:
@@ -523,16 +523,22 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
latest_event_ids = {}
for parent_event_id, child_event_id in txn:
# Only consider the latest threaded reply (by topological ordering).
@@ -552,7 +558,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
GROUP BY parent.event_id
"""
@@ -561,9 +567,15 @@ class RelationsWorkerStore(SQLBaseStore):
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
return counts, latest_event_ids
@@ -626,16 +638,24 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
AND child.sender = ?
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.extend((RelationTypes.THREAD, user_id))
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ args.append(user_id)
+
+ txn.execute(sql % (clause, relations_clause), args)
return {row[0] for row in txn.fetchall()}
participated_threads = await self.db_pool.runInteraction(
@@ -834,26 +854,23 @@ class RelationsWorkerStore(SQLBaseStore):
results.setdefault(event_id, BundledAggregations()).replace = edit
# Fetch thread summaries.
- if self._msc3440_enabled:
- summaries = await self._get_thread_summaries(events_by_id.keys())
- # Only fetch participated for a limited selection based on what had
- # summaries.
- participated = await self._get_threads_participated(
- summaries.keys(), user_id
- )
- for event_id, summary in summaries.items():
- if summary:
- thread_count, latest_thread_event, edit = summary
- results.setdefault(
- event_id, BundledAggregations()
- ).thread = _ThreadAggregation(
- latest_event=latest_thread_event,
- latest_edit=edit,
- count=thread_count,
- # If there's a thread summary it must also exist in the
- # participated dictionary.
- current_user_participated=participated[event_id],
- )
+ summaries = await self._get_thread_summaries(events_by_id.keys())
+ # Only fetch participated for a limited selection based on what had
+ # summaries.
+ participated = await self._get_threads_participated(summaries.keys(), user_id)
+ for event_id, summary in summaries.items():
+ if summary:
+ thread_count, latest_thread_event, edit = summary
+ results.setdefault(
+ event_id, BundledAggregations()
+ ).thread = _ThreadAggregation(
+ latest_event=latest_thread_event,
+ latest_edit=edit,
+ count=thread_count,
+ # If there's a thread summary it must also exist in the
+ # participated dictionary.
+ current_user_participated=participated[event_id],
+ )
return results
|