From f8712228801d9c4753e65819fa4241818b56953b Mon Sep 17 00:00:00 2001 From: reivilibre Date: Fri, 1 Apr 2022 13:08:55 +0100 Subject: Move `update_client_ip` background job from the main process to the background worker. (#12251) --- synapse/replication/tcp/handler.py | 48 +++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 13 deletions(-) (limited to 'synapse/replication/tcp/handler.py') diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b217c35f99..615f1828dd 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -235,6 +235,14 @@ class ReplicationCommandHandler: if self._is_master: self._server_notices_sender = hs.get_server_notices_sender() + if hs.config.redis.redis_enabled: + # If we're using Redis, it's the background worker that should + # receive USER_IP commands and store the relevant client IPs. + self._should_insert_client_ips = hs.config.worker.run_background_tasks + else: + # If we're NOT using Redis, this must be handled by the master + self._should_insert_client_ips = hs.get_instance_name() == "master" + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -401,23 +409,37 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_ip_cache_counter.inc() - if self._is_master: + if self._is_master or self._should_insert_client_ips: + # We make a point of only returning an awaitable if there's actually + # something to do; on_USER_IP is not an async function, but + # _handle_user_ip is. + # If on_USER_IP returns an awaitable, it gets scheduled as a + # background process (see `BaseReplicationStreamProtocol.handle_command`). return self._handle_user_ip(cmd) else: + # Returning None when this process definitely has nothing to do + # reduces the overhead of handling the USER_IP command, which is + # currently broadcast to all workers regardless of utility. return None async def _handle_user_ip(self, cmd: UserIpCommand) -> None: - await self._store.insert_client_ip( - cmd.user_id, - cmd.access_token, - cmd.ip, - cmd.user_agent, - cmd.device_id, - cmd.last_seen, - ) - - assert self._server_notices_sender is not None - await self._server_notices_sender.on_user_ip(cmd.user_id) + """ + Handles a User IP, branching depending on whether we are the main process + and/or the background worker. + """ + if self._is_master: + assert self._server_notices_sender is not None + await self._server_notices_sender.on_user_ip(cmd.user_id) + + if self._should_insert_client_ips: + await self._store.insert_client_ip( + cmd.user_id, + cmd.access_token, + cmd.ip, + cmd.user_agent, + cmd.device_id, + cmd.last_seen, + ) def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None: if cmd.instance_name == self._instance_name: @@ -698,7 +720,7 @@ class ReplicationCommandHandler: access_token: str, ip: str, user_agent: str, - device_id: str, + device_id: Optional[str], last_seen: int, ) -> None: """Tell the master that the user made a request.""" -- cgit 1.4.1