diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 790d058c43..7c34bde3e5 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -27,6 +27,7 @@ from typing import (
Optional,
Set,
Tuple,
+ Union,
cast,
)
@@ -78,7 +79,7 @@ class DeltaState:
Attributes:
to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state
- no_longer_in_room: The server is not longer in the room, so the room
+ no_longer_in_room: The server is no longer in the room, so the room
should e.g. be removed from `current_state_events` table.
"""
@@ -130,22 +131,25 @@ class PersistEventsStore:
@trace
async def _persist_events_and_state_updates(
self,
+ room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
- state_delta_for_room: Dict[str, DeltaState],
- new_forward_extremities: Dict[str, Set[str]],
+ state_delta_for_room: Optional[DeltaState],
+ new_forward_extremities: Optional[Set[str]],
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
) -> None:
"""Persist a set of events alongside updates to the current state and
- forward extremities tables.
+ forward extremities tables.
+
+ Assumes that we are only persisting events for one room at a time.
Args:
+ room_id:
events_and_contexts:
- state_delta_for_room: Map from room_id to the delta to apply to
- room state
- new_forward_extremities: Map from room_id to set of event IDs
- that are the new forward extremities of the room.
+ state_delta_for_room: The delta to apply to the room state
+ new_forward_extremities: A set of event IDs that are the new forward
+ extremities of the room.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
@@ -195,6 +199,7 @@ class PersistEventsStore:
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
+ room_id=room_id,
events_and_contexts=events_and_contexts,
inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room,
@@ -220,9 +225,9 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc()
- for room_id, latest_event_ids in new_forward_extremities.items():
+ if new_forward_extremities:
self.store.get_latest_event_ids_in_room.prefill(
- (room_id,), frozenset(latest_event_ids)
+ (room_id,), frozenset(new_forward_extremities)
)
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
@@ -335,10 +340,11 @@ class PersistEventsStore:
self,
txn: LoggingTransaction,
*,
+ room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]],
inhibit_local_membership_updates: bool,
- state_delta_for_room: Dict[str, DeltaState],
- new_forward_extremities: Dict[str, Set[str]],
+ state_delta_for_room: Optional[DeltaState],
+ new_forward_extremities: Optional[Set[str]],
) -> None:
"""Insert some number of room events into the necessary database tables.
@@ -346,8 +352,11 @@ class PersistEventsStore:
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
+ Assumes that we are only persisting events for one room at a time.
+
Args:
txn
+ room_id: The room the events are from
events_and_contexts: events to persist
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
@@ -356,10 +365,9 @@ class PersistEventsStore:
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room: The current-state delta for each room.
- new_forward_extremities: The new forward extremities for each room.
- For each room, a list of the event ids which are the forward
- extremities.
+ state_delta_for_room: The current-state delta for the room.
+ new_forward_extremities: The new forward extremities for the room:
+ a set of the event ids which are the forward extremities.
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
@@ -375,14 +383,13 @@ class PersistEventsStore:
#
# Annoyingly SQLite doesn't support row level locking.
if isinstance(self.database_engine, PostgresEngine):
- for room_id in {e.room_id for e, _ in events_and_contexts}:
- txn.execute(
- "SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
- (room_id,),
- )
- row = txn.fetchone()
- if row is None:
- raise Exception(f"Room does not exist {room_id}")
+ txn.execute(
+ "SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
+ (room_id,),
+ )
+ row = txn.fetchone()
+ if row is None:
+ raise Exception(f"Room does not exist {room_id}")
# stream orderings should have been assigned by now
assert min_stream_order
@@ -418,7 +425,9 @@ class PersistEventsStore:
events_and_contexts
)
- self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)
+ self._update_room_depths_txn(
+ txn, room_id, events_and_contexts=events_and_contexts
+ )
# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
@@ -431,11 +440,13 @@ class PersistEventsStore:
self._store_event_txn(txn, events_and_contexts=events_and_contexts)
- self._update_forward_extremities_txn(
- txn,
- new_forward_extremities=new_forward_extremities,
- max_stream_order=max_stream_order,
- )
+ if new_forward_extremities:
+ self._update_forward_extremities_txn(
+ txn,
+ room_id,
+ new_forward_extremities=new_forward_extremities,
+ max_stream_order=max_stream_order,
+ )
self._persist_transaction_ids_txn(txn, events_and_contexts)
@@ -463,7 +474,10 @@ class PersistEventsStore:
# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
- self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+ if state_delta_for_room:
+ self._update_current_state_txn(
+ txn, room_id, state_delta_for_room, min_stream_order
+ )
def _persist_event_auth_chain_txn(
self,
@@ -501,16 +515,19 @@ class PersistEventsStore:
# We ignore legacy rooms that we aren't filling the chain cover index
# for.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="rooms",
- column="room_id",
- iterable={event.room_id for event in events if event.is_state()},
- keyvalues={},
- retcols=("room_id", "has_auth_chain_index"),
+ rows = cast(
+ List[Tuple[str, Optional[Union[int, bool]]]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="rooms",
+ column="room_id",
+ iterable={event.room_id for event in events if event.is_state()},
+ keyvalues={},
+ retcols=("room_id", "has_auth_chain_index"),
+ ),
)
rooms_using_chain_index = {
- row["room_id"] for row in rows if row["has_auth_chain_index"]
+ room_id for room_id, has_auth_chain_index in rows if has_auth_chain_index
}
state_events = {
@@ -571,19 +588,18 @@ class PersistEventsStore:
# We check if there are any events that need to be handled in the rooms
# we're looking at. These should just be out of band memberships, where
# we didn't have the auth chain when we first persisted.
- rows = db_pool.simple_select_many_txn(
- txn,
- table="event_auth_chain_to_calculate",
- keyvalues={},
- column="room_id",
- iterable=set(event_to_room_id.values()),
- retcols=("event_id", "type", "state_key"),
+ auth_chain_to_calc_rows = cast(
+ List[Tuple[str, str, str]],
+ db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth_chain_to_calculate",
+ keyvalues={},
+ column="room_id",
+ iterable=set(event_to_room_id.values()),
+ retcols=("event_id", "type", "state_key"),
+ ),
)
- for row in rows:
- event_id = row["event_id"]
- event_type = row["type"]
- state_key = row["state_key"]
-
+ for event_id, event_type, state_key in auth_chain_to_calc_rows:
# (We could pull out the auth events for all rows at once using
# simple_select_many, but this case happens rarely and almost always
# with a single row.)
@@ -753,23 +769,31 @@ class PersistEventsStore:
# Step 1, fetch all existing links from all the chains we've seen
# referenced.
chain_links = _LinkMap()
- rows = db_pool.simple_select_many_txn(
- txn,
- table="event_auth_chain_links",
- column="origin_chain_id",
- iterable={chain_id for chain_id, _ in chain_map.values()},
- keyvalues={},
- retcols=(
- "origin_chain_id",
- "origin_sequence_number",
- "target_chain_id",
- "target_sequence_number",
+ auth_chain_rows = cast(
+ List[Tuple[int, int, int, int]],
+ db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth_chain_links",
+ column="origin_chain_id",
+ iterable={chain_id for chain_id, _ in chain_map.values()},
+ keyvalues={},
+ retcols=(
+ "origin_chain_id",
+ "origin_sequence_number",
+ "target_chain_id",
+ "target_sequence_number",
+ ),
),
)
- for row in rows:
+ for (
+ origin_chain_id,
+ origin_sequence_number,
+ target_chain_id,
+ target_sequence_number,
+ ) in auth_chain_rows:
chain_links.add_link(
- (row["origin_chain_id"], row["origin_sequence_number"]),
- (row["target_chain_id"], row["target_sequence_number"]),
+ (origin_chain_id, origin_sequence_number),
+ (target_chain_id, target_sequence_number),
new=False,
)
@@ -1015,74 +1039,75 @@ class PersistEventsStore:
await self.db_pool.runInteraction(
"update_current_state",
self._update_current_state_txn,
- state_delta_by_room={room_id: state_delta},
+ room_id,
+ delta_state=state_delta,
stream_id=stream_ordering,
)
def _update_current_state_txn(
self,
txn: LoggingTransaction,
- state_delta_by_room: Dict[str, DeltaState],
+ room_id: str,
+ delta_state: DeltaState,
stream_id: int,
) -> None:
- for room_id, delta_state in state_delta_by_room.items():
- to_delete = delta_state.to_delete
- to_insert = delta_state.to_insert
-
- # Figure out the changes of membership to invalidate the
- # `get_rooms_for_user` cache.
- # We find out which membership events we may have deleted
- # and which we have added, then we invalidate the caches for all
- # those users.
- members_changed = {
- state_key
- for ev_type, state_key in itertools.chain(to_delete, to_insert)
- if ev_type == EventTypes.Member
- }
+ to_delete = delta_state.to_delete
+ to_insert = delta_state.to_insert
+
+ # Figure out the changes of membership to invalidate the
+ # `get_rooms_for_user` cache.
+ # We find out which membership events we may have deleted
+ # and which we have added, then we invalidate the caches for all
+ # those users.
+ members_changed = {
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
+ if ev_type == EventTypes.Member
+ }
- if delta_state.no_longer_in_room:
- # Server is no longer in the room so we delete the room from
- # current_state_events, being careful we've already updated the
- # rooms.room_version column (which gets populated in a
- # background task).
- self._upsert_room_version_txn(txn, room_id)
+ if delta_state.no_longer_in_room:
+ # Server is no longer in the room so we delete the room from
+ # current_state_events, being careful we've already updated the
+ # rooms.room_version column (which gets populated in a
+ # background task).
+ self._upsert_room_version_txn(txn, room_id)
- # Before deleting we populate the current_state_delta_stream
- # so that async background tasks get told what happened.
- sql = """
+ # Before deleting we populate the current_state_delta_stream
+ # so that async background tasks get told what happened.
+ sql = """
INSERT INTO current_state_delta_stream
(stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, room_id, type, state_key, null, event_id
FROM current_state_events
WHERE room_id = ?
"""
- txn.execute(sql, (stream_id, self._instance_name, room_id))
+ txn.execute(sql, (stream_id, self._instance_name, room_id))
- # We also want to invalidate the membership caches for users
- # that were in the room.
- users_in_room = self.store.get_users_in_room_txn(txn, room_id)
- members_changed.update(users_in_room)
+ # We also want to invalidate the membership caches for users
+ # that were in the room.
+ users_in_room = self.store.get_users_in_room_txn(txn, room_id)
+ members_changed.update(users_in_room)
- self.db_pool.simple_delete_txn(
- txn,
- table="current_state_events",
- keyvalues={"room_id": room_id},
- )
- else:
- # We're still in the room, so we update the current state as normal.
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="current_state_events",
+ keyvalues={"room_id": room_id},
+ )
+ else:
+ # We're still in the room, so we update the current state as normal.
- # First we add entries to the current_state_delta_stream. We
- # do this before updating the current_state_events table so
- # that we can use it to calculate the `prev_event_id`. (This
- # allows us to not have to pull out the existing state
- # unnecessarily).
- #
- # The stream_id for the update is chosen to be the minimum of the stream_ids
- # for the batch of the events that we are persisting; that means we do not
- # end up in a situation where workers see events before the
- # current_state_delta updates.
- #
- sql = """
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ #
+ # The stream_id for the update is chosen to be the minimum of the stream_ids
+ # for the batch of the events that we are persisting; that means we do not
+ # end up in a situation where workers see events before the
+ # current_state_delta updates.
+ #
+ sql = """
INSERT INTO current_state_delta_stream
(stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, ?, (
@@ -1090,39 +1115,39 @@ class PersistEventsStore:
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
- txn.execute_batch(
- sql,
+ txn.execute_batch(
+ sql,
+ (
(
- (
- stream_id,
- self._instance_name,
- room_id,
- etype,
- state_key,
- to_insert.get((etype, state_key)),
- room_id,
- etype,
- state_key,
- )
- for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
- )
- # Now we actually update the current_state_events table
+ stream_id,
+ self._instance_name,
+ room_id,
+ etype,
+ state_key,
+ to_insert.get((etype, state_key)),
+ room_id,
+ etype,
+ state_key,
+ )
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
+ # Now we actually update the current_state_events table
- txn.execute_batch(
- "DELETE FROM current_state_events"
- " WHERE room_id = ? AND type = ? AND state_key = ?",
- (
- (room_id, etype, state_key)
- for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
- )
+ txn.execute_batch(
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
- # We include the membership in the current state table, hence we do
- # a lookup when we insert. This assumes that all events have already
- # been inserted into room_memberships.
- txn.execute_batch(
- """INSERT INTO current_state_events
+ # We include the membership in the current state table, hence we do
+ # a lookup when we insert. This assumes that all events have already
+ # been inserted into room_memberships.
+ txn.execute_batch(
+ """INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?, ?,
@@ -1130,34 +1155,34 @@ class PersistEventsStore:
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""",
- [
- (room_id, key[0], key[1], ev_id, ev_id, ev_id)
- for key, ev_id in to_insert.items()
- ],
- )
+ [
+ (room_id, key[0], key[1], ev_id, ev_id, ev_id)
+ for key, ev_id in to_insert.items()
+ ],
+ )
- # We now update `local_current_membership`. We do this regardless
- # of whether we're still in the room or not to handle the case where
- # e.g. we just got banned (where we need to record that fact here).
-
- # Note: Do we really want to delete rows here (that we do not
- # subsequently reinsert below)? While technically correct it means
- # we have no record of the fact the user *was* a member of the
- # room but got, say, state reset out of it.
- if to_delete or to_insert:
- txn.execute_batch(
- "DELETE FROM local_current_membership"
- " WHERE room_id = ? AND user_id = ?",
- (
- (room_id, state_key)
- for etype, state_key in itertools.chain(to_delete, to_insert)
- if etype == EventTypes.Member and self.is_mine_id(state_key)
- ),
- )
+ # We now update `local_current_membership`. We do this regardless
+ # of whether we're still in the room or not to handle the case where
+ # e.g. we just got banned (where we need to record that fact here).
- if to_insert:
- txn.execute_batch(
- """INSERT INTO local_current_membership
+ # Note: Do we really want to delete rows here (that we do not
+ # subsequently reinsert below)? While technically correct it means
+ # we have no record of the fact the user *was* a member of the
+ # room but got, say, state reset out of it.
+ if to_delete or to_insert:
+ txn.execute_batch(
+ "DELETE FROM local_current_membership"
+ " WHERE room_id = ? AND user_id = ?",
+ (
+ (room_id, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ if etype == EventTypes.Member and self.is_mine_id(state_key)
+ ),
+ )
+
+ if to_insert:
+ txn.execute_batch(
+ """INSERT INTO local_current_membership
(room_id, user_id, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?,
@@ -1165,29 +1190,27 @@ class PersistEventsStore:
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""",
- [
- (room_id, key[1], ev_id, ev_id, ev_id)
- for key, ev_id in to_insert.items()
- if key[0] == EventTypes.Member and self.is_mine_id(key[1])
- ],
- )
-
- txn.call_after(
- self.store._curr_state_delta_stream_cache.entity_has_changed,
- room_id,
- stream_id,
+ [
+ (room_id, key[1], ev_id, ev_id, ev_id)
+ for key, ev_id in to_insert.items()
+ if key[0] == EventTypes.Member and self.is_mine_id(key[1])
+ ],
)
- # Invalidate the various caches
- self.store._invalidate_state_caches_and_stream(
- txn, room_id, members_changed
- )
+ txn.call_after(
+ self.store._curr_state_delta_stream_cache.entity_has_changed,
+ room_id,
+ stream_id,
+ )
- # Check if any of the remote membership changes requires us to
- # unsubscribe from their device lists.
- self.store.handle_potentially_left_users_txn(
- txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
- )
+ # Invalidate the various caches
+ self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+
+ # Check if any of the remote membership changes requires us to
+ # unsubscribe from their device lists.
+ self.store.handle_potentially_left_users_txn(
+ txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
+ )
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
@@ -1221,23 +1244,19 @@ class PersistEventsStore:
def _update_forward_extremities_txn(
self,
txn: LoggingTransaction,
- new_forward_extremities: Dict[str, Set[str]],
+ room_id: str,
+ new_forward_extremities: Set[str],
max_stream_order: int,
) -> None:
- for room_id in new_forward_extremities.keys():
- self.db_pool.simple_delete_txn(
- txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
- )
+ self.db_pool.simple_delete_txn(
+ txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
+ )
self.db_pool.simple_insert_many_txn(
txn,
table="event_forward_extremities",
keys=("event_id", "room_id"),
- values=[
- (ev_id, room_id)
- for room_id, new_extrem in new_forward_extremities.items()
- for ev_id in new_extrem
- ],
+ values=[(ev_id, room_id) for ev_id in new_forward_extremities],
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
@@ -1249,8 +1268,7 @@ class PersistEventsStore:
keys=("room_id", "event_id", "stream_ordering"),
values=[
(room_id, event_id, max_stream_order)
- for room_id, new_extrem in new_forward_extremities.items()
- for event_id in new_extrem
+ for event_id in new_forward_extremities
],
)
@@ -1287,36 +1305,45 @@ class PersistEventsStore:
def _update_room_depths_txn(
self,
txn: LoggingTransaction,
+ room_id: str,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> None:
"""Update min_depth for each room
Args:
txn: db connection
+ room_id: The room ID
events_and_contexts: events we are persisting
"""
- depth_updates: Dict[str, int] = {}
+ stream_ordering: Optional[int] = None
+ depth_update = 0
for event, context in events_and_contexts:
- # Then update the `stream_ordering` position to mark the latest
- # event as the front of the room. This should not be done for
- # backfilled events because backfilled events have negative
- # stream_ordering and happened in the past so we know that we don't
- # need to update the stream_ordering tip/front for the room.
+ # Don't update the stream ordering for backfilled events because
+ # backfilled events have negative stream_ordering and happened in the
+ # past, so we know that we don't need to update the stream_ordering
+ # tip/front for the room.
assert event.internal_metadata.stream_ordering is not None
if event.internal_metadata.stream_ordering >= 0:
- txn.call_after(
- self.store._events_stream_cache.entity_has_changed,
- event.room_id,
- event.internal_metadata.stream_ordering,
- )
+ if stream_ordering is None:
+ stream_ordering = event.internal_metadata.stream_ordering
+ else:
+ stream_ordering = max(
+ stream_ordering, event.internal_metadata.stream_ordering
+ )
if not event.internal_metadata.is_outlier() and not context.rejected:
- depth_updates[event.room_id] = max(
- event.depth, depth_updates.get(event.room_id, event.depth)
- )
+ depth_update = max(event.depth, depth_update)
- for room_id, depth in depth_updates.items():
- self._update_min_depth_for_room_txn(txn, room_id, depth)
+ # Then update the `stream_ordering` position to mark the latest event as
+ # the front of the room.
+ if stream_ordering is not None:
+ txn.call_after(
+ self.store._events_stream_cache.entity_has_changed,
+ room_id,
+ stream_ordering,
+ )
+
+ self._update_min_depth_for_room_txn(txn, room_id, depth_update)
def _update_outliers_txn(
self,
@@ -1339,13 +1366,19 @@ class PersistEventsStore:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
- txn.execute(
- "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
- % (",".join(["?"] * len(events_and_contexts)),),
- [event.event_id for event, _ in events_and_contexts],
+ rows = cast(
+ List[Tuple[str, bool]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ "events",
+ "event_id",
+ [event.event_id for event, _ in events_and_contexts],
+ keyvalues={},
+ retcols=("event_id", "outlier"),
+ ),
)
- have_persisted = dict(cast(Iterable[Tuple[str, bool]], txn))
+ have_persisted = dict(rows)
logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
@@ -1443,7 +1476,7 @@ class PersistEventsStore:
txn,
table="event_json",
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
- values=(
+ values=[
(
event.event_id,
event.room_id,
@@ -1452,7 +1485,7 @@ class PersistEventsStore:
event.format_version,
)
for event, _ in events_and_contexts
- ),
+ ],
)
self.db_pool.simple_insert_many_txn(
@@ -1475,7 +1508,7 @@ class PersistEventsStore:
"state_key",
"rejection_reason",
),
- values=(
+ values=[
(
self._instance_name,
event.internal_metadata.stream_ordering,
@@ -1494,7 +1527,7 @@ class PersistEventsStore:
context.rejected,
)
for event, context in events_and_contexts
- ),
+ ],
)
# If we're persisting an unredacted event we go and ensure
@@ -1517,11 +1550,11 @@ class PersistEventsStore:
txn,
table="state_events",
keys=("event_id", "room_id", "type", "state_key"),
- values=(
+ values=[
(event.event_id, event.room_id, event.type, event.state_key)
for event, _ in events_and_contexts
if event.is_state()
- ),
+ ],
)
def _store_rejected_events_txn(
@@ -1654,8 +1687,6 @@ class PersistEventsStore:
) -> None:
to_prefill = []
- rows = []
-
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
@@ -1676,10 +1707,9 @@ class PersistEventsStore:
)
txn.execute(sql + clause, args)
- rows = self.db_pool.cursor_to_dict(txn)
- for row in rows:
- event = ev_map[row["event_id"]]
- if not row["rejects"] and not row["redacts"]:
+ for event_id, redacts, rejects in txn:
+ event = ev_map[event_id]
+ if not rejects and not redacts:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
async def external_prefill() -> None:
@@ -2259,35 +2289,59 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
- # From the events passed in, add all of the prev events as backwards extremities.
- # Ignore any events that are already backwards extrems or outliers.
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- # 1. Don't add an event as a extremity again if we already persisted it
- # as a non-outlier.
- # 2. Don't add an outlier as an extremity if it has no prev_events
- " AND NOT EXISTS ("
- " SELECT 1 FROM events"
- " LEFT JOIN event_edges edge"
- " ON edge.event_id = events.event_id"
- " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
- " )"
+
+ room_id = events[0].room_id
+
+ potential_backwards_extremities = {
+ e_id
+ for ev in events
+ for e_id in ev.prev_event_ids()
+ if not ev.internal_metadata.is_outlier()
+ }
+
+ if not potential_backwards_extremities:
+ return
+
+ existing_events_outliers = self.db_pool.simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=potential_backwards_extremities,
+ keyvalues={"outlier": False},
+ retcols=("event_id",),
)
- txn.execute_batch(
- query,
- [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
- for ev in events
- for e_id in ev.prev_event_ids()
- if not ev.internal_metadata.is_outlier()
- ],
+ potential_backwards_extremities.difference_update(
+ e for e, in existing_events_outliers
)
+ if potential_backwards_extremities:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="event_backward_extremities",
+ key_names=("room_id", "event_id"),
+ key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+ value_names=(),
+ value_values=(),
+ )
+
+ # Record the stream orderings where we have new gaps.
+ gap_events = [
+ (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+ for ev in events
+ if any(
+ e_id in potential_backwards_extremities
+ for e_id in ev.prev_event_ids()
+ )
+ ]
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="timeline_gaps",
+ keys=("room_id", "instance_name", "stream_ordering"),
+ values=gap_events,
+ )
+
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
|