diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 8af53b4f28..82ea3b895f 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
+ "outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
@@ -84,6 +85,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
+ "outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
}
@@ -116,6 +118,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
+ event.internal_metadata.outlier = event_payload["outlier"]
context = EventContext.deserialize(
self.storage, event_payload["context"]
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 8fa104c8d3..a4c5b44292 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -40,6 +40,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
+ "outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
@@ -79,7 +80,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
-
serialized_context = await context.serialize(event, store)
payload = {
@@ -87,6 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
+ "outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
@@ -108,6 +109,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
+ event.internal_metadata.outlier = content["outlier"]
requester = Requester.deserialize(self.store, content["requester"])
context = EventContext.deserialize(self.storage, content["context"])
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 045bd014da..93161c3dfb 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -24,7 +24,7 @@ from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f45e7a8c89..3dfee76743 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -33,7 +33,7 @@ import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
if TYPE_CHECKING:
- import synapse.server
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -299,20 +299,23 @@ class TypingStream(Stream):
NAME = "typing"
ROW_TYPE = TypingStreamRow
- def __init__(self, hs):
- typing_handler = hs.get_typing_handler()
-
+ def __init__(self, hs: "HomeServer"):
writer_instance = hs.config.worker.writers.typing
if writer_instance == hs.get_instance_name():
# On the writer, query the typing handler
- update_function = typing_handler.get_all_typing_updates
+ typing_writer_handler = hs.get_typing_writer_handler()
+ update_function = (
+ typing_writer_handler.get_all_typing_updates
+ ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
+ current_token_function = typing_writer_handler.get_current_token
else:
# Query the typing writer process
update_function = make_http_update_function(hs, self.NAME)
+ current_token_function = hs.get_typing_handler().get_current_token
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(typing_handler.get_current_token),
+ current_token_without_instance(current_token_function),
update_function,
)
@@ -509,7 +512,7 @@ class AccountDataStream(Stream):
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
|