summary refs log tree commit diff
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-01-12 18:35:00 +0100
committerMathieu Velten <mathieuv@matrix.org>2023-01-17 09:32:45 +0100
commit3bad02fbfeb7019b14c3d41a52651659051768b2 (patch)
tree29c7365a0698c452afa37e581dd4d60495406be5
parentAdd an early return when handling no-op presence updates. (#14855) (diff)
downloadsynapse-3bad02fbfeb7019b14c3d41a52651659051768b2.tar.xz
Non lazy loading sync not blocking during fast join
Signed-off-by: Mathieu Velten <mathieuv@matrix.org>
-rw-r--r--changelog.d/14831.misc1
-rw-r--r--synapse/handlers/sync.py63
-rw-r--r--synapse/storage/databases/main/relations.py1
-rw-r--r--synapse/storage/databases/main/room.py32
-rw-r--r--synapse/streams/events.py3
-rw-r--r--synapse/types/__init__.py9
-rw-r--r--tests/rest/admin/test_room.py4
-rw-r--r--tests/rest/client/test_rooms.py10
8 files changed, 108 insertions, 15 deletions
diff --git a/changelog.d/14831.misc b/changelog.d/14831.misc
new file mode 100644
index 0000000000..72d6463f25
--- /dev/null
+++ b/changelog.d/14831.misc
@@ -0,0 +1 @@
+Non lazy loading sync not blocking during fast join.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 78d488f2b1..9cf1f29de1 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1817,11 +1817,34 @@ class SyncHandler:
             )
             sync_result_builder.now_token = now_token
 
+        # Retrieve rooms that got un partial stated in the meantime, only useful in case
+        # of a non lazy-loading-members sync.
+        un_partial_stated_rooms = set()
+        if not sync_result_builder.sync_config.filter_collection.lazy_load_members():
+            un_partial_state_rooms_since = 0
+            if sync_result_builder.since_token is not None:
+                un_partial_state_rooms_since = int(
+                    sync_result_builder.since_token.un_partial_state_rooms_key
+                )
+
+            un_partial_state_rooms_now = int(
+                sync_result_builder.now_token.un_partial_state_rooms_key
+            )
+            if un_partial_state_rooms_since != un_partial_state_rooms_now:
+                un_partial_stated_rooms = (
+                    await self.store.get_un_partial_stated_rooms_between(
+                        un_partial_state_rooms_since,
+                        un_partial_state_rooms_now,
+                    )
+                )
+
         # 2. We check up front if anything has changed, if it hasn't then there is
         # no point in going further.
         if not sync_result_builder.full_state:
             if since_token and not ephemeral_by_room and not account_data_by_room:
-                have_changed = await self._have_rooms_changed(sync_result_builder)
+                have_changed = await self._have_rooms_changed(
+                    sync_result_builder, un_partial_stated_rooms
+                )
                 log_kv({"rooms_have_changed": have_changed})
                 if not have_changed:
                     tags_by_room = await self.store.get_updated_tags(
@@ -1835,7 +1858,7 @@ class SyncHandler:
         ignored_users = await self.store.ignored_users(user_id)
         if since_token:
             room_changes = await self._get_rooms_changed(
-                sync_result_builder, ignored_users
+                sync_result_builder, ignored_users, un_partial_stated_rooms
             )
             tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
@@ -1888,7 +1911,9 @@ class SyncHandler:
         )
 
     async def _have_rooms_changed(
-        self, sync_result_builder: "SyncResultBuilder"
+        self,
+        sync_result_builder: "SyncResultBuilder",
+        un_partial_stated_rooms: Set[str],
     ) -> bool:
         """Returns whether there may be any new events that should be sent down
         the sync. Returns True if there are.
@@ -1905,6 +1930,11 @@ class SyncHandler:
 
         stream_id = since_token.room_key.stream
         for room_id in sync_result_builder.joined_room_ids:
+            # If a room has been un partial stated in the meantime,
+            # let's consider it has changes and deal with it accordingly
+            # in _get_rooms_changed.
+            if room_id in un_partial_stated_rooms:
+                return True
             if self.store.has_room_changed_since(room_id, stream_id):
                 return True
         return False
@@ -1913,6 +1943,7 @@ class SyncHandler:
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
+        un_partial_stated_rooms: Set[str],
     ) -> _RoomChanges:
         """Determine the changes in rooms to report to the user.
 
@@ -2116,7 +2147,24 @@ class SyncHandler:
             room_entry = room_to_events.get(room_id, None)
 
             newly_joined = room_id in newly_joined_rooms
-            if room_entry:
+
+            # In case of a non lazy-loading-members sync we want to include
+            # rooms that got un partial stated in the meantime, and we need
+            # to include the full state of them.
+            if (
+                not sync_config.filter_collection.lazy_load_members()
+                and room_id in un_partial_stated_rooms
+            ):
+                entry = RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=None,
+                    newly_joined=True,
+                    full_state=True,
+                    since_token=None,
+                    upto_token=now_token,
+                )
+            elif room_entry:
                 events, start_key = room_entry
 
                 prev_batch_token = now_token.copy_and_replace(
@@ -2186,6 +2234,13 @@ class SyncHandler:
         knocked = []
 
         for event in room_list:
+            # Do not include rooms that we don't have the full state yet
+            # in case of non lazy-loading-members sync.
+            if (
+                not sync_config.filter_collection.lazy_load_members()
+            ) and await self.store.is_partial_state_room(event.room_id):
+                continue
+
             if event.room_version_id not in KNOWN_ROOM_VERSIONS:
                 continue
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index aea96e9d24..95787c2cfd 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
                         to_device_key=0,
                         device_list_key=0,
                         groups_key=0,
+                        un_partial_state_rooms_key=0,
                     )
 
             return events[:limit], next_token
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 78906a5e1d..c614eda076 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -26,6 +26,7 @@ from typing import (
     Mapping,
     Optional,
     Sequence,
+    Set,
     Tuple,
     Union,
     cast,
@@ -1285,10 +1286,39 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         #      explanation.)
         return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
 
+    async def get_un_partial_stated_rooms_between(
+        self, last_id: int, current_id: int
+    ) -> Set[str]:
+        """Get all rooms that got un partial stated between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            The list of rooms.
+        """
+
+        if last_id == current_id:
+            return set()
+
+        def _get_un_partial_stated_rooms_between_txn(
+            txn: LoggingTransaction,
+        ) -> Set[str]:
+            sql = """
+                SELECT DISTINCT room_id FROM un_partial_stated_room_stream
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (last_id, current_id))
+
+            return {r[0] for r in txn}
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_rooms_between",
+            _get_un_partial_stated_rooms_between_txn,
+        )
+
     async def get_un_partial_stated_rooms_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
-        """Get updates for caches replication stream.
+        """Get updates for un partial stated rooms replication stream.
 
         Args:
             instance_name: The writer we want to fetch updates from. Unused
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 619eb7f601..7e7bd160b5 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -58,6 +58,7 @@ class EventSources:
         push_rules_key = self.store.get_max_push_rules_stream_id()
         to_device_key = self.store.get_to_device_stream_token()
         device_list_key = self.store.get_device_stream_token()
+        un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token()
 
         token = StreamToken(
             room_key=self.sources.room.get_current_key(),
@@ -70,6 +71,7 @@ class EventSources:
             device_list_key=device_list_key,
             # Groups key is unused.
             groups_key=0,
+            un_partial_state_rooms_key=un_partial_state_rooms_key,
         )
         return token
 
@@ -107,5 +109,6 @@ class EventSources:
             to_device_key=0,
             device_list_key=0,
             groups_key=0,
+            un_partial_state_rooms_key=0,
         )
         return token
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 0c725eb967..d378c39ec2 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -646,12 +646,13 @@ class StreamToken:
         7. `to_device_key`: `274711`
         8. `device_list_key`: `265584`
         9. `groups_key`: `1` (note that this key is now unused)
+        10. `un_partial_state_rooms_key`: `379`
 
     You can see how many of these keys correspond to the various
     fields in a "/sync" response:
     ```json
     {
-        "next_batch": "s12_4_0_1_1_1_1_4_1",
+        "next_batch": "s12_4_0_1_1_1_1_4_1_1",
         "presence": {
             "events": []
         },
@@ -663,7 +664,7 @@ class StreamToken:
                 "!QrZlfIDQLNLdZHqTnt:hs1": {
                     "timeline": {
                         "events": [],
-                        "prev_batch": "s10_4_0_1_1_1_1_4_1",
+                        "prev_batch": "s10_4_0_1_1_1_1_4_1_1",
                         "limited": false
                     },
                     "state": {
@@ -699,6 +700,7 @@ class StreamToken:
     device_list_key: int
     # Note that the groups key is no longer used and may have bogus values.
     groups_key: int
+    un_partial_state_rooms_key: int
 
     _SEPARATOR = "_"
     START: ClassVar["StreamToken"]
@@ -737,6 +739,7 @@ class StreamToken:
                 # serialized so that there will not be confusion in the future
                 # if additional tokens are added.
                 str(self.groups_key),
+                str(self.un_partial_state_rooms_key),
             ]
         )
 
@@ -769,7 +772,7 @@ class StreamToken:
         return attr.evolve(self, **{key: new_value})
 
 
-StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
+StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index e0f5d54aba..453a6e979c 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
 
     def test_topo_token_is_accepted(self) -> None:
         """Test Topo Token is accepted."""
-        token = "t1-0_0_0_0_0_0_0_0_0"
+        token = "t1-0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
 
     def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
         """Test that stream token is accepted for forward pagination."""
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index b4daace556..9222cab198 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
         self.room_id = self.helper.create_room_as(self.user_id)
 
     def test_topo_token_is_accepted(self) -> None:
-        token = "t1-0_0_0_0_0_0_0_0_0"
+        token = "t1-0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
         )
@@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
         self.assertTrue("end" in channel.json_body)
 
     def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
         )
@@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """Test that we can filter by a label on a /messages request."""
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """Test that we can filter by the absence of a label on a /messages request."""
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"