summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/federation.py3
-rw-r--r--synapse/replication/http/send_event.py4
-rw-r--r--synapse/replication/slave/storage/pushers.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py17
4 files changed, 17 insertions, 9 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 8af53b4f28..82ea3b895f 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                                     // containing the event
                 "event_format_version": .., // 1,2,3 etc: the event format version
                 "internal_metadata": { .. serialized internal_metadata .. },
+                "outlier": true|false,
                 "rejected_reason": ..,   // The event.rejected_reason field
                 "context": { .. serialized event context .. },
             }],
@@ -84,6 +85,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                     "room_version": event.room_version.identifier,
                     "event_format_version": event.format_version,
                     "internal_metadata": event.internal_metadata.get_dict(),
+                    "outlier": event.internal_metadata.is_outlier(),
                     "rejected_reason": event.rejected_reason,
                     "context": serialized_context,
                 }
@@ -116,6 +118,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 event = make_event_from_dict(
                     event_dict, room_ver, internal_metadata, rejected_reason
                 )
+                event.internal_metadata.outlier = event_payload["outlier"]
 
                 context = EventContext.deserialize(
                     self.storage, event_payload["context"]
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 8fa104c8d3..a4c5b44292 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -40,6 +40,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
                                 // containing the event
             "event_format_version": .., // 1,2,3 etc: the event format version
             "internal_metadata": { .. serialized internal_metadata .. },
+            "outlier": true|false,
             "rejected_reason": ..,   // The event.rejected_reason field
             "context": { .. serialized event context .. },
             "requester": { .. serialized requester .. },
@@ -79,7 +80,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             ratelimit (bool)
             extra_users (list(UserID)): Any extra users to notify about event
         """
-
         serialized_context = await context.serialize(event, store)
 
         payload = {
@@ -87,6 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             "room_version": event.room_version.identifier,
             "event_format_version": event.format_version,
             "internal_metadata": event.internal_metadata.get_dict(),
+            "outlier": event.internal_metadata.is_outlier(),
             "rejected_reason": event.rejected_reason,
             "context": serialized_context,
             "requester": requester.serialize(),
@@ -108,6 +109,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             event = make_event_from_dict(
                 event_dict, room_ver, internal_metadata, rejected_reason
             )
+            event.internal_metadata.outlier = content["outlier"]
 
             requester = Requester.deserialize(self.store, content["requester"])
             context = EventContext.deserialize(self.storage, content["context"])
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 045bd014da..93161c3dfb 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -24,7 +24,7 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 if TYPE_CHECKING:
-    from synapse.app.homeserver import HomeServer
+    from synapse.server import HomeServer
 
 
 class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f45e7a8c89..3dfee76743 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -33,7 +33,7 @@ import attr
 from synapse.replication.http.streams import ReplicationGetStreamUpdates
 
 if TYPE_CHECKING:
-    import synapse.server
+    from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
 
@@ -299,20 +299,23 @@ class TypingStream(Stream):
     NAME = "typing"
     ROW_TYPE = TypingStreamRow
 
-    def __init__(self, hs):
-        typing_handler = hs.get_typing_handler()
-
+    def __init__(self, hs: "HomeServer"):
         writer_instance = hs.config.worker.writers.typing
         if writer_instance == hs.get_instance_name():
             # On the writer, query the typing handler
-            update_function = typing_handler.get_all_typing_updates
+            typing_writer_handler = hs.get_typing_writer_handler()
+            update_function = (
+                typing_writer_handler.get_all_typing_updates
+            )  # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
+            current_token_function = typing_writer_handler.get_current_token
         else:
             # Query the typing writer process
             update_function = make_http_update_function(hs, self.NAME)
+            current_token_function = hs.get_typing_handler().get_current_token
 
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(typing_handler.get_current_token),
+            current_token_without_instance(current_token_function),
             update_function,
         )
 
@@ -509,7 +512,7 @@ class AccountDataStream(Stream):
     NAME = "account_data"
     ROW_TYPE = AccountDataStreamRow
 
-    def __init__(self, hs: "synapse.server.HomeServer"):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),