diff --git a/changelog.d/17658.misc b/changelog.d/17658.misc
new file mode 100644
index 0000000000..0bdbc1140d
--- /dev/null
+++ b/changelog.d/17658.misc
@@ -0,0 +1 @@
+Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.
diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py
index 444cc32f36..7340c6ec05 100644
--- a/synapse/handlers/sliding_sync/__init__.py
+++ b/synapse/handlers/sliding_sync/__init__.py
@@ -1040,29 +1040,67 @@ class SlidingSyncHandler:
)
)
- # By default, just choose the membership event position
+ # Figure out the last bump event in the room
+ #
+ # By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
-
- # Figure out the last bump event in the room if we're in the room.
+ # If we're joined to the room, we need to find the last bump event before the
+ # `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
- last_bump_event_result = (
- await self.store.get_last_event_pos_in_room_before_stream_ordering(
- room_id,
- to_token.room_key,
- event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
- )
+ # We can quickly query for the latest bump event in the room using the
+ # sliding sync tables.
+ latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
+ room_id
)
- # But if we found a bump event, use that instead
- if last_bump_event_result is not None:
- _, new_bump_event_pos = last_bump_event_result
+ min_to_token_position = to_token.room_key.stream
- # If we've just joined a remote room, then the last bump event may
- # have been backfilled (and so have a negative stream ordering).
- # These negative stream orderings can't sensibly be compared, so
- # instead we use the membership event position.
- if new_bump_event_pos.stream > 0:
- bump_stamp = new_bump_event_pos.stream
+ # If we can rely on the new sliding sync tables and the `bump_stamp` is
+ # `None`, just fallback to the membership event position. This can happen
+ # when we've just joined a remote room and all the events are backfilled.
+ if (
+ # FIXME: The background job check can be removed once we bump
+ # `SCHEMA_COMPAT_VERSION` and run the foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
+ # (tracked by https://github.com/element-hq/synapse/issues/17623)
+ await self.store.have_finished_sliding_sync_background_jobs()
+ and latest_room_bump_stamp is None
+ ):
+ pass
+
+ # The `bump_stamp` stored in the database might be ahead of our token. Since
+ # `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
+ # that's before the `to_token` in all scenarios. The only scenario we can be
+ # sure of is if the `bump_stamp` is totally before the minimum position from
+ # the token.
+ #
+ # We don't need to check if the background update has finished, as if the
+ # returned bump stamp is not None then it must be up to date.
+ elif (
+ latest_room_bump_stamp is not None
+ and latest_room_bump_stamp < min_to_token_position
+ ):
+ bump_stamp = latest_room_bump_stamp
+
+ # Otherwise, if it's within or after the `to_token`, we need to find the
+ # last bump event before the `to_token`.
+ else:
+ last_bump_event_result = (
+ await self.store.get_last_event_pos_in_room_before_stream_ordering(
+ room_id,
+ to_token.room_key,
+ event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+ )
+ )
+ if last_bump_event_result is not None:
+ _, new_bump_event_pos = last_bump_event_result
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index d423d80efa..e5f63019fd 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -327,6 +327,13 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
+ # XXX: We can't rely on `stream_ordering`/`instance_name` being correct
+ # at this point. We could be working with events that were previously
+ # persisted as an `outlier` with one `stream_ordering` but are now being
+ # persisted again and de-outliered and are being assigned a different
+ # `stream_ordering` here that won't end up being used.
+ # `_update_outliers_txn()` will fix this discrepancy (always use the
+ # `stream_ordering` from the first time it was persisted).
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
@@ -470,11 +477,11 @@ class PersistEventsStore:
membership_infos_to_insert_membership_snapshots.append(
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
# because we're sourcing the event from `events_and_contexts`, we
- # can't rely on `stream_ordering`/`instance_name` being correct. We
- # could be working with events that were previously persisted as an
- # `outlier` with one `stream_ordering` but are now being persisted
- # again and de-outliered and assigned a different `stream_ordering`
- # that won't end up being used. Since we call
+ # can't rely on `stream_ordering`/`instance_name` being correct at
+ # this point. We could be working with events that were previously
+ # persisted as an `outlier` with one `stream_ordering` but are now
+ # being persisted again and de-outliered and assigned a different
+ # `stream_ordering` that won't end up being used. Since we call
# `_calculate_sliding_sync_table_changes()` before
# `_update_outliers_txn()` which fixes this discrepancy (always use
# the `stream_ordering` from the first time it was persisted), we're
@@ -591,11 +598,17 @@ class PersistEventsStore:
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
- bump_stamp_to_fully_insert = (
- most_recent_bump_event_pos_results[1].stream
- if most_recent_bump_event_pos_results is not None
- else None
- )
+ if most_recent_bump_event_pos_results is not None:
+ _, new_bump_event_pos = most_recent_bump_event_pos_results
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead just leave it as `None` in the table and we will use their
+ # membership event position as the bump event position in the
+ # Sliding Sync API.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp_to_fully_insert = new_bump_event_pos.stream
current_state_ids_map = dict(
await self.store.get_partial_filtered_current_state_ids(
@@ -2123,31 +2136,26 @@ class PersistEventsStore:
if len(events_and_contexts) == 0:
return
- # We only update the sliding sync tables for non-backfilled events.
- #
- # Check if the first event is a backfilled event (with a negative
- # `stream_ordering`). If one event is backfilled, we assume this whole batch was
- # backfilled.
- first_event_stream_ordering = events_and_contexts[0][
- 0
- ].internal_metadata.stream_ordering
- # This should exist for persisted events
- assert first_event_stream_ordering is not None
- if first_event_stream_ordering < 0:
- return
-
# Since the list is sorted ascending by `stream_ordering`, the last event should
# have the highest `stream_ordering`.
max_stream_ordering = events_and_contexts[-1][
0
].internal_metadata.stream_ordering
+ # `stream_ordering` should be assigned for persisted events
+ assert max_stream_ordering is not None
+ # Check if the event is a backfilled event (with a negative `stream_ordering`).
+ # If one event is backfilled, we assume this whole batch was backfilled.
+ if max_stream_ordering < 0:
+ # We only update the sliding sync tables for non-backfilled events.
+ return
+
max_bump_stamp = None
for event, _ in reversed(events_and_contexts):
# Sanity check that all events belong to the same room
assert event.room_id == room_id
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
- # This should exist for persisted events
+ # `stream_ordering` should be assigned for persisted events
assert event.internal_metadata.stream_ordering is not None
max_bump_stamp = event.internal_metadata.stream_ordering
@@ -2156,11 +2164,6 @@ class PersistEventsStore:
# matching bump event which should have the highest `stream_ordering`.
break
- # We should have exited earlier if there were no events
- assert (
- max_stream_ordering is not None
- ), "Expected to have a stream_ordering if we have events"
-
# Handle updating the `sliding_sync_joined_rooms` table.
#
txn.execute(
diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py
index dc747d7ac0..83939d10b0 100644
--- a/synapse/storage/databases/main/sliding_sync.py
+++ b/synapse/storage/databases/main/sliding_sync.py
@@ -41,6 +41,46 @@ logger = logging.getLogger(__name__)
class SlidingSyncStore(SQLBaseStore):
+ async def get_latest_bump_stamp_for_room(
+ self,
+ room_id: str,
+ ) -> Optional[int]:
+ """
+ Get the `bump_stamp` for the room.
+
+ The `bump_stamp` is the `stream_ordering` of the last event according to the
+ `bump_event_types`. This helps clients sort more readily without them needing to
+ pull in a bunch of the timeline to determine the last activity.
+ `bump_event_types` is a thing because for example, we don't want display name
+ changes to mark the room as unread and bump it to the top. For encrypted rooms,
+ we just have to consider any activity as a bump because we can't see the content
+ and the client has to figure it out for themselves.
+
+ This should only be called where the server is participating
+ in the room (someone local is joined).
+
+ Returns:
+ The `bump_stamp` for the room (which can be `None`).
+ """
+
+ return cast(
+ Optional[int],
+ await self.db_pool.simple_select_one_onecol(
+ table="sliding_sync_joined_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="bump_stamp",
+ # FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the
+ # foreground update for
+ # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked
+ # by https://github.com/element-hq/synapse/issues/17623)
+ #
+ # The should be `allow_none=False` in the future because event though
+ # `bump_stamp` itself can be `None`, we should have a row in the
+ # `sliding_sync_joined_rooms` table for any joined room.
+ allow_none=True,
+ ),
+ )
+
async def persist_per_connection_state(
self,
user_id: str,
diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py
index 621f46fff8..de80ad53cd 100644
--- a/tests/storage/test_sliding_sync_tables.py
+++ b/tests/storage/test_sliding_sync_tables.py
@@ -106,6 +106,12 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
assert persist_events_store is not None
self.persist_events_store = persist_events_store
+ persist_controller = self.hs.get_storage_controllers().persistence
+ assert persist_controller is not None
+ self.persist_controller = persist_controller
+
+ self.state_handler = self.hs.get_state_handler()
+
def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
"""
Return the rows from the `sliding_sync_joined_rooms` table.
@@ -260,10 +266,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
- persist_controller = self.hs.get_storage_controllers().persistence
- assert persist_controller is not None
persisted_event, _, _ = self.get_success(
- persist_controller.persist_event(invite_event, context)
+ self.persist_controller.persist_event(invite_event, context)
)
self._remote_invite_count += 1
@@ -316,10 +320,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
- persist_controller = self.hs.get_storage_controllers().persistence
- assert persist_controller is not None
persisted_event, _, _ = self.get_success(
- persist_controller.persist_event(kick_event, context)
+ self.persist_controller.persist_event(kick_event, context)
)
return persisted_event
@@ -926,6 +928,201 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
user2_snapshot,
)
+ def test_joined_room_bump_stamp_backfill(self) -> None:
+ """
+ Test that `bump_stamp` ignores backfilled events, i.e. events with a
+ negative stream ordering.
+ """
+ user1_id = self.register_user("user1", "pass")
+ _user1_tok = self.login(user1_id, "pass")
+
+ # Create a remote room
+ creator = "@user:other"
+ room_id = "!foo:other"
+ room_version = RoomVersions.V10
+ shared_kwargs = {
+ "room_id": room_id,
+ "room_version": room_version.identifier,
+ }
+
+ create_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[],
+ type=EventTypes.Create,
+ state_key="",
+ content={
+ # The `ROOM_CREATOR` field could be removed if we used a room
+ # version > 10 (in favor of relying on `sender`)
+ EventContentFields.ROOM_CREATOR: creator,
+ EventContentFields.ROOM_VERSION: room_version.identifier,
+ },
+ sender=creator,
+ **shared_kwargs,
+ )
+ )
+ creator_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[create_tuple[0].event_id],
+ auth_event_ids=[create_tuple[0].event_id],
+ type=EventTypes.Member,
+ state_key=creator,
+ content={"membership": Membership.JOIN},
+ sender=creator,
+ **shared_kwargs,
+ )
+ )
+ room_name_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[creator_tuple[0].event_id],
+ auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+ type=EventTypes.Name,
+ state_key="",
+ content={
+ EventContentFields.ROOM_NAME: "my super duper room",
+ },
+ sender=creator,
+ **shared_kwargs,
+ )
+ )
+ # We add a message event as a valid "bump type"
+ msg_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[room_name_tuple[0].event_id],
+ auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+ type=EventTypes.Message,
+ content={"body": "foo", "msgtype": "m.text"},
+ sender=creator,
+ **shared_kwargs,
+ )
+ )
+ invite_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[msg_tuple[0].event_id],
+ auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+ type=EventTypes.Member,
+ state_key=user1_id,
+ content={"membership": Membership.INVITE},
+ sender=creator,
+ **shared_kwargs,
+ )
+ )
+
+ remote_events_and_contexts = [
+ create_tuple,
+ creator_tuple,
+ room_name_tuple,
+ msg_tuple,
+ invite_tuple,
+ ]
+
+ # Ensure the local HS knows the room version
+ self.get_success(self.store.store_room(room_id, creator, False, room_version))
+
+ # Persist these events as backfilled events.
+ for event, context in remote_events_and_contexts:
+ self.get_success(
+ self.persist_controller.persist_event(event, context, backfilled=True)
+ )
+
+ # Now we join the local user to the room. We want to make this feel as close to
+ # the real `process_remote_join()` as possible but we'd like to avoid some of
+ # the auth checks that would be done in the real code.
+ #
+ # FIXME: The test was originally written using this less-real
+ # `persist_event(...)` shortcut but it would be nice to use the real remote join
+ # process in a `FederatingHomeserverTestCase`.
+ flawed_join_tuple = self.get_success(
+ create_event(
+ self.hs,
+ prev_event_ids=[invite_tuple[0].event_id],
+ # This doesn't work correctly to create an `EventContext` that includes
+ # both of these state events. I assume it's because we're working on our
+ # local homeserver which has the remote state set as `outlier`. We have
+ # to create our own EventContext below to get this right.
+ auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
+ type=EventTypes.Member,
+ state_key=user1_id,
+ content={"membership": Membership.JOIN},
+ sender=user1_id,
+ **shared_kwargs,
+ )
+ )
+ # We have to create our own context to get the state set correctly. If we use
+ # the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
+ # table will only have the join event in it which should never happen in our
+ # real server.
+ join_event = flawed_join_tuple[0]
+ join_context = self.get_success(
+ self.state_handler.compute_event_context(
+ join_event,
+ state_ids_before_event={
+ (e.type, e.state_key): e.event_id
+ for e in [create_tuple[0], invite_tuple[0], room_name_tuple[0]]
+ },
+ partial_state=False,
+ )
+ )
+ join_event, _join_event_pos, _room_token = self.get_success(
+ self.persist_controller.persist_event(join_event, join_context)
+ )
+
+ # Make sure the tables are populated correctly
+ sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
+ self.assertIncludes(
+ set(sliding_sync_joined_rooms_results.keys()),
+ {room_id},
+ exact=True,
+ )
+ self.assertEqual(
+ sliding_sync_joined_rooms_results[room_id],
+ _SlidingSyncJoinedRoomResult(
+ room_id=room_id,
+ # This should be the last event in the room (the join membership)
+ event_stream_ordering=join_event.internal_metadata.stream_ordering,
+ # Since all of the bump events are backfilled, the `bump_stamp` should
+ # still be `None`. (and we will fallback to the users membership event
+ # position in the Sliding Sync API)
+ bump_stamp=None,
+ room_type=None,
+ # We still pick up state of the room even if it's backfilled
+ room_name="my super duper room",
+ is_encrypted=False,
+ tombstone_successor_room_id=None,
+ ),
+ )
+
+ sliding_sync_membership_snapshots_results = (
+ self._get_sliding_sync_membership_snapshots()
+ )
+ self.assertIncludes(
+ set(sliding_sync_membership_snapshots_results.keys()),
+ {
+ (room_id, user1_id),
+ },
+ exact=True,
+ )
+ self.assertEqual(
+ sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
+ _SlidingSyncMembershipSnapshotResult(
+ room_id=room_id,
+ user_id=user1_id,
+ sender=user1_id,
+ membership_event_id=join_event.event_id,
+ membership=Membership.JOIN,
+ event_stream_ordering=join_event.internal_metadata.stream_ordering,
+ has_known_state=True,
+ room_type=None,
+ room_name="my super duper room",
+ is_encrypted=False,
+ tombstone_successor_room_id=None,
+ ),
+ )
+
@parameterized.expand(
# Test both an insert an upsert into the
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
@@ -1036,11 +1233,9 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
context = self.get_success(unpersisted_context.persist(event))
events_to_persist.append((event, context))
- persist_controller = self.hs.get_storage_controllers().persistence
- assert persist_controller is not None
for event, context in events_to_persist:
self.get_success(
- persist_controller.persist_event(
+ self.persist_controller.persist_event(
event,
context,
)
|