summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-12-14 14:47:11 +0000
committerGitHub <noreply@github.com>2022-12-14 14:47:11 +0000
commitfb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7 (patch)
treee7422eeab193b3d60b9d29676a0356669feb37fd /synapse/replication/tcp
parentDelete event_push_summary_unique_index again. (#14669) (diff)
downloadsynapse-fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7.tar.xz
Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] (#14545)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/streams/__init__.py7
-rw-r--r--synapse/replication/tcp/streams/partial_state.py28
2 files changed, 34 insertions, 1 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 8575666d9c..110f10aab9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import (
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
+from synapse.replication.tcp.streams.partial_state import (
+    UnPartialStatedEventStream,
+    UnPartialStatedRoomStream,
+)
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -63,6 +66,7 @@ STREAMS_MAP = {
         AccountDataStream,
         UserSignatureStream,
         UnPartialStatedRoomStream,
+        UnPartialStatedEventStream,
     )
 }
 
@@ -83,4 +87,5 @@ __all__ = [
     "AccountDataStream",
     "UserSignatureStream",
     "UnPartialStatedRoomStream",
+    "UnPartialStatedEventStream",
 ]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
index 18f087ffa2..b5a2ae74b6 100644
--- a/synapse/replication/tcp/streams/partial_state.py
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream):
             current_token_without_instance(store.get_un_partial_stated_rooms_token),
             store.get_un_partial_stated_rooms_from_stream,
         )
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedEventStreamRow:
+    # ID of the event that has been un-partial-stated.
+    event_id: str
+
+    # True iff the rejection status of the event changed as a result of being
+    # un-partial-stated.
+    rejection_status_changed: bool
+
+
+class UnPartialStatedEventStream(Stream):
+    """
+    Stream to notify about events becoming un-partial-stated.
+    """
+
+    NAME = "un_partial_stated_event"
+    ROW_TYPE = UnPartialStatedEventStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        store = hs.get_datastores().main
+        super().__init__(
+            hs.get_instance_name(),
+            # TODO(faster_joins, multiple writers): we need to account for instance names
+            current_token_without_instance(store.get_un_partial_stated_events_token),
+            store.get_un_partial_stated_events_from_stream,
+        )