diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f600f1190..0f097a2927 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.logging.opentracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -145,6 +146,7 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
+ @trace
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
@@ -353,9 +355,9 @@ class PersistEventsStore:
txn: LoggingTransaction,
*,
events_and_contexts: List[Tuple[EventBase, EventContext]],
- inhibit_local_membership_updates: bool = False,
- state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
- new_forward_extremities: Optional[Dict[str, Set[str]]] = None,
+ inhibit_local_membership_updates: bool,
+ state_delta_for_room: Dict[str, DeltaState],
+ new_forward_extremities: Dict[str, Set[str]],
) -> None:
"""Insert some number of room events into the necessary database tables.
@@ -382,9 +384,6 @@ class PersistEventsStore:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
- state_delta_for_room = state_delta_for_room or {}
- new_forward_extremities = new_forward_extremities or {}
-
all_events_and_contexts = events_and_contexts
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
@@ -408,6 +407,31 @@ class PersistEventsStore:
assert min_stream_order
assert max_stream_order
+ # Once the txn completes, invalidate all of the relevant caches. Note that we do this
+ # up here because it captures all the events_and_contexts before any are removed.
+ for event, _ in events_and_contexts:
+ self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
+ if event.redacts:
+ self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
+
+ relates_to = None
+ relation = relation_from_event(event)
+ if relation:
+ relates_to = relation.parent_id
+
+ assert event.internal_metadata.stream_ordering is not None
+ txn.call_after(
+ self.store._invalidate_caches_for_event,
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.type,
+ getattr(event, "state_key", None),
+ event.redacts,
+ relates_to,
+ backfilled=False,
+ )
+
self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremities,
@@ -457,6 +481,7 @@ class PersistEventsStore:
# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
+ # NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _persist_event_auth_chain_txn(
@@ -1170,17 +1195,16 @@ class PersistEventsStore:
)
# Invalidate the various caches
-
- for member in members_changed:
- txn.call_after(
- self.store.get_rooms_for_user_with_stream_ordering.invalidate,
- (member,),
- )
-
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
)
+ # Check if any of the remote membership changes requires us to
+ # unsubscribe from their device lists.
+ self.store.handle_potentially_left_users_txn(
+ txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
+ )
+
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
events.
@@ -1220,9 +1244,6 @@ class PersistEventsStore:
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
- txn.call_after(
- self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
- )
self.db_pool.simple_insert_many_txn(
txn,
@@ -1258,9 +1279,10 @@ class PersistEventsStore:
Pick the earliest non-outlier if there is one, else the earliest one.
Args:
- events_and_contexts (list[(EventBase, EventContext)]):
+ events_and_contexts:
+
Returns:
- list[(EventBase, EventContext)]: filtered list
+ filtered list
"""
new_events_and_contexts: OrderedDict[
str, Tuple[EventBase, EventContext]
@@ -1286,14 +1308,11 @@ class PersistEventsStore:
"""Update min_depth for each room
Args:
- txn (twisted.enterprise.adbapi.Connection): db connection
- events_and_contexts (list[(EventBase, EventContext)]): events
- we are persisting
+ txn: db connection
+ events_and_contexts: events we are persisting
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
- # Remove the any existing cache entries for the event_ids
- self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
@@ -1490,7 +1509,7 @@ class PersistEventsStore:
event.sender,
"url" in event.content and isinstance(event.content["url"], str),
event.get_state_key(),
- context.rejected or None,
+ context.rejected,
)
for event, context in events_and_contexts
),
@@ -1561,13 +1580,11 @@ class PersistEventsStore:
"""Update all the miscellaneous tables for new events
Args:
- txn (twisted.enterprise.adbapi.Connection): db connection
- events_and_contexts (list[(EventBase, EventContext)]): events
- we are persisting
- all_events_and_contexts (list[(EventBase, EventContext)]): all
- events that we were going to persist. This includes events
- we've already persisted, etc, that wouldn't appear in
- events_and_context.
+ txn: db connection
+ events_and_contexts: events we are persisting
+ all_events_and_contexts: all events that we were going to persist.
+ This includes events we've already persisted, etc, that wouldn't
+ appear in events_and_context.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
@@ -1594,7 +1611,7 @@ class PersistEventsStore:
)
# Remove from relations table.
- self._handle_redact_relations(txn, event.redacts)
+ self._handle_redact_relations(txn, event.room_id, event.redacts)
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
@@ -1695,16 +1712,7 @@ class PersistEventsStore:
txn.async_call_after(prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
- """Invalidate the caches for the redacted event.
-
- Note that these caches are also cleared as part of event replication in
- _invalidate_caches_for_event.
- """
assert event.redacts is not None
- self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
- txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
- txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
-
self.db_pool.simple_upsert_txn(
txn,
table="redactions",
@@ -1805,34 +1813,6 @@ class PersistEventsStore:
for event in events:
assert event.internal_metadata.stream_ordering is not None
- txn.call_after(
- self.store._membership_stream_cache.entity_has_changed,
- event.state_key,
- event.internal_metadata.stream_ordering,
- )
- txn.call_after(
- self.store.get_invited_rooms_for_local_user.invalidate,
- (event.state_key,),
- )
- txn.call_after(
- self.store.get_local_users_in_room.invalidate,
- (event.room_id,),
- )
- txn.call_after(
- self.store.get_number_joined_users_in_room.invalidate,
- (event.room_id,),
- )
- txn.call_after(
- self.store.get_user_in_room_with_profile.invalidate,
- (event.room_id, event.state_key),
- )
-
- # The `_get_membership_from_event_id` is immutable, except for the
- # case where we look up an event *before* persisting it.
- txn.call_after(
- self.store._get_membership_from_event_id.invalidate,
- (event.event_id,),
- )
# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
@@ -1881,33 +1861,32 @@ class PersistEventsStore:
},
)
- txn.call_after(
- self.store.get_relations_for_event.invalidate, (relation.parent_id,)
- )
- txn.call_after(
- self.store.get_aggregation_groups_for_event.invalidate,
- (relation.parent_id,),
- )
- txn.call_after(
- self.store.get_mutual_event_relations_for_rel_type.invalidate,
- (relation.parent_id,),
- )
-
- if relation.rel_type == RelationTypes.REPLACE:
- txn.call_after(
- self.store.get_applicable_edit.invalidate, (relation.parent_id,)
- )
-
if relation.rel_type == RelationTypes.THREAD:
- txn.call_after(
- self.store.get_thread_summary.invalidate, (relation.parent_id,)
- )
- # It should be safe to only invalidate the cache if the user has not
- # previously participated in the thread, but that's difficult (and
- # potentially error-prone) so it is always invalidated.
- txn.call_after(
- self.store.get_thread_participated.invalidate,
- (relation.parent_id, event.sender),
+ # Upsert into the threads table, but only overwrite the value if the
+ # new event is of a later topological order OR if the topological
+ # ordering is equal, but the stream ordering is later.
+ sql = """
+ INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (room_id, thread_id)
+ DO UPDATE SET
+ latest_event_id = excluded.latest_event_id,
+ topological_ordering = excluded.topological_ordering,
+ stream_ordering = excluded.stream_ordering
+ WHERE
+ threads.topological_ordering <= excluded.topological_ordering AND
+ threads.stream_ordering < excluded.stream_ordering
+ """
+
+ txn.execute(
+ sql,
+ (
+ event.room_id,
+ relation.parent_id,
+ event.event_id,
+ event.depth,
+ event.internal_metadata.stream_ordering,
+ ),
)
def _handle_insertion_event(
@@ -2033,35 +2012,52 @@ class PersistEventsStore:
txn.execute(sql, (batch_id,))
def _handle_redact_relations(
- self, txn: LoggingTransaction, redacted_event_id: str
+ self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
"""Handles receiving a redaction and checking whether the redacted event
has any relations which must be removed from the database.
Args:
txn
+ room_id: The room ID of the event that was redacted.
redacted_event_id: The event that was redacted.
"""
- # Fetch the current relation of the event being redacted.
- redacted_relates_to = self.db_pool.simple_select_one_onecol_txn(
+ # Fetch the relation of the event being redacted.
+ row = self.db_pool.simple_select_one_txn(
txn,
table="event_relations",
keyvalues={"event_id": redacted_event_id},
- retcol="relates_to_id",
+ retcols=("relates_to_id", "relation_type"),
allow_none=True,
)
+ # Nothing to do if no relation is found.
+ if row is None:
+ return
+
+ redacted_relates_to = row["relates_to_id"]
+ rel_type = row["relation_type"]
+ self.db_pool.simple_delete_txn(
+ txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
+ )
+
# Any relation information for the related event must be cleared.
- if redacted_relates_to is not None:
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_relations_for_event, (redacted_relates_to,)
+ )
+ if rel_type == RelationTypes.ANNOTATION:
self.store._invalidate_cache_and_stream(
- txn, self.store.get_relations_for_event, (redacted_relates_to,)
+ txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
)
+ if rel_type == RelationTypes.REFERENCE:
self.store._invalidate_cache_and_stream(
- txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
+ txn, self.store.get_references_for_event, (redacted_relates_to,)
)
+ if rel_type == RelationTypes.REPLACE:
self.store._invalidate_cache_and_stream(
txn, self.store.get_applicable_edit, (redacted_relates_to,)
)
+ if rel_type == RelationTypes.THREAD:
self.store._invalidate_cache_and_stream(
txn, self.store.get_thread_summary, (redacted_relates_to,)
)
@@ -2069,14 +2065,41 @@ class PersistEventsStore:
txn, self.store.get_thread_participated, (redacted_relates_to,)
)
self.store._invalidate_cache_and_stream(
- txn,
- self.store.get_mutual_event_relations_for_rel_type,
- (redacted_relates_to,),
+ txn, self.store.get_threads, (room_id,)
)
- self.db_pool.simple_delete_txn(
- txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
- )
+ # Find the new latest event in the thread.
+ sql = """
+ SELECT event_id, topological_ordering, stream_ordering
+ FROM event_relations
+ INNER JOIN events USING (event_id)
+ WHERE relates_to_id = ? AND relation_type = ?
+ ORDER BY topological_ordering DESC, stream_ordering DESC
+ LIMIT 1
+ """
+ txn.execute(sql, (redacted_relates_to, RelationTypes.THREAD))
+
+ # If a latest event is found, update the threads table, this might
+ # be the same current latest event (if an earlier event in the thread
+ # was redacted).
+ latest_event_row = txn.fetchone()
+ if latest_event_row:
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="threads",
+ keyvalues={"room_id": room_id, "thread_id": redacted_relates_to},
+ values={
+ "latest_event_id": latest_event_row[0],
+ "topological_ordering": latest_event_row[1],
+ "stream_ordering": latest_event_row[2],
+ },
+ )
+
+ # Otherwise, delete the thread: it no longer exists.
+ else:
+ self.db_pool.simple_delete_one_txn(
+ txn, table="threads", keyvalues={"thread_id": redacted_relates_to}
+ )
def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase) -> None:
if isinstance(event.content.get("topic"), str):
@@ -2178,26 +2201,26 @@ class PersistEventsStore:
appear in events_and_context.
"""
- # Only non outlier events will have push actions associated with them,
+ # Only notifiable events will have push actions associated with them,
# so let's filter them out. (This makes joining large rooms faster, as
# these queries took seconds to process all the state events).
- non_outlier_events = [
+ notifiable_events = [
event
for event, _ in events_and_contexts
- if not event.internal_metadata.is_outlier()
+ if event.internal_metadata.is_notifiable()
]
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
- topological_ordering, notif, highlight, unread
+ topological_ordering, notif, highlight, unread, thread_id
)
- SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
FROM event_push_actions_staging
WHERE event_id = ?
"""
- if non_outlier_events:
+ if notifiable_events:
txn.execute_batch(
sql,
(
@@ -2207,32 +2230,10 @@ class PersistEventsStore:
event.depth,
event.event_id,
)
- for event in non_outlier_events
+ for event in notifiable_events
),
)
- room_to_event_ids: Dict[str, List[str]] = {}
- for e in non_outlier_events:
- room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
-
- for room_id, event_ids in room_to_event_ids.items():
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="event_push_actions_staging",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=("user_id",),
- )
-
- user_ids = {row["user_id"] for row in rows}
-
- for user_id in user_ids:
- txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
- (room_id, user_id),
- )
-
# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
@@ -2240,18 +2241,13 @@ class PersistEventsStore:
(
(event.event_id,)
for event, _ in all_events_and_contexts
- if not event.internal_metadata.is_outlier()
+ if event.internal_metadata.is_notifiable()
),
)
def _remove_push_actions_for_event_id_txn(
self, txn: LoggingTransaction, room_id: str, event_id: str
) -> None:
- # Sad that we have to blow away the cache for the whole room here
- txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
- (room_id,),
- )
txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id),
@@ -2433,17 +2429,31 @@ class PersistEventsStore:
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
+ backward_extremity_tuples_to_remove = [
+ (ev.event_id, ev.room_id)
+ for ev in events
+ if not ev.internal_metadata.is_outlier()
+ # If we encountered an event with no prev_events, then we might
+ # as well remove it now because it won't ever have anything else
+ # to backfill from.
+ or len(ev.prev_event_ids()) == 0
+ ]
txn.execute_batch(
query,
- [
- (ev.event_id, ev.room_id)
- for ev in events
- if not ev.internal_metadata.is_outlier()
- # If we encountered an event with no prev_events, then we might
- # as well remove it now because it won't ever have anything else
- # to backfill from.
- or len(ev.prev_event_ids()) == 0
- ],
+ backward_extremity_tuples_to_remove,
+ )
+
+ # Clear out the failed backfill attempts after we successfully pulled
+ # the event. Since we no longer need these events as backward
+ # extremities, it also means that they won't be backfilled from again so
+ # we no longer need to store the backfill attempts around it.
+ query = """
+ DELETE FROM event_failed_pull_attempts
+ WHERE event_id = ? and room_id = ?
+ """
+ txn.execute_batch(
+ query,
+ backward_extremity_tuples_to_remove,
)
|