diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 908f3f1db7..c20d9c7e9d 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -426,6 +426,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
code, response = await self.response_cache.wrap(
txn_id, self._handle_request, request, content, **kwargs
)
+ # Take a copy so we don't mutate things in the cache.
+ response = dict(response)
else:
# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py
index 2374f810c9..111ec07e64 100644
--- a/synapse/replication/http/account_data.py
+++ b/synapse/replication/http/account_data.py
@@ -265,7 +265,6 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
@staticmethod
async def _serialize_payload(user_id: str, room_id: str, tag: str) -> JsonDict: # type: ignore[override]
-
return {}
async def _handle_request( # type: ignore[override]
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index ecea6fc915..cc3929dcf5 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -195,7 +195,6 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
async def _serialize_payload( # type: ignore[override]
user_id: str, device_id: str, keys: JsonDict
) -> JsonDict:
-
return {
"user_id": user_id,
"device_id": device_id,
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 9fa1060d48..67b01db67e 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -142,17 +142,12 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
}
async def _handle_request( # type: ignore[override]
- self,
- request: SynapseRequest,
- content: JsonDict,
- room_id: str,
- user_id: str,
+ self, request: SynapseRequest, content: JsonDict, room_id: str, user_id: str
) -> Tuple[int, JsonDict]:
remote_room_hosts = content["remote_room_hosts"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
-
request.requester = requester
logger.debug("remote_knock: %s on room: %s", user_id, room_id)
@@ -277,16 +272,12 @@ class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
}
async def _handle_request( # type: ignore[override]
- self,
- request: SynapseRequest,
- content: JsonDict,
- knock_event_id: str,
+ self, request: SynapseRequest, content: JsonDict, knock_event_id: str
) -> Tuple[int, JsonDict]:
txn_id = content["txn_id"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
-
request.requester = requester
# hopefully we're now on the master, so this won't recurse!
@@ -363,3 +354,5 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
+ ReplicationRemoteKnockRestServlet(hs).register(http_server)
+ ReplicationRemoteRescindKnockRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index f1dc435f8d..2accffe18d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -370,15 +370,23 @@ class ReplicationDataHandler:
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
logger.info(
- "Waiting for repl stream %r to reach %s (%s)",
+ "Waiting for repl stream %r to reach %s (%s); currently at: %s",
stream_name,
position,
instance_name,
+ current_position,
)
try:
await make_deferred_yieldable(deferred)
except defer.TimeoutError:
- logger.error("Timed out waiting for stream %s", stream_name)
+ logger.error(
+ "Timed out waiting for repl stream %r to reach %s (%s)"
+ "; currently at: %s",
+ stream_name,
+ position,
+ instance_name,
+ self._streams[stream_name].current_token(instance_name),
+ )
return
logger.info(
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index c5b0bb4e1f..2a6ad95986 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -328,7 +328,6 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
outbound_redis_connection: txredisapi.ConnectionHandler,
channel_names: List[str],
):
-
super().__init__(
hs,
uuid="subscriber",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index ce95714ea0..f1c3e7595a 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer:
except Exception:
logger.exception("Failed to replicate")
+ # The last token we send may not match the current
+ # token, in which case we want to send out a `POSITION`
+ # to tell other workers the actual current position.
+ if updates[-1][0] < current_token:
+ logger.info(
+ "Sending position: %s -> %s",
+ stream.NAME,
+ current_token,
+ )
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ updates[-1][0],
+ current_token,
+ )
+ )
+
logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 14b6705862..ad9b760713 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -139,7 +139,6 @@ class EventsStream(Stream):
current_token: Token,
target_row_count: int,
) -> StreamUpdateResult:
-
# the events stream merges together three separate sources:
# * new events
# * current_state changes
|