1 files changed, 26 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 28f0d4a25a..2b0e52f23c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -314,6 +314,32 @@ class Notifier:
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)
+ async def on_un_partial_stated_room(
+ self,
+ room_id: str,
+ new_token: int,
+ ) -> None:
+ """Used by the resync background processes to wake up all listeners
+ of this room when it is un-partial-stated.
+
+ It will also notify replication listeners of the change in stream.
+ """
+
+ # Wake up all related user stream notifiers
+ user_streams = self.room_to_user_streams.get(room_id, set())
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(
+ StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
+ )
+ except Exception:
+ logger.exception("Failed to notify listener")
+
+ # Poke the replication so that other workers also see the write to
+ # the un-partial-stated rooms stream.
+ self.notify_replication()
+
async def notify_new_room_events(
self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
|