diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b217c35f99..ae878724ff 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -235,6 +235,8 @@ class ReplicationCommandHandler:
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()
+ self._is_background_worker = hs.config.worker.run_background_tasks
+
def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
@@ -401,12 +403,27 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
- if self._is_master:
+ if self._is_master or self._is_background_worker:
return self._handle_user_ip(cmd)
else:
return None
async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
+ """
+ Handles a User IP, branching depending on whether we are the main process
+ and/or the background worker.
+ """
+ if self._is_master:
+ await self._handle_user_ip_as_master(cmd)
+
+ if self._is_background_worker:
+ await self._handle_user_ip_as_background_worker(cmd)
+
+ async def _handle_user_ip_as_master(self, cmd: UserIpCommand) -> None:
+ assert self._server_notices_sender is not None
+ await self._server_notices_sender.on_user_ip(cmd.user_id)
+
+ async def _handle_user_ip_as_background_worker(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
@@ -416,9 +433,6 @@ class ReplicationCommandHandler:
cmd.last_seen,
)
- assert self._server_notices_sender is not None
- await self._server_notices_sender.on_user_ip(cmd.user_id)
-
def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
|