diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..b7cc0433e7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -32,8 +32,10 @@ from typing import (
Iterable,
List,
Optional,
+ Sequence,
Set,
Tuple,
+ TypedDict,
cast,
)
@@ -41,17 +43,24 @@ import attr
from prometheus_client import Counter
import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+)
from synapse.api.errors import PartialStateConflictError
from synapse.api.room_versions import RoomVersions
-from synapse.events import EventBase, relation_from_event
+from synapse.events import EventBase, StrippedStateEvent, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ make_tuple_in_list_sql_clause,
)
from synapse.storage.databases.main.event_federation import EventFederationStore
from synapse.storage.databases.main.events_worker import EventCacheEntry
@@ -59,7 +68,15 @@ from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
-from synapse.types import JsonDict, StateMap, StrCollection, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ MutableStateMap,
+ StateMap,
+ StrCollection,
+ get_domain_from_id,
+)
+from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.stringutils import non_null_str_or_none
@@ -78,6 +95,19 @@ event_counter = Counter(
["type", "origin_type", "origin_entity"],
)
+# State event type/key pairs that we need to gather to fill in the
+# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+SLIDING_SYNC_RELEVANT_STATE_SET = (
+ # So we can fill in the `room_type` column
+ (EventTypes.Create, ""),
+ # So we can fill in the `is_encrypted` column
+ (EventTypes.RoomEncryption, ""),
+ # So we can fill in the `room_name` column
+ (EventTypes.Name, ""),
+ # So we can fill in the `tombstone_successor_room_id` column
+ (EventTypes.Tombstone, ""),
+)
+
@attr.s(slots=True, auto_attribs=True)
class DeltaState:
@@ -99,6 +129,80 @@ class DeltaState:
return not self.to_delete and not self.to_insert and not self.no_longer_in_room
+# We want `total=False` because we want to allow values to be unset.
+class SlidingSyncStateInsertValues(TypedDict, total=False):
+ """
+ Insert values relevant for the `sliding_sync_joined_rooms` and
+ `sliding_sync_membership_snapshots` database tables.
+ """
+
+ room_type: Optional[str]
+ is_encrypted: Optional[bool]
+ room_name: Optional[str]
+ tombstone_successor_room_id: Optional[str]
+
+
+class SlidingSyncMembershipSnapshotSharedInsertValues(
+ SlidingSyncStateInsertValues, total=False
+):
+ """
+ Insert values for `sliding_sync_membership_snapshots` that we can share across
+ multiple memberships
+ """
+
+ has_known_state: Optional[bool]
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncMembershipInfo:
+ """
+ Values unique to each membership
+ """
+
+ user_id: str
+ sender: str
+ membership_event_id: str
+ membership: str
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo):
+ """
+ SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership
+ event
+ """
+
+ membership_event_stream_ordering: int
+ membership_event_instance_name: str
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncTableChanges:
+ room_id: str
+ # If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
+ # fully-insert it which means we also need to include a `bump_stamp` value to use
+ # for the row. This should only be populated when we're trying to fully-insert a
+ # row.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ joined_room_bump_stamp_to_fully_insert: Optional[int]
+ # Values to upsert into `sliding_sync_joined_rooms`
+ joined_room_updates: SlidingSyncStateInsertValues
+
+ # Shared values to upsert into `sliding_sync_membership_snapshots` for each
+ # `to_insert_membership_snapshots`
+ membership_snapshot_shared_insert_values: (
+ SlidingSyncMembershipSnapshotSharedInsertValues
+ )
+ # List of membership to insert into `sliding_sync_membership_snapshots`
+ to_insert_membership_snapshots: List[SlidingSyncMembershipInfo]
+ # List of user_id to delete from `sliding_sync_membership_snapshots`
+ to_delete_membership_snapshots: List[str]
+
+
@attr.s(slots=True, auto_attribs=True)
class NewEventChainLinks:
"""Information about new auth chain links that need to be added to the DB.
@@ -142,9 +246,9 @@ class PersistEventsStore:
self.is_mine_id = hs.is_mine_id
# This should only exist on instances that are configured to write
- assert (
- hs.get_instance_name() in hs.config.worker.writers.events
- ), "Can only instantiate EventsStore on master"
+ assert hs.get_instance_name() in hs.config.worker.writers.events, (
+ "Can only instantiate EventsStore on master"
+ )
# Since we have been configured to write, we ought to have id generators,
# rather than id trackers.
@@ -223,9 +327,24 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
+ # XXX: We can't rely on `stream_ordering`/`instance_name` being correct
+ # at this point. We could be working with events that were previously
+ # persisted as an `outlier` with one `stream_ordering` but are now being
+ # persisted again and de-outliered and are being assigned a different
+ # `stream_ordering` here that won't end up being used.
+ # `_update_outliers_txn()` will fix this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted).
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
+ sliding_sync_table_changes = None
+ if state_delta_for_room is not None:
+ sliding_sync_table_changes = (
+ await self._calculate_sliding_sync_table_changes(
+ room_id, events_and_contexts, state_delta_for_room
+ )
+ )
+
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
@@ -235,6 +354,7 @@ class PersistEventsStore:
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
new_event_links=new_event_links,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
@@ -261,6 +381,301 @@ class PersistEventsStore:
(room_id,), frozenset(new_forward_extremities)
)
+ async def _calculate_sliding_sync_table_changes(
+ self,
+ room_id: str,
+ events_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+ delta_state: DeltaState,
+ ) -> SlidingSyncTableChanges:
+ """
+ Calculate the changes to the `sliding_sync_membership_snapshots` and
+ `sliding_sync_joined_rooms` tables given the deltas that are going to be used to
+ update the `current_state_events` table.
+
+ Just a bunch of pre-processing so we so we don't need to spend time in the
+ transaction itself gathering all of this info. It's also easier to deal with
+ redactions outside of a transaction.
+
+ Args:
+ room_id: The room ID currently being processed.
+ events_and_contexts: List of tuples of (event, context) being persisted.
+ This is completely optional (you can pass an empty list) and will just
+ save us from fetching the events from the database if we already have
+ them. We assume the list is sorted ascending by `stream_ordering`. We
+ don't care about the sort when the events are backfilled (with negative
+ `stream_ordering`).
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+
+ Returns:
+ SlidingSyncTableChanges
+ """
+ to_insert = delta_state.to_insert
+ to_delete = delta_state.to_delete
+
+ # If no state is changing, we don't need to do anything. This can happen when a
+ # partial-stated room is re-syncing the current state.
+ if not to_insert and not to_delete:
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ joined_room_bump_stamp_to_fully_insert=None,
+ joined_room_updates={},
+ membership_snapshot_shared_insert_values={},
+ to_insert_membership_snapshots=[],
+ to_delete_membership_snapshots=[],
+ )
+
+ event_map = {event.event_id: event for event, _ in events_and_contexts}
+
+ # Handle gathering info for the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ user_ids_to_delete_membership_snapshots = [
+ state_key
+ for event_type, state_key in to_delete
+ if event_type == EventTypes.Member and self.is_mine_id(state_key)
+ ]
+
+ membership_snapshot_shared_insert_values: SlidingSyncMembershipSnapshotSharedInsertValues = {}
+ membership_infos_to_insert_membership_snapshots: List[
+ SlidingSyncMembershipInfo
+ ] = []
+ if to_insert:
+ membership_event_id_to_user_id_map: Dict[str, str] = {}
+ for state_key, event_id in to_insert.items():
+ if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
+ membership_event_id_to_user_id_map[event_id] = state_key[1]
+
+ membership_event_map: Dict[str, EventBase] = {}
+ # In normal event persist scenarios, we should be able to find the
+ # membership events in the `events_and_contexts` given to us but it's
+ # possible a state reset happened which added us to the room without a
+ # corresponding new membership event (reset back to a previous membership).
+ missing_membership_event_ids: Set[str] = set()
+ for membership_event_id in membership_event_id_to_user_id_map.keys():
+ membership_event = event_map.get(membership_event_id)
+ if membership_event:
+ membership_event_map[membership_event_id] = membership_event
+ else:
+ missing_membership_event_ids.add(membership_event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_membership_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_membership_event_ids
+ )
+ # There shouldn't be any missing events
+ assert remaining_events.keys() == missing_membership_event_ids, (
+ missing_membership_event_ids.difference(remaining_events.keys())
+ )
+ membership_event_map.update(remaining_events)
+
+ for (
+ membership_event_id,
+ user_id,
+ ) in membership_event_id_to_user_id_map.items():
+ membership_infos_to_insert_membership_snapshots.append(
+ # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
+ # because we're sourcing the event from `events_and_contexts`, we
+ # can't rely on `stream_ordering`/`instance_name` being correct at
+ # this point. We could be working with events that were previously
+ # persisted as an `outlier` with one `stream_ordering` but are now
+ # being persisted again and de-outliered and assigned a different
+ # `stream_ordering` that won't end up being used. Since we call
+ # `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use
+ # the `stream_ordering` from the first time it was persisted), we're
+ # working with an unreliable `stream_ordering` value that will
+ # possibly be unused and not make it into the `events` table.
+ SlidingSyncMembershipInfo(
+ user_id=user_id,
+ sender=membership_event_map[membership_event_id].sender,
+ membership_event_id=membership_event_id,
+ membership=membership_event_map[membership_event_id].membership,
+ )
+ )
+
+ if membership_infos_to_insert_membership_snapshots:
+ current_state_ids_map: MutableStateMap[str] = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+ # Since we fetched the current state before we took `to_insert`/`to_delete`
+ # into account, we need to do a couple fixups.
+ #
+ # Update the current_state_map with what we have `to_delete`
+ for state_key in to_delete:
+ current_state_ids_map.pop(state_key, None)
+ # Update the current_state_map with what we have `to_insert`
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ current_state_map: MutableStateMap[EventBase] = {}
+ # In normal event persist scenarios, we probably won't be able to find
+ # these state events in `events_and_contexts` since we don't generally
+ # batch up local membership changes with other events, but it can
+ # happen.
+ missing_state_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_state_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events
+ if missing_state_event_ids:
+ remaining_events = await self.store.get_events(
+ missing_state_event_ids
+ )
+ # There shouldn't be any missing events
+ assert remaining_events.keys() == missing_state_event_ids, (
+ missing_state_event_ids.difference(remaining_events.keys())
+ )
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ if current_state_map:
+ state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ membership_snapshot_shared_insert_values.update(state_insert_values)
+ # We have current state to work from
+ membership_snapshot_shared_insert_values["has_known_state"] = True
+ else:
+ # We don't have any `current_state_events` anymore (previously
+ # cleared out because of `no_longer_in_room`). This can happen if
+ # one user is joined and another is invited (some non-join
+ # membership). If the joined user leaves, we are `no_longer_in_room`
+ # and `current_state_events` is cleared out. When the invited user
+ # rejects the invite (leaves the room), we will end up here.
+ #
+ # In these cases, we should inherit the meta data from the previous
+ # snapshot so we shouldn't update any of the state values. When
+ # using sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ #
+ # Ideally, we could additionally assert that we're only here for
+ # valid non-join membership transitions.
+ assert delta_state.no_longer_in_room
+
+ # Handle gathering info for the `sliding_sync_joined_rooms` table
+ #
+ # We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ joined_room_updates: SlidingSyncStateInsertValues = {}
+ bump_stamp_to_fully_insert: Optional[int] = None
+ if not delta_state.no_longer_in_room:
+ current_state_ids_map = {}
+
+ # Always fully-insert rows if they don't already exist in the
+ # `sliding_sync_joined_rooms` table. This way we can rely on a row if it
+ # exists in the table.
+ #
+ # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
+ # https://github.com/element-hq/synapse/issues/17623)
+ existing_row_in_table = await self.store.db_pool.simple_select_one_onecol(
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="room_id",
+ allow_none=True,
+ )
+ if not existing_row_in_table:
+ most_recent_bump_event_pos_results = (
+ await self.store.get_last_event_pos_in_room(
+ room_id,
+ event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+ )
+ )
+ if most_recent_bump_event_pos_results is not None:
+ _, new_bump_event_pos = most_recent_bump_event_pos_results
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead just leave it as `None` in the table and we will use their
+ # membership event position as the bump event position in the
+ # Sliding Sync API.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp_to_fully_insert = new_bump_event_pos.stream
+
+ current_state_ids_map = dict(
+ await self.store.get_partial_filtered_current_state_ids(
+ room_id,
+ state_filter=StateFilter.from_types(
+ SLIDING_SYNC_RELEVANT_STATE_SET
+ ),
+ )
+ )
+
+ # Look through the items we're going to insert into the current state to see
+ # if there is anything that we care about and should also update in the
+ # `sliding_sync_joined_rooms` table.
+ for state_key, event_id in to_insert.items():
+ if state_key in SLIDING_SYNC_RELEVANT_STATE_SET:
+ current_state_ids_map[state_key] = event_id
+
+ # Get the full event objects for the current state events
+ #
+ # In normal event persist scenarios, we should be able to find the state
+ # events in the `events_and_contexts` given to us but it's possible a state
+ # reset happened which that reset back to a previous state.
+ current_state_map = {}
+ missing_event_ids: Set[str] = set()
+ for state_key, event_id in current_state_ids_map.items():
+ event = event_map.get(event_id)
+ if event:
+ current_state_map[state_key] = event
+ else:
+ missing_event_ids.add(event_id)
+
+ # Otherwise, we need to find a couple events that we were reset to.
+ if missing_event_ids:
+ remaining_events = await self.store.get_events(missing_event_ids)
+ # There shouldn't be any missing events
+ assert remaining_events.keys() == missing_event_ids, (
+ missing_event_ids.difference(remaining_events.keys())
+ )
+ for event in remaining_events.values():
+ current_state_map[(event.type, event.state_key)] = event
+
+ joined_room_updates = (
+ PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
+ current_state_map
+ )
+ )
+
+ # If something is being deleted from the state, we need to clear it out
+ for state_key in to_delete:
+ if state_key == (EventTypes.Create, ""):
+ joined_room_updates["room_type"] = None
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ joined_room_updates["is_encrypted"] = False
+ elif state_key == (EventTypes.Name, ""):
+ joined_room_updates["room_name"] = None
+
+ return SlidingSyncTableChanges(
+ room_id=room_id,
+ # For `sliding_sync_joined_rooms`
+ joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
+ joined_room_updates=joined_room_updates,
+ # For `sliding_sync_membership_snapshots`
+ membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values,
+ to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots,
+ to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots,
+ )
+
async def calculate_chain_cover_index_for_events(
self, room_id: str, events: Collection[EventBase]
) -> Dict[str, NewEventChainLinks]:
@@ -315,7 +730,7 @@ class PersistEventsStore:
keyvalues={},
retcols=("event_id",),
)
- already_persisted_events = {event_id for event_id, in rows}
+ already_persisted_events = {event_id for (event_id,) in rows}
state_events = [
event
for event in state_events
@@ -458,6 +873,7 @@ class PersistEventsStore:
state_delta_for_room: Optional[DeltaState],
new_forward_extremities: Optional[Set[str]],
new_event_links: Dict[str, NewEventChainLinks],
+ sliding_sync_table_changes: Optional[SlidingSyncTableChanges],
) -> None:
"""Insert some number of room events into the necessary database tables.
@@ -478,9 +894,14 @@ class PersistEventsStore:
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room: The current-state delta for the room.
+ state_delta_for_room: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
new_forward_extremities: The new forward extremities for the room:
a set of the event ids which are the forward extremities.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
@@ -590,10 +1011,22 @@ class PersistEventsStore:
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
if state_delta_for_room:
+ # If the state delta exists, the sliding sync table changes should also exist
+ assert sliding_sync_table_changes is not None
+
self._update_current_state_txn(
- txn, room_id, state_delta_for_room, min_stream_order
+ txn,
+ room_id,
+ state_delta_for_room,
+ min_stream_order,
+ sliding_sync_table_changes,
)
+ # We only update the sliding sync tables for non-backfilled events.
+ self._update_sliding_sync_tables_with_new_persisted_events_txn(
+ txn, room_id, events_and_contexts
+ )
+
def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,
@@ -1128,8 +1561,20 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
- """Update the current state stored in the datatabase for the given room"""
+ """
+ Update the current state stored in the datatabase for the given room
+
+ Args:
+ room_id
+ state_delta: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
if state_delta.is_noop():
return
@@ -1141,6 +1586,7 @@ class PersistEventsStore:
room_id,
delta_state=state_delta,
stream_id=stream_ordering,
+ sliding_sync_table_changes=sliding_sync_table_changes,
)
def _update_current_state_txn(
@@ -1149,16 +1595,40 @@ class PersistEventsStore:
room_id: str,
delta_state: DeltaState,
stream_id: int,
+ sliding_sync_table_changes: SlidingSyncTableChanges,
) -> None:
+ """
+ Handles updating tables that track the current state of a room.
+
+ Args:
+ txn
+ room_id
+ delta_state: Deltas that are going to be used to update the
+ `current_state_events` table. Changes to the current state of the room.
+ stream_id: This is expected to be the minimum `stream_ordering` for the
+ batch of events that we are persisting; which means we do not end up in a
+ situation where workers see events before the `current_state_delta` updates.
+ FIXME: However, this function also gets called with next upcoming
+ `stream_ordering` when we re-sync the state of a partial stated room (see
+ `update_current_state(...)`) which may be "correct" but it would be good to
+ nail down what exactly is the expected value here.
+ sliding_sync_table_changes: Changes to the
+ `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
+ derived from the given `delta_state` (see
+ `_calculate_sliding_sync_table_changes(...)`)
+ """
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
+ # Sanity check we're processing the same thing
+ assert room_id == sliding_sync_table_changes.room_id
+
# Figure out the changes of membership to invalidate the
# `get_rooms_for_user` cache.
# We find out which membership events we may have deleted
# and which we have added, then we invalidate the caches for all
# those users.
- members_changed = {
+ members_to_cache_bust = {
state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
@@ -1182,16 +1652,22 @@ class PersistEventsStore:
"""
txn.execute(sql, (stream_id, self._instance_name, room_id))
+ # Grab the list of users before we clear out the current state
+ users_in_room = self.store.get_users_in_room_txn(txn, room_id)
# We also want to invalidate the membership caches for users
# that were in the room.
- users_in_room = self.store.get_users_in_room_txn(txn, room_id)
- members_changed.update(users_in_room)
+ members_to_cache_bust.update(users_in_room)
self.db_pool.simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ )
else:
# We're still in the room, so we update the current state as normal.
@@ -1216,7 +1692,7 @@ class PersistEventsStore:
"""
txn.execute_batch(
sql,
- (
+ [
(
stream_id,
self._instance_name,
@@ -1229,17 +1705,17 @@ class PersistEventsStore:
state_key,
)
for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
+ ],
)
# Now we actually update the current_state_events table
txn.execute_batch(
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
- (
+ [
(room_id, etype, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
+ ],
)
# We include the membership in the current state table, hence we do
@@ -1260,6 +1736,63 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_joined_rooms` table. We only deal with
+ # updating the state related columns. The
+ # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event
+ # persisting stack (see
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
+ #
+ # We only need to update when one of the relevant state values has changed
+ if sliding_sync_table_changes.joined_room_updates:
+ sliding_sync_updates_keys = (
+ sliding_sync_table_changes.joined_room_updates.keys()
+ )
+ sliding_sync_updates_values = (
+ sliding_sync_table_changes.joined_room_updates.values()
+ )
+
+ args: List[Any] = [
+ room_id,
+ room_id,
+ sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
+ ]
+ args.extend(iter(sliding_sync_updates_values))
+
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
+ # pre-calculate from `events_and_contexts` at the time when
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working
+ # with events that were previously persisted as an `outlier` with one
+ # `stream_ordering` but are now being persisted again and de-outliered
+ # and assigned a different `stream_ordering`. Since we call
+ # `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted), we're working
+ # with an unreliable `stream_ordering` value that will possibly be
+ # unused and not make it into the `events` table.
+ #
+ # We don't update `event_stream_ordering` `ON CONFLICT` because it's
+ # simpler and we can just rely on
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
+ # the right thing (same for `bump_stamp`). The only reason we're
+ # inserting `event_stream_ordering` here is because the column has a
+ # `NON NULL` constraint and we need some answer.
+ txn.execute(
+ f"""
+ INSERT INTO sliding_sync_joined_rooms
+ (room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)})
+ VALUES (
+ ?,
+ (SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1),
+ ?,
+ {", ".join("?" for _ in sliding_sync_updates_values)}
+ )
+ ON CONFLICT (room_id)
+ DO UPDATE SET
+ {", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)}
+ """,
+ args,
+ )
+
# We now update `local_current_membership`. We do this regardless
# of whether we're still in the room or not to handle the case where
# e.g. we just got banned (where we need to record that fact here).
@@ -1272,11 +1805,11 @@ class PersistEventsStore:
txn.execute_batch(
"DELETE FROM local_current_membership"
" WHERE room_id = ? AND user_id = ?",
- (
+ [
(room_id, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
if etype == EventTypes.Member and self.is_mine_id(state_key)
- ),
+ ],
)
if to_insert:
@@ -1296,20 +1829,422 @@ class PersistEventsStore:
],
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ #
+ # This would only happen if someone was state reset out of the room
+ if sliding_sync_table_changes.to_delete_membership_snapshots:
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ column="user_id",
+ values=sliding_sync_table_changes.to_delete_membership_snapshots,
+ keyvalues={"room_id": room_id},
+ )
+
+ # We do this regardless of whether the server is `no_longer_in_room` or not
+ # because we still want a row if a local user was just left/kicked or got banned
+ # from the room.
+ if sliding_sync_table_changes.to_insert_membership_snapshots:
+ # Update the `sliding_sync_membership_snapshots` table
+ #
+ sliding_sync_snapshot_keys = sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
+ sliding_sync_snapshot_values = sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
+ # We need to insert/update regardless of whether we have
+ # `sliding_sync_snapshot_keys` because there are other fields in the `ON
+ # CONFLICT` upsert to run (see inherit case (explained in
+ # `_calculate_sliding_sync_table_changes()`) for more context when this
+ # happens).
+ #
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
+ # pre-calculate from `events_and_contexts` at the time when
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working with
+ # events that were previously persisted as an `outlier` with one
+ # `stream_ordering` but are now being persisted again and de-outliered and
+ # assigned a different `stream_ordering` that won't end up being used. Since
+ # we call `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted), we're working
+ # with an unreliable `stream_ordering` value that will possibly be unused
+ # and not make it into the `events` table.
+ txn.execute_batch(
+ f"""
+ INSERT INTO sliding_sync_membership_snapshots
+ (room_id, user_id, sender, membership_event_id, membership, forgotten, event_stream_ordering, event_instance_name
+ {("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""})
+ VALUES (
+ ?, ?, ?, ?, ?, ?,
+ (SELECT stream_ordering FROM events WHERE event_id = ?),
+ (SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?)
+ {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
+ )
+ ON CONFLICT (room_id, user_id)
+ DO UPDATE SET
+ sender = EXCLUDED.sender,
+ membership_event_id = EXCLUDED.membership_event_id,
+ membership = EXCLUDED.membership,
+ forgotten = EXCLUDED.forgotten,
+ event_stream_ordering = EXCLUDED.event_stream_ordering
+ {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}
+ """,
+ [
+ [
+ room_id,
+ membership_info.user_id,
+ membership_info.sender,
+ membership_info.membership_event_id,
+ membership_info.membership,
+ # Since this is a new membership, it isn't forgotten anymore (which
+ # matches how Synapse currently thinks about the forgotten status)
+ 0,
+ # XXX: We do not use `membership_info.membership_event_stream_ordering` here
+ # because it is an unreliable value. See XXX note above.
+ membership_info.membership_event_id,
+ # XXX: We do not use `membership_info.membership_event_instance_name` here
+ # because it is an unreliable value. See XXX note above.
+ membership_info.membership_event_id,
+ ]
+ + list(sliding_sync_snapshot_values)
+ for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
+ ],
+ )
+
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,
stream_id,
)
+ for user_id in members_to_cache_bust:
+ txn.call_after(
+ self.store._membership_stream_cache.entity_has_changed,
+ user_id,
+ stream_id,
+ )
+
# Invalidate the various caches
- self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+ self.store._invalidate_state_caches_and_stream(
+ txn, room_id, members_to_cache_bust
+ )
# Check if any of the remote membership changes requires us to
# unsubscribe from their device lists.
self.store.handle_potentially_left_users_txn(
- txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
+ txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)}
+ )
+
+ @classmethod
+ def _get_relevant_sliding_sync_current_state_event_ids_txn(
+ cls, txn: LoggingTransaction, room_id: str
+ ) -> MutableStateMap[str]:
+ """
+ Fetch the current state event IDs for the relevant (to the
+ `sliding_sync_joined_rooms` table) state types for the given room.
+
+ Returns:
+ A tuple of:
+ 1. StateMap of event IDs necessary to to fetch the relevant state values
+ needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`.
+ 2. The corresponding latest `stream_id` in the
+ `current_state_delta_stream` table. This is useful to compare against
+ the `current_state_delta_stream` table later so you can check whether
+ the current state has changed since you last fetched the current
+ state.
+ """
+ # Fetch the current state event IDs from the database
+ (
+ event_type_and_state_key_in_list_clause,
+ event_type_and_state_key_args,
+ ) = make_tuple_in_list_sql_clause(
+ txn.database_engine,
+ ("type", "state_key"),
+ SLIDING_SYNC_RELEVANT_STATE_SET,
+ )
+ txn.execute(
+ f"""
+ SELECT c.event_id, c.type, c.state_key
+ FROM current_state_events AS c
+ WHERE
+ c.room_id = ?
+ AND {event_type_and_state_key_in_list_clause}
+ """,
+ [room_id] + event_type_and_state_key_args,
)
+ current_state_map: MutableStateMap[str] = {
+ (event_type, state_key): event_id for event_id, event_type, state_key in txn
+ }
+
+ return current_state_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_state_map(
+ cls, state_map: StateMap[EventBase]
+ ) -> SlidingSyncStateInsertValues:
+ """
+ Extract the relevant state values from the `state_map` needed to insert into the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into
+ the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncStateInsertValues = {}
+
+ # Parse the raw event JSON
+ for state_key, event in state_map.items():
+ if state_key == (EventTypes.Create, ""):
+ room_type = event.content.get(EventContentFields.ROOM_TYPE)
+ # Scrutinize JSON values
+ if room_type is None or (
+ isinstance(room_type, str)
+ # We ignore values with null bytes as Postgres doesn't allow them in
+ # text columns.
+ and "\0" not in room_type
+ ):
+ sliding_sync_insert_map["room_type"] = room_type
+ elif state_key == (EventTypes.RoomEncryption, ""):
+ encryption_algorithm = event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ is_encrypted = encryption_algorithm is not None
+ sliding_sync_insert_map["is_encrypted"] = is_encrypted
+ elif state_key == (EventTypes.Name, ""):
+ room_name = event.content.get(EventContentFields.ROOM_NAME)
+ # Scrutinize JSON values. We ignore values with nulls as
+ # postgres doesn't allow null bytes in text columns.
+ if room_name is None or (
+ isinstance(room_name, str)
+ # We ignore values with null bytes as Postgres doesn't allow them in
+ # text columns.
+ and "\0" not in room_name
+ ):
+ sliding_sync_insert_map["room_name"] = room_name
+ elif state_key == (EventTypes.Tombstone, ""):
+ successor_room_id = event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ # Scrutinize JSON values
+ if successor_room_id is None or (
+ isinstance(successor_room_id, str)
+ # We ignore values with null bytes as Postgres doesn't allow them in
+ # text columns.
+ and "\0" not in successor_room_id
+ ):
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ successor_room_id
+ )
+ else:
+ # We only expect to see events according to the
+ # `SLIDING_SYNC_RELEVANT_STATE_SET`.
+ raise AssertionError(
+ "Unexpected event (we should not be fetching extra events or this "
+ + "piece of code needs to be updated to handle a new event type added "
+ + "to `SLIDING_SYNC_RELEVANT_STATE_SET`): {state_key} {event.event_id}"
+ )
+
+ return sliding_sync_insert_map
+
+ @classmethod
+ def _get_sliding_sync_insert_values_from_stripped_state(
+ cls, unsigned_stripped_state_events: Any
+ ) -> SlidingSyncMembershipSnapshotSharedInsertValues:
+ """
+ Pull out the relevant state values from the stripped state on an invite or knock
+ membership event needed to insert into the `sliding_sync_membership_snapshots`
+ tables.
+
+ Returns:
+ Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
+ state values needed to insert into the `sliding_sync_membership_snapshots` tables.
+ """
+ # Map of values to insert/update in the `sliding_sync_membership_snapshots` table
+ sliding_sync_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
+
+ if unsigned_stripped_state_events is not None:
+ stripped_state_map: MutableStateMap[StrippedStateEvent] = {}
+ if isinstance(unsigned_stripped_state_events, list):
+ for raw_stripped_event in unsigned_stripped_state_events:
+ stripped_state_event = parse_stripped_state_event(
+ raw_stripped_event
+ )
+ if stripped_state_event is not None:
+ stripped_state_map[
+ (
+ stripped_state_event.type,
+ stripped_state_event.state_key,
+ )
+ ] = stripped_state_event
+
+ # If there is some stripped state, we assume the remote server passed *all*
+ # of the potential stripped state events for the room.
+ create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
+ # Sanity check that we at-least have the create event
+ if create_stripped_event is not None:
+ sliding_sync_insert_map["has_known_state"] = True
+
+ # XXX: Keep this up-to-date with `SLIDING_SYNC_RELEVANT_STATE_SET`
+
+ # Find the room_type
+ sliding_sync_insert_map["room_type"] = (
+ create_stripped_event.content.get(EventContentFields.ROOM_TYPE)
+ if create_stripped_event is not None
+ else None
+ )
+
+ # Find whether the room is_encrypted
+ encryption_stripped_event = stripped_state_map.get(
+ (EventTypes.RoomEncryption, "")
+ )
+ encryption = (
+ encryption_stripped_event.content.get(
+ EventContentFields.ENCRYPTION_ALGORITHM
+ )
+ if encryption_stripped_event is not None
+ else None
+ )
+ sliding_sync_insert_map["is_encrypted"] = encryption is not None
+
+ # Find the room_name
+ room_name_stripped_event = stripped_state_map.get((EventTypes.Name, ""))
+ sliding_sync_insert_map["room_name"] = (
+ room_name_stripped_event.content.get(EventContentFields.ROOM_NAME)
+ if room_name_stripped_event is not None
+ else None
+ )
+
+ # Check for null bytes in the room name and type. We have to
+ # ignore values with null bytes as Postgres doesn't allow them
+ # in text columns.
+ if (
+ sliding_sync_insert_map["room_name"] is not None
+ and "\0" in sliding_sync_insert_map["room_name"]
+ ):
+ sliding_sync_insert_map.pop("room_name")
+
+ if (
+ sliding_sync_insert_map["room_type"] is not None
+ and "\0" in sliding_sync_insert_map["room_type"]
+ ):
+ sliding_sync_insert_map.pop("room_type")
+
+ # Find the tombstone_successor_room_id
+ # Note: This isn't one of the stripped state events according to the spec
+ # but seems like there is no reason not to support this kind of thing.
+ tombstone_stripped_event = stripped_state_map.get(
+ (EventTypes.Tombstone, "")
+ )
+ sliding_sync_insert_map["tombstone_successor_room_id"] = (
+ tombstone_stripped_event.content.get(
+ EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
+ )
+ if tombstone_stripped_event is not None
+ else None
+ )
+
+ if (
+ sliding_sync_insert_map["tombstone_successor_room_id"] is not None
+ and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
+ ):
+ sliding_sync_insert_map.pop("tombstone_successor_room_id")
+
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+ else:
+ # No stripped state provided
+ sliding_sync_insert_map["has_known_state"] = False
+ sliding_sync_insert_map["room_type"] = None
+ sliding_sync_insert_map["room_name"] = None
+ sliding_sync_insert_map["is_encrypted"] = False
+
+ return sliding_sync_insert_map
+
+ def _update_sliding_sync_tables_with_new_persisted_events_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
+ ) -> None:
+ """
+ Update the latest `event_stream_ordering`/`bump_stamp` columns in the
+ `sliding_sync_joined_rooms` table for the room with new events.
+
+ This function assumes that `_store_event_txn()` (to persist the event) and
+ `_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has
+ been updated with rooms that were joined) have already been run.
+
+ Args:
+ txn
+ room_id: The room that all of the events belong to
+ events_and_contexts: The events being persisted. We assume the list is
+ sorted ascending by `stream_ordering`. We don't care about the sort when the
+ events are backfilled (with negative `stream_ordering`).
+ """
+
+ # Nothing to do if there are no events
+ if len(events_and_contexts) == 0:
+ return
+
+ # Since the list is sorted ascending by `stream_ordering`, the last event should
+ # have the highest `stream_ordering`.
+ max_stream_ordering = events_and_contexts[-1][
+ 0
+ ].internal_metadata.stream_ordering
+ # `stream_ordering` should be assigned for persisted events
+ assert max_stream_ordering is not None
+ # Check if the event is a backfilled event (with a negative `stream_ordering`).
+ # If one event is backfilled, we assume this whole batch was backfilled.
+ if max_stream_ordering < 0:
+ # We only update the sliding sync tables for non-backfilled events.
+ return
+
+ max_bump_stamp = None
+ for event, _ in reversed(events_and_contexts):
+ # Sanity check that all events belong to the same room
+ assert event.room_id == room_id
+
+ if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
+ # `stream_ordering` should be assigned for persisted events
+ assert event.internal_metadata.stream_ordering is not None
+
+ max_bump_stamp = event.internal_metadata.stream_ordering
+
+ # Since we're iterating in reverse, we can break as soon as we find a
+ # matching bump event which should have the highest `stream_ordering`.
+ break
+
+ # Handle updating the `sliding_sync_joined_rooms` table.
+ #
+ txn.execute(
+ """
+ UPDATE sliding_sync_joined_rooms
+ SET
+ event_stream_ordering = CASE
+ WHEN event_stream_ordering IS NULL OR event_stream_ordering < ?
+ THEN ?
+ ELSE event_stream_ordering
+ END,
+ bump_stamp = CASE
+ WHEN bump_stamp IS NULL OR bump_stamp < ?
+ THEN ?
+ ELSE bump_stamp
+ END
+ WHERE room_id = ?
+ """,
+ (
+ max_stream_ordering,
+ max_stream_ordering,
+ max_bump_stamp,
+ max_bump_stamp,
+ room_id,
+ ),
+ )
+ # This may or may not update any rows depending if we are `no_longer_in_room`
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
@@ -1931,7 +2866,9 @@ class PersistEventsStore:
)
for event in events:
+ # Sanity check that we're working with persisted events
assert event.internal_metadata.stream_ordering is not None
+ assert event.internal_metadata.instance_name is not None
# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
@@ -1945,6 +2882,16 @@ class PersistEventsStore:
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
+ # The only sort of out-of-band-membership events we expect to see here
+ # are remote invites/knocks and LEAVE events corresponding to
+ # rejected/retracted invites and rescinded knocks.
+ assert event.type == EventTypes.Member
+ assert event.membership in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ Membership.LEAVE,
+ )
+
self.db_pool.simple_upsert_txn(
txn,
table="local_current_membership",
@@ -1956,6 +2903,59 @@ class PersistEventsStore:
},
)
+ # Handle updating the `sliding_sync_membership_snapshots` table
+ # (out-of-band membership events only)
+ #
+ raw_stripped_state_events = None
+ if event.membership == Membership.INVITE:
+ invite_room_state = event.unsigned.get("invite_room_state")
+ raw_stripped_state_events = invite_room_state
+ elif event.membership == Membership.KNOCK:
+ knock_room_state = event.unsigned.get("knock_room_state")
+ raw_stripped_state_events = knock_room_state
+
+ insert_values = {
+ "sender": event.sender,
+ "membership_event_id": event.event_id,
+ "membership": event.membership,
+ # Since this is a new membership, it isn't forgotten anymore (which
+ # matches how Synapse currently thinks about the forgotten status)
+ "forgotten": 0,
+ "event_stream_ordering": event.internal_metadata.stream_ordering,
+ "event_instance_name": event.internal_metadata.instance_name,
+ }
+ if event.membership == Membership.LEAVE:
+ # Inherit the meta data from the remote invite/knock. When using
+ # sliding sync filters, this will prevent the room from
+ # disappearing/appearing just because you left the room.
+ pass
+ elif event.membership in (Membership.INVITE, Membership.KNOCK):
+ extra_insert_values = (
+ self._get_sliding_sync_insert_values_from_stripped_state(
+ raw_stripped_state_events
+ )
+ )
+ insert_values.update(extra_insert_values)
+ else:
+ # We don't know how to handle this type of membership yet
+ #
+ # FIXME: We should use `assert_never` here but for some reason
+ # the exhaustive matching doesn't recognize the `Never` here.
+ # assert_never(event.membership)
+ raise AssertionError(
+ f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet"
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_membership_snapshots",
+ keyvalues={
+ "room_id": event.room_id,
+ "user_id": event.state_key,
+ },
+ values=insert_values,
+ )
+
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
@@ -2221,7 +3221,7 @@ class PersistEventsStore:
if notifiable_events:
txn.execute_batch(
sql,
- (
+ [
(
event.room_id,
event.internal_metadata.stream_ordering,
@@ -2229,18 +3229,18 @@ class PersistEventsStore:
event.event_id,
)
for event in notifiable_events
- ),
+ ],
)
# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
- (
+ [
(event.event_id,)
for event, _ in all_events_and_contexts
if event.internal_metadata.is_notifiable()
- ),
+ ],
)
def _remove_push_actions_for_event_id_txn(
@@ -2415,7 +3415,7 @@ class PersistEventsStore:
)
potential_backwards_extremities.difference_update(
- e for e, in existing_events_outliers
+ e for (e,) in existing_events_outliers
)
if potential_backwards_extremities:
@@ -2448,8 +3448,7 @@ class PersistEventsStore:
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
+ "DELETE FROM event_backward_extremities WHERE event_id = ? AND room_id = ?"
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
|