summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py11
-rw-r--r--synapse/notifier.py26
-rw-r--r--synapse/storage/databases/main/room.py10
-rw-r--r--synapse/types/__init__.py1
4 files changed, 38 insertions, 10 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eca75f1108..3fcd5f3db4 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1726,15 +1726,16 @@ class FederationHandler:
                 await self._device_handler.handle_room_un_partial_stated(room_id)
 
                 logger.info("Clearing partial-state flag for %s", room_id)
-                success = await self.store.clear_partial_state_room(room_id)
-                if success:
+                new_stream_id = await self.store.clear_partial_state_room(room_id)
+                if new_stream_id is not None:
                     logger.info("State resync complete for %s", room_id)
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
-                    # Poke the notifier so that other workers see the write to
-                    # the un-partial-stated rooms stream.
-                    self._notifier.notify_replication()
+
+                    await self._notifier.on_un_partial_stated_room(
+                        room_id, new_stream_id
+                    )
 
                     # TODO(faster_joins) update room stats and user directory?
                     #   https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 26b97cf766..258e60367e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -315,6 +315,32 @@ class Notifier:
             event_entries.append((entry, event.event_id))
         await self.notify_new_room_events(event_entries, max_room_stream_token)
 
+    async def on_un_partial_stated_room(
+        self,
+        room_id: str,
+        new_token: int,
+    ) -> None:
+        """Used by the resync background processes to wake up all listeners
+        of this room that it just got un-partial-stated.
+
+        It will also notify replication listeners of the change in stream.
+        """
+
+        # Wake up all related user stream notifiers
+        user_streams = self.room_to_user_streams.get(room_id, set())
+        time_now_ms = self.clock.time_msec()
+        for user_stream in user_streams:
+            try:
+                user_stream.notify(
+                    StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
+                )
+            except Exception:
+                logger.exception("Failed to notify listener")
+
+        # Poke the replication so that other workers also see the write to
+        # the un-partial-stated rooms stream.
+        self.notify_replication()
+
     async def notify_new_room_events(
         self,
         event_entries: List[Tuple[_PendingRoomEventEntry, str]],
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 7a16d27627..09ba372f7a 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2330,16 +2330,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             (room_id,),
         )
 
-    async def clear_partial_state_room(self, room_id: str) -> bool:
+    async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
         """Clears the partial state flag for a room.
 
         Args:
             room_id: The room whose partial state flag is to be cleared.
 
         Returns:
-            `True` if the partial state flag has been cleared successfully.
+            The corresponding stream id for the un-partial-stated rooms stream.
 
-            `False` if the partial state flag could not be cleared because the room
+            `None` if the partial state flag could not be cleared because the room
             still contains events with partial state.
         """
         try:
@@ -2350,7 +2350,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                     room_id,
                     un_partial_state_room_stream_id,
                 )
-                return True
+                return un_partial_state_room_stream_id
         except self.db_pool.engine.module.IntegrityError as e:
             # Assume that any `IntegrityError`s are due to partial state events.
             logger.info(
@@ -2358,7 +2358,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                 room_id,
                 e,
             )
-            return False
+            return None
 
     def _clear_partial_state_room_txn(
         self,
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index dfea825d98..1ae1e9e526 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -627,6 +627,7 @@ class StreamKeyType:
     PUSH_RULES: Final = "push_rules_key"
     TO_DEVICE: Final = "to_device_key"
     DEVICE_LIST: Final = "device_list_key"
+    UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)