diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..60c92e5804 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -32,6 +32,7 @@ from typing import (
Iterable,
List,
Optional,
+ Sequence,
Set,
Tuple,
cast,
@@ -39,19 +40,27 @@ from typing import (
import attr
from prometheus_client import Counter
+from typing_extensions import TypedDict
import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+)
from synapse.api.errors import PartialStateConflictError
from synapse.api.room_versions import RoomVersions
-from synapse.events import EventBase, relation_from_event
+from synapse.events import EventBase, StrippedStateEvent, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ make_tuple_in_list_sql_clause,
)
from synapse.storage.databases.main.event_federation import EventFederationStore
from synapse.storage.databases.main.events_worker import EventCacheEntry
@@ -59,7 +68,15 @@ from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
-from synapse.types import JsonDict, StateMap, StrCollection, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ MutableStateMap,
+ StateMap,
+ StrCollection,
+ get_domain_from_id,
+)
+from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.stringutils import non_null_str_or_none
@@ -78,6 +95,19 @@ event_counter = Counter(
["type", "origin_type", "origin_entity"],
)
+# State event type/key pairs that we need to gather to fill in the
+# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+SLIDING_SYNC_RELEVANT_STATE_SET = (
+ # So we can fill in the `room_type` column
+ (EventTypes.Create, ""),
+ # So we can fill in the `is_encrypted` column
+ (EventTypes.RoomEncryption, ""),
+ # So we can fill in the `room_name` column
+ (EventTypes.Name, ""),
+ # So we can fill in the `tombstone_successor_room_id` column
+ (EventTypes.Tombstone, ""),
+)
+
@attr.s(slots=True, auto_attribs=True)
class DeltaState:
@@ -99,6 +129,82 @@ class DeltaState:
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
+# We want `total=False` because we want to allow values to be unset.
+class SlidingSyncStateInsertValues(TypedDict, total=False):
+ """
+ Insert values relevant for the `sliding_sync_joined_rooms` and
+ `sliding_sync_membership_snapshots` database tables.
+ """
+
+ room_type: Optional[str]
+ is_encrypted: Optional[bool]
+ room_name: Optional[str]
+ tombstone_successor_room_id: Optional[str]
+
+
+class SlidingSyncMembershipSnapshotSharedInsertValues(
+ SlidingSyncStateInsertValues, total=False
+):
+ """
+ Insert values for `sliding_sync_membership_snapshots` that we can share across
+ multiple memberships
+ """
+
+ has_known_state: Optional[bool]
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncMembershipInfo:
+ """
+ Values unique to each membership
+ """
+
+ user_id: str
+ sender: str
+ membership_event_id: str
+ membership: str
+ membership_event_stream_ordering: int
+ membership_event_instance_name: str
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncTableChanges:
+ room_id: str
+ # `stream_ordering` of the most recent event being persisted in the room. This doesn't
+ # need to be perfect, we just need *some* answer that points to a real event in the
+ # room in case we are the first ones inserting into the `sliding_sync_joined_rooms`
+ # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality,
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
+ # `_update_current_state_txn()` whenever a new event is persisted to update it to the
+ # correct latest value.
+ #
+ # This should be *some* value that points to a real event in the room if we are
+ # still joined to the room and some state is changing (`to_insert` or `to_delete`).
+ joined_room_best_effort_most_recent_stream_ordering: Optional[int]
+ # If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
+ # fully-insert it which means we also need to include a `bump_stamp` value to use
+ # for the row. This should only be populated when we're trying to fully-insert a
+ # row.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ joined_room_bump_stamp_to_fully_insert: Optional[int]
+ # Values to upsert into `sliding_sync_joined_rooms`
+ joined_room_updates: SlidingSyncStateInsertValues
+
+ # Shared values to upsert into `sliding_sync_membership_snapshots` for each
+ # `to_insert_membership_snapshots`
+ membership_snapshot_shared_insert_values: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ )
+ # List of membership to insert into `sliding_sync_membership_snapshots`
+ to_insert_membership_snapshots: List[SlidingSyncMembershipInfo]
+ # List of user_id to delete from `sliding_sync_membership_snapshots`
+ to_delete_membership_snapshots: List[str]
+
+
@attr.s(slots=True, auto_attribs=True)
class NewEventChainLinks:
"""Information about new auth chain links that need to be added to the DB.
@@ -226,6 +332,14 @@ class PersistEventsStore:
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
+ sliding_sync_table_changes = None
+ if state_delta_for_room is not None:
+ sliding_sync_table_changes = (
+ await self._calculate_sliding_sync_table_changes(
+ room_id, events_and_contexts, state_delta_for_room
+ )
+ )
+
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
@@ -235,6 +349,7 @@ class PersistEventsStore:
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
new_event_links=new_event_links,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
@@ -261,6 +376,341 @@ class PersistEventsStore:
(room_id,), frozenset(new_forward_extremities)
)
+ async def _calculate_sliding_sync_table_changes(
+ self,
+ room_id: str,
+ events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+ delta_state: DeltaState,
+ ) -> SlidingSyncTableChanges:
+ """
+ Calculate the changes to the `sliding_sync_membership_snapshots` and
+ `sliding_sync_joined_rooms` tables given the deltas that are going to be used to
+ update the `current_state_events` table.
+
+ Just a bunch of pre-processing so we so we don't need to spend time in the
+ transaction itself gathering all of this info. It's also easier to deal with
+ redactions outside of a transaction.
+
+ Args:
+ room_id: The room ID currently being processed.
+ events_and_contexts: List of tuples of (event, context) being persisted.
+ This is completely optional (you can pass an empty list) and will just
+ save us from fetching the events from the database if we already have
+ them. We assume the list is sorted ascending by `stream_ordering`. We
+ don't care about the sort when the events are backfilled (with negative
+ `stream_ordering`).
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ """
+ to_insert = delta_state.to_insert
+ to_delete = delta_state.to_delete
+
+ # If no state is changing, we don't need to do anything. This can happen when a
+ # partial-stated room is re-syncing the current state.
+ if not to_insert and not to_delete:
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ joined_room_best_effort_most_recent_stream_ordering=None,
+ joined_room_bump_stamp_to_fully_insert=None,
+ joined_room_updates={},
+ membership_snapshot_shared_insert_values={},
+ to_insert_membership_snapshots=[],
+ to_delete_membership_snapshots=[],
+ )
+
+ event_map = {event.event_id: event for event, _ in events_and_contexts}
+
+ # Handle gathering info for the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ user_ids_to_delete_membership_snapshots = [
+ state_key
+ for event_type, state_key in to_delete
+ if event_type == EventTypes.Member and self.is_mine_id(state_key)
+ ]
+
+ membership_snapshot_shared_insert_values: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ ) = {}
+ membership_infos_to_insert_membership_snapshots: List[
+ SlidingSyncMembershipInfo
+ ] = []
+ if to_insert:
+ membership_event_id_to_user_id_map: Dict[str, str] = {}
+ for state_key, event_id in to_insert.items():
+ if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
+ membership_event_id_to_user_id_map[event_id] = state_key[1]
+
+ membership_event_map: Dict[str, EventBase] = {}
+ # In normal event persist scenarios, we should be able to find the
+ # membership events in the `events_and_contexts` given to us but it's
+ # possible a state reset happened which added us to the room without a
+ # corresponding new membership event (reset back to a previous membership).
+ missing_membership_event_ids: Set[str] = set()
+ for membership_event_id in membership_event_id_to_user_id_map.keys():
+ membership_event = event_map.get(membership_event_id)
+ if membership_event:
+ membership_event_map[membership_event_id] = membership_event
+ else:
+ missing_membership_event_ids.add(membership_event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_membership_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_membership_event_ids
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_membership_event_ids
+ ), missing_membership_event_ids.difference(remaining_events.keys())
+ membership_event_map.update(remaining_events)
+
+ for (
+ membership_event_id,
+ user_id,
+ ) in membership_event_id_to_user_id_map.items():
+ # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point
+ membership_event_stream_ordering = membership_event_map[
+ membership_event_id
+ ].internal_metadata.stream_ordering
+ assert membership_event_stream_ordering is not None
+ membership_event_instance_name = membership_event_map[
+ membership_event_id
+ ].internal_metadata.instance_name
+ assert membership_event_instance_name is not None
+
+ membership_infos_to_insert_membership_snapshots.append(
+ SlidingSyncMembershipInfo(
+ user_id=user_id,
+ sender=membership_event_map[membership_event_id].sender,
+ membership_event_id=membership_event_id,
+ membership=membership_event_map[membership_event_id].membership,
+ membership_event_stream_ordering=membership_event_stream_ordering,
+ membership_event_instance_name=membership_event_instance_name,
+ )
+ )
+
+ if membership_infos_to_insert_membership_snapshots:
+ current_state_ids_map: MutableStateMap[str] = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+ # Since we fetched the current state before we took `to_insert`/`to_delete`
+ # into account, we need to do a couple fixups.
+ #
+ # Update the current_state_map with what we have `to_delete`
+ for state_key in to_delete:
+ current_state_ids_map.pop(state_key, None)
+ # Update the current_state_map with what we have `to_insert`
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ current_state_map: MutableStateMap[EventBase] = {}
+ # In normal event persist scenarios, we probably won't be able to find
+ # these state events in `events_and_contexts` since we don't generally
+ # batch up local membership changes with other events, but it can
+ # happen.
+ missing_state_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_state_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events
+ if missing_state_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_state_event_ids
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_state_event_ids
+ ), missing_state_event_ids.difference(remaining_events.keys())
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ if current_state_map:
+ state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ membership_snapshot_shared_insert_values.update(state_insert_values)
+ # We have current state to work from
+ membership_snapshot_shared_insert_values["has_known_state"] = True
+ else:
+ # We don't have any `current_state_events` anymore (previously
+ # cleared out because of `no_longer_in_room`). This can happen if
+ # one user is joined and another is invited (some non-join
+ # membership). If the joined user leaves, we are `no_longer_in_room`
+ # and `current_state_events` is cleared out. When the invited user
+ # rejects the invite (leaves the room), we will end up here.
+ #
+ # In these cases, we should inherit the meta data from the previous
+ # snapshot so we shouldn't update any of the state values. When
+ # using sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ #
+ # Ideally, we could additionally assert that we're only here for
+ # valid non-join membership transitions.
+ assert delta_state.no_longer_in_room
+
+ # Handle gathering info for the `sliding_sync_joined_rooms` table
+ #
+ # We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ joined_room_updates: SlidingSyncStateInsertValues = {}
+ best_effort_most_recent_stream_ordering: Optional[int] = None
+ bump_stamp_to_fully_insert: Optional[int] = None
+ if not delta_state.no_longer_in_room:
+ current_state_ids_map = {}
+
+ # Always fully-insert rows if they don't already exist in the
+ # `sliding_sync_joined_rooms` table. This way we can rely on a row if it
+ # exists in the table.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ existing_row_in_table = await self.store.db_pool.simple_select_one_onecol(
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="room_id",
+ allow_none=True,
+ )
+ if not existing_row_in_table:
+ most_recent_bump_event_pos_results = (
+ await self.store.get_last_event_pos_in_room(
+ room_id,
+ event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+ )
+ )
+ bump_stamp_to_fully_insert = (
+ most_recent_bump_event_pos_results[1].stream
+ if most_recent_bump_event_pos_results is not None
+ else None
+ )
+
+ current_state_ids_map = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+
+ # Look through the items we're going to insert into the current state to see
+ # if there is anything that we care about and should also update in the
+ # `sliding_sync_joined_rooms` table.
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ # Get the full event objects for the current state events
+ #
+ # In normal event persist scenarios, we should be able to find the state
+ # events in the `events_and_contexts` given to us but it's possible a state
+ # reset happened which that reset back to a previous state.
+ current_state_map = {}
+ missing_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_event_ids:
+ remaining_events = await self.store.get_events(
+ current_state_ids_map.values()
+ )
+ # There shouldn't be any missing events
+ assert (
+ remaining_events.keys() == missing_event_ids
+ ), missing_event_ids.difference(remaining_events.keys())
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ joined_room_updates = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ )
+
+ # If something is being deleted from the state, we need to clear it out
+ for state_key in to_delete:
+ if state_key == (EventTypes.Create, ""):
+ joined_room_updates["room_type"] = None
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ joined_room_updates["is_encrypted"] = False
+ elif state_key == (EventTypes.Name, ""):
+ joined_room_updates["room_name"] = None
+
+ # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to
+ # be perfect, we just need *some* answer that points to a real event in the
+ # room in case we are the first ones inserting into the
+ # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on
+ # `event_stream_ordering`. In reality,
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
+ # `_update_current_state_txn()` whenever a new event is persisted to update
+ # it to the correct latest value.
+ #
+ if len(events_and_contexts) > 0:
+ # Since the list is sorted ascending by `stream_ordering`, the last event
+ # should have the highest `stream_ordering`.
+ best_effort_most_recent_stream_ordering = events_and_contexts[-1][
+ 0
+ ].internal_metadata.stream_ordering
+ else:
+ # If there are no `events_and_contexts`, we assume it's one of two scenarios:
+ # 1. If there are new state `to_insert` but no `events_and_contexts`,
+ # then it's a state reset.
+ # 2. Otherwise, it's some partial-state room re-syncing the current state and
+ # going through un-partial process.
+ #
+ # Either way, we assume no new events are being persisted and we can
+ # find the latest already in the database. Since this is a best-effort
+ # value, we don't need to be perfect although I think we're pretty close
+ # here.
+ most_recent_event_pos_results = (
+ await self.store.get_last_event_pos_in_room(
+ room_id, event_types=None
+ )
+ )
+ assert most_recent_event_pos_results, (
+ f"We should not be seeing `None` here because we are still in the room ({room_id}) and "
+ + "it should at-least have a join membership event that's keeping us here."
+ )
+ best_effort_most_recent_stream_ordering = most_recent_event_pos_results[
+ 1
+ ].stream
+
+ # We should have found a value if we are still in the room
+ assert best_effort_most_recent_stream_ordering is not None
+
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ # For `sliding_sync_joined_rooms`
+ joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering,
+ joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
+ joined_room_updates=joined_room_updates,
+ # For `sliding_sync_membership_snapshots`
+ membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
+ to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots,
+ to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots,
+ )
+
async def calculate_chain_cover_index_for_events(
self, room_id: str, events: Collection[EventBase]
) -> Dict[str, NewEventChainLinks]:
@@ -458,6 +908,7 @@ class PersistEventsStore:
state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Optional[Set[str]],
new_event_links: Dict[str, NewEventChainLinks],
+ sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Insert some number of room events into the necessary database tables.
@@ -478,9 +929,14 @@ class PersistEventsStore:
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room: The current-state delta for the room.
+ state_delta_for_room: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
new_forward_extremities: The new forward extremities for the room:
a set of the event ids which are the forward extremities.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
@@ -590,10 +1046,22 @@ class PersistEventsStore:
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
if state_delta_for_room:
+ # If the state delta exists, the sliding sync table changes should also exist
+ assert sliding_sync_table_changes is not None
+
self._update_current_state_txn(
- txn, room_id, state_delta_for_room, min_stream_order
+ txn,
+ room_id,
+ state_delta_for_room,
+ min_stream_order,
+ sliding_sync_table_changes,
)
+ # We only update the sliding sync tables for non-backfilled events.
+ self._update_sliding_sync_tables_with_new_persisted_events_txn(
+ txn, room_id, events_and_contexts
+ )
+
def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,
@@ -1128,8 +1596,20 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
- """Update the current state stored in the datatabase for the given room"""
+ """
+ Update the current state stored in the datatabase for the given room
+
+ Args:
+ room_id
+ state_delta: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
if state_delta.is_noop():
return
@@ -1141,6 +1621,7 @@ class PersistEventsStore:
room_id,
delta_state=state_delta,
stream_id=stream_ordering,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
def _update_current_state_txn(
@@ -1149,16 +1630,34 @@ class PersistEventsStore:
room_id: str,
delta_state: DeltaState,
stream_id: int,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
+ """
+ Handles updating tables that track the current state of a room.
+
+ Args:
+ txn
+ room_id
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ stream_id: TODO
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
+ # Sanity check we're processing the same thing
+ assert room_id == sliding_sync_table_changes.room_id
+
# Figure out the changes of membership to invalidate the
# `get_rooms_for_user` cache.
# We find out which membership events we may have deleted
# and which we have added, then we invalidate the caches for all
# those users.
- members_changed = {
+ members_to_cache_bust = {
state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
@@ -1182,16 +1681,22 @@ class PersistEventsStore:
"""
txn.execute(sql, (stream_id, self._instance_name, room_id))
+ # Grab the list of users before we clear out the current state
+ users_in_room = self.store.get_users_in_room_txn(txn, room_id)
# We also want to invalidate the membership caches for users
# that were in the room.
- users_in_room = self.store.get_users_in_room_txn(txn, room_id)
- members_changed.update(users_in_room)
+ members_to_cache_bust.update(users_in_room)
self.db_pool.simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ )
else:
# We're still in the room, so we update the current state as normal.
@@ -1260,6 +1765,41 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_joined_rooms` table. We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ # We only need to update when one of the relevant state values has changed
+ if sliding_sync_table_changes.joined_room_updates:
+ # This should be *some* value that points to a real event in the room if
+ # we are still joined to the room.
+ assert (
+ sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering
+ is not None
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ values=sliding_sync_table_changes.joined_room_updates,
+ insertion_values={
+ # The reason we're only *inserting* (not *updating*)
+ # `event_stream_ordering` here is because the column has a `NON
+ # NULL` constraint and we need *some* answer. And if the row
+ # already exists, it already has the correct value and it's
+ # better to just rely on
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`
+ # to do the right thing (same for `bump_stamp`).
+ "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering,
+ # If we're trying to fully-insert a row, we need to provide a
+ # value for `bump_stamp` if it exists for the room.
+ "bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
+ },
+ )
+
# We now update `local_current_membership`. We do this regardless
# of whether we're still in the room or not to handle the case where
# e.g. we just got banned (where we need to record that fact here).
@@ -1296,6 +1836,60 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ if sliding_sync_table_changes.to_delete_membership_snapshots:
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ column="user_id",
+ values=sliding_sync_table_changes.to_delete_membership_snapshots,
+ keyvalues={"room_id": room_id},
+ )
+
+ # We do this regardless of whether the server is `no_longer_in_room` or not
+ # because we still want a row if a local user was just left/kicked or got banned
+ # from the room.
+ if sliding_sync_table_changes.to_insert_membership_snapshots:
+ # Update the `sliding_sync_membership_snapshots` table
+ #
+ # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys`
+ # because there are other fields in the `ON CONFLICT` upsert to run (see
+ # inherit case above for more context when this happens).
+ self.db_pool.simple_upsert_many_txn(
+ txn=txn,
+ table="sliding_sync_membership_snapshots",
+ key_names=("room_id", "user_id"),
+ key_values=[
+ (room_id, membership_info.user_id)
+ for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
+ ],
+ value_names=[
+ "sender",
+ "membership_event_id",
+ "membership",
+ "event_stream_ordering",
+ "event_instance_name",
+ ]
+ + list(
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
+ ),
+ value_values=[
+ [
+ membership_info.sender,
+ membership_info.membership_event_id,
+ membership_info.membership,
+ membership_info.membership_event_stream_ordering,
+ membership_info.membership_event_instance_name,
+ ]
+ + list(
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
+ )
+ for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
+ ],
+ )
+
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,
@@ -1303,13 +1897,302 @@ class PersistEventsStore:
)
# Invalidate the various caches
- self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+ self.store._invalidate_state_caches_and_stream(
+ txn, room_id, members_to_cache_bust
+ )
# Check if any of the remote membership changes requires us to
# unsubscribe from their device lists.
self.store.handle_potentially_left_users_txn(
- txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
+ txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)}
+ )
+
+ @classmethod
+ def _get_relevant_sliding_sync_current_state_event_ids_txn(
+ cls, txn: LoggingTransaction, room_id: str
+ ) -> MutableStateMap[str]:
+ """
+ Fetch the current state event IDs for the relevant (to the
+ `sliding_sync_joined_rooms` table) state types for the given room.
+
+ Returns:
+ A tuple of:
+ 1. StateMap of event IDs necessary to to fetch the relevant state values
+ needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`.
+ 2. The corresponding latest `stream_id` in the
+ `current_state_delta_stream` table. This is useful to compare against
+ the `current_state_delta_stream` table later so you can check whether
+ the current state has changed since you last fetched the current
+ state.
+ """
+ # Fetch the current state event IDs from the database
+ (
+ event_type_and_state_key_in_list_clause,
+ event_type_and_state_key_args,
+ ) = make_tuple_in_list_sql_clause(
+ txn.database_engine,
+ ("type", "state_key"),
+ SLIDING_SYNC_RELEVANT_STATE_SET,
+ )
+ txn.execute(
+ f"""
+ SELECT c.event_id, c.type, c.state_key
+ FROM current_state_events AS c
+ WHERE
+ c.room_id = ?
+ AND {event_type_and_state_key_in_list_clause}
+ """,
+ [room_id] + event_type_and_state_key_args,
+ )
+ current_state_map: MutableStateMap[str] = {
+ (event_type, state_key): event_id for event_id, event_type, state_key in txn
+ }
+
+ return current_state_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_state_map(
+ cls, state_map: StateMap[EventBase]
+ ) -> SlidingSyncStateInsertValues:
+ """
+ Extract the relevant state values from the `state_map` needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into
+ the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncStateInsertValues = {}
+
+ # Parse the raw event JSON
+ for state_key, event in state_map.items():
+ if state_key == (EventTypes.Create, ""):
+ room_type = event.content.get(EventContentFields.ROOM_TYPE)
+ # Scrutinize JSON values
+ if room_type is None or isinstance(room_type, str):
+ sliding_sync_insert_map["room_type"] = room_type
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ encryption_algorithm = event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ is_encrypted = encryption_algorithm is not None
+ sliding_sync_insert_map["is_encrypted"] = is_encrypted
+ elif state_key == (EventTypes.Name, ""):
+ room_name = event.content.get(EventContentFields.ROOM_NAME)
+ # Scrutinize JSON values
+ if room_name is None or isinstance(room_name, str):
+ sliding_sync_insert_map["room_name"] = room_name
+ elif state_key == (EventTypes.Tombstone, ""):
+ successor_room_id = event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ # Scrutinize JSON values
+ if successor_room_id is None or isinstance(successor_room_id, str):
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ successor_room_id
+ )
+ else:
+ # We only expect to see events according to the
+ # `SLIDING_SYNC_RELEVANT_STATE_SET`.
+ raise AssertionError(
+ "Unexpected event (we should not be fetching extra events or this "
+ + "piece of code needs to be updated to handle a new event type added "
+ + "to `SLIDING_SYNC_RELEVANT_STATE_SET`): {state_key} {event.event_id}"
+ )
+
+ return sliding_sync_insert_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_stripped_state(
+ cls, unsigned_stripped_state_events: Any
+ ) -> SlidingSyncMembershipSnapshotSharedInsertValues:
+ """
+ Pull out the relevant state values from the stripped state on an invite or knock
+ membership event needed to insert into the `sliding_sync_membership_snapshots`
+ tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into the `sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
+
+ if unsigned_stripped_state_events is not None:
+ stripped_state_map: MutableStateMap[StrippedStateEvent] = {}
+ if isinstance(unsigned_stripped_state_events, list):
+ for raw_stripped_event in unsigned_stripped_state_events:
+ stripped_state_event = parse_stripped_state_event(
+ raw_stripped_event
+ )
+ if stripped_state_event is not None:
+ stripped_state_map[
+ (
+ stripped_state_event.type,
+ stripped_state_event.state_key,
+ )
+ ] = stripped_state_event
+
+ # If there is some stripped state, we assume the remote server passed *all*
+ # of the potential stripped state events for the room.
+ create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
+ # Sanity check that we at-least have the create event
+ if create_stripped_event is not None:
+ sliding_sync_insert_map["has_known_state"] = True
+
+ # XXX: Keep this up-to-date with `SLIDING_SYNC_RELEVANT_STATE_SET`
+
+ # Find the room_type
+ sliding_sync_insert_map["room_type"] = (
+ create_stripped_event.content.get(EventContentFields.ROOM_TYPE)
+ if create_stripped_event is not None
+ else None
+ )
+
+ # Find whether the room is_encrypted
+ encryption_stripped_event = stripped_state_map.get(
+ (EventTypes.RoomEncryption, "")
+ )
+ encryption = (
+ encryption_stripped_event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ if encryption_stripped_event is not None
+ else None
+ )
+ sliding_sync_insert_map["is_encrypted"] = encryption is not None
+
+ # Find the room_name
+ room_name_stripped_event = stripped_state_map.get((EventTypes.Name, ""))
+ sliding_sync_insert_map["room_name"] = (
+ room_name_stripped_event.content.get(EventContentFields.ROOM_NAME)
+ if room_name_stripped_event is not None
+ else None
+ )
+
+ # Find the tombstone_successor_room_id
+ # Note: This isn't one of the stripped state events according to the spec
+ # but seems like there is no reason not to support this kind of thing.
+ tombstone_stripped_event = stripped_state_map.get(
+ (EventTypes.Tombstone, "")
+ )
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ tombstone_stripped_event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ if tombstone_stripped_event is not None
+ else None
+ )
+
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+
+ return sliding_sync_insert_map
+
+ def _update_sliding_sync_tables_with_new_persisted_events_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
+ ) -> None:
+ """
+ Update the latest `event_stream_ordering`/`bump_stamp` columns in the
+ `sliding_sync_joined_rooms` table for the room with new events.
+
+ This function assumes that `_store_event_txn()` (to persist the event) and
+ `_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has
+ been updated with rooms that were joined) have already been run.
+
+ Args:
+ txn
+ room_id: The room that all of the events belong to
+ events_and_contexts: The events being persisted. We assume the list is
+ sorted ascending by `stream_ordering`. We don't care about the sort when the
+ events are backfilled (with negative `stream_ordering`).
+ """
+
+ # Nothing to do if there are no events
+ if len(events_and_contexts) == 0:
+ return
+
+ # We only update the sliding sync tables for non-backfilled events.
+ #
+ # Check if the first event is a backfilled event (with a negative
+ # `stream_ordering`). If one event is backfilled, we assume this whole batch was
+ # backfilled.
+ first_event_stream_ordering = events_and_contexts[0][
+ 0
+ ].internal_metadata.stream_ordering
+ # This should exist for persisted events
+ assert first_event_stream_ordering is not None
+ if first_event_stream_ordering < 0:
+ return
+
+ # Since the list is sorted ascending by `stream_ordering`, the last event should
+ # have the highest `stream_ordering`.
+ max_stream_ordering = events_and_contexts[-1][
+ 0
+ ].internal_metadata.stream_ordering
+ max_bump_stamp = None
+ for event, _ in reversed(events_and_contexts):
+ # Sanity check that all events belong to the same room
+ assert event.room_id == room_id
+
+ if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
+ # This should exist for persisted events
+ assert event.internal_metadata.stream_ordering is not None
+
+ max_bump_stamp = event.internal_metadata.stream_ordering
+
+ # Since we're iterating in reverse, we can break as soon as we find a
+ # matching bump event which should have the highest `stream_ordering`.
+ break
+
+ # We should have exited earlier if there were no events
+ assert (
+ max_stream_ordering is not None
+ ), "Expected to have a stream_ordering if we have events"
+
+ # Handle updating the `sliding_sync_joined_rooms` table.
+ #
+ txn.execute(
+ """
+ UPDATE sliding_sync_joined_rooms
+ SET
+ event_stream_ordering = CASE
+ WHEN event_stream_ordering IS NULL OR event_stream_ordering < ?
+ THEN ?
+ ELSE event_stream_ordering
+ END,
+ bump_stamp = CASE
+ WHEN bump_stamp IS NULL OR bump_stamp < ?
+ THEN ?
+ ELSE bump_stamp
+ END
+ WHERE room_id = ?
+ """,
+ (
+ max_stream_ordering,
+ max_stream_ordering,
+ max_bump_stamp,
+ max_bump_stamp,
+ room_id,
+ ),
)
+ # This may or may not update any rows depending if we are `no_longer_in_room`
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
@@ -1931,7 +2814,9 @@ class PersistEventsStore:
)
for event in events:
+ # Sanity check that we're working with persisted events
assert event.internal_metadata.stream_ordering is not None
+ assert event.internal_metadata.instance_name is not None
# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
@@ -1945,6 +2830,16 @@ class PersistEventsStore:
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
+ # The only sort of out-of-band-membership events we expect to see here
+ # are remote invites/knocks and LEAVE events corresponding to
+ # rejected/retracted invites and rescinded knocks.
+ assert event.type == EventTypes.Member
+ assert event.membership in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ Membership.LEAVE,
+ )
+
self.db_pool.simple_upsert_txn(
txn,
table="local_current_membership",
@@ -1956,6 +2851,56 @@ class PersistEventsStore:
},
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ # (out-of-band membership events only)
+ #
+ raw_stripped_state_events = None
+ if event.membership == Membership.INVITE:
+ invite_room_state = event.unsigned.get("invite_room_state")
+ raw_stripped_state_events = invite_room_state
+ elif event.membership == Membership.KNOCK:
+ knock_room_state = event.unsigned.get("knock_room_state")
+ raw_stripped_state_events = knock_room_state
+
+ insert_values = {
+ "sender": event.sender,
+ "membership_event_id": event.event_id,
+ "membership": event.membership,
+ "event_stream_ordering": event.internal_metadata.stream_ordering,
+ "event_instance_name": event.internal_metadata.instance_name,
+ }
+ if event.membership == Membership.LEAVE:
+ # Inherit the meta data from the remote invite/knock. When using
+ # sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ pass
+ elif event.membership in (Membership.INVITE, Membership.KNOCK):
+ extra_insert_values = (
+ self._get_sliding_sync_insert_values_from_stripped_state(
+ raw_stripped_state_events
+ )
+ )
+ insert_values.update(extra_insert_values)
+ else:
+ # We don't know how to handle this type of membership yet
+ #
+ # FIXME: We should use `assert_never` here but for some reason
+ # the exhaustive matching doesn't recognize the `Never` here.
+ # assert_never(event.membership)
+ raise AssertionError(
+ f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet"
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keyvalues={
+ "room_id": event.room_id,
+ "user_id": event.state_key,
+ },
+ values=insert_values,
+ )
+
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 64d303e330..88ff5aa2df 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -24,9 +24,9 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
import attr
-from synapse.api.constants import EventContentFields, RelationTypes
+from synapse.api.constants import EventContentFields, Membership, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import make_event_from_dict
+from synapse.events import EventBase, make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -34,9 +34,21 @@ from synapse.storage.database import (
LoggingTransaction,
make_tuple_comparison_clause,
)
-from synapse.storage.databases.main.events import PersistEventsStore
+from synapse.storage.databases.main.events import (
+ SLIDING_SYNC_RELEVANT_STATE_SET,
+ PersistEventsStore,
+ SlidingSyncMembershipInfo,
+ SlidingSyncMembershipSnapshotSharedInsertValues,
+ SlidingSyncStateInsertValues,
+)
+from synapse.storage.databases.main.state_deltas import StateDeltasStore
+from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.types import Cursor
-from synapse.types import JsonDict, StrCollection
+from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
+from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+from synapse.types.state import StateFilter
+from synapse.util import json_encoder
+from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -78,6 +90,14 @@ class _BackgroundUpdates:
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
+ SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
+ "sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
+ )
+ SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
+ SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
+ "sliding_sync_membership_snapshots_bg_update"
+ )
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -97,7 +117,19 @@ class _CalculateChainCover:
finished_room_map: Dict[str, Tuple[int, int]]
-class EventsBackgroundUpdatesStore(SQLBaseStore):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _JoinedRoomStreamOrderingUpdate:
+ """
+ Intermediate container class used in `SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE`
+ """
+
+ # The most recent event stream_ordering for the room
+ most_recent_event_stream_ordering: int
+ # The most recent event `bump_stamp` for the room
+ most_recent_bump_stamp: Optional[int]
+
+
+class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
@@ -279,6 +311,34 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
where_clause="NOT outlier",
)
+ # Handle background updates for Sliding Sync tables
+ #
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
+ self._sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update,
+ )
+ # Add some background updates to populate the sliding sync tables
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
+ self._sliding_sync_joined_rooms_bg_update,
+ )
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
+ self._sliding_sync_membership_snapshots_bg_update,
+ )
+
+ # We want this to run on the main database at startup before we start processing
+ # events.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ with db_conn.cursor(txn_name="resolve_sliding_sync") as txn:
+ _resolve_stale_data_in_sliding_sync_tables(
+ txn=txn,
+ )
+
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1073,7 +1133,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
PersistEventsStore._add_chain_cover_index(
txn,
self.db_pool,
- self.event_chain_id_gen, # type: ignore[attr-defined]
+ self.event_chain_id_gen,
event_to_room_id,
event_to_types,
cast(Dict[str, StrCollection], event_to_auth_chain),
@@ -1516,3 +1576,961 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return batch_size
+
+ async def _sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update(
+ self, progress: JsonDict, _batch_size: int
+ ) -> int:
+ """
+ Prefill `sliding_sync_joined_rooms_to_recalculate` table with all rooms we know about already.
+ """
+
+ def _txn(txn: LoggingTransaction) -> None:
+ # We do this as one big bulk insert. This has been tested on a bigger
+ # homeserver with ~10M rooms and took 60s. There is potential for this to
+ # starve disk usage while this goes on.
+ #
+ # We upsert in case we have to run this multiple times.
+ #
+ # The `WHERE TRUE` clause is to avoid "Parsing Ambiguity"
+ txn.execute(
+ """
+ INSERT INTO sliding_sync_joined_rooms_to_recalculate
+ (room_id)
+ SELECT room_id FROM rooms WHERE ?
+ ON CONFLICT (room_id)
+ DO NOTHING;
+ """,
+ (True,),
+ )
+
+ await self.db_pool.runInteraction(
+ "_sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update",
+ _txn,
+ )
+
+ # Background update is done.
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
+ )
+ return 0
+
+ async def _sliding_sync_joined_rooms_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the `sliding_sync_joined_rooms` table.
+ """
+ # We don't need to fetch any progress state because we just grab the next N
+ # events in `sliding_sync_joined_rooms_to_recalculate`
+
+ def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]:
+ """
+ Returns:
+ A list of room ID's to update along with the progress value
+ (event_stream_ordering) indicating the continuation point in the
+ `current_state_events` table for the next batch.
+ """
+ # Fetch the set of room IDs that we want to update
+ #
+ # We use `current_state_events` table as the barometer for whether the
+ # server is still participating in the room because if we're
+ # `no_longer_in_room`, this table would be cleared out for the given
+ # `room_id`.
+ txn.execute(
+ """
+ SELECT room_id
+ FROM sliding_sync_joined_rooms_to_recalculate
+ LIMIT ?
+ """,
+ (batch_size,),
+ )
+
+ rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall())
+
+ return rooms_to_update_rows
+
+ rooms_to_update = await self.db_pool.runInteraction(
+ "_sliding_sync_joined_rooms_bg_update._get_rooms_to_update_txn",
+ _get_rooms_to_update_txn,
+ )
+
+ if not rooms_to_update:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE
+ )
+ return 0
+
+ # Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table.
+ joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {}
+ # Map from room_id to stream_ordering/bump_stamp, etc values
+ joined_room_stream_ordering_updates: Dict[
+ str, _JoinedRoomStreamOrderingUpdate
+ ] = {}
+ # As long as we get this value before we fetch the current state, we can use it
+ # to check if something has changed since that point.
+ most_recent_current_state_delta_stream_id = (
+ await self.get_max_stream_id_in_current_state_deltas()
+ )
+ for (room_id,) in rooms_to_update:
+ current_state_ids_map = await self.db_pool.runInteraction(
+ "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn",
+ PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn,
+ room_id,
+ )
+
+ # If we're not joined to the room a) it doesn't belong in the
+ # `sliding_sync_joined_rooms` table so we should skip and b) we won't have
+ # any `current_state_events` for the room.
+ if not current_state_ids_map:
+ continue
+
+ fetched_events = await self.get_events(current_state_ids_map.values())
+
+ current_state_map: StateMap[EventBase] = {
+ state_key: fetched_events[event_id]
+ for state_key, event_id in current_state_ids_map.items()
+ # `get_events(...)` will filter out events for unknown room versions
+ if event_id in fetched_events
+ }
+
+ # Even if we are joined to the room, this can happen for unknown room
+ # versions (old room versions that aren't known anymore) since
+ # `get_events(...)` will filter out events for unknown room versions
+ if not current_state_map:
+ continue
+
+ state_insert_values = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ )
+ # We should have some insert values for each room, even if they are `None`
+ assert state_insert_values
+ joined_room_updates[room_id] = state_insert_values
+
+ # Figure out the stream_ordering of the latest event in the room
+ most_recent_event_pos_results = await self.get_last_event_pos_in_room(
+ room_id, event_types=None
+ )
+ assert most_recent_event_pos_results is not None, (
+ f"We should not be seeing `None` here because the room ({room_id}) should at-least have a create event "
+ + "given we pulled the room out of `current_state_events`"
+ )
+ most_recent_event_stream_ordering = most_recent_event_pos_results[1].stream
+ assert most_recent_event_stream_ordering > 0, (
+ "We should have at-least one event in the room (our own join membership event for example) "
+ + "that isn't backfilled (negative `stream_ordering`) if we are joined to the room."
+ )
+ # Figure out the latest `bump_stamp` in the room. This could be `None` for a
+ # federated room you just joined where all of events are still `outliers` or
+ # backfilled history. In the Sliding Sync API, we default to the user's
+ # membership event `stream_ordering` if we don't have a `bump_stamp` so
+ # having it as `None` in this table is fine.
+ bump_stamp_event_pos_results = await self.get_last_event_pos_in_room(
+ room_id, event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+ )
+ most_recent_bump_stamp = None
+ if (
+ bump_stamp_event_pos_results is not None
+ and bump_stamp_event_pos_results[1].stream > 0
+ ):
+ most_recent_bump_stamp = bump_stamp_event_pos_results[1].stream
+
+ joined_room_stream_ordering_updates[room_id] = (
+ _JoinedRoomStreamOrderingUpdate(
+ most_recent_event_stream_ordering=most_recent_event_stream_ordering,
+ most_recent_bump_stamp=most_recent_bump_stamp,
+ )
+ )
+
+ def _fill_table_txn(txn: LoggingTransaction) -> None:
+ # Handle updating the `sliding_sync_joined_rooms` table
+ #
+ for (
+ room_id,
+ update_map,
+ ) in joined_room_updates.items():
+ joined_room_stream_ordering_update = (
+ joined_room_stream_ordering_updates[room_id]
+ )
+ event_stream_ordering = (
+ joined_room_stream_ordering_update.most_recent_event_stream_ordering
+ )
+ bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp
+
+ # Check if the current state has been updated since we gathered it.
+ # We're being careful not to insert/overwrite with stale data.
+ state_deltas_since_we_gathered_current_state = (
+ self.get_current_state_deltas_for_room_txn(
+ txn,
+ room_id,
+ from_token=RoomStreamToken(
+ stream=most_recent_current_state_delta_stream_id
+ ),
+ to_token=None,
+ )
+ )
+ for state_delta in state_deltas_since_we_gathered_current_state:
+ # We only need to check for the state is relevant to the
+ # `sliding_sync_joined_rooms` table.
+ if (
+ state_delta.event_type,
+ state_delta.state_key,
+ ) in SLIDING_SYNC_RELEVANT_STATE_SET:
+ # Raising exception so we can just exit and try again. It would
+ # be hard to resolve this within the transaction because we need
+ # to get full events out that take redactions into account. We
+ # could add some retry logic here, but it's easier to just let
+ # the background update try again.
+ raise Exception(
+ "Current state was updated after we gathered it to update "
+ + "`sliding_sync_joined_rooms` in the background update. "
+ + "Raising exception so we can just try again."
+ )
+
+ # Since we fully insert rows into `sliding_sync_joined_rooms`, we can
+ # just do everything on insert and `ON CONFLICT DO NOTHING`.
+ #
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ values={},
+ insertion_values={
+ **update_map,
+ # The reason we're only *inserting* (not *updating*) `event_stream_ordering`
+ # and `bump_stamp` is because if they are present, that means they are already
+ # up-to-date.
+ "event_stream_ordering": event_stream_ordering,
+ "bump_stamp": bump_stamp,
+ },
+ )
+
+ # Now that we've processed all the room, we can remove them from the
+ # queue.
+ #
+ # Note: we need to remove all the rooms from the queue we pulled out
+ # from the DB, not just the ones we've processed above. Otherwise
+ # we'll simply keep pulling out the same rooms over and over again.
+ self.db_pool.simple_delete_many_batch_txn(
+ txn,
+ table="sliding_sync_joined_rooms_to_recalculate",
+ keys=("room_id",),
+ values=rooms_to_update,
+ )
+
+ await self.db_pool.runInteraction(
+ "sliding_sync_joined_rooms_bg_update", _fill_table_txn
+ )
+
+ return len(rooms_to_update)
+
+ async def _sliding_sync_membership_snapshots_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the `sliding_sync_membership_snapshots` table.
+ """
+ # We do this in two phases: a) the initial phase where we go through all
+ # room memberships, and then b) a second phase where we look at new
+ # memberships (this is to handle the case where we downgrade and then
+ # upgrade again).
+ #
+ # We have to do this as two phases (rather than just the second phase
+ # where we iterate on event_stream_ordering), as the
+ # `event_stream_ordering` column may have null values for old rows.
+ # Therefore we first do the set of historic rooms and *then* look at any
+ # new rows (which will have a non-null `event_stream_ordering`).
+ initial_phase = progress.get("initial_phase")
+ if initial_phase is None:
+ # If this is the first run, store the current max stream position.
+ # We know we will go through all memberships less than the current
+ # max in the initial phase.
+ progress = {
+ "initial_phase": True,
+ "last_event_stream_ordering": self.get_room_max_stream_ordering(),
+ }
+ await self.db_pool.updates._background_update_progress(
+ _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
+ progress,
+ )
+ initial_phase = True
+
+ last_room_id = progress.get("last_room_id", "")
+ last_event_stream_ordering = progress["last_event_stream_ordering"]
+
+ def _find_memberships_to_update_txn(
+ txn: LoggingTransaction,
+ ) -> List[
+ Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
+ ]:
+ # Fetch the set of event IDs that we want to update
+
+ if initial_phase:
+ # There are some old out-of-band memberships (before
+ # https://github.com/matrix-org/synapse/issues/6983) where we don't have
+ # the corresponding room stored in the `rooms` table`. We use `LEFT JOIN
+ # rooms AS r USING (room_id)` to find the rooms missing from `rooms` and
+ # insert a row for them below.
+ txn.execute(
+ """
+ SELECT
+ c.room_id,
+ r.room_id,
+ c.user_id,
+ e.sender,
+ c.event_id,
+ c.membership,
+ e.stream_ordering,
+ e.instance_name,
+ e.outlier
+ FROM local_current_membership AS c
+ INNER JOIN events AS e USING (event_id)
+ LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
+ WHERE c.room_id > ?
+ ORDER BY c.room_id ASC
+ LIMIT ?
+ """,
+ (last_room_id, batch_size),
+ )
+ elif last_event_stream_ordering is not None:
+ # It's important to sort by `event_stream_ordering` *ascending* (oldest to
+ # newest) so that if we see that this background update in progress and want
+ # to start the catch-up process, we can safely assume that it will
+ # eventually get to the rooms we want to catch-up on anyway (see
+ # `_resolve_stale_data_in_sliding_sync_tables()`).
+ #
+ # `c.room_id` is duplicated to make it match what we're doing in the
+ # `initial_phase`. But we can avoid doing the extra `rooms` table join
+ # because we can assume all of these new events won't have this problem.
+ txn.execute(
+ """
+ SELECT
+ c.room_id,
+ c.room_id,
+ c.user_id,
+ e.sender,
+ c.event_id,
+ c.membership,
+ c.event_stream_ordering,
+ e.instance_name,
+ e.outlier
+ FROM local_current_membership AS c
+ INNER JOIN events AS e USING (event_id)
+ WHERE event_stream_ordering > ?
+ ORDER BY event_stream_ordering ASC
+ LIMIT ?
+ """,
+ (last_event_stream_ordering, batch_size),
+ )
+ else:
+ raise Exception("last_event_stream_ordering should not be None")
+
+ memberships_to_update_rows = cast(
+ List[
+ Tuple[
+ str, Optional[str], str, str, str, str, int, Optional[str], bool
+ ]
+ ],
+ txn.fetchall(),
+ )
+
+ return memberships_to_update_rows
+
+ memberships_to_update_rows = await self.db_pool.runInteraction(
+ "sliding_sync_membership_snapshots_bg_update._find_memberships_to_update_txn",
+ _find_memberships_to_update_txn,
+ )
+
+ if not memberships_to_update_rows:
+ if initial_phase:
+ # Move onto the next phase.
+ await self.db_pool.updates._background_update_progress(
+ _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
+ {
+ "initial_phase": False,
+ "last_event_stream_ordering": last_event_stream_ordering,
+ },
+ )
+ return 0
+ else:
+ # We've finished both phases, we're done.
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
+ )
+ return 0
+
+ def _find_previous_membership_txn(
+ txn: LoggingTransaction, event_id: str, user_id: str
+ ) -> Tuple[str, str]:
+ # Find the previous invite/knock event before the leave event. This
+ # is done by looking at the auth events of the invite/knock and
+ # finding the corresponding membership event.
+ txn.execute(
+ """
+ SELECT m.event_id, m.membership
+ FROM event_auth AS a
+ INNER JOIN room_memberships AS m ON (a.auth_id = m.event_id)
+ WHERE a.event_id = ? AND m.user_id = ?
+ """,
+ (event_id, user_id),
+ )
+ row = txn.fetchone()
+
+ # We should see a corresponding previous invite/knock event
+ assert row is not None
+ previous_event_id, membership = row
+
+ return previous_event_id, membership
+
+ # Map from (room_id, user_id) to ...
+ to_insert_membership_snapshots: Dict[
+ Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
+ ] = {}
+ to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfo] = (
+ {}
+ )
+ for (
+ room_id,
+ room_id_from_rooms_table,
+ user_id,
+ sender,
+ membership_event_id,
+ membership,
+ membership_event_stream_ordering,
+ membership_event_instance_name,
+ is_outlier,
+ ) in memberships_to_update_rows:
+ # We don't know how to handle `membership` values other than these. The
+ # code below would need to be updated.
+ assert membership in (
+ Membership.JOIN,
+ Membership.INVITE,
+ Membership.KNOCK,
+ Membership.LEAVE,
+ Membership.BAN,
+ )
+
+ # There are some old out-of-band memberships (before
+ # https://github.com/matrix-org/synapse/issues/6983) where we don't have the
+ # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
+ # constraint on the `sliding_sync_membership_snapshots` table so we have to
+ # fix-up these memberships by adding the room to the `rooms` table.
+ if room_id_from_rooms_table is None:
+ await self.db_pool.simple_insert(
+ table="rooms",
+ values={
+ "room_id": room_id,
+ # Only out-of-band memberships are missing from the `rooms`
+ # table so that is the only type of membership we're dealing
+ # with here. Since we don't calculate the "chain cover" for
+ # out-of-band memberships, we can just set this to `True` as if
+ # the user ever joins the room, we will end up calculating the
+ # "chain cover" anyway.
+ "has_auth_chain_index": True,
+ },
+ )
+
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_membership_snapshots_insert_map: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ ) = {}
+ if membership == Membership.JOIN:
+ # If we're still joined, we can pull from current state.
+ current_state_ids_map: StateMap[
+ str
+ ] = await self.hs.get_storage_controllers().state.get_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ # Partially-stated rooms should have all state events except for
+ # remote membership events so we don't need to wait at all because
+ # we only want some non-membership state
+ await_full_state=False,
+ )
+ # We're iterating over rooms that we are joined to so they should
+ # have `current_state_events` and we should have some current state
+ # for each room
+ assert current_state_ids_map
+
+ fetched_events = await self.get_events(current_state_ids_map.values())
+
+ current_state_map: StateMap[EventBase] = {
+ state_key: fetched_events[event_id]
+ for state_key, event_id in current_state_ids_map.items()
+ # `get_events(...)` will filter out events for unknown room versions
+ if event_id in fetched_events
+ }
+
+ # Can happen for unknown room versions (old room versions that aren't known
+ # anymore) since `get_events(...)` will filter out events for unknown room
+ # versions
+ if not current_state_map:
+ continue
+
+ state_insert_values = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ )
+ sliding_sync_membership_snapshots_insert_map.update(state_insert_values)
+ # We should have some insert values for each room, even if they are `None`
+ assert sliding_sync_membership_snapshots_insert_map
+
+ # We have current state to work from
+ sliding_sync_membership_snapshots_insert_map["has_known_state"] = True
+ elif membership in (Membership.INVITE, Membership.KNOCK) or (
+ membership == Membership.LEAVE and is_outlier
+ ):
+ invite_or_knock_event_id = membership_event_id
+ invite_or_knock_membership = membership
+
+ # If the event is an `out_of_band_membership` (special case of
+ # `outlier`), we never had historical state so we have to pull from
+ # the stripped state on the previous invite/knock event. This gives
+ # us a consistent view of the room state regardless of your
+ # membership (i.e. the room shouldn't disappear if your using the
+ # `is_encrypted` filter and you leave).
+ if membership == Membership.LEAVE and is_outlier:
+ invite_or_knock_event_id, invite_or_knock_membership = (
+ await self.db_pool.runInteraction(
+ "sliding_sync_membership_snapshots_bg_update._find_previous_membership",
+ _find_previous_membership_txn,
+ membership_event_id,
+ user_id,
+ )
+ )
+
+ # Pull from the stripped state on the invite/knock event
+ invite_or_knock_event = await self.get_event(invite_or_knock_event_id)
+
+ raw_stripped_state_events = None
+ if invite_or_knock_membership == Membership.INVITE:
+ invite_room_state = invite_or_knock_event.unsigned.get(
+ "invite_room_state"
+ )
+ raw_stripped_state_events = invite_room_state
+ elif invite_or_knock_membership == Membership.KNOCK:
+ knock_room_state = invite_or_knock_event.unsigned.get(
+ "knock_room_state"
+ )
+ raw_stripped_state_events = knock_room_state
+
+ sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
+ raw_stripped_state_events
+ )
+
+ # We should have some insert values for each room, even if no
+ # stripped state is on the event because we still want to record
+ # that we have no known state
+ assert sliding_sync_membership_snapshots_insert_map
+ elif membership in (Membership.LEAVE, Membership.BAN):
+ # Pull from historical state
+ state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event(
+ membership_event_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ # Partially-stated rooms should have all state events except for
+ # remote membership events so we don't need to wait at all because
+ # we only want some non-membership state
+ await_full_state=False,
+ )
+
+ fetched_events = await self.get_events(state_ids_map.values())
+
+ state_map: StateMap[EventBase] = {
+ state_key: fetched_events[event_id]
+ for state_key, event_id in state_ids_map.items()
+ # `get_events(...)` will filter out events for unknown room versions
+ if event_id in fetched_events
+ }
+
+ # Can happen for unknown room versions (old room versions that aren't known
+ # anymore) since `get_events(...)` will filter out events for unknown room
+ # versions
+ if not state_map:
+ continue
+
+ state_insert_values = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ state_map
+ )
+ )
+ sliding_sync_membership_snapshots_insert_map.update(state_insert_values)
+ # We should have some insert values for each room, even if they are `None`
+ assert sliding_sync_membership_snapshots_insert_map
+
+ # We have historical state to work from
+ sliding_sync_membership_snapshots_insert_map["has_known_state"] = True
+ else:
+ # We don't know how to handle this type of membership yet
+ #
+ # FIXME: We should use `assert_never` here but for some reason
+ # the exhaustive matching doesn't recognize the `Never` here.
+ # assert_never(membership)
+ raise AssertionError(
+ f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
+ )
+
+ to_insert_membership_snapshots[(room_id, user_id)] = (
+ sliding_sync_membership_snapshots_insert_map
+ )
+ to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfo(
+ user_id=user_id,
+ sender=sender,
+ membership_event_id=membership_event_id,
+ membership=membership,
+ membership_event_stream_ordering=membership_event_stream_ordering,
+ # If instance_name is null we default to "master"
+ membership_event_instance_name=membership_event_instance_name
+ or "master",
+ )
+
+ def _fill_table_txn(txn: LoggingTransaction) -> None:
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ #
+ for key, insert_map in to_insert_membership_snapshots.items():
+ room_id, user_id = key
+ membership_info = to_insert_membership_infos[key]
+ sender = membership_info.sender
+ membership_event_id = membership_info.membership_event_id
+ membership = membership_info.membership
+ membership_event_stream_ordering = (
+ membership_info.membership_event_stream_ordering
+ )
+ membership_event_instance_name = (
+ membership_info.membership_event_instance_name
+ )
+
+ # We don't need to upsert the state because we never partially
+ # insert/update the snapshots and anything already there is up-to-date
+ # EXCEPT for the `forgotten` field since that is updated out-of-band
+ # from the membership changes.
+ #
+ # Even though we're only doing insertions, we're using
+ # `simple_upsert_txn()` here to avoid unique violation errors that would
+ # happen from `simple_insert_txn()`
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keyvalues={"room_id": room_id, "user_id": user_id},
+ values={},
+ insertion_values={
+ **insert_map,
+ "sender": sender,
+ "membership_event_id": membership_event_id,
+ "membership": membership,
+ "event_stream_ordering": membership_event_stream_ordering,
+ "event_instance_name": membership_event_instance_name,
+ },
+ )
+ # We need to find the `forgotten` value during the transaction because
+ # we can't risk inserting stale data.
+ txn.execute(
+ """
+ UPDATE sliding_sync_membership_snapshots
+ SET
+ forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
+ WHERE room_id = ? and user_id = ?
+ """,
+ (
+ membership_event_id,
+ room_id,
+ user_id,
+ ),
+ )
+
+ await self.db_pool.runInteraction(
+ "sliding_sync_membership_snapshots_bg_update", _fill_table_txn
+ )
+
+ # Update the progress
+ (
+ room_id,
+ _room_id_from_rooms_table,
+ _user_id,
+ _sender,
+ _membership_event_id,
+ _membership,
+ membership_event_stream_ordering,
+ _membership_event_instance_name,
+ _is_outlier,
+ ) = memberships_to_update_rows[-1]
+
+ progress = {
+ "initial_phase": initial_phase,
+ "last_room_id": room_id,
+ "last_event_stream_ordering": membership_event_stream_ordering,
+ }
+
+ await self.db_pool.updates._background_update_progress(
+ _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
+ progress,
+ )
+
+ return len(memberships_to_update_rows)
+
+
+def _resolve_stale_data_in_sliding_sync_tables(
+ txn: LoggingTransaction,
+) -> None:
+ """
+ Clears stale/out-of-date entries from the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+
+ This accounts for when someone downgrades their Synapse version and then upgrades it
+ again. This will ensure that we don't have any stale/out-of-date data in the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new
+ events sent in rooms would have also needed to be written to the sliding sync
+ tables. For example a new event needs to bump `event_stream_ordering` in
+ `sliding_sync_joined_rooms` table or some state in the room changing (like the room
+ name). Or another example of someone's membership changing in a room affecting
+ `sliding_sync_membership_snapshots`.
+
+ This way, if a row exists in the sliding sync tables, we are able to rely on it
+ (accurate data). And if a row doesn't exist, we use a fallback to get the same info
+ until the background updates fill in the rows or a new event comes in triggering it
+ to be fully inserted.
+
+ FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ foreground update for
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ https://github.com/element-hq/synapse/issues/17623)
+ """
+
+ _resolve_stale_data_in_sliding_sync_joined_rooms_table(txn)
+ _resolve_stale_data_in_sliding_sync_membership_snapshots_table(txn)
+
+
+def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
+ txn: LoggingTransaction,
+) -> None:
+ """
+ Clears stale/out-of-date entries from the `sliding_sync_joined_rooms` table and
+ kicks-off the background update to catch-up with what we missed while Synapse was
+ downgraded.
+
+ See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
+ context.
+ """
+
+ # Find the point when we stopped writing to the `sliding_sync_joined_rooms` table
+ txn.execute(
+ """
+ SELECT event_stream_ordering
+ FROM sliding_sync_joined_rooms
+ ORDER BY event_stream_ordering DESC
+ LIMIT 1
+ """,
+ )
+
+ # If we have nothing written to the `sliding_sync_joined_rooms` table, there is
+ # nothing to clean up
+ row = cast(Optional[Tuple[int]], txn.fetchone())
+ max_stream_ordering_sliding_sync_joined_rooms_table = None
+ depends_on = None
+ if row is not None:
+ (max_stream_ordering_sliding_sync_joined_rooms_table,) = row
+
+ txn.execute(
+ """
+ SELECT room_id
+ FROM events
+ WHERE stream_ordering > ?
+ GROUP BY room_id
+ ORDER BY MAX(stream_ordering) ASC
+ """,
+ (max_stream_ordering_sliding_sync_joined_rooms_table,),
+ )
+
+ room_rows = txn.fetchall()
+ # No new events have been written to the `events` table since the last time we wrote
+ # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
+ # the expected normal scenario for people who have not downgraded their Synapse
+ # version.
+ if not room_rows:
+ return
+
+ # 1000 is an arbitrary batch size with no testing
+ for chunk in batch_iter(room_rows, 1000):
+ # Handle updating the `sliding_sync_joined_rooms` table
+ #
+ # Clear out the stale data
+ DatabasePool.simple_delete_many_batch_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keys=("room_id",),
+ values=chunk,
+ )
+
+ # Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms
+ # that went stale and now need to be recalculated.
+ DatabasePool.simple_upsert_many_txn_native_upsert(
+ txn,
+ table="sliding_sync_joined_rooms_to_recalculate",
+ key_names=("room_id",),
+ key_values=chunk,
+ value_names=(),
+ # No value columns, therefore make a blank list so that the following
+ # zip() works correctly.
+ value_values=[() for x in range(len(chunk))],
+ )
+ else:
+ # Avoid adding the background updates when there is no data to run them on (if
+ # the homeserver has no rooms). The portdb script refuses to run with pending
+ # background updates and since we potentially add them every time the server
+ # starts, we add this check for to allow the script to breath.
+ txn.execute("SELECT 1 FROM local_current_membership LIMIT 1")
+ row = txn.fetchone()
+ if row is None:
+ # There are no rooms, so don't schedule the bg update.
+ return
+
+ # Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is
+ # nothing in the `sliding_sync_joined_rooms` table
+ DatabasePool.simple_upsert_txn_native_upsert(
+ txn,
+ table="background_updates",
+ keyvalues={
+ "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
+ },
+ values={},
+ # Only insert the row if it doesn't already exist. If it already exists,
+ # we're already working on it
+ insertion_values={
+ "progress_json": "{}",
+ },
+ )
+ depends_on = (
+ _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
+ )
+
+ # Now kick-off the background update to catch-up with what we missed while Synapse
+ # was downgraded.
+ #
+ # We may need to catch-up on everything if we have nothing written to the
+ # `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms
+ # on their server (so the normal background update completes), downgrade Synapse
+ # versions, join and create some new rooms, and upgrade again.
+ DatabasePool.simple_upsert_txn_native_upsert(
+ txn,
+ table="background_updates",
+ keyvalues={
+ "update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE
+ },
+ values={},
+ # Only insert the row if it doesn't already exist. If it already exists, we will
+ # eventually fill in the rows we're trying to populate.
+ insertion_values={
+ # Empty progress is expected since it's not used for this background update.
+ "progress_json": "{}",
+ # Wait for the prefill to finish
+ "depends_on": depends_on,
+ },
+ )
+
+
+def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
+ txn: LoggingTransaction,
+) -> None:
+ """
+ Clears stale/out-of-date entries from the `sliding_sync_membership_snapshots` table
+ and kicks-off the background update to catch-up with what we missed while Synapse
+ was downgraded.
+
+ See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
+ context.
+ """
+
+ # Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table
+ txn.execute(
+ """
+ SELECT event_stream_ordering
+ FROM sliding_sync_membership_snapshots
+ ORDER BY event_stream_ordering DESC
+ LIMIT 1
+ """,
+ )
+
+ # If we have nothing written to the `sliding_sync_membership_snapshots` table,
+ # there is nothing to clean up
+ row = cast(Optional[Tuple[int]], txn.fetchone())
+ max_stream_ordering_sliding_sync_membership_snapshots_table = None
+ if row is not None:
+ (max_stream_ordering_sliding_sync_membership_snapshots_table,) = row
+
+ # XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is
+ # set out-of-band, there is no way to tell whether it was set while Synapse was
+ # downgraded. The only thing the user can do is `/forget` again if they run into
+ # this.
+ #
+ # This only picks up changes to memberships.
+ txn.execute(
+ """
+ SELECT user_id, room_id
+ FROM local_current_membership
+ WHERE event_stream_ordering > ?
+ ORDER BY event_stream_ordering ASC
+ """,
+ (max_stream_ordering_sliding_sync_membership_snapshots_table,),
+ )
+
+ membership_rows = txn.fetchall()
+ # No new events have been written to the `events` table since the last time we wrote
+ # to the `sliding_sync_membership_snapshots` table so there is nothing to clean up.
+ # This is the expected normal scenario for people who have not downgraded their
+ # Synapse version.
+ if not membership_rows:
+ return
+
+ # 1000 is an arbitrary batch size with no testing
+ for chunk in batch_iter(membership_rows, 1000):
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ #
+ DatabasePool.simple_delete_many_batch_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keys=("user_id", "room_id"),
+ values=chunk,
+ )
+ else:
+ # Avoid adding the background updates when there is no data to run them on (if
+ # the homeserver has no rooms). The portdb script refuses to run with pending
+ # background updates and since we potentially add them every time the server
+ # starts, we add this check for to allow the script to breath.
+ txn.execute("SELECT 1 FROM local_current_membership LIMIT 1")
+ row = txn.fetchone()
+ if row is None:
+ # There are no rooms, so don't schedule the bg update.
+ return
+
+ # Now kick-off the background update to catch-up with what we missed while Synapse
+ # was downgraded.
+ #
+ # We may need to catch-up on everything if we have nothing written to the
+ # `sliding_sync_membership_snapshots` table yet. This could happen if someone had
+ # zero rooms on their server (so the normal background update completes), downgrade
+ # Synapse versions, join and create some new rooms, and upgrade again.
+ #
+ progress_json: JsonDict = {}
+ if max_stream_ordering_sliding_sync_membership_snapshots_table is not None:
+ progress_json["initial_phase"] = False
+ progress_json["last_event_stream_ordering"] = (
+ max_stream_ordering_sliding_sync_membership_snapshots_table
+ )
+
+ DatabasePool.simple_upsert_txn_native_upsert(
+ txn,
+ table="background_updates",
+ keyvalues={
+ "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
+ },
+ values={},
+ # Only insert the row if it doesn't already exist. If it already exists, we will
+ # eventually fill in the rows we're trying to populate.
+ insertion_values={
+ "progress_json": json_encoder.encode(progress_json),
+ },
+ )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4d4877c4c3..6079cc4a52 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -457,6 +457,8 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]:
"""Get an event from the database by event_id.
+ Events for unknown room versions will also be filtered out.
+
Args:
event_id: The event_id of the event to fetch
@@ -511,6 +513,10 @@ class EventsWorkerStore(SQLBaseStore):
) -> Dict[str, EventBase]:
"""Get events from the database
+ Unknown events will be omitted from the response.
+
+ Events for unknown room versions will also be filtered out.
+
Args:
event_ids: The event_ids of the events to fetch
@@ -553,6 +559,8 @@ class EventsWorkerStore(SQLBaseStore):
Unknown events will be omitted from the response.
+ Events for unknown room versions will also be filtered out.
+
Args:
event_ids: The event_ids of the events to fetch
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 3b81ed943c..fc4c286595 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so must be deleted first.
"local_current_membership",
"room_memberships",
+ # Note: the sliding_sync_ tables have foreign keys to the `events` table
+ # so must be deleted first.
+ "sliding_sync_joined_rooms",
+ "sliding_sync_membership_snapshots",
"events",
"federation_inbound_events_staging",
"receipts_graph",
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 71baf57663..722686d4b8 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
+ self.db_pool.simple_update_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ updatevalues={"forgotten": 1},
+ )
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
self._invalidate_cache_and_stream(
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index eaa13da368..ba52fff652 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -161,45 +161,80 @@ class StateDeltasStore(SQLBaseStore):
self._get_max_stream_id_in_current_state_deltas_txn,
)
- @trace
- async def get_current_state_deltas_for_room(
- self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+ def get_current_state_deltas_for_room_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ *,
+ from_token: Optional[RoomStreamToken],
+ to_token: Optional[RoomStreamToken],
) -> List[StateDelta]:
- """Get the state deltas between two tokens."""
-
- if not self._curr_state_delta_stream_cache.has_entity_changed(
- room_id, from_token.stream
- ):
- return []
+ """
+ Get the state deltas between two tokens.
- def get_current_state_deltas_for_room_txn(
- txn: LoggingTransaction,
- ) -> List[StateDelta]:
- sql = """
+ (> `from_token` and <= `to_token`)
+ """
+ from_clause = ""
+ from_args = []
+ if from_token is not None:
+ from_clause = "AND ? < stream_id"
+ from_args = [from_token.stream]
+
+ to_clause = ""
+ to_args = []
+ if to_token is not None:
+ to_clause = "AND stream_id <= ?"
+ to_args = [to_token.get_max_stream_pos()]
+
+ sql = f"""
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
- WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+ WHERE room_id = ? {from_clause} {to_clause}
ORDER BY stream_id ASC
"""
- txn.execute(
- sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+ txn.execute(sql, [room_id] + from_args + to_args)
+
+ return [
+ StateDelta(
+ stream_id=row[1],
+ room_id=room_id,
+ event_type=row[2],
+ state_key=row[3],
+ event_id=row[4],
+ prev_event_id=row[5],
)
+ for row in txn
+ if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+ ]
- return [
- StateDelta(
- stream_id=row[1],
- room_id=room_id,
- event_type=row[2],
- state_key=row[3],
- event_id=row[4],
- prev_event_id=row[5],
- )
- for row in txn
- if _filter_results_by_stream(from_token, to_token, row[0], row[1])
- ]
+ @trace
+ async def get_current_state_deltas_for_room(
+ self,
+ room_id: str,
+ *,
+ from_token: Optional[RoomStreamToken],
+ to_token: Optional[RoomStreamToken],
+ ) -> List[StateDelta]:
+ """
+ Get the state deltas between two tokens.
+
+ (> `from_token` and <= `to_token`)
+ """
+
+ if (
+ from_token is not None
+ and not self._curr_state_delta_stream_cache.has_entity_changed(
+ room_id, from_token.stream
+ )
+ ):
+ return []
return await self.db_pool.runInteraction(
- "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+ "get_current_state_deltas_for_room",
+ self.get_current_state_deltas_for_room_txn,
+ room_id,
+ from_token=from_token,
+ to_token=to_token,
)
@trace
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e33a8cfe97..1a59e0b5a8 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1264,12 +1264,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ async def get_last_event_pos_in_room(
+ self,
+ room_id: str,
+ event_types: Optional[StrCollection] = None,
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
+ """
+ Returns the ID and event position of the last event in a room.
+
+ Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
+
+ Args:
+ room_id
+ event_types: Optional allowlist of event types to filter by
+
+ Returns:
+ The ID of the most recent event and it's position, or None if there are no
+ events in the room that match the given event types.
+ """
+
+ def _get_last_event_pos_in_room_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
+ event_type_clause = ""
+ event_type_args: List[str] = []
+ if event_types is not None and len(event_types) > 0:
+ event_type_clause, event_type_args = make_in_list_sql_clause(
+ txn.database_engine, "type", event_types
+ )
+ event_type_clause = f"AND {event_type_clause}"
+
+ sql = f"""
+ SELECT event_id, stream_ordering, instance_name
+ FROM events
+ LEFT JOIN rejections USING (event_id)
+ WHERE room_id = ?
+ {event_type_clause}
+ AND NOT outlier
+ AND rejections.event_id IS NULL
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ """
+
+ txn.execute(
+ sql,
+ [room_id] + event_type_args,
+ )
+
+ row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
+ if row is not None:
+ event_id, stream_ordering, instance_name = row
+
+ return event_id, PersistedEventPosition(
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
+ )
+
+ return None
+
+ return await self.db_pool.runInteraction(
+ "get_last_event_pos_in_room",
+ _get_last_event_pos_in_room_txn,
+ )
+
@trace
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
- event_types: Optional[Collection[str]] = None,
+ event_types: Optional[StrCollection] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
|