diff options
author | reivilibre <oliverw@matrix.org> | 2022-12-14 14:47:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-14 14:47:11 +0000 |
commit | fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7 (patch) | |
tree | e7422eeab193b3d60b9d29676a0356669feb37fd /synapse/replication/tcp/streams | |
parent | Delete event_push_summary_unique_index again. (#14669) (diff) | |
download | synapse-fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7.tar.xz |
Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] (#14545)
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r-- | synapse/replication/tcp/streams/__init__.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/partial_state.py | 28 |
2 files changed, 34 insertions, 1 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 8575666d9c..110f10aab9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import ( ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream -from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream +from synapse.replication.tcp.streams.partial_state import ( + UnPartialStatedEventStream, + UnPartialStatedRoomStream, +) STREAMS_MAP = { stream.NAME: stream @@ -63,6 +66,7 @@ STREAMS_MAP = { AccountDataStream, UserSignatureStream, UnPartialStatedRoomStream, + UnPartialStatedEventStream, ) } @@ -83,4 +87,5 @@ __all__ = [ "AccountDataStream", "UserSignatureStream", "UnPartialStatedRoomStream", + "UnPartialStatedEventStream", ] diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index 18f087ffa2..b5a2ae74b6 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream): current_token_without_instance(store.get_un_partial_stated_rooms_token), store.get_un_partial_stated_rooms_from_stream, ) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnPartialStatedEventStreamRow: + # ID of the event that has been un-partial-stated. + event_id: str + + # True iff the rejection status of the event changed as a result of being + # un-partial-stated. + rejection_status_changed: bool + + +class UnPartialStatedEventStream(Stream): + """ + Stream to notify about events becoming un-partial-stated. + """ + + NAME = "un_partial_stated_event" + ROW_TYPE = UnPartialStatedEventStreamRow + + def __init__(self, hs: "HomeServer"): + store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + # TODO(faster_joins, multiple writers): we need to account for instance names + current_token_without_instance(store.get_un_partial_stated_events_token), + store.get_un_partial_stated_events_from_stream, + ) |