diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1dd114c601..2b91f1f67c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1530,12 +1530,13 @@ class PersistEventsStore:
room_id,
# Even though `Mapping`/`Dict` have no guaranteed order, some
# implementations may preserve insertion order so we're just going to
- # choose the best possible answer by using the "last" event ID which we
+ # choose the best possible answer by using the "first" event ID which we
# will assume will have the greatest `stream_ordering`. We really just
# need *some* answer in case we are the first ones inserting into the
- # table and this will resolve itself when we update this field in the
- # persist events loop.
- list(to_insert.values())[-1],
+ # table and in reality, `_store_event_txn()` is run before this function
+ # so it will already have the correct value. This is just to account for
+ # things changing in the future.
+ next(iter(to_insert.values())),
]
# If we have a `bump_event_id`, let's update the `bump_stamp` column
bump_stamp_column = ""
@@ -1977,6 +1978,64 @@ class PersistEventsStore:
],
)
+ # Handle updating `sliding_sync_joined_rooms`
+ room_id_to_stream_ordering_map: Dict[str, int] = {}
+ room_id_to_bump_stamp_map: Dict[str, int] = {}
+ for event, _ in events_and_contexts:
+ existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id)
+ # This should exist at this point because we're inserting events here which require it
+ assert event.internal_metadata.stream_ordering is not None
+ if (
+ existing_stream_ordering is None
+ or existing_stream_ordering < event.internal_metadata.stream_ordering
+ ):
+ room_id_to_stream_ordering_map[event.room_id] = (
+ event.internal_metadata.stream_ordering
+ )
+
+ if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
+ existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id)
+ # This should exist at this point because we're inserting events here which require it
+ assert event.internal_metadata.stream_ordering is not None
+ if (
+ existing_bump_stamp is None
+ or existing_bump_stamp < event.internal_metadata.stream_ordering
+ ):
+ room_id_to_bump_stamp_map[event.room_id] = (
+ event.internal_metadata.stream_ordering
+ )
+
+ # `_store_event_txn` is run before `_update_current_state_txn` which handles
+ # deleting the rows if we are no longer in the room so we don't need to worry
+ # about inserting something that will be orphaned.
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ key_names=("room_id",),
+ key_values=[
+ (room_id,) for room_id in room_id_to_stream_ordering_map.keys()
+ ],
+ value_names=("event_stream_ordering",),
+ value_values=[
+ (room_id_to_stream_ordering_map[room_id],)
+ for room_id in room_id_to_stream_ordering_map.keys()
+ ],
+ )
+ # This has to be separate from the upsert above because we won't have a
+ # `bump_stamp` for every event and we don't want to overwrite the existing value
+ # with `None`.
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_joined_rooms",
+ key_names=("room_id",),
+ key_values=[(room_id,) for room_id in room_id_to_bump_stamp_map.keys()],
+ value_names=("bump_stamp",),
+ value_values=[
+ (room_id_to_bump_stamp_map[room_id],)
+ for room_id in room_id_to_bump_stamp_map.keys()
+ ],
+ )
+
def _store_rejected_events_txn(
self,
txn: LoggingTransaction,
diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
index 61ea65aba2..a60fabb586 100644
--- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
+++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
@@ -31,7 +31,16 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
PRIMARY KEY (room_id)
);
-
+-- Store the user's non-join room memberships. Only stores the latest membership event
+-- for a given user which matches `local_current_membership` (except we don't store
+-- joins).
+--
+-- FIXME: It might be easier to just store any membership here but just indicate that
+-- the state is a snapshot of the current state at the time of the membership event.
+-- That way we don't have to worry about clearing out
+-- `sliding_sync_non_join_memberships` when the the user joins a room. And we always
+-- have the full picture. Perhaps we can call it `sliding_sync_membership_snapshots`.
+--
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
-- the membership event itself as the `bump_stamp`.
CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
@@ -50,6 +59,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
-- time of the membership)
is_encrypted BOOLEAN DEFAULT 0 NOT NULL,
+ -- FIXME: Maybe we want to add `tombstone_successor_room_id` here to help with `include_old_rooms`
+ -- (tracked by https://github.com/element-hq/synapse/issues/17540)
PRIMARY KEY (room_id, user_id)
);
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
index 9543123473..b8302a7928 100644
--- a/tests/storage/test_events.py
+++ b/tests/storage/test_events.py
@@ -617,8 +617,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
- # User1 joins the room
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
@@ -630,15 +631,20 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
room_id=room_id1,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ # History visibility just happens to be the last event sent in the room
+ event_stream_ordering=state_map[
+ (EventTypes.RoomHistoryVisibility, "")
+ ].internal_metadata.stream_ordering,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=None,
room_name=None,
is_encrypted=False,
),
)
+ # No one is non-joined to this room so we shouldn't see anything
sliding_sync_non_join_memberships_results = (
self._get_sliding_sync_non_join_memberships()
)
@@ -674,7 +680,14 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
)
# User1 joins the room
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ user1_join_event_pos = self.get_success(
+ self.store.get_position_for_event(user1_join_response["event_id"])
+ )
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
@@ -686,9 +699,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
room_id=room_id1,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ event_stream_ordering=user1_join_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=None,
room_name="my super duper room",
is_encrypted=True,
@@ -704,10 +718,73 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
exact=True,
)
- def test_joined_room_with_info_updated(self) -> None:
+ def test_joined_space_room_with_info(self) -> None:
"""
- Test info in `sliding_sync_joined_rooms` is updated when the current state is
- updated.
+ Test joined space room with name shows up in `sliding_sync_joined_rooms`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ space_room_id = self.helper.create_room_as(
+ user2_id,
+ tok=user2_tok,
+ extra_content={
+ "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+ },
+ )
+ # Add a room name
+ self.helper.send_state(
+ space_room_id,
+ EventTypes.Name,
+ {"name": "my super duper space"},
+ tok=user2_tok,
+ )
+
+ # User1 joins the room
+ user1_join_response = self.helper.join(space_room_id, user1_id, tok=user1_tok)
+ user1_join_event_pos = self.get_success(
+ self.store.get_position_for_event(user1_join_response["event_id"])
+ )
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(space_room_id)
+ )
+
+ sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
+ self.assertIncludes(
+ set(sliding_sync_joined_rooms_results.keys()),
+ {space_room_id},
+ exact=True,
+ )
+ self.assertEqual(
+ sliding_sync_joined_rooms_results[space_room_id],
+ _SlidingSyncJoinedRoomResult(
+ room_id=space_room_id,
+ event_stream_ordering=user1_join_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
+ room_type=RoomTypes.SPACE,
+ room_name="my super duper space",
+ is_encrypted=False,
+ ),
+ )
+
+ sliding_sync_non_join_memberships_results = (
+ self._get_sliding_sync_non_join_memberships()
+ )
+ self.assertIncludes(
+ set(sliding_sync_non_join_memberships_results.keys()),
+ set(),
+ exact=True,
+ )
+
+ def test_joined_room_with_state_updated(self) -> None:
+ """
+ Test state derived info in `sliding_sync_joined_rooms` is updated when the
+ current state is updated.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@@ -724,7 +801,14 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
)
# User1 joins the room
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ user1_join_event_pos = self.get_success(
+ self.store.get_position_for_event(user1_join_response["event_id"])
+ )
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
@@ -736,9 +820,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
room_id=room_id1,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ event_stream_ordering=user1_join_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
@@ -762,12 +847,15 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
tok=user2_tok,
)
# Encrypt the room
- self.helper.send_state(
+ encrypt_room_response = self.helper.send_state(
room_id1,
EventTypes.RoomEncryption,
{EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
tok=user2_tok,
)
+ encrypt_room_event_pos = self.get_success(
+ self.store.get_position_for_event(encrypt_room_response["event_id"])
+ )
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
@@ -780,9 +868,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
room_id=room_id1,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ event_stream_ordering=encrypt_room_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=None,
room_name="my super duper room was renamed",
is_encrypted=True,
@@ -798,48 +887,88 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
exact=True,
)
- def test_joined_space_room_with_info(self) -> None:
+ def test_joined_room_is_bumped(self) -> None:
"""
- Test joined space room with name shows up in `sliding_sync_joined_rooms`.
+ Test that `event_stream_ordering` and `bump_stamp` is updated when a new bump
+ event is sent (`sliding_sync_joined_rooms`).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
- space_room_id = self.helper.create_room_as(
- user2_id,
- tok=user2_tok,
- extra_content={
- "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
- },
- )
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
# Add a room name
self.helper.send_state(
- space_room_id,
+ room_id1,
EventTypes.Name,
- {"name": "my super duper space"},
+ {"name": "my super duper room"},
tok=user2_tok,
)
# User1 joins the room
- self.helper.join(space_room_id, user1_id, tok=user1_tok)
+ user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ user1_join_event_pos = self.get_success(
+ self.store.get_position_for_event(user1_join_response["event_id"])
+ )
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
- {space_room_id},
+ {room_id1},
exact=True,
)
self.assertEqual(
- sliding_sync_joined_rooms_results[space_room_id],
+ sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
- room_id=space_room_id,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
- room_type=RoomTypes.SPACE,
- room_name="my super duper space",
+ room_id=room_id1,
+ event_stream_ordering=user1_join_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
+ room_type=None,
+ room_name="my super duper room",
+ is_encrypted=False,
+ ),
+ )
+
+ sliding_sync_non_join_memberships_results = (
+ self._get_sliding_sync_non_join_memberships()
+ )
+ self.assertIncludes(
+ set(sliding_sync_non_join_memberships_results.keys()),
+ set(),
+ exact=True,
+ )
+
+ # Send a new message to bump the room
+ event_response = self.helper.send(room_id1, "some message", tok=user1_tok)
+ event_pos = self.get_success(
+ self.store.get_position_for_event(event_response["event_id"])
+ )
+
+ sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
+ self.assertIncludes(
+ set(sliding_sync_joined_rooms_results.keys()),
+ {room_id1},
+ exact=True,
+ )
+ # Make sure we see the new room name
+ self.assertEqual(
+ sliding_sync_joined_rooms_results[room_id1],
+ _SlidingSyncJoinedRoomResult(
+ room_id=room_id1,
+ # Updated `event_stream_ordering`
+ event_stream_ordering=event_pos.stream,
+ # And since the event was a bump event, the `bump_stamp` should be updated
+ bump_stamp=event_pos.stream,
+ # The state is still the same (it didn't change)
+ room_type=None,
+ room_name="my super duper room",
is_encrypted=False,
),
)
@@ -896,12 +1025,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
# Update the room name after we are invited just to make sure
# we don't update non-join memberships when the room name changes.
- self.helper.send_state(
+ rename_response = self.helper.send_state(
space_room_id,
EventTypes.Name,
{"name": "my super duper space was renamed"},
tok=user2_tok,
)
+ rename_event_pos = self.get_success(
+ self.store.get_position_for_event(rename_response["event_id"])
+ )
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(space_room_id)
+ )
# User2 is still joined to the room so we should still have an entry in the
# `sliding_sync_joined_rooms` table.
@@ -915,9 +1051,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[space_room_id],
_SlidingSyncJoinedRoomResult(
room_id=space_room_id,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ event_stream_ordering=rename_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=RoomTypes.SPACE,
room_name="my super duper space was renamed",
is_encrypted=True,
@@ -981,6 +1118,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
self.store.get_position_for_event(user3_ban_response["event_id"])
)
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
+
# User2 is still joined to the room so we should still have an entry
# in the `sliding_sync_joined_rooms` table.
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
@@ -993,9 +1134,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
sliding_sync_joined_rooms_results[room_id1],
_SlidingSyncJoinedRoomResult(
room_id=room_id1,
- # TODO
- event_stream_ordering=None,
- bump_stamp=None,
+ event_stream_ordering=user3_ban_event_pos.stream,
+ bump_stamp=state_map[
+ (EventTypes.Create, "")
+ ].internal_metadata.stream_ordering,
room_type=None,
room_name=None,
is_encrypted=False,
@@ -1042,6 +1184,8 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
# TODO: Test remote invite
+ # TODO Test for non-join membership changing
+
def test_non_join_server_left_room(self) -> None:
"""
Test everyone local leaves the room but their leave membership still shows up in
|