diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..60c92e5804 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -32,6 +32,7 @@ from typing import (
Iterable,
List,
Optional,
+ Sequence,
Set,
Tuple,
cast,
@@ -39,19 +40,27 @@ from typing import (
import attr
from prometheus_client import Counter
+from typing_extensions import TypedDict
import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+)
from synapse.api.errors import PartialStateConflictError
from synapse.api.room_versions import RoomVersions
-from synapse.events import EventBase, relation_from_event
+from synapse.events import EventBase, StrippedStateEvent, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ make_tuple_in_list_sql_clause,
)
from synapse.storage.databases.main.event_federation import EventFederationStore
from synapse.storage.databases.main.events_worker import EventCacheEntry
@@ -59,7 +68,15 @@ from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
-from synapse.types import JsonDict, StateMap, StrCollection, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ MutableStateMap,
+ StateMap,
+ StrCollection,
+ get_domain_from_id,
+)
+from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.stringutils import non_null_str_or_none
@@ -78,6 +95,19 @@ event_counter = Counter(
["type", "origin_type", "origin_entity"],
)
+# State event type/key pairs that we need to gather to fill in the
+# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+SLIDING_SYNC_RELEVANT_STATE_SET = (
+ # So we can fill in the `room_type` column
+ (EventTypes.Create, ""),
+ # So we can fill in the `is_encrypted` column
+ (EventTypes.RoomEncryption, ""),
+ # So we can fill in the `room_name` column
+ (EventTypes.Name, ""),
+ # So we can fill in the `tombstone_successor_room_id` column
+ (EventTypes.Tombstone, ""),
+)
+
@attr.s(slots=True, auto_attribs=True)
class DeltaState:
@@ -99,6 +129,82 @@ class DeltaState:
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
+# We want `total=False` because we want to allow values to be unset.
+class SlidingSyncStateInsertValues(TypedDict, total=False):
+ """
+ Insert values relevant for the `sliding_sync_joined_rooms` and
+ `sliding_sync_membership_snapshots` database tables.
+ """
+
+ room_type: Optional[str]
+ is_encrypted: Optional[bool]
+ room_name: Optional[str]
+ tombstone_successor_room_id: Optional[str]
+
+
+class SlidingSyncMembershipSnapshotSharedInsertValues(
+ SlidingSyncStateInsertValues, total=False
+):
+ """
+ Insert values for `sliding_sync_membership_snapshots` that we can share across
+ multiple memberships
+ """
+
+ has_known_state: Optional[bool]
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncMembershipInfo:
+ """
+ Values unique to each membership
+ """
+
+ user_id: str
+ sender: str
+ membership_event_id: str
+ membership: str
+ membership_event_stream_ordering: int
+ membership_event_instance_name: str
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncTableChanges:
+ room_id: str
+ # `stream_ordering` of the most recent event being persisted in the room. This doesn't
+ # need to be perfect, we just need *some* answer that points to a real event in the
+ # room in case we are the first ones inserting into the `sliding_sync_joined_rooms`
+ # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality,
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
+ # `_update_current_state_txn()` whenever a new event is persisted to update it to the
+ # correct latest value.
+ #
+ # This should be *some* value that points to a real event in the room if we are
+ # still joined to the room and some state is changing (`to_insert` or `to_delete`).
+ joined_room_best_effort_most_recent_stream_ordering: Optional[int]
+ # If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
+ # fully-insert it which means we also need to include a `bump_stamp` value to use
+ # for the row. This should only be populated when we're trying to fully-insert a
+ # row.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ joined_room_bump_stamp_to_fully_insert: Optional[int]
+ # Values to upsert into `sliding_sync_joined_rooms`
+ joined_room_updates: SlidingSyncStateInsertValues
+
+ # Shared values to upsert into `sliding_sync_membership_snapshots` for each
+ # `to_insert_membership_snapshots`
+ membership_snapshot_shared_insert_values: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ )
+ # List of membership to insert into `sliding_sync_membership_snapshots`
+ to_insert_membership_snapshots: List[SlidingSyncMembershipInfo]
+ # List of user_id to delete from `sliding_sync_membership_snapshots`
+ to_delete_membership_snapshots: List[str]
+
+
@attr.s(slots=True, auto_attribs=True)
class NewEventChainLinks:
"""Information about new auth chain links that need to be added to the DB.
@@ -226,6 +332,14 @@ class PersistEventsStore:
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
+ sliding_sync_table_changes = None
+ if state_delta_for_room is not None:
+ sliding_sync_table_changes = (
+ await self._calculate_sliding_sync_table_changes(
+ room_id, events_and_contexts, state_delta_for_room
+ )
+ )
+
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
@@ -235,6 +349,7 @@ class PersistEventsStore:
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
new_event_links=new_event_links,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
@@ -261,6 +376,341 @@ class PersistEventsStore:
(room_id,), frozenset(new_forward_extremities)
)
+ async def _calculate_sliding_sync_table_changes(
+ self,
+ room_id: str,
+ events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+ delta_state: DeltaState,
+ ) -> SlidingSyncTableChanges:
+ """
+ Calculate the changes to the `sliding_sync_membership_snapshots` and
+ `sliding_sync_joined_rooms` tables given the deltas that are going to be used to
+ update the `current_state_events` table.
+
+ Just a bunch of pre-processing so we so we don't need to spend time in the
+ transaction itself gathering all of this info. It's also easier to deal with
+ redactions outside of a transaction.
+
+ Args:
+ room_id: The room ID currently being processed.
+ events_and_contexts: List of tuples of (event, context) being persisted.
+ This is completely optional (you can pass an empty list) and will just
+ save us from fetching the events from the database if we already have
+ them. We assume the list is sorted ascending by `stream_ordering`. We
+ don't care about the sort when the events are backfilled (with negative
+ `stream_ordering`).
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ """
+ to_insert = delta_state.to_insert
+ to_delete = delta_state.to_delete
+
+ # If no state is changing, we don't need to do anything. This can happen when a
+ # partial-stated room is re-syncing the current state.
+ if not to_insert and not to_delete:
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ joined_room_best_effort_most_recent_stream_ordering=None,
+ joined_room_bump_stamp_to_fully_insert=None,
+ joined_room_updates={},
+ membership_snapshot_shared_insert_values={},
+ to_insert_membership_snapshots=[],
+ to_delete_membership_snapshots=[],
+ )
+
+ event_map = {event.event_id: event for event, _ in events_and_contexts}
+
+ # Handle gathering info for the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ user_ids_to_delete_membership_snapshots = [
+ state_key
+ for event_type, state_key in to_delete
+ if event_type == EventTypes.Member and self.is_mine_id(state_key)
+ ]
+
+ membership_snapshot_shared_insert_values: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ ) = {}
+ membership_infos_to_insert_membership_snapshots: List[
+ SlidingSyncMembershipInfo
+ ] = []
+ if to_insert:
+ membership_event_id_to_user_id_map: Dict[str, str] = {}
+ for state_key, event_id in to_insert.items():
+ if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
+ membership_event_id_to_user_id_map[event_id] = state_key[1]
+
+ membership_event_map: Dict[str, EventBase] = {}
+ # In normal event persist scenarios, we should be able to find the
+ # membership events in the `events_and_contexts` given to us but it's
+ # possible a state reset happened which added us to the room without a
+ # corresponding new membership event (reset back to a previous membership).
+ missing_membership_event_ids: Set[str] = set()
+ for membership_event_id in membership_event_id_to_user_id_map.keys():
+ membership_event = event_map.get(membership_event_id)
+ if membership_event:
+ membership_event_map[membership_event_id] = membership_event
+ else:
+ missing_membership_event_ids.add(membership_event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_membership_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_membership_event_ids
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_membership_event_ids
+ ), missing_membership_event_ids.difference(remaining_events.keys())
+ membership_event_map.update(remaining_events)
+
+ for (
+ membership_event_id,
+ user_id,
+ ) in membership_event_id_to_user_id_map.items():
+ # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point
+ membership_event_stream_ordering = membership_event_map[
+ membership_event_id
+ ].internal_metadata.stream_ordering
+ assert membership_event_stream_ordering is not None
+ membership_event_instance_name = membership_event_map[
+ membership_event_id
+ ].internal_metadata.instance_name
+ assert membership_event_instance_name is not None
+
+ membership_infos_to_insert_membership_snapshots.append(
+ SlidingSyncMembershipInfo(
+ user_id=user_id,
+ sender=membership_event_map[membership_event_id].sender,
+ membership_event_id=membership_event_id,
+ membership=membership_event_map[membership_event_id].membership,
+ membership_event_stream_ordering=membership_event_stream_ordering,
+ membership_event_instance_name=membership_event_instance_name,
+ )
+ )
+
+ if membership_infos_to_insert_membership_snapshots:
+ current_state_ids_map: MutableStateMap[str] = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+ # Since we fetched the current state before we took `to_insert`/`to_delete`
+ # into account, we need to do a couple fixups.
+ #
+ # Update the current_state_map with what we have `to_delete`
+ for state_key in to_delete:
+ current_state_ids_map.pop(state_key, None)
+ # Update the current_state_map with what we have `to_insert`
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ current_state_map: MutableStateMap[EventBase] = {}
+ # In normal event persist scenarios, we probably won't be able to find
+ # these state events in `events_and_contexts` since we don't generally
+ # batch up local membership changes with other events, but it can
+ # happen.
+ missing_state_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_state_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events
+ if missing_state_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_state_event_ids
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_state_event_ids
+ ), missing_state_event_ids.difference(remaining_events.keys())
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ if current_state_map:
+ state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ membership_snapshot_shared_insert_values.update(state_insert_values)
+ # We have current state to work from
+ membership_snapshot_shared_insert_values["has_known_state"] = True
+ else:
+ # We don't have any `current_state_events` anymore (previously
+ # cleared out because of `no_longer_in_room`). This can happen if
+ # one user is joined and another is invited (some non-join
+ # membership). If the joined user leaves, we are `no_longer_in_room`
+ # and `current_state_events` is cleared out. When the invited user
+ # rejects the invite (leaves the room), we will end up here.
+ #
+ # In these cases, we should inherit the meta data from the previous
+ # snapshot so we shouldn't update any of the state values. When
+ # using sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ #
+ # Ideally, we could additionally assert that we're only here for
+ # valid non-join membership transitions.
+ assert delta_state.no_longer_in_room
+
+ # Handle gathering info for the `sliding_sync_joined_rooms` table
+ #
+ # We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ joined_room_updates: SlidingSyncStateInsertValues = {}
+ best_effort_most_recent_stream_ordering: Optional[int] = None
+ bump_stamp_to_fully_insert: Optional[int] = None
+ if not delta_state.no_longer_in_room:
+ current_state_ids_map = {}
+
+ # Always fully-insert rows if they don't already exist in the
+ # `sliding_sync_joined_rooms` table. This way we can rely on a row if it
+ # exists in the table.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ existing_row_in_table = await self.store.db_pool.simple_select_one_onecol(
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="room_id",
+ allow_none=True,
+ )
+ if not existing_row_in_table:
+ most_recent_bump_event_pos_results = (
+ await self.store.get_last_event_pos_in_room(
+ room_id,
+ event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+ )
+ )
+ bump_stamp_to_fully_insert = (
+ most_recent_bump_event_pos_results[1].stream
+ if most_recent_bump_event_pos_results is not None
+ else None
+ )
+
+ current_state_ids_map = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+
+ # Look through the items we're going to insert into the current state to see
+ # if there is anything that we care about and should also update in the
+ # `sliding_sync_joined_rooms` table.
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ # Get the full event objects for the current state events
+ #
+ # In normal event persist scenarios, we should be able to find the state
+ # events in the `events_and_contexts` given to us but it's possible a state
+ # reset happened which that reset back to a previous state.
+ current_state_map = {}
+ missing_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_event_ids:
+ remaining_events = await self.store.get_events(
+ current_state_ids_map.values()
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_event_ids
+ ), missing_event_ids.difference(remaining_events.keys())
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ joined_room_updates = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ )
+
+ # If something is being deleted from the state, we need to clear it out
+ for state_key in to_delete:
+ if state_key == (EventTypes.Create, ""):
+ joined_room_updates["room_type"] = None
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ joined_room_updates["is_encrypted"] = False
+ elif state_key == (EventTypes.Name, ""):
+ joined_room_updates["room_name"] = None
+
+ # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to
+ # be perfect, we just need *some* answer that points to a real event in the
+ # room in case we are the first ones inserting into the
+ # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on
+ # `event_stream_ordering`. In reality,
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
+ # `_update_current_state_txn()` whenever a new event is persisted to update
+ # it to the correct latest value.
+ #
+ if len(events_and_contexts) > 0:
+ # Since the list is sorted ascending by `stream_ordering`, the last event
+ # should have the highest `stream_ordering`.
+ best_effort_most_recent_stream_ordering = events_and_contexts[-1][
+ 0
+ ].internal_metadata.stream_ordering
+ else:
+ # If there are no `events_and_contexts`, we assume it's one of two scenarios:
+ # 1. If there are new state `to_insert` but no `events_and_contexts`,
+ # then it's a state reset.
+ # 2. Otherwise, it's some partial-state room re-syncing the current state and
+ # going through un-partial process.
+ #
+ # Either way, we assume no new events are being persisted and we can
+ # find the latest already in the database. Since this is a best-effort
+ # value, we don't need to be perfect although I think we're pretty close
+ # here.
+ most_recent_event_pos_results = (
+ await self.store.get_last_event_pos_in_room(
+ room_id, event_types=None
+ )
+ )
+ assert most_recent_event_pos_results, (
+ f"We should not be seeing `None` here because we are still in the room ({room_id}) and "
+ + "it should at-least have a join membership event that's keeping us here."
+ )
+ best_effort_most_recent_stream_ordering = most_recent_event_pos_results[
+ 1
+ ].stream
+
+ # We should have found a value if we are still in the room
+ assert best_effort_most_recent_stream_ordering is not None
+
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ # For `sliding_sync_joined_rooms`
+ joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering,
+ joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
+ joined_room_updates=joined_room_updates,
+ # For `sliding_sync_membership_snapshots`
+ membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
+ to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots,
+ to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots,
+ )
+
async def calculate_chain_cover_index_for_events(
self, room_id: str, events: Collection[EventBase]
) -> Dict[str, NewEventChainLinks]:
@@ -458,6 +908,7 @@ class PersistEventsStore:
state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Optional[Set[str]],
new_event_links: Dict[str, NewEventChainLinks],
+ sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Insert some number of room events into the necessary database tables.
@@ -478,9 +929,14 @@ class PersistEventsStore:
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room: The current-state delta for the room.
+ state_delta_for_room: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
new_forward_extremities: The new forward extremities for the room:
a set of the event ids which are the forward extremities.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
@@ -590,10 +1046,22 @@ class PersistEventsStore:
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
if state_delta_for_room:
+ # If the state delta exists, the sliding sync table changes should also exist
+ assert sliding_sync_table_changes is not None
+
self._update_current_state_txn(
- txn, room_id, state_delta_for_room, min_stream_order
+ txn,
+ room_id,
+ state_delta_for_room,
+ min_stream_order,
+ sliding_sync_table_changes,
)
+ # We only update the sliding sync tables for non-backfilled events.
+ self._update_sliding_sync_tables_with_new_persisted_events_txn(
+ txn, room_id, events_and_contexts
+ )
+
def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,
@@ -1128,8 +1596,20 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
- """Update the current state stored in the datatabase for the given room"""
+ """
+ Update the current state stored in the datatabase for the given room
+
+ Args:
+ room_id
+ state_delta: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
if state_delta.is_noop():
return
@@ -1141,6 +1621,7 @@ class PersistEventsStore:
room_id,
delta_state=state_delta,
stream_id=stream_ordering,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
def _update_current_state_txn(
@@ -1149,16 +1630,34 @@ class PersistEventsStore:
room_id: str,
delta_state: DeltaState,
stream_id: int,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
+ """
+ Handles updating tables that track the current state of a room.
+
+ Args:
+ txn
+ room_id
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ stream_id: TODO
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
+ # Sanity check we're processing the same thing
+ assert room_id == sliding_sync_table_changes.room_id
+
# Figure out the changes of membership to invalidate the
# `get_rooms_for_user` cache.
# We find out which membership events we may have deleted
# and which we have added, then we invalidate the caches for all
# those users.
- members_changed = {
+ members_to_cache_bust = {
state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
@@ -1182,16 +1681,22 @@ class PersistEventsStore:
"""
txn.execute(sql, (stream_id, self._instance_name, room_id))
+ # Grab the list of users before we clear out the current state
+ users_in_room = self.store.get_users_in_room_txn(txn, room_id)
# We also want to invalidate the membership caches for users
# that were in the room.
- users_in_room = self.store.get_users_in_room_txn(txn, room_id)
- members_changed.update(users_in_room)
+ members_to_cache_bust.update(users_in_room)
self.db_pool.simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ )
else:
# We're still in the room, so we update the current state as normal.
@@ -1260,6 +1765,41 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_joined_rooms` table. We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ # We only need to update when one of the relevant state values has changed
+ if sliding_sync_table_changes.joined_room_updates:
+ # This should be *some* value that points to a real event in the room if
+ # we are still joined to the room.
+ assert (
+ sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering
+ is not None
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ values=sliding_sync_table_changes.joined_room_updates,
+ insertion_values={
+ # The reason we're only *inserting* (not *updating*)
+ # `event_stream_ordering` here is because the column has a `NON
+ # NULL` constraint and we need *some* answer. And if the row
+ # already exists, it already has the correct value and it's
+ # better to just rely on
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`
+ # to do the right thing (same for `bump_stamp`).
+ "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering,
+ # If we're trying to fully-insert a row, we need to provide a
+ # value for `bump_stamp` if it exists for the room.
+ "bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
+ },
+ )
+
# We now update `local_current_membership`. We do this regardless
# of whether we're still in the room or not to handle the case where
# e.g. we just got banned (where we need to record that fact here).
@@ -1296,6 +1836,60 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ if sliding_sync_table_changes.to_delete_membership_snapshots:
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ column="user_id",
+ values=sliding_sync_table_changes.to_delete_membership_snapshots,
+ keyvalues={"room_id": room_id},
+ )
+
+ # We do this regardless of whether the server is `no_longer_in_room` or not
+ # because we still want a row if a local user was just left/kicked or got banned
+ # from the room.
+ if sliding_sync_table_changes.to_insert_membership_snapshots:
+ # Update the `sliding_sync_membership_snapshots` table
+ #
+ # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys`
+ # because there are other fields in the `ON CONFLICT` upsert to run (see
+ # inherit case above for more context when this happens).
+ self.db_pool.simple_upsert_many_txn(
+ txn=txn,
+ table="sliding_sync_membership_snapshots",
+ key_names=("room_id", "user_id"),
+ key_values=[
+ (room_id, membership_info.user_id)
+ for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
+ ],
+ value_names=[
+ "sender",
+ "membership_event_id",
+ "membership",
+ "event_stream_ordering",
+ "event_instance_name",
+ ]
+ + list(
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
+ ),
+ value_values=[
+ [
+ membership_info.sender,
+ membership_info.membership_event_id,
+ membership_info.membership,
+ membership_info.membership_event_stream_ordering,
+ membership_info.membership_event_instance_name,
+ ]
+ + list(
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
+ )
+ for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
+ ],
+ )
+
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,
@@ -1303,13 +1897,302 @@ class PersistEventsStore:
)
# Invalidate the various caches
- self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+ self.store._invalidate_state_caches_and_stream(
+ txn, room_id, members_to_cache_bust
+ )
# Check if any of the remote membership changes requires us to
# unsubscribe from their device lists.
self.store.handle_potentially_left_users_txn(
- txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
+ txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)}
+ )
+
+ @classmethod
+ def _get_relevant_sliding_sync_current_state_event_ids_txn(
+ cls, txn: LoggingTransaction, room_id: str
+ ) -> MutableStateMap[str]:
+ """
+ Fetch the current state event IDs for the relevant (to the
+ `sliding_sync_joined_rooms` table) state types for the given room.
+
+ Returns:
+ A tuple of:
+ 1. StateMap of event IDs necessary to to fetch the relevant state values
+ needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`.
+ 2. The corresponding latest `stream_id` in the
+ `current_state_delta_stream` table. This is useful to compare against
+ the `current_state_delta_stream` table later so you can check whether
+ the current state has changed since you last fetched the current
+ state.
+ """
+ # Fetch the current state event IDs from the database
+ (
+ event_type_and_state_key_in_list_clause,
+ event_type_and_state_key_args,
+ ) = make_tuple_in_list_sql_clause(
+ txn.database_engine,
+ ("type", "state_key"),
+ SLIDING_SYNC_RELEVANT_STATE_SET,
+ )
+ txn.execute(
+ f"""
+ SELECT c.event_id, c.type, c.state_key
+ FROM current_state_events AS c
+ WHERE
+ c.room_id = ?
+ AND {event_type_and_state_key_in_list_clause}
+ """,
+ [room_id] + event_type_and_state_key_args,
+ )
+ current_state_map: MutableStateMap[str] = {
+ (event_type, state_key): event_id for event_id, event_type, state_key in txn
+ }
+
+ return current_state_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_state_map(
+ cls, state_map: StateMap[EventBase]
+ ) -> SlidingSyncStateInsertValues:
+ """
+ Extract the relevant state values from the `state_map` needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into
+ the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncStateInsertValues = {}
+
+ # Parse the raw event JSON
+ for state_key, event in state_map.items():
+ if state_key == (EventTypes.Create, ""):
+ room_type = event.content.get(EventContentFields.ROOM_TYPE)
+ # Scrutinize JSON values
+ if room_type is None or isinstance(room_type, str):
+ sliding_sync_insert_map["room_type"] = room_type
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ encryption_algorithm = event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ is_encrypted = encryption_algorithm is not None
+ sliding_sync_insert_map["is_encrypted"] = is_encrypted
+ elif state_key == (EventTypes.Name, ""):
+ room_name = event.content.get(EventContentFields.ROOM_NAME)
+ # Scrutinize JSON values
+ if room_name is None or isinstance(room_name, str):
+ sliding_sync_insert_map["room_name"] = room_name
+ elif state_key == (EventTypes.Tombstone, ""):
+ successor_room_id = event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ # Scrutinize JSON values
+ if successor_room_id is None or isinstance(successor_room_id, str):
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ successor_room_id
+ )
+ else:
+ # We only expect to see events according to the
+ # `SLIDING_SYNC_RELEVANT_STATE_SET`.
+ raise AssertionError(
+ "Unexpected event (we should not be fetching extra events or this "
+ + "piece of code needs to be updated to handle a new event type added "
+ + "to `SLIDING_SYNC_RELEVANT_STATE_SET`): {state_key} {event.event_id}"
+ )
+
+ return sliding_sync_insert_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_stripped_state(
+ cls, unsigned_stripped_state_events: Any
+ ) -> SlidingSyncMembershipSnapshotSharedInsertValues:
+ """
+ Pull out the relevant state values from the stripped state on an invite or knock
+ membership event needed to insert into the `sliding_sync_membership_snapshots`
+ tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into the `sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
+
+ if unsigned_stripped_state_events is not None:
+ stripped_state_map: MutableStateMap[StrippedStateEvent] = {}
+ if isinstance(unsigned_stripped_state_events, list):
+ for raw_stripped_event in unsigned_stripped_state_events:
+ stripped_state_event = parse_stripped_state_event(
+ raw_stripped_event
+ )
+ if stripped_state_event is not None:
+ stripped_state_map[
+ (
+ stripped_state_event.type,
+ stripped_state_event.state_key,
+ )
+ ] = stripped_state_event
+
+ # If there is some stripped state, we assume the remote server passed *all*
+ # of the potential stripped state events for the room.
+ create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
+ # Sanity check that we at-least have the create event
+ if create_stripped_event is not None:
+ sliding_sync_insert_map["has_known_state"] = True
+
+ # XXX: Keep this up-to-date with `SLIDING_SYNC_RELEVANT_STATE_SET`
+
+ # Find the room_type
+ sliding_sync_insert_map["room_type"] = (
+ create_stripped_event.content.get(EventContentFields.ROOM_TYPE)
+ if create_stripped_event is not None
+ else None
+ )
+
+ # Find whether the room is_encrypted
+ encryption_stripped_event = stripped_state_map.get(
+ (EventTypes.RoomEncryption, "")
+ )
+ encryption = (
+ encryption_stripped_event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ if encryption_stripped_event is not None
+ else None
+ )
+ sliding_sync_insert_map["is_encrypted"] = encryption is not None
+
+ # Find the room_name
+ room_name_stripped_event = stripped_state_map.get((EventTypes.Name, ""))
+ sliding_sync_insert_map["room_name"] = (
+ room_name_stripped_event.content.get(EventContentFields.ROOM_NAME)
+ if room_name_stripped_event is not None
+ else None
+ )
+
+ # Find the tombstone_successor_room_id
+ # Note: This isn't one of the stripped state events according to the spec
+ # but seems like there is no reason not to support this kind of thing.
+ tombstone_stripped_event = stripped_state_map.get(
+ (EventTypes.Tombstone, "")
+ )
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ tombstone_stripped_event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ if tombstone_stripped_event is not None
+ else None
+ )
+
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+
+ return sliding_sync_insert_map
+
+ def _update_sliding_sync_tables_with_new_persisted_events_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
+ ) -> None:
+ """
+ Update the latest `event_stream_ordering`/`bump_stamp` columns in the
+ `sliding_sync_joined_rooms` table for the room with new events.
+
+ This function assumes that `_store_event_txn()` (to persist the event) and
+ `_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has
+ been updated with rooms that were joined) have already been run.
+
+ Args:
+ txn
+ room_id: The room that all of the events belong to
+ events_and_contexts: The events being persisted. We assume the list is
+ sorted ascending by `stream_ordering`. We don't care about the sort when the
+ events are backfilled (with negative `stream_ordering`).
+ """
+
+ # Nothing to do if there are no events
+ if len(events_and_contexts) == 0:
+ return
+
+ # We only update the sliding sync tables for non-backfilled events.
+ #
+ # Check if the first event is a backfilled event (with a negative
+ # `stream_ordering`). If one event is backfilled, we assume this whole batch was
+ # backfilled.
+ first_event_stream_ordering = events_and_contexts[0][
+ 0
+ ].internal_metadata.stream_ordering
+ # This should exist for persisted events
+ assert first_event_stream_ordering is not None
+ if first_event_stream_ordering < 0:
+ return
+
+ # Since the list is sorted ascending by `stream_ordering`, the last event should
+ # have the highest `stream_ordering`.
+ max_stream_ordering = events_and_contexts[-1][
+ 0
+ ].internal_metadata.stream_ordering
+ max_bump_stamp = None
+ for event, _ in reversed(events_and_contexts):
+ # Sanity check that all events belong to the same room
+ assert event.room_id == room_id
+
+ if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
+ # This should exist for persisted events
+ assert event.internal_metadata.stream_ordering is not None
+
+ max_bump_stamp = event.internal_metadata.stream_ordering
+
+ # Since we're iterating in reverse, we can break as soon as we find a
+ # matching bump event which should have the highest `stream_ordering`.
+ break
+
+ # We should have exited earlier if there were no events
+ assert (
+ max_stream_ordering is not None
+ ), "Expected to have a stream_ordering if we have events"
+
+ # Handle updating the `sliding_sync_joined_rooms` table.
+ #
+ txn.execute(
+ """
+ UPDATE sliding_sync_joined_rooms
+ SET
+ event_stream_ordering = CASE
+ WHEN event_stream_ordering IS NULL OR event_stream_ordering < ?
+ THEN ?
+ ELSE event_stream_ordering
+ END,
+ bump_stamp = CASE
+ WHEN bump_stamp IS NULL OR bump_stamp < ?
+ THEN ?
+ ELSE bump_stamp
+ END
+ WHERE room_id = ?
+ """,
+ (
+ max_stream_ordering,
+ max_stream_ordering,
+ max_bump_stamp,
+ max_bump_stamp,
+ room_id,
+ ),
)
+ # This may or may not update any rows depending if we are `no_longer_in_room`
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
@@ -1931,7 +2814,9 @@ class PersistEventsStore:
)
for event in events:
+ # Sanity check that we're working with persisted events
assert event.internal_metadata.stream_ordering is not None
+ assert event.internal_metadata.instance_name is not None
# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
@@ -1945,6 +2830,16 @@ class PersistEventsStore:
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
+ # The only sort of out-of-band-membership events we expect to see here
+ # are remote invites/knocks and LEAVE events corresponding to
+ # rejected/retracted invites and rescinded knocks.
+ assert event.type == EventTypes.Member
+ assert event.membership in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ Membership.LEAVE,
+ )
+
self.db_pool.simple_upsert_txn(
txn,
table="local_current_membership",
@@ -1956,6 +2851,56 @@ class PersistEventsStore:
},
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ # (out-of-band membership events only)
+ #
+ raw_stripped_state_events = None
+ if event.membership == Membership.INVITE:
+ invite_room_state = event.unsigned.get("invite_room_state")
+ raw_stripped_state_events = invite_room_state
+ elif event.membership == Membership.KNOCK:
+ knock_room_state = event.unsigned.get("knock_room_state")
+ raw_stripped_state_events = knock_room_state
+
+ insert_values = {
+ "sender": event.sender,
+ "membership_event_id": event.event_id,
+ "membership": event.membership,
+ "event_stream_ordering": event.internal_metadata.stream_ordering,
+ "event_instance_name": event.internal_metadata.instance_name,
+ }
+ if event.membership == Membership.LEAVE:
+ # Inherit the meta data from the remote invite/knock. When using
+ # sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ pass
+ elif event.membership in (Membership.INVITE, Membership.KNOCK):
+ extra_insert_values = (
+ self._get_sliding_sync_insert_values_from_stripped_state(
+ raw_stripped_state_events
+ )
+ )
+ insert_values.update(extra_insert_values)
+ else:
+ # We don't know how to handle this type of membership yet
+ #
+ # FIXME: We should use `assert_never` here but for some reason
+ # the exhaustive matching doesn't recognize the `Never` here.
+ # assert_never(event.membership)
+ raise AssertionError(
+ f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet"
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keyvalues={
+ "room_id": event.room_id,
+ "user_id": event.state_key,
+ },
+ values=insert_values,
+ )
+
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
|