summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b4dad47b45..658d89210d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import (
     TagAccountDataStream,
     ToDeviceStream,
     TypingStream,
+    UnPartialStatedEventStream,
     UnPartialStatedRoomStream,
 )
 from synapse.replication.tcp.streams.events import (
@@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
+from synapse.replication.tcp.streams.partial_state import (
+    UnPartialStatedEventStreamRow,
+    UnPartialStatedRoomStreamRow,
+)
 from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
 from synapse.util.async_helpers import Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
@@ -247,6 +251,14 @@ class ReplicationDataHandler:
                 self._state_storage_controller.notify_room_un_partial_stated(
                     row.room_id
                 )
+        elif stream_name == UnPartialStatedEventStream.NAME:
+            for row in rows:
+                assert isinstance(row, UnPartialStatedEventStreamRow)
+
+                # Wake up any tasks waiting for the event to be un-partial-stated.
+                self._state_storage_controller.notify_event_un_partial_stated(
+                    row.event_id
+                )
 
         await self._presence_handler.process_replication_rows(
             stream_name, instance_name, token, rows