diff options
author | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-08-08 15:41:55 -0500 |
---|---|---|
committer | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-08-08 15:41:55 -0500 |
commit | cc2d2b6b9f8fd286725e010b30f1c3eae3ccaadf (patch) | |
tree | f69aaa73bf12786aae711b3494af0800694efc4a | |
parent | Fix some lints (diff) | |
download | synapse-cc2d2b6b9f8fd286725e010b30f1c3eae3ccaadf.tar.xz |
Fill in `stream_ordering`/`bump_stamp` when we add current state to the joined rooms table
-rw-r--r-- | synapse/handlers/sliding_sync.py | 23 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 53 | ||||
-rw-r--r-- | synapse/types/handlers/__init__.py | 13 | ||||
-rw-r--r-- | tests/storage/test_events.py | 11 |
4 files changed, 75 insertions, 25 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 8467766518..4253eebed2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -74,7 +74,12 @@ from synapse.types import ( StreamToken, UserID, ) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult +from synapse.types.handlers import ( + SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, + OperationType, + SlidingSyncConfig, + SlidingSyncResult, +) from synapse.types.state import StateFilter from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client @@ -91,18 +96,6 @@ class Sentinel(enum.Enum): UNSET_SENTINEL = object() -# The event types that clients should consider as new activity. -DEFAULT_BUMP_EVENT_TYPES = { - EventTypes.Create, - EventTypes.Message, - EventTypes.Encrypted, - EventTypes.Sticker, - EventTypes.CallInvite, - EventTypes.PollStart, - EventTypes.LiveLocationShareStart, -} - - @attr.s(slots=True, frozen=True, auto_attribs=True) class _RoomMembershipForUser: """ @@ -2174,7 +2167,9 @@ class SlidingSyncHandler: # Figure out the last bump event in the room last_bump_event_result = ( await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + room_id, + to_token.room_key, + event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, ) ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c101820c1d..1dd114c601 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -73,6 +73,7 @@ from synapse.types import ( StrCollection, get_domain_from_id, ) +from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES from synapse.util import json_encoder from synapse.util.iterutils import batch_iter, sorted_topologically from synapse.util.stringutils import non_null_str_or_none @@ -1308,6 +1309,9 @@ class PersistEventsStore: ) # Update the `sliding_sync_non_join_memberships` table + # + # Pulling keys/values separately is safe and will produce congruent + # lists insert_keys = sliding_sync_non_joined_rooms_insert_map.keys() insert_values = sliding_sync_non_joined_rooms_insert_map.values() txn.execute_batch( @@ -1452,6 +1456,7 @@ class PersistEventsStore: create_event_id = None room_encryption_event_id = None room_name_event_id = None + bump_event_id = None for state_key, event_id in to_insert.items(): if state_key[0] == EventTypes.Create and state_key[1] == "": create_event_id = event_id @@ -1463,6 +1468,12 @@ class PersistEventsStore: room_name_event_id = event_id event_ids_to_fetch.append(event_id) + if ( + state_key[0] in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES + and state_key[1] == "" + ): + bump_event_id = event_id + # Map of values to insert/update in the `sliding_sync_joined_rooms` table sliding_sync_joined_rooms_insert_map: Dict[ str, Optional[Union[str, bool]] @@ -1515,23 +1526,57 @@ class PersistEventsStore: ) # Update the `sliding_sync_joined_rooms` table + args: List[Any] = [ + room_id, + # Even though `Mapping`/`Dict` have no guaranteed order, some + # implementations may preserve insertion order so we're just going to + # choose the best possible answer by using the "last" event ID which we + # will assume will have the greatest `stream_ordering`. We really just + # need *some* answer in case we are the first ones inserting into the + # table and this will resolve itself when we update this field in the + # persist events loop. + list(to_insert.values())[-1], + ] + # If we have a `bump_event_id`, let's update the `bump_stamp` column + bump_stamp_column = "" + bump_stamp_values_clause = "" + if bump_event_id is not None: + bump_stamp_column = "bump_stamp, " + bump_stamp_values_clause = ( + "(SELECT stream_ordering FROM events WHERE event_id = ?)," + ) + args.append(bump_event_id) + # Pulling keys/values separately is safe and will produce congruent lists insert_keys = sliding_sync_joined_rooms_insert_map.keys() insert_values = sliding_sync_joined_rooms_insert_map.values() + args.extend(iter(insert_values)) if len(insert_keys) > 0: - # TODO: Should we add `event_stream_ordering`, `bump_stamp` on insert? + # We don't update `bump_stamp` `ON CONFLICT` because we're dealing with + # state here and the only state event that is also a bump event type is + # `m.room.create`. Given the room creation event is the first one in the + # room, it's either going to be set on insert, or we've already moved on + # to other events with a greater `stream_ordering`/`bump_stamp` and we + # don't need to even try. txn.execute( f""" INSERT INTO sliding_sync_joined_rooms - (room_id, {", ".join(insert_keys)}) + (room_id, event_stream_ordering, {bump_stamp_column} {", ".join(insert_keys)}) VALUES ( ?, + (SELECT stream_ordering FROM events WHERE event_id = ?), + {bump_stamp_values_clause} {", ".join("?" for _ in insert_values)} ) ON CONFLICT (room_id) DO UPDATE SET - {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} + {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}, + event_stream_ordering = CASE + WHEN event_stream_ordering < EXCLUDED.event_stream_ordering + THEN EXCLUDED.event_stream_ordering + ELSE event_stream_ordering + END """, - [room_id] + list(insert_values), + args, ) # We now update `local_current_membership`. We do this regardless diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 363f060bef..62c86919c4 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -30,6 +30,7 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2: else: from pydantic import Extra +from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.types import ( DeviceListUpdates, @@ -45,6 +46,18 @@ from synapse.types.rest.client import SlidingSyncBody if TYPE_CHECKING: from synapse.handlers.relations import BundledAggregations +# Sliding Sync: The event types that clients should consider as new activity and affect +# the `bump_stamp` +SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = { + EventTypes.Create, + EventTypes.Message, + EventTypes.Encrypted, + EventTypes.Sticker, + EventTypes.CallInvite, + EventTypes.PollStart, + EventTypes.LiveLocationShareStart, +} + class ShutdownRoomParams(TypedDict): """ diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 2fb863a8c9..9543123473 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -523,10 +523,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): room.register_servlets, ] - def prepare( - self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer - ) -> None: - self.store = self.hs.get_datastores().main + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.storage_controllers = hs.get_storage_controllers() def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]: """ @@ -615,10 +614,8 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - user2_id = self.register_user("user2", "pass") - user2_tok = self.login(user2_id, "pass") - room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) # User1 joins the room self.helper.join(room_id1, user1_id, tok=user1_tok) |