diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 209833d287..b8198e059c 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -20,7 +20,7 @@ from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.logging.opentracing import active_span
from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import JsonDict
+from synapse.types import JsonDict, JsonMapping
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -82,7 +82,7 @@ class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
- ) -> Tuple[int, Dict[str, Optional[JsonDict]]]:
+ ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]:
user_ids: List[str] = content["user_ids"]
logger.info("Resync for %r", user_ids)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d9045d7b73..5642666411 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -644,7 +644,7 @@ class ReplicationCommandHandler:
[stream.parse_row(row) for row in rows],
)
- logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
+ logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 347467d863..1d9a29d22e 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -191,7 +191,12 @@ class ReplicationStreamer:
if updates:
logger.info(
- "Streaming: %s -> %s", stream.NAME, updates[-1][0]
+ "Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)",
+ stream.NAME,
+ updates[-1][0],
+ limited,
+ len(updates),
+ current_token,
)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
|