diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b4dad47b45..658d89210d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream,
ToDeviceStream,
TypingStream,
+ UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
@@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
+from synapse.replication.tcp.streams.partial_state import (
+ UnPartialStatedEventStreamRow,
+ UnPartialStatedRoomStreamRow,
+)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
@@ -247,6 +251,14 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
+ elif stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+
+ # Wake up any tasks waiting for the event to be un-partial-stated.
+ self._state_storage_controller.notify_event_un_partial_stated(
+ row.event_id
+ )
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 8575666d9c..110f10aab9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import (
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
+from synapse.replication.tcp.streams.partial_state import (
+ UnPartialStatedEventStream,
+ UnPartialStatedRoomStream,
+)
STREAMS_MAP = {
stream.NAME: stream
@@ -63,6 +66,7 @@ STREAMS_MAP = {
AccountDataStream,
UserSignatureStream,
UnPartialStatedRoomStream,
+ UnPartialStatedEventStream,
)
}
@@ -83,4 +87,5 @@ __all__ = [
"AccountDataStream",
"UserSignatureStream",
"UnPartialStatedRoomStream",
+ "UnPartialStatedEventStream",
]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
index 18f087ffa2..b5a2ae74b6 100644
--- a/synapse/replication/tcp/streams/partial_state.py
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream):
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_from_stream,
)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedEventStreamRow:
+ # ID of the event that has been un-partial-stated.
+ event_id: str
+
+ # True iff the rejection status of the event changed as a result of being
+ # un-partial-stated.
+ rejection_status_changed: bool
+
+
+class UnPartialStatedEventStream(Stream):
+ """
+ Stream to notify about events becoming un-partial-stated.
+ """
+
+ NAME = "un_partial_stated_event"
+ ROW_TYPE = UnPartialStatedEventStreamRow
+
+ def __init__(self, hs: "HomeServer"):
+ store = hs.get_datastores().main
+ super().__init__(
+ hs.get_instance_name(),
+ # TODO(faster_joins, multiple writers): we need to account for instance names
+ current_token_without_instance(store.get_un_partial_stated_events_token),
+ store.get_un_partial_stated_events_from_stream,
+ )
|