summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-03-18 13:59:52 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-03-18 13:59:52 +0000
commit7973fe82cea0e94bbc589f3c251a4f7f42894365 (patch)
tree611b98e7fd5b4b471ba17b58d23c67c1453405cc
parentFold ClientIpStore into ClientIpWorkerStore (diff)
downloadsynapse-7973fe82cea0e94bbc589f3c251a4f7f42894365.tar.xz
Make the background worker handle USER_IP replication commands
-rw-r--r--synapse/replication/tcp/handler.py22
1 files changed, 18 insertions, 4 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index b217c35f99..ae878724ff 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -235,6 +235,8 @@ class ReplicationCommandHandler: if self._is_master: self._server_notices_sender = hs.get_server_notices_sender() + self._is_background_worker = hs.config.worker.run_background_tasks + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -401,12 +403,27 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_ip_cache_counter.inc() - if self._is_master: + if self._is_master or self._is_background_worker: return self._handle_user_ip(cmd) else: return None async def _handle_user_ip(self, cmd: UserIpCommand) -> None: + """ + Handles a User IP, branching depending on whether we are the main process + and/or the background worker. + """ + if self._is_master: + await self._handle_user_ip_as_master(cmd) + + if self._is_background_worker: + await self._handle_user_ip_as_background_worker(cmd) + + async def _handle_user_ip_as_master(self, cmd: UserIpCommand) -> None: + assert self._server_notices_sender is not None + await self._server_notices_sender.on_user_ip(cmd.user_id) + + async def _handle_user_ip_as_background_worker(self, cmd: UserIpCommand) -> None: await self._store.insert_client_ip( cmd.user_id, cmd.access_token, @@ -416,9 +433,6 @@ class ReplicationCommandHandler: cmd.last_seen, ) - assert self._server_notices_sender is not None - await self._server_notices_sender.on_user_ip(cmd.user_id) - def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None: if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes