diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index f707f74b50..4804f0f25a 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -35,7 +35,7 @@ import attr
from prometheus_client import Counter
import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes
+from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
@@ -1616,7 +1616,7 @@ class PersistEventsStore:
)
# Remove from relations table.
- self._handle_redact_relations(txn, event.redacts)
+ self._handle_redact_relations(txn, event.room_id, event.redacts)
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
@@ -1866,6 +1866,34 @@ class PersistEventsStore:
},
)
+ if relation.rel_type == RelationTypes.THREAD:
+ # Upsert into the threads table, but only overwrite the value if the
+ # new event is of a later topological order OR if the topological
+ # ordering is equal, but the stream ordering is later.
+ sql = """
+ INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (room_id, thread_id)
+ DO UPDATE SET
+ latest_event_id = excluded.latest_event_id,
+ topological_ordering = excluded.topological_ordering,
+ stream_ordering = excluded.stream_ordering
+ WHERE
+ threads.topological_ordering <= excluded.topological_ordering AND
+ threads.stream_ordering < excluded.stream_ordering
+ """
+
+ txn.execute(
+ sql,
+ (
+ event.room_id,
+ relation.parent_id,
+ event.event_id,
+ event.depth,
+ event.internal_metadata.stream_ordering,
+ ),
+ )
+
def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
@@ -1989,13 +2017,14 @@ class PersistEventsStore:
txn.execute(sql, (batch_id,))
def _handle_redact_relations(
- self, txn: LoggingTransaction, redacted_event_id: str
+ self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
"""Handles receiving a redaction and checking whether the redacted event
has any relations which must be removed from the database.
Args:
txn
+ room_id: The room ID of the event that was redacted.
redacted_event_id: The event that was redacted.
"""
@@ -2025,9 +2054,7 @@ class PersistEventsStore:
txn, self.store.get_thread_participated, (redacted_relates_to,)
)
self.store._invalidate_cache_and_stream(
- txn,
- self.store.get_mutual_event_relations_for_rel_type,
- (redacted_relates_to,),
+ txn, self.store.get_threads, (room_id,)
)
self.db_pool.simple_delete_txn(
@@ -2174,7 +2201,7 @@ class PersistEventsStore:
(
(event.event_id,)
for event, _ in all_events_and_contexts
- if not event.internal_metadata.is_outlier()
+ if event.internal_metadata.is_notifiable()
),
)
|