diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/streams/__init__.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/partial_state.py | 48 |
2 files changed, 51 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index b1cd55bf6f..8575666d9c 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import ( ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream STREAMS_MAP = { stream.NAME: stream @@ -61,6 +62,7 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, UserSignatureStream, + UnPartialStatedRoomStream, ) } @@ -80,4 +82,5 @@ __all__ = [ "TagAccountDataStream", "AccountDataStream", "UserSignatureStream", + "UnPartialStatedRoomStream", ] diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py new file mode 100644 index 0000000000..18f087ffa2 --- /dev/null +++ b/synapse/replication/tcp/streams/partial_state.py @@ -0,0 +1,48 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +import attr + +from synapse.replication.tcp.streams import Stream +from synapse.replication.tcp.streams._base import current_token_without_instance + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnPartialStatedRoomStreamRow: + # ID of the room that has been un-partial-stated. + room_id: str + + +class UnPartialStatedRoomStream(Stream): + """ + Stream to notify about rooms becoming un-partial-stated; + that is, when the background sync finishes such that we now have full state for + the room. + """ + + NAME = "un_partial_stated_room" + ROW_TYPE = UnPartialStatedRoomStreamRow + + def __init__(self, hs: "HomeServer"): + store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + # TODO(faster_joins, multiple writers): we need to account for instance names + current_token_without_instance(store.get_un_partial_stated_rooms_token), + store.get_un_partial_stated_rooms_from_stream, + ) |