diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
deleted file mode 100644
index 14706a0817..0000000000
--- a/synapse/replication/slave/storage/client_ips.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright 2017 Vector Creations Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
-from synapse.util.caches.lrucache import LruCache
-
-from ._base import BaseSlavedStore
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedClientIpStore(BaseSlavedStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
- cache_name="client_ip_last_seen", max_size=50000
- )
-
- async def insert_client_ip(
- self, user_id: str, access_token: str, ip: str, user_agent: str, device_id: str
- ) -> None:
- now = int(self._clock.time_msec())
- key = (user_id, access_token, ip)
-
- try:
- last_seen = self.client_ip_last_seen.get(key)
- except KeyError:
- last_seen = None
-
- # Rate-limited inserts
- if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
- return
-
- self.client_ip_last_seen.set(key, now)
-
- self.hs.get_replication_command_handler().send_user_ip(
- user_id, access_token, ip, user_agent, device_id, now
- )
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 3654f6c03c..fe34948168 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -356,7 +356,7 @@ class UserIpCommand(Command):
access_token: str,
ip: str,
user_agent: str,
- device_id: str,
+ device_id: Optional[str],
last_seen: int,
):
self.user_id = user_id
@@ -389,6 +389,12 @@ class UserIpCommand(Command):
)
)
+ def __repr__(self) -> str:
+ return (
+ f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, "
+ f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
+ )
+
class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b217c35f99..615f1828dd 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -235,6 +235,14 @@ class ReplicationCommandHandler:
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()
+ if hs.config.redis.redis_enabled:
+ # If we're using Redis, it's the background worker that should
+ # receive USER_IP commands and store the relevant client IPs.
+ self._should_insert_client_ips = hs.config.worker.run_background_tasks
+ else:
+ # If we're NOT using Redis, this must be handled by the master
+ self._should_insert_client_ips = hs.get_instance_name() == "master"
+
def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
@@ -401,23 +409,37 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
- if self._is_master:
+ if self._is_master or self._should_insert_client_ips:
+ # We make a point of only returning an awaitable if there's actually
+ # something to do; on_USER_IP is not an async function, but
+ # _handle_user_ip is.
+ # If on_USER_IP returns an awaitable, it gets scheduled as a
+ # background process (see `BaseReplicationStreamProtocol.handle_command`).
return self._handle_user_ip(cmd)
else:
+ # Returning None when this process definitely has nothing to do
+ # reduces the overhead of handling the USER_IP command, which is
+ # currently broadcast to all workers regardless of utility.
return None
async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
- await self._store.insert_client_ip(
- cmd.user_id,
- cmd.access_token,
- cmd.ip,
- cmd.user_agent,
- cmd.device_id,
- cmd.last_seen,
- )
-
- assert self._server_notices_sender is not None
- await self._server_notices_sender.on_user_ip(cmd.user_id)
+ """
+ Handles a User IP, branching depending on whether we are the main process
+ and/or the background worker.
+ """
+ if self._is_master:
+ assert self._server_notices_sender is not None
+ await self._server_notices_sender.on_user_ip(cmd.user_id)
+
+ if self._should_insert_client_ips:
+ await self._store.insert_client_ip(
+ cmd.user_id,
+ cmd.access_token,
+ cmd.ip,
+ cmd.user_agent,
+ cmd.device_id,
+ cmd.last_seen,
+ )
def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
@@ -698,7 +720,7 @@ class ReplicationCommandHandler:
access_token: str,
ip: str,
user_agent: str,
- device_id: str,
+ device_id: Optional[str],
last_seen: int,
) -> None:
"""Tell the master that the user made a request."""
|