diff --git a/changelog.d/17635.misc b/changelog.d/17635.misc
new file mode 100644
index 0000000000..756918e2b2
--- /dev/null
+++ b/changelog.d/17635.misc
@@ -0,0 +1 @@
+Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 8e3b404aed..8db302b3d8 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -230,6 +230,8 @@ class EventContentFields:
ROOM_NAME: Final = "name"
+ MEMBERSHIP: Final = "membership"
+
# Used in m.room.guest_access events.
GUEST_ACCESS: Final = "guest_access"
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 60c92e5804..f3dbe5bba7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -163,6 +163,15 @@ class SlidingSyncMembershipInfo:
sender: str
membership_event_id: str
membership: str
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo):
+ """
+ SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership
+ event
+ """
+
membership_event_stream_ordering: int
membership_event_instance_name: str
@@ -170,17 +179,6 @@ class SlidingSyncMembershipInfo:
@attr.s(slots=True, auto_attribs=True)
class SlidingSyncTableChanges:
room_id: str
- # `stream_ordering` of the most recent event being persisted in the room. This doesn't
- # need to be perfect, we just need *some* answer that points to a real event in the
- # room in case we are the first ones inserting into the `sliding_sync_joined_rooms`
- # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality,
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
- # `_update_current_state_txn()` whenever a new event is persisted to update it to the
- # correct latest value.
- #
- # This should be *some* value that points to a real event in the room if we are
- # still joined to the room and some state is changing (`to_insert` or `to_delete`).
- joined_room_best_effort_most_recent_stream_ordering: Optional[int]
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
# fully-insert it which means we also need to include a `bump_stamp` value to use
# for the row. This should only be populated when we're trying to fully-insert a
@@ -401,6 +399,9 @@ class PersistEventsStore:
`stream_ordering`).
delta_state: Deltas that are going to be used to update the
`current_state_events` table. Changes to the current state of the room.
+
+ Returns:
+ SlidingSyncTableChanges
"""
to_insert = delta_state.to_insert
to_delete = delta_state.to_delete
@@ -410,7 +411,6 @@ class PersistEventsStore:
if not to_insert and not to_delete:
return SlidingSyncTableChanges(
room_id=room_id,
- joined_room_best_effort_most_recent_stream_ordering=None,
joined_room_bump_stamp_to_fully_insert=None,
joined_room_updates={},
membership_snapshot_shared_insert_values={},
@@ -469,24 +469,24 @@ class PersistEventsStore:
membership_event_id,
user_id,
) in membership_event_id_to_user_id_map.items():
- # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point
- membership_event_stream_ordering = membership_event_map[
- membership_event_id
- ].internal_metadata.stream_ordering
- assert membership_event_stream_ordering is not None
- membership_event_instance_name = membership_event_map[
- membership_event_id
- ].internal_metadata.instance_name
- assert membership_event_instance_name is not None
-
membership_infos_to_insert_membership_snapshots.append(
+ # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
+ # because we're sourcing the event from `events_and_contexts`, we
+ # can't rely on `stream_ordering`/`instance_name` being correct. We
+ # could be working with events that were previously persisted as an
+ # `outlier` with one `stream_ordering` but are now being persisted
+ # again and de-outliered and assigned a different `stream_ordering`
+ # that won't end up being used. Since we call
+ # `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use
+ # the `stream_ordering` from the first time it was persisted), we're
+ # working with an unreliable `stream_ordering` value that will
+ # possibly be unused and not make it into the `events` table.
SlidingSyncMembershipInfo(
user_id=user_id,
sender=membership_event_map[membership_event_id].sender,
membership_event_id=membership_event_id,
membership=membership_event_map[membership_event_id].membership,
- membership_event_stream_ordering=membership_event_stream_ordering,
- membership_event_instance_name=membership_event_instance_name,
)
)
@@ -568,7 +568,6 @@ class PersistEventsStore:
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
#
joined_room_updates: SlidingSyncStateInsertValues = {}
- best_effort_most_recent_stream_ordering: Optional[int] = None
bump_stamp_to_fully_insert: Optional[int] = None
if not delta_state.no_longer_in_room:
current_state_ids_map = {}
@@ -632,9 +631,7 @@ class PersistEventsStore:
# Otherwise, we need to find a couple events that we were reset to.
if missing_event_ids:
- remaining_events = await self.store.get_events(
- current_state_ids_map.values()
- )
+ remaining_events = await self.store.get_events(missing_event_ids)
# There shouldn't be any missing events
assert (
remaining_events.keys() == missing_event_ids
@@ -657,52 +654,9 @@ class PersistEventsStore:
elif state_key == (EventTypes.Name, ""):
joined_room_updates["room_name"] = None
- # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to
- # be perfect, we just need *some* answer that points to a real event in the
- # room in case we are the first ones inserting into the
- # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on
- # `event_stream_ordering`. In reality,
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
- # `_update_current_state_txn()` whenever a new event is persisted to update
- # it to the correct latest value.
- #
- if len(events_and_contexts) > 0:
- # Since the list is sorted ascending by `stream_ordering`, the last event
- # should have the highest `stream_ordering`.
- best_effort_most_recent_stream_ordering = events_and_contexts[-1][
- 0
- ].internal_metadata.stream_ordering
- else:
- # If there are no `events_and_contexts`, we assume it's one of two scenarios:
- # 1. If there are new state `to_insert` but no `events_and_contexts`,
- # then it's a state reset.
- # 2. Otherwise, it's some partial-state room re-syncing the current state and
- # going through un-partial process.
- #
- # Either way, we assume no new events are being persisted and we can
- # find the latest already in the database. Since this is a best-effort
- # value, we don't need to be perfect although I think we're pretty close
- # here.
- most_recent_event_pos_results = (
- await self.store.get_last_event_pos_in_room(
- room_id, event_types=None
- )
- )
- assert most_recent_event_pos_results, (
- f"We should not be seeing `None` here because we are still in the room ({room_id}) and "
- + "it should at-least have a join membership event that's keeping us here."
- )
- best_effort_most_recent_stream_ordering = most_recent_event_pos_results[
- 1
- ].stream
-
- # We should have found a value if we are still in the room
- assert best_effort_most_recent_stream_ordering is not None
-
return SlidingSyncTableChanges(
room_id=room_id,
# For `sliding_sync_joined_rooms`
- joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering,
joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert,
joined_room_updates=joined_room_updates,
# For `sliding_sync_membership_snapshots`
@@ -1773,31 +1727,53 @@ class PersistEventsStore:
#
# We only need to update when one of the relevant state values has changed
if sliding_sync_table_changes.joined_room_updates:
- # This should be *some* value that points to a real event in the room if
- # we are still joined to the room.
- assert (
- sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering
- is not None
+ sliding_sync_updates_keys = (
+ sliding_sync_table_changes.joined_room_updates.keys()
+ )
+ sliding_sync_updates_values = (
+ sliding_sync_table_changes.joined_room_updates.values()
)
- self.db_pool.simple_upsert_txn(
- txn,
- table="sliding_sync_joined_rooms",
- keyvalues={"room_id": room_id},
- values=sliding_sync_table_changes.joined_room_updates,
- insertion_values={
- # The reason we're only *inserting* (not *updating*)
- # `event_stream_ordering` here is because the column has a `NON
- # NULL` constraint and we need *some* answer. And if the row
- # already exists, it already has the correct value and it's
- # better to just rely on
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()`
- # to do the right thing (same for `bump_stamp`).
- "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering,
- # If we're trying to fully-insert a row, we need to provide a
- # value for `bump_stamp` if it exists for the room.
- "bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
- },
+ args: List[Any] = [
+ room_id,
+ room_id,
+ sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert,
+ ]
+ args.extend(iter(sliding_sync_updates_values))
+
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
+ # pre-calculate from `events_and_contexts` at the time when
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working
+ # with events that were previously persisted as an `outlier` with one
+ # `stream_ordering` but are now being persisted again and de-outliered
+ # and assigned a different `stream_ordering`. Since we call
+ # `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted), we're working
+ # with an unreliable `stream_ordering` value that will possibly be
+ # unused and not make it into the `events` table.
+ #
+ # We don't update `event_stream_ordering` `ON CONFLICT` because it's
+ # simpler and we can just rely on
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
+ # the right thing (same for `bump_stamp`). The only reason we're
+ # inserting `event_stream_ordering` here is because the column has a
+ # `NON NULL` constraint and we need some answer.
+ txn.execute(
+ f"""
+ INSERT INTO sliding_sync_joined_rooms
+ (room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)})
+ VALUES (
+ ?,
+ (SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1),
+ ?,
+ {", ".join("?" for _ in sliding_sync_updates_values)}
+ )
+ ON CONFLICT (room_id)
+ DO UPDATE SET
+ {", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)}
+ """,
+ args,
)
# We now update `local_current_membership`. We do this regardless
@@ -1854,38 +1830,63 @@ class PersistEventsStore:
if sliding_sync_table_changes.to_insert_membership_snapshots:
# Update the `sliding_sync_membership_snapshots` table
#
- # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys`
- # because there are other fields in the `ON CONFLICT` upsert to run (see
- # inherit case above for more context when this happens).
- self.db_pool.simple_upsert_many_txn(
- txn=txn,
- table="sliding_sync_membership_snapshots",
- key_names=("room_id", "user_id"),
- key_values=[
- (room_id, membership_info.user_id)
- for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
- ],
- value_names=[
- "sender",
- "membership_event_id",
- "membership",
- "event_stream_ordering",
- "event_instance_name",
- ]
- + list(
- sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
- ),
- value_values=[
+ sliding_sync_snapshot_keys = (
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys()
+ )
+ sliding_sync_snapshot_values = (
+ sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
+ )
+ # We need to insert/update regardless of whether we have
+ # `sliding_sync_snapshot_keys` because there are other fields in the `ON
+ # CONFLICT` upsert to run (see inherit case (explained in
+ # `_calculate_sliding_sync_table_changes()`) for more context when this
+ # happens).
+ #
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
+ # pre-calculate from `events_and_contexts` at the time when
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working with
+ # events that were previously persisted as an `outlier` with one
+ # `stream_ordering` but are now being persisted again and de-outliered and
+ # assigned a different `stream_ordering` that won't end up being used. Since
+ # we call `_calculate_sliding_sync_table_changes()` before
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted), we're working
+ # with an unreliable `stream_ordering` value that will possibly be unused
+ # and not make it into the `events` table.
+ txn.execute_batch(
+ f"""
+ INSERT INTO sliding_sync_membership_snapshots
+ (room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name
+ {("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""})
+ VALUES (
+ ?, ?, ?, ?, ?,
+ (SELECT stream_ordering FROM events WHERE event_id = ?),
+ (SELECT instance_name FROM events WHERE event_id = ?)
+ {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
+ )
+ ON CONFLICT (room_id, user_id)
+ DO UPDATE SET
+ sender = EXCLUDED.sender,
+ membership_event_id = EXCLUDED.membership_event_id,
+ membership = EXCLUDED.membership,
+ event_stream_ordering = EXCLUDED.event_stream_ordering
+ {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}
+ """,
+ [
[
+ room_id,
+ membership_info.user_id,
membership_info.sender,
membership_info.membership_event_id,
membership_info.membership,
- membership_info.membership_event_stream_ordering,
- membership_info.membership_event_instance_name,
+ # XXX: We do not use `membership_info.membership_event_stream_ordering` here
+ # because it is an unreliable value. See XXX note above.
+ membership_info.membership_event_id,
+ # XXX: We do not use `membership_info.membership_event_instance_name` here
+ # because it is an unreliable value. See XXX note above.
+ membership_info.membership_event_id,
]
- + list(
- sliding_sync_table_changes.membership_snapshot_shared_insert_values.values()
- )
+ + list(sliding_sync_snapshot_values)
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
],
)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index b86f873eba..49ca985c4d 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -37,7 +37,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
PersistEventsStore,
- SlidingSyncMembershipInfo,
+ SlidingSyncMembershipInfoWithEventPos,
SlidingSyncMembershipSnapshotSharedInsertValues,
SlidingSyncStateInsertValues,
)
@@ -1994,9 +1994,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
to_insert_membership_snapshots: Dict[
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
] = {}
- to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfo] = (
- {}
- )
+ to_insert_membership_infos: Dict[
+ Tuple[str, str], SlidingSyncMembershipInfoWithEventPos
+ ] = {}
for (
room_id,
room_id_from_rooms_table,
@@ -2185,15 +2185,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
to_insert_membership_snapshots[(room_id, user_id)] = (
sliding_sync_membership_snapshots_insert_map
)
- to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfo(
- user_id=user_id,
- sender=sender,
- membership_event_id=membership_event_id,
- membership=membership,
- membership_event_stream_ordering=membership_event_stream_ordering,
- # If instance_name is null we default to "master"
- membership_event_instance_name=membership_event_instance_name
- or "master",
+ to_insert_membership_infos[(room_id, user_id)] = (
+ SlidingSyncMembershipInfoWithEventPos(
+ user_id=user_id,
+ sender=sender,
+ membership_event_id=membership_event_id,
+ membership=membership,
+ membership_event_stream_ordering=membership_event_stream_ordering,
+ # If instance_name is null we default to "master"
+ membership_event_instance_name=membership_event_instance_name
+ or "master",
+ )
)
def _fill_table_txn(txn: LoggingTransaction) -> None:
diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py
index d0bbc1c803..621f46fff8 100644
--- a/tests/storage/test_sliding_sync_tables.py
+++ b/tests/storage/test_sliding_sync_tables.py
@@ -38,6 +38,7 @@ from synapse.storage.databases.main.events_bg_updates import (
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
)
+from synapse.types import create_requester
from synapse.util import Clock
from tests.test_utils.event_injection import create_event
@@ -925,6 +926,128 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
user2_snapshot,
)
+ @parameterized.expand(
+ # Test both an insert an upsert into the
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
+ # more possibilities of things going wrong.
+ [
+ ("insert", True),
+ ("upsert", False),
+ ]
+ )
+ def test_joined_room_outlier_and_deoutlier(
+ self, description: str, should_insert: bool
+ ) -> None:
+ """
+ This is a regression test.
+
+ This is to simulate the case where an event is first persisted as an outlier
+ (like a remote invite) and then later persisted again to de-outlier it. The
+ first the time, the `outlier` is persisted with one `stream_ordering` but when
+ persisted again and de-outliered, it is assigned a different `stream_ordering`
+ that won't end up being used. Since we call
+ `_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` which
+ fixes this discrepancy (always use the `stream_ordering` from the first time it
+ was persisted), make sure we're not using an unreliable `stream_ordering` values
+ that will cause `FOREIGN KEY constraint failed` in the
+ `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
+ """
+ user1_id = self.register_user("user1", "pass")
+ _user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_version = RoomVersions.V10
+ room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, room_version=room_version.identifier
+ )
+
+ if should_insert:
+ # Clear these out so we always insert
+ self.get_success(
+ self.store.db_pool.simple_delete(
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ desc="TODO",
+ )
+ )
+ self.get_success(
+ self.store.db_pool.simple_delete(
+ table="sliding_sync_membership_snapshots",
+ keyvalues={"room_id": room_id},
+ desc="TODO",
+ )
+ )
+
+ # Create a membership event (which triggers an insert into
+ # `sliding_sync_membership_snapshots`)
+ membership_event_dict = {
+ "type": EventTypes.Member,
+ "state_key": user1_id,
+ "sender": user1_id,
+ "room_id": room_id,
+ "content": {EventContentFields.MEMBERSHIP: Membership.JOIN},
+ }
+ # Create a relevant state event (which triggers an insert into
+ # `sliding_sync_joined_rooms`)
+ state_event_dict = {
+ "type": EventTypes.Name,
+ "state_key": "",
+ "sender": user2_id,
+ "room_id": room_id,
+ "content": {EventContentFields.ROOM_NAME: "my super room"},
+ }
+ event_dicts_to_persist = [
+ membership_event_dict,
+ state_event_dict,
+ ]
+
+ for event_dict in event_dicts_to_persist:
+ events_to_persist = []
+
+ # Create the events as an outliers
+ (
+ event,
+ unpersisted_context,
+ ) = self.get_success(
+ self.hs.get_event_creation_handler().create_event(
+ requester=create_requester(user1_id),
+ event_dict=event_dict,
+ outlier=True,
+ )
+ )
+ # FIXME: Should we use an `EventContext.for_outlier(...)` here?
+ # Doesn't seem to matter for this test.
+ context = self.get_success(unpersisted_context.persist(event))
+ events_to_persist.append((event, context))
+
+ # Create the event again but as an non-outlier. This will de-outlier the event
+ # when we persist it.
+ (
+ event,
+ unpersisted_context,
+ ) = self.get_success(
+ self.hs.get_event_creation_handler().create_event(
+ requester=create_requester(user1_id),
+ event_dict=event_dict,
+ outlier=False,
+ )
+ )
+ context = self.get_success(unpersisted_context.persist(event))
+ events_to_persist.append((event, context))
+
+ persist_controller = self.hs.get_storage_controllers().persistence
+ assert persist_controller is not None
+ for event, context in events_to_persist:
+ self.get_success(
+ persist_controller.persist_event(
+ event,
+ context,
+ )
+ )
+
+ # We're just testing that it does not explode
+
def test_joined_room_meta_state_reset(self) -> None:
"""
Test that a state reset on the room name is reflected in the
|