summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-08-08 15:41:55 -0500
committerEric Eastwood <eric.eastwood@beta.gouv.fr>2024-08-08 15:41:55 -0500
commitcc2d2b6b9f8fd286725e010b30f1c3eae3ccaadf (patch)
treef69aaa73bf12786aae711b3494af0800694efc4a
parentFix some lints (diff)
downloadsynapse-cc2d2b6b9f8fd286725e010b30f1c3eae3ccaadf.tar.xz
Fill in `stream_ordering`/`bump_stamp` when we add current state to the joined rooms table
-rw-r--r--synapse/handlers/sliding_sync.py23
-rw-r--r--synapse/storage/databases/main/events.py53
-rw-r--r--synapse/types/handlers/__init__.py13
-rw-r--r--tests/storage/test_events.py11
4 files changed, 75 insertions, 25 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8467766518..4253eebed2 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -74,7 +74,12 @@ from synapse.types import (
     StreamToken,
     UserID,
 )
-from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
+from synapse.types.handlers import (
+    SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+    OperationType,
+    SlidingSyncConfig,
+    SlidingSyncResult,
+)
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
@@ -91,18 +96,6 @@ class Sentinel(enum.Enum):
     UNSET_SENTINEL = object()
 
 
-# The event types that clients should consider as new activity.
-DEFAULT_BUMP_EVENT_TYPES = {
-    EventTypes.Create,
-    EventTypes.Message,
-    EventTypes.Encrypted,
-    EventTypes.Sticker,
-    EventTypes.CallInvite,
-    EventTypes.PollStart,
-    EventTypes.LiveLocationShareStart,
-}
-
-
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _RoomMembershipForUser:
     """
@@ -2174,7 +2167,9 @@ class SlidingSyncHandler:
         # Figure out the last bump event in the room
         last_bump_event_result = (
             await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+                room_id,
+                to_token.room_key,
+                event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
             )
         )
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c101820c1d..1dd114c601 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -73,6 +73,7 @@ from synapse.types import (
     StrCollection,
     get_domain_from_id,
 )
+from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
 from synapse.util import json_encoder
 from synapse.util.iterutils import batch_iter, sorted_topologically
 from synapse.util.stringutils import non_null_str_or_none
@@ -1308,6 +1309,9 @@ class PersistEventsStore:
                         )
 
                 # Update the `sliding_sync_non_join_memberships` table
+                #
+                # Pulling keys/values separately is safe and will produce congruent
+                # lists
                 insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
                 insert_values = sliding_sync_non_joined_rooms_insert_map.values()
                 txn.execute_batch(
@@ -1452,6 +1456,7 @@ class PersistEventsStore:
             create_event_id = None
             room_encryption_event_id = None
             room_name_event_id = None
+            bump_event_id = None
             for state_key, event_id in to_insert.items():
                 if state_key[0] == EventTypes.Create and state_key[1] == "":
                     create_event_id = event_id
@@ -1463,6 +1468,12 @@ class PersistEventsStore:
                     room_name_event_id = event_id
                     event_ids_to_fetch.append(event_id)
 
+                if (
+                    state_key[0] in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
+                    and state_key[1] == ""
+                ):
+                    bump_event_id = event_id
+
             # Map of values to insert/update in the `sliding_sync_joined_rooms` table
             sliding_sync_joined_rooms_insert_map: Dict[
                 str, Optional[Union[str, bool]]
@@ -1515,23 +1526,57 @@ class PersistEventsStore:
                     )
 
             # Update the `sliding_sync_joined_rooms` table
+            args: List[Any] = [
+                room_id,
+                # Even though `Mapping`/`Dict` have no guaranteed order, some
+                # implementations may preserve insertion order so we're just going to
+                # choose the best possible answer by using the "last" event ID which we
+                # will assume will have the greatest `stream_ordering`. We really just
+                # need *some* answer in case we are the first ones inserting into the
+                # table and this will resolve itself when we update this field in the
+                # persist events loop.
+                list(to_insert.values())[-1],
+            ]
+            # If we have a `bump_event_id`, let's update the `bump_stamp` column
+            bump_stamp_column = ""
+            bump_stamp_values_clause = ""
+            if bump_event_id is not None:
+                bump_stamp_column = "bump_stamp, "
+                bump_stamp_values_clause = (
+                    "(SELECT stream_ordering FROM events WHERE event_id = ?),"
+                )
+                args.append(bump_event_id)
+            # Pulling keys/values separately is safe and will produce congruent lists
             insert_keys = sliding_sync_joined_rooms_insert_map.keys()
             insert_values = sliding_sync_joined_rooms_insert_map.values()
+            args.extend(iter(insert_values))
             if len(insert_keys) > 0:
-                # TODO: Should we add `event_stream_ordering`, `bump_stamp` on insert?
+                # We don't update `bump_stamp` `ON CONFLICT` because we're dealing with
+                # state here and the only state event that is also a bump event type is
+                # `m.room.create`. Given the room creation event is the first one in the
+                # room, it's either going to be set on insert, or we've already moved on
+                # to other events with a greater `stream_ordering`/`bump_stamp` and we
+                # don't need to even try.
                 txn.execute(
                     f"""
                     INSERT INTO sliding_sync_joined_rooms
-                        (room_id, {", ".join(insert_keys)})
+                        (room_id, event_stream_ordering, {bump_stamp_column} {", ".join(insert_keys)})
                     VALUES (
                         ?,
+                        (SELECT stream_ordering FROM events WHERE event_id = ?),
+                        {bump_stamp_values_clause}
                         {", ".join("?" for _ in insert_values)}
                     )
                     ON CONFLICT (room_id)
                     DO UPDATE SET
-                        {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
+                        {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)},
+                        event_stream_ordering = CASE
+                            WHEN event_stream_ordering < EXCLUDED.event_stream_ordering
+                                THEN EXCLUDED.event_stream_ordering
+                            ELSE event_stream_ordering
+                        END
                     """,
-                    [room_id] + list(insert_values),
+                    args,
                 )
 
         # We now update `local_current_membership`. We do this regardless
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 363f060bef..62c86919c4 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -30,6 +30,7 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2:
 else:
     from pydantic import Extra
 
+from synapse.api.constants import EventTypes
 from synapse.events import EventBase
 from synapse.types import (
     DeviceListUpdates,
@@ -45,6 +46,18 @@ from synapse.types.rest.client import SlidingSyncBody
 if TYPE_CHECKING:
     from synapse.handlers.relations import BundledAggregations
 
+# Sliding Sync: The event types that clients should consider as new activity and affect
+# the `bump_stamp`
+SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
+    EventTypes.Message,
+    EventTypes.Encrypted,
+    EventTypes.Sticker,
+    EventTypes.CallInvite,
+    EventTypes.PollStart,
+    EventTypes.LiveLocationShareStart,
+}
+
 
 class ShutdownRoomParams(TypedDict):
     """
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
index 2fb863a8c9..9543123473 100644
--- a/tests/storage/test_events.py
+++ b/tests/storage/test_events.py
@@ -523,10 +523,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
         room.register_servlets,
     ]
 
-    def prepare(
-        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
-    ) -> None:
-        self.store = self.hs.get_datastores().main
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.storage_controllers = hs.get_storage_controllers()
 
     def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
         """
@@ -615,10 +614,8 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
-        user2_id = self.register_user("user2", "pass")
-        user2_tok = self.login(user2_id, "pass")
 
-        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
 
         # User1 joins the room
         self.helper.join(room_id1, user1_id, tok=user1_tok)