summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py967
1 files changed, 956 insertions, 11 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py

index 1f7acdb859..60c92e5804 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -32,6 +32,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, cast, @@ -39,19 +40,27 @@ from typing import ( import attr from prometheus_client import Counter +from typing_extensions import TypedDict import synapse.metrics -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RelationTypes, +) from synapse.api.errors import PartialStateConflictError from synapse.api.room_versions import RoomVersions -from synapse.events import EventBase, relation_from_event +from synapse.events import EventBase, StrippedStateEvent, relation_from_event from synapse.events.snapshot import EventContext +from synapse.events.utils import parse_stripped_state_event from synapse.logging.opentracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + make_tuple_in_list_sql_clause, ) from synapse.storage.databases.main.event_federation import EventFederationStore from synapse.storage.databases.main.events_worker import EventCacheEntry @@ -59,7 +68,15 @@ from synapse.storage.databases.main.search import SearchEntry from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator -from synapse.types import JsonDict, StateMap, StrCollection, get_domain_from_id +from synapse.types import ( + JsonDict, + MutableStateMap, + StateMap, + StrCollection, + get_domain_from_id, +) +from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES +from synapse.types.state import StateFilter from synapse.util import json_encoder from synapse.util.iterutils import batch_iter, sorted_topologically from synapse.util.stringutils import non_null_str_or_none @@ -78,6 +95,19 @@ event_counter = Counter( ["type", "origin_type", "origin_entity"], ) +# State event type/key pairs that we need to gather to fill in the +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. +SLIDING_SYNC_RELEVANT_STATE_SET = ( + # So we can fill in the `room_type` column + (EventTypes.Create, ""), + # So we can fill in the `is_encrypted` column + (EventTypes.RoomEncryption, ""), + # So we can fill in the `room_name` column + (EventTypes.Name, ""), + # So we can fill in the `tombstone_successor_room_id` column + (EventTypes.Tombstone, ""), +) + @attr.s(slots=True, auto_attribs=True) class DeltaState: @@ -99,6 +129,82 @@ class DeltaState: return not self.to_delete and not self.to_insert and not self.no_longer_in_room +# We want `total=False` because we want to allow values to be unset. +class SlidingSyncStateInsertValues(TypedDict, total=False): + """ + Insert values relevant for the `sliding_sync_joined_rooms` and + `sliding_sync_membership_snapshots` database tables. + """ + + room_type: Optional[str] + is_encrypted: Optional[bool] + room_name: Optional[str] + tombstone_successor_room_id: Optional[str] + + +class SlidingSyncMembershipSnapshotSharedInsertValues( + SlidingSyncStateInsertValues, total=False +): + """ + Insert values for `sliding_sync_membership_snapshots` that we can share across + multiple memberships + """ + + has_known_state: Optional[bool] + + +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncMembershipInfo: + """ + Values unique to each membership + """ + + user_id: str + sender: str + membership_event_id: str + membership: str + membership_event_stream_ordering: int + membership_event_instance_name: str + + +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncTableChanges: + room_id: str + # `stream_ordering` of the most recent event being persisted in the room. This doesn't + # need to be perfect, we just need *some* answer that points to a real event in the + # room in case we are the first ones inserting into the `sliding_sync_joined_rooms` + # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality, + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after + # `_update_current_state_txn()` whenever a new event is persisted to update it to the + # correct latest value. + # + # This should be *some* value that points to a real event in the room if we are + # still joined to the room and some state is changing (`to_insert` or `to_delete`). + joined_room_best_effort_most_recent_stream_ordering: Optional[int] + # If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to + # fully-insert it which means we also need to include a `bump_stamp` value to use + # for the row. This should only be populated when we're trying to fully-insert a + # row. + # + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + joined_room_bump_stamp_to_fully_insert: Optional[int] + # Values to upsert into `sliding_sync_joined_rooms` + joined_room_updates: SlidingSyncStateInsertValues + + # Shared values to upsert into `sliding_sync_membership_snapshots` for each + # `to_insert_membership_snapshots` + membership_snapshot_shared_insert_values: ( + SlidingSyncMembershipSnapshotSharedInsertValues + ) + # List of membership to insert into `sliding_sync_membership_snapshots` + to_insert_membership_snapshots: List[SlidingSyncMembershipInfo] + # List of user_id to delete from `sliding_sync_membership_snapshots` + to_delete_membership_snapshots: List[str] + + @attr.s(slots=True, auto_attribs=True) class NewEventChainLinks: """Information about new auth chain links that need to be added to the DB. @@ -226,6 +332,14 @@ class PersistEventsStore: event.internal_metadata.stream_ordering = stream event.internal_metadata.instance_name = self._instance_name + sliding_sync_table_changes = None + if state_delta_for_room is not None: + sliding_sync_table_changes = ( + await self._calculate_sliding_sync_table_changes( + room_id, events_and_contexts, state_delta_for_room + ) + ) + await self.db_pool.runInteraction( "persist_events", self._persist_events_txn, @@ -235,6 +349,7 @@ class PersistEventsStore: state_delta_for_room=state_delta_for_room, new_forward_extremities=new_forward_extremities, new_event_links=new_event_links, + sliding_sync_table_changes=sliding_sync_table_changes, ) persist_event_counter.inc(len(events_and_contexts)) @@ -261,6 +376,341 @@ class PersistEventsStore: (room_id,), frozenset(new_forward_extremities) ) + async def _calculate_sliding_sync_table_changes( + self, + room_id: str, + events_and_contexts: Sequence[Tuple[EventBase, EventContext]], + delta_state: DeltaState, + ) -> SlidingSyncTableChanges: + """ + Calculate the changes to the `sliding_sync_membership_snapshots` and + `sliding_sync_joined_rooms` tables given the deltas that are going to be used to + update the `current_state_events` table. + + Just a bunch of pre-processing so we so we don't need to spend time in the + transaction itself gathering all of this info. It's also easier to deal with + redactions outside of a transaction. + + Args: + room_id: The room ID currently being processed. + events_and_contexts: List of tuples of (event, context) being persisted. + This is completely optional (you can pass an empty list) and will just + save us from fetching the events from the database if we already have + them. We assume the list is sorted ascending by `stream_ordering`. We + don't care about the sort when the events are backfilled (with negative + `stream_ordering`). + delta_state: Deltas that are going to be used to update the + `current_state_events` table. Changes to the current state of the room. + """ + to_insert = delta_state.to_insert + to_delete = delta_state.to_delete + + # If no state is changing, we don't need to do anything. This can happen when a + # partial-stated room is re-syncing the current state. + if not to_insert and not to_delete: + return SlidingSyncTableChanges( + room_id=room_id, + joined_room_best_effort_most_recent_stream_ordering=None, + joined_room_bump_stamp_to_fully_insert=None, + joined_room_updates={}, + membership_snapshot_shared_insert_values={}, + to_insert_membership_snapshots=[], + to_delete_membership_snapshots=[], + ) + + event_map = {event.event_id: event for event, _ in events_and_contexts} + + # Handle gathering info for the `sliding_sync_membership_snapshots` table + # + # This would only happen if someone was state reset out of the room + user_ids_to_delete_membership_snapshots = [ + state_key + for event_type, state_key in to_delete + if event_type == EventTypes.Member and self.is_mine_id(state_key) + ] + + membership_snapshot_shared_insert_values: ( + SlidingSyncMembershipSnapshotSharedInsertValues + ) = {} + membership_infos_to_insert_membership_snapshots: List[ + SlidingSyncMembershipInfo + ] = [] + if to_insert: + membership_event_id_to_user_id_map: Dict[str, str] = {} + for state_key, event_id in to_insert.items(): + if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): + membership_event_id_to_user_id_map[event_id] = state_key[1] + + membership_event_map: Dict[str, EventBase] = {} + # In normal event persist scenarios, we should be able to find the + # membership events in the `events_and_contexts` given to us but it's + # possible a state reset happened which added us to the room without a + # corresponding new membership event (reset back to a previous membership). + missing_membership_event_ids: Set[str] = set() + for membership_event_id in membership_event_id_to_user_id_map.keys(): + membership_event = event_map.get(membership_event_id) + if membership_event: + membership_event_map[membership_event_id] = membership_event + else: + missing_membership_event_ids.add(membership_event_id) + + # Otherwise, we need to find a couple events that we were reset to. + if missing_membership_event_ids: + remaining_events = await self.store.get_events( + missing_membership_event_ids + ) + # There shouldn't be any missing events + assert ( + remaining_events.keys() == missing_membership_event_ids + ), missing_membership_event_ids.difference(remaining_events.keys()) + membership_event_map.update(remaining_events) + + for ( + membership_event_id, + user_id, + ) in membership_event_id_to_user_id_map.items(): + # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point + membership_event_stream_ordering = membership_event_map[ + membership_event_id + ].internal_metadata.stream_ordering + assert membership_event_stream_ordering is not None + membership_event_instance_name = membership_event_map[ + membership_event_id + ].internal_metadata.instance_name + assert membership_event_instance_name is not None + + membership_infos_to_insert_membership_snapshots.append( + SlidingSyncMembershipInfo( + user_id=user_id, + sender=membership_event_map[membership_event_id].sender, + membership_event_id=membership_event_id, + membership=membership_event_map[membership_event_id].membership, + membership_event_stream_ordering=membership_event_stream_ordering, + membership_event_instance_name=membership_event_instance_name, + ) + ) + + if membership_infos_to_insert_membership_snapshots: + current_state_ids_map: MutableStateMap[str] = dict( + await self.store.get_partial_filtered_current_state_ids( + room_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + ) + ) + # Since we fetched the current state before we took `to_insert`/`to_delete` + # into account, we need to do a couple fixups. + # + # Update the current_state_map with what we have `to_delete` + for state_key in to_delete: + current_state_ids_map.pop(state_key, None) + # Update the current_state_map with what we have `to_insert` + for state_key, event_id in to_insert.items(): + if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: + current_state_ids_map[state_key] = event_id + + current_state_map: MutableStateMap[EventBase] = {} + # In normal event persist scenarios, we probably won't be able to find + # these state events in `events_and_contexts` since we don't generally + # batch up local membership changes with other events, but it can + # happen. + missing_state_event_ids: Set[str] = set() + for state_key, event_id in current_state_ids_map.items(): + event = event_map.get(event_id) + if event: + current_state_map[state_key] = event + else: + missing_state_event_ids.add(event_id) + + # Otherwise, we need to find a couple events + if missing_state_event_ids: + remaining_events = await self.store.get_events( + missing_state_event_ids + ) + # There shouldn't be any missing events + assert ( + remaining_events.keys() == missing_state_event_ids + ), missing_state_event_ids.difference(remaining_events.keys()) + for event in remaining_events.values(): + current_state_map[(event.type, event.state_key)] = event + + if current_state_map: + state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map + ) + membership_snapshot_shared_insert_values.update(state_insert_values) + # We have current state to work from + membership_snapshot_shared_insert_values["has_known_state"] = True + else: + # We don't have any `current_state_events` anymore (previously + # cleared out because of `no_longer_in_room`). This can happen if + # one user is joined and another is invited (some non-join + # membership). If the joined user leaves, we are `no_longer_in_room` + # and `current_state_events` is cleared out. When the invited user + # rejects the invite (leaves the room), we will end up here. + # + # In these cases, we should inherit the meta data from the previous + # snapshot so we shouldn't update any of the state values. When + # using sliding sync filters, this will prevent the room from + # disappearing/appearing just because you left the room. + # + # Ideally, we could additionally assert that we're only here for + # valid non-join membership transitions. + assert delta_state.no_longer_in_room + + # Handle gathering info for the `sliding_sync_joined_rooms` table + # + # We only deal with + # updating the state related columns. The + # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event + # persisting stack (see + # `_update_sliding_sync_tables_with_new_persisted_events_txn()`) + # + joined_room_updates: SlidingSyncStateInsertValues = {} + best_effort_most_recent_stream_ordering: Optional[int] = None + bump_stamp_to_fully_insert: Optional[int] = None + if not delta_state.no_longer_in_room: + current_state_ids_map = {} + + # Always fully-insert rows if they don't already exist in the + # `sliding_sync_joined_rooms` table. This way we can rely on a row if it + # exists in the table. + # + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + existing_row_in_table = await self.store.db_pool.simple_select_one_onecol( + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + retcol="room_id", + allow_none=True, + ) + if not existing_row_in_table: + most_recent_bump_event_pos_results = ( + await self.store.get_last_event_pos_in_room( + room_id, + event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, + ) + ) + bump_stamp_to_fully_insert = ( + most_recent_bump_event_pos_results[1].stream + if most_recent_bump_event_pos_results is not None + else None + ) + + current_state_ids_map = dict( + await self.store.get_partial_filtered_current_state_ids( + room_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + ) + ) + + # Look through the items we're going to insert into the current state to see + # if there is anything that we care about and should also update in the + # `sliding_sync_joined_rooms` table. + for state_key, event_id in to_insert.items(): + if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: + current_state_ids_map[state_key] = event_id + + # Get the full event objects for the current state events + # + # In normal event persist scenarios, we should be able to find the state + # events in the `events_and_contexts` given to us but it's possible a state + # reset happened which that reset back to a previous state. + current_state_map = {} + missing_event_ids: Set[str] = set() + for state_key, event_id in current_state_ids_map.items(): + event = event_map.get(event_id) + if event: + current_state_map[state_key] = event + else: + missing_event_ids.add(event_id) + + # Otherwise, we need to find a couple events that we were reset to. + if missing_event_ids: + remaining_events = await self.store.get_events( + current_state_ids_map.values() + ) + # There shouldn't be any missing events + assert ( + remaining_events.keys() == missing_event_ids + ), missing_event_ids.difference(remaining_events.keys()) + for event in remaining_events.values(): + current_state_map[(event.type, event.state_key)] = event + + joined_room_updates = ( + PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map + ) + ) + + # If something is being deleted from the state, we need to clear it out + for state_key in to_delete: + if state_key == (EventTypes.Create, ""): + joined_room_updates["room_type"] = None + elif state_key == (EventTypes.RoomEncryption, ""): + joined_room_updates["is_encrypted"] = False + elif state_key == (EventTypes.Name, ""): + joined_room_updates["room_name"] = None + + # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to + # be perfect, we just need *some* answer that points to a real event in the + # room in case we are the first ones inserting into the + # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on + # `event_stream_ordering`. In reality, + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after + # `_update_current_state_txn()` whenever a new event is persisted to update + # it to the correct latest value. + # + if len(events_and_contexts) > 0: + # Since the list is sorted ascending by `stream_ordering`, the last event + # should have the highest `stream_ordering`. + best_effort_most_recent_stream_ordering = events_and_contexts[-1][ + 0 + ].internal_metadata.stream_ordering + else: + # If there are no `events_and_contexts`, we assume it's one of two scenarios: + # 1. If there are new state `to_insert` but no `events_and_contexts`, + # then it's a state reset. + # 2. Otherwise, it's some partial-state room re-syncing the current state and + # going through un-partial process. + # + # Either way, we assume no new events are being persisted and we can + # find the latest already in the database. Since this is a best-effort + # value, we don't need to be perfect although I think we're pretty close + # here. + most_recent_event_pos_results = ( + await self.store.get_last_event_pos_in_room( + room_id, event_types=None + ) + ) + assert most_recent_event_pos_results, ( + f"We should not be seeing `None` here because we are still in the room ({room_id}) and " + + "it should at-least have a join membership event that's keeping us here." + ) + best_effort_most_recent_stream_ordering = most_recent_event_pos_results[ + 1 + ].stream + + # We should have found a value if we are still in the room + assert best_effort_most_recent_stream_ordering is not None + + return SlidingSyncTableChanges( + room_id=room_id, + # For `sliding_sync_joined_rooms` + joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering, + joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert, + joined_room_updates=joined_room_updates, + # For `sliding_sync_membership_snapshots` + membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values, + to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots, + to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots, + ) + async def calculate_chain_cover_index_for_events( self, room_id: str, events: Collection[EventBase] ) -> Dict[str, NewEventChainLinks]: @@ -458,6 +908,7 @@ class PersistEventsStore: state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], new_event_links: Dict[str, NewEventChainLinks], + sliding_sync_table_changes: Optional[SlidingSyncTableChanges], ) -> None: """Insert some number of room events into the necessary database tables. @@ -478,9 +929,14 @@ class PersistEventsStore: delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room: The current-state delta for the room. + state_delta_for_room: Deltas that are going to be used to update the + `current_state_events` table. Changes to the current state of the room. new_forward_extremities: The new forward extremities for the room: a set of the event ids which are the forward extremities. + sliding_sync_table_changes: Changes to the + `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables + derived from the given `delta_state` (see + `_calculate_sliding_sync_table_changes(...)`) Raises: PartialStateConflictError: if attempting to persist a partial state event in @@ -590,10 +1046,22 @@ class PersistEventsStore: # room_memberships, where applicable. # NB: This function invalidates all state related caches if state_delta_for_room: + # If the state delta exists, the sliding sync table changes should also exist + assert sliding_sync_table_changes is not None + self._update_current_state_txn( - txn, room_id, state_delta_for_room, min_stream_order + txn, + room_id, + state_delta_for_room, + min_stream_order, + sliding_sync_table_changes, ) + # We only update the sliding sync tables for non-backfilled events. + self._update_sliding_sync_tables_with_new_persisted_events_txn( + txn, room_id, events_and_contexts + ) + def _persist_event_auth_chain_txn( self, txn: LoggingTransaction, @@ -1128,8 +1596,20 @@ class PersistEventsStore: self, room_id: str, state_delta: DeltaState, + sliding_sync_table_changes: SlidingSyncTableChanges, ) -> None: - """Update the current state stored in the datatabase for the given room""" + """ + Update the current state stored in the datatabase for the given room + + Args: + room_id + state_delta: Deltas that are going to be used to update the + `current_state_events` table. Changes to the current state of the room. + sliding_sync_table_changes: Changes to the + `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables + derived from the given `delta_state` (see + `_calculate_sliding_sync_table_changes(...)`) + """ if state_delta.is_noop(): return @@ -1141,6 +1621,7 @@ class PersistEventsStore: room_id, delta_state=state_delta, stream_id=stream_ordering, + sliding_sync_table_changes=sliding_sync_table_changes, ) def _update_current_state_txn( @@ -1149,16 +1630,34 @@ class PersistEventsStore: room_id: str, delta_state: DeltaState, stream_id: int, + sliding_sync_table_changes: SlidingSyncTableChanges, ) -> None: + """ + Handles updating tables that track the current state of a room. + + Args: + txn + room_id + delta_state: Deltas that are going to be used to update the + `current_state_events` table. Changes to the current state of the room. + stream_id: TODO + sliding_sync_table_changes: Changes to the + `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables + derived from the given `delta_state` (see + `_calculate_sliding_sync_table_changes(...)`) + """ to_delete = delta_state.to_delete to_insert = delta_state.to_insert + # Sanity check we're processing the same thing + assert room_id == sliding_sync_table_changes.room_id + # Figure out the changes of membership to invalidate the # `get_rooms_for_user` cache. # We find out which membership events we may have deleted # and which we have added, then we invalidate the caches for all # those users. - members_changed = { + members_to_cache_bust = { state_key for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member @@ -1182,16 +1681,22 @@ class PersistEventsStore: """ txn.execute(sql, (stream_id, self._instance_name, room_id)) + # Grab the list of users before we clear out the current state + users_in_room = self.store.get_users_in_room_txn(txn, room_id) # We also want to invalidate the membership caches for users # that were in the room. - users_in_room = self.store.get_users_in_room_txn(txn, room_id) - members_changed.update(users_in_room) + members_to_cache_bust.update(users_in_room) self.db_pool.simple_delete_txn( txn, table="current_state_events", keyvalues={"room_id": room_id}, ) + self.db_pool.simple_delete_txn( + txn, + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + ) else: # We're still in the room, so we update the current state as normal. @@ -1260,6 +1765,41 @@ class PersistEventsStore: ], ) + # Handle updating the `sliding_sync_joined_rooms` table. We only deal with + # updating the state related columns. The + # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event + # persisting stack (see + # `_update_sliding_sync_tables_with_new_persisted_events_txn()`) + # + # We only need to update when one of the relevant state values has changed + if sliding_sync_table_changes.joined_room_updates: + # This should be *some* value that points to a real event in the room if + # we are still joined to the room. + assert ( + sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering + is not None + ) + + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + values=sliding_sync_table_changes.joined_room_updates, + insertion_values={ + # The reason we're only *inserting* (not *updating*) + # `event_stream_ordering` here is because the column has a `NON + # NULL` constraint and we need *some* answer. And if the row + # already exists, it already has the correct value and it's + # better to just rely on + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` + # to do the right thing (same for `bump_stamp`). + "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering, + # If we're trying to fully-insert a row, we need to provide a + # value for `bump_stamp` if it exists for the room. + "bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, + }, + ) + # We now update `local_current_membership`. We do this regardless # of whether we're still in the room or not to handle the case where # e.g. we just got banned (where we need to record that fact here). @@ -1296,6 +1836,60 @@ class PersistEventsStore: ], ) + # Handle updating the `sliding_sync_membership_snapshots` table + # + # This would only happen if someone was state reset out of the room + if sliding_sync_table_changes.to_delete_membership_snapshots: + self.db_pool.simple_delete_many_txn( + txn, + table="sliding_sync_membership_snapshots", + column="user_id", + values=sliding_sync_table_changes.to_delete_membership_snapshots, + keyvalues={"room_id": room_id}, + ) + + # We do this regardless of whether the server is `no_longer_in_room` or not + # because we still want a row if a local user was just left/kicked or got banned + # from the room. + if sliding_sync_table_changes.to_insert_membership_snapshots: + # Update the `sliding_sync_membership_snapshots` table + # + # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys` + # because there are other fields in the `ON CONFLICT` upsert to run (see + # inherit case above for more context when this happens). + self.db_pool.simple_upsert_many_txn( + txn=txn, + table="sliding_sync_membership_snapshots", + key_names=("room_id", "user_id"), + key_values=[ + (room_id, membership_info.user_id) + for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots + ], + value_names=[ + "sender", + "membership_event_id", + "membership", + "event_stream_ordering", + "event_instance_name", + ] + + list( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() + ), + value_values=[ + [ + membership_info.sender, + membership_info.membership_event_id, + membership_info.membership, + membership_info.membership_event_stream_ordering, + membership_info.membership_event_instance_name, + ] + + list( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() + ) + for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots + ], + ) + txn.call_after( self.store._curr_state_delta_stream_cache.entity_has_changed, room_id, @@ -1303,13 +1897,302 @@ class PersistEventsStore: ) # Invalidate the various caches - self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed) + self.store._invalidate_state_caches_and_stream( + txn, room_id, members_to_cache_bust + ) # Check if any of the remote membership changes requires us to # unsubscribe from their device lists. self.store.handle_potentially_left_users_txn( - txn, {m for m in members_changed if not self.hs.is_mine_id(m)} + txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)} + ) + + @classmethod + def _get_relevant_sliding_sync_current_state_event_ids_txn( + cls, txn: LoggingTransaction, room_id: str + ) -> MutableStateMap[str]: + """ + Fetch the current state event IDs for the relevant (to the + `sliding_sync_joined_rooms` table) state types for the given room. + + Returns: + A tuple of: + 1. StateMap of event IDs necessary to to fetch the relevant state values + needed to insert into the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`. + 2. The corresponding latest `stream_id` in the + `current_state_delta_stream` table. This is useful to compare against + the `current_state_delta_stream` table later so you can check whether + the current state has changed since you last fetched the current + state. + """ + # Fetch the current state event IDs from the database + ( + event_type_and_state_key_in_list_clause, + event_type_and_state_key_args, + ) = make_tuple_in_list_sql_clause( + txn.database_engine, + ("type", "state_key"), + SLIDING_SYNC_RELEVANT_STATE_SET, + ) + txn.execute( + f""" + SELECT c.event_id, c.type, c.state_key + FROM current_state_events AS c + WHERE + c.room_id = ? + AND {event_type_and_state_key_in_list_clause} + """, + [room_id] + event_type_and_state_key_args, + ) + current_state_map: MutableStateMap[str] = { + (event_type, state_key): event_id for event_id, event_type, state_key in txn + } + + return current_state_map + + @classmethod + def _get_sliding_sync_insert_values_from_state_map( + cls, state_map: StateMap[EventBase] + ) -> SlidingSyncStateInsertValues: + """ + Extract the relevant state values from the `state_map` needed to insert into the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + + Returns: + Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant + state values needed to insert into + the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + """ + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_insert_map: SlidingSyncStateInsertValues = {} + + # Parse the raw event JSON + for state_key, event in state_map.items(): + if state_key == (EventTypes.Create, ""): + room_type = event.content.get(EventContentFields.ROOM_TYPE) + # Scrutinize JSON values + if room_type is None or isinstance(room_type, str): + sliding_sync_insert_map["room_type"] = room_type + elif state_key == (EventTypes.RoomEncryption, ""): + encryption_algorithm = event.content.get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + is_encrypted = encryption_algorithm is not None + sliding_sync_insert_map["is_encrypted"] = is_encrypted + elif state_key == (EventTypes.Name, ""): + room_name = event.content.get(EventContentFields.ROOM_NAME) + # Scrutinize JSON values + if room_name is None or isinstance(room_name, str): + sliding_sync_insert_map["room_name"] = room_name + elif state_key == (EventTypes.Tombstone, ""): + successor_room_id = event.content.get( + EventContentFields.TOMBSTONE_SUCCESSOR_ROOM + ) + # Scrutinize JSON values + if successor_room_id is None or isinstance(successor_room_id, str): + sliding_sync_insert_map["tombstone_successor_room_id"] = ( + successor_room_id + ) + else: + # We only expect to see events according to the + # `SLIDING_SYNC_RELEVANT_STATE_SET`. + raise AssertionError( + "Unexpected event (we should not be fetching extra events or this " + + "piece of code needs to be updated to handle a new event type added " + + "to `SLIDING_SYNC_RELEVANT_STATE_SET`): {state_key} {event.event_id}" + ) + + return sliding_sync_insert_map + + @classmethod + def _get_sliding_sync_insert_values_from_stripped_state( + cls, unsigned_stripped_state_events: Any + ) -> SlidingSyncMembershipSnapshotSharedInsertValues: + """ + Pull out the relevant state values from the stripped state on an invite or knock + membership event needed to insert into the `sliding_sync_membership_snapshots` + tables. + + Returns: + Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant + state values needed to insert into the `sliding_sync_membership_snapshots` tables. + """ + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {} + + if unsigned_stripped_state_events is not None: + stripped_state_map: MutableStateMap[StrippedStateEvent] = {} + if isinstance(unsigned_stripped_state_events, list): + for raw_stripped_event in unsigned_stripped_state_events: + stripped_state_event = parse_stripped_state_event( + raw_stripped_event + ) + if stripped_state_event is not None: + stripped_state_map[ + ( + stripped_state_event.type, + stripped_state_event.state_key, + ) + ] = stripped_state_event + + # If there is some stripped state, we assume the remote server passed *all* + # of the potential stripped state events for the room. + create_stripped_event = stripped_state_map.get((EventTypes.Create, "")) + # Sanity check that we at-least have the create event + if create_stripped_event is not None: + sliding_sync_insert_map["has_known_state"] = True + + # XXX: Keep this up-to-date with `SLIDING_SYNC_RELEVANT_STATE_SET` + + # Find the room_type + sliding_sync_insert_map["room_type"] = ( + create_stripped_event.content.get(EventContentFields.ROOM_TYPE) + if create_stripped_event is not None + else None + ) + + # Find whether the room is_encrypted + encryption_stripped_event = stripped_state_map.get( + (EventTypes.RoomEncryption, "") + ) + encryption = ( + encryption_stripped_event.content.get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + if encryption_stripped_event is not None + else None + ) + sliding_sync_insert_map["is_encrypted"] = encryption is not None + + # Find the room_name + room_name_stripped_event = stripped_state_map.get((EventTypes.Name, "")) + sliding_sync_insert_map["room_name"] = ( + room_name_stripped_event.content.get(EventContentFields.ROOM_NAME) + if room_name_stripped_event is not None + else None + ) + + # Find the tombstone_successor_room_id + # Note: This isn't one of the stripped state events according to the spec + # but seems like there is no reason not to support this kind of thing. + tombstone_stripped_event = stripped_state_map.get( + (EventTypes.Tombstone, "") + ) + sliding_sync_insert_map["tombstone_successor_room_id"] = ( + tombstone_stripped_event.content.get( + EventContentFields.TOMBSTONE_SUCCESSOR_ROOM + ) + if tombstone_stripped_event is not None + else None + ) + + else: + # No stripped state provided + sliding_sync_insert_map["has_known_state"] = False + sliding_sync_insert_map["room_type"] = None + sliding_sync_insert_map["room_name"] = None + sliding_sync_insert_map["is_encrypted"] = False + else: + # No stripped state provided + sliding_sync_insert_map["has_known_state"] = False + sliding_sync_insert_map["room_type"] = None + sliding_sync_insert_map["room_name"] = None + sliding_sync_insert_map["is_encrypted"] = False + + return sliding_sync_insert_map + + def _update_sliding_sync_tables_with_new_persisted_events_txn( + self, + txn: LoggingTransaction, + room_id: str, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ) -> None: + """ + Update the latest `event_stream_ordering`/`bump_stamp` columns in the + `sliding_sync_joined_rooms` table for the room with new events. + + This function assumes that `_store_event_txn()` (to persist the event) and + `_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has + been updated with rooms that were joined) have already been run. + + Args: + txn + room_id: The room that all of the events belong to + events_and_contexts: The events being persisted. We assume the list is + sorted ascending by `stream_ordering`. We don't care about the sort when the + events are backfilled (with negative `stream_ordering`). + """ + + # Nothing to do if there are no events + if len(events_and_contexts) == 0: + return + + # We only update the sliding sync tables for non-backfilled events. + # + # Check if the first event is a backfilled event (with a negative + # `stream_ordering`). If one event is backfilled, we assume this whole batch was + # backfilled. + first_event_stream_ordering = events_and_contexts[0][ + 0 + ].internal_metadata.stream_ordering + # This should exist for persisted events + assert first_event_stream_ordering is not None + if first_event_stream_ordering < 0: + return + + # Since the list is sorted ascending by `stream_ordering`, the last event should + # have the highest `stream_ordering`. + max_stream_ordering = events_and_contexts[-1][ + 0 + ].internal_metadata.stream_ordering + max_bump_stamp = None + for event, _ in reversed(events_and_contexts): + # Sanity check that all events belong to the same room + assert event.room_id == room_id + + if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: + # This should exist for persisted events + assert event.internal_metadata.stream_ordering is not None + + max_bump_stamp = event.internal_metadata.stream_ordering + + # Since we're iterating in reverse, we can break as soon as we find a + # matching bump event which should have the highest `stream_ordering`. + break + + # We should have exited earlier if there were no events + assert ( + max_stream_ordering is not None + ), "Expected to have a stream_ordering if we have events" + + # Handle updating the `sliding_sync_joined_rooms` table. + # + txn.execute( + """ + UPDATE sliding_sync_joined_rooms + SET + event_stream_ordering = CASE + WHEN event_stream_ordering IS NULL OR event_stream_ordering < ? + THEN ? + ELSE event_stream_ordering + END, + bump_stamp = CASE + WHEN bump_stamp IS NULL OR bump_stamp < ? + THEN ? + ELSE bump_stamp + END + WHERE room_id = ? + """, + ( + max_stream_ordering, + max_stream_ordering, + max_bump_stamp, + max_bump_stamp, + room_id, + ), ) + # This may or may not update any rows depending if we are `no_longer_in_room` def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None: """Update the room version in the database based off current state @@ -1931,7 +2814,9 @@ class PersistEventsStore: ) for event in events: + # Sanity check that we're working with persisted events assert event.internal_metadata.stream_ordering is not None + assert event.internal_metadata.instance_name is not None # We update the local_current_membership table only if the event is # "current", i.e., its something that has just happened. @@ -1945,6 +2830,16 @@ class PersistEventsStore: and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): + # The only sort of out-of-band-membership events we expect to see here + # are remote invites/knocks and LEAVE events corresponding to + # rejected/retracted invites and rescinded knocks. + assert event.type == EventTypes.Member + assert event.membership in ( + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + ) + self.db_pool.simple_upsert_txn( txn, table="local_current_membership", @@ -1956,6 +2851,56 @@ class PersistEventsStore: }, ) + # Handle updating the `sliding_sync_membership_snapshots` table + # (out-of-band membership events only) + # + raw_stripped_state_events = None + if event.membership == Membership.INVITE: + invite_room_state = event.unsigned.get("invite_room_state") + raw_stripped_state_events = invite_room_state + elif event.membership == Membership.KNOCK: + knock_room_state = event.unsigned.get("knock_room_state") + raw_stripped_state_events = knock_room_state + + insert_values = { + "sender": event.sender, + "membership_event_id": event.event_id, + "membership": event.membership, + "event_stream_ordering": event.internal_metadata.stream_ordering, + "event_instance_name": event.internal_metadata.instance_name, + } + if event.membership == Membership.LEAVE: + # Inherit the meta data from the remote invite/knock. When using + # sliding sync filters, this will prevent the room from + # disappearing/appearing just because you left the room. + pass + elif event.membership in (Membership.INVITE, Membership.KNOCK): + extra_insert_values = ( + self._get_sliding_sync_insert_values_from_stripped_state( + raw_stripped_state_events + ) + ) + insert_values.update(extra_insert_values) + else: + # We don't know how to handle this type of membership yet + # + # FIXME: We should use `assert_never` here but for some reason + # the exhaustive matching doesn't recognize the `Never` here. + # assert_never(event.membership) + raise AssertionError( + f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet" + ) + + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_membership_snapshots", + keyvalues={ + "room_id": event.room_id, + "user_id": event.state_key, + }, + values=insert_values, + ) + def _handle_event_relations( self, txn: LoggingTransaction, event: EventBase ) -> None: