summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/streams/__init__.py3
-rw-r--r--synapse/replication/tcp/streams/partial_state.py48
2 files changed, 51 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -61,6 +62,7 @@ STREAMS_MAP = {
         TagAccountDataStream,
         AccountDataStream,
         UserSignatureStream,
+        UnPartialStatedRoomStream,
     )
 }
 
@@ -80,4 +82,5 @@ __all__ = [
     "TagAccountDataStream",
     "AccountDataStream",
     "UserSignatureStream",
+    "UnPartialStatedRoomStream",
 ]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
new file mode 100644
index 0000000000..18f087ffa2
--- /dev/null
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+import attr
+
+from synapse.replication.tcp.streams import Stream
+from synapse.replication.tcp.streams._base import current_token_without_instance
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedRoomStreamRow:
+    # ID of the room that has been un-partial-stated.
+    room_id: str
+
+
+class UnPartialStatedRoomStream(Stream):
+    """
+    Stream to notify about rooms becoming un-partial-stated;
+    that is, when the background sync finishes such that we now have full state for
+    the room.
+    """
+
+    NAME = "un_partial_stated_room"
+    ROW_TYPE = UnPartialStatedRoomStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        store = hs.get_datastores().main
+        super().__init__(
+            hs.get_instance_name(),
+            # TODO(faster_joins, multiple writers): we need to account for instance names
+            current_token_without_instance(store.get_un_partial_stated_rooms_token),
+            store.get_un_partial_stated_rooms_from_stream,
+        )