summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-04-01 13:08:55 +0100
committerGitHub <noreply@github.com>2022-04-01 13:08:55 +0100
commitf8712228801d9c4753e65819fa4241818b56953b (patch)
treefffb61404df1fe81c8598a1ac4b7d37c62747638 /synapse/replication
parentRaise an exception when getting state at an outlier (#12191) (diff)
downloadsynapse-f8712228801d9c4753e65819fa4241818b56953b.tar.xz
Move `update_client_ip` background job from the main process to the background worker. (#12251)
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/client_ips.py59
-rw-r--r--synapse/replication/tcp/commands.py8
-rw-r--r--synapse/replication/tcp/handler.py48
3 files changed, 42 insertions, 73 deletions
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
deleted file mode 100644
index 14706a0817..0000000000
--- a/synapse/replication/slave/storage/client_ips.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright 2017 Vector Creations Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
-from synapse.util.caches.lrucache import LruCache
-
-from ._base import BaseSlavedStore
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-
-class SlavedClientIpStore(BaseSlavedStore):
-    def __init__(
-        self,
-        database: DatabasePool,
-        db_conn: LoggingDatabaseConnection,
-        hs: "HomeServer",
-    ):
-        super().__init__(database, db_conn, hs)
-
-        self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
-            cache_name="client_ip_last_seen", max_size=50000
-        )
-
-    async def insert_client_ip(
-        self, user_id: str, access_token: str, ip: str, user_agent: str, device_id: str
-    ) -> None:
-        now = int(self._clock.time_msec())
-        key = (user_id, access_token, ip)
-
-        try:
-            last_seen = self.client_ip_last_seen.get(key)
-        except KeyError:
-            last_seen = None
-
-        # Rate-limited inserts
-        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
-            return
-
-        self.client_ip_last_seen.set(key, now)
-
-        self.hs.get_replication_command_handler().send_user_ip(
-            user_id, access_token, ip, user_agent, device_id, now
-        )
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 3654f6c03c..fe34948168 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -356,7 +356,7 @@ class UserIpCommand(Command):
         access_token: str,
         ip: str,
         user_agent: str,
-        device_id: str,
+        device_id: Optional[str],
         last_seen: int,
     ):
         self.user_id = user_id
@@ -389,6 +389,12 @@ class UserIpCommand(Command):
             )
         )
 
+    def __repr__(self) -> str:
+        return (
+            f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, "
+            f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
+        )
+
 
 class RemoteServerUpCommand(_SimpleCommand):
     """Sent when a worker has detected that a remote server is no longer
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b217c35f99..615f1828dd 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -235,6 +235,14 @@ class ReplicationCommandHandler:
         if self._is_master:
             self._server_notices_sender = hs.get_server_notices_sender()
 
+        if hs.config.redis.redis_enabled:
+            # If we're using Redis, it's the background worker that should
+            # receive USER_IP commands and store the relevant client IPs.
+            self._should_insert_client_ips = hs.config.worker.run_background_tasks
+        else:
+            # If we're NOT using Redis, this must be handled by the master
+            self._should_insert_client_ips = hs.get_instance_name() == "master"
+
     def _add_command_to_stream_queue(
         self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
     ) -> None:
@@ -401,23 +409,37 @@ class ReplicationCommandHandler:
     ) -> Optional[Awaitable[None]]:
         user_ip_cache_counter.inc()
 
-        if self._is_master:
+        if self._is_master or self._should_insert_client_ips:
+            # We make a point of only returning an awaitable if there's actually
+            # something to do; on_USER_IP is not an async function, but
+            # _handle_user_ip is.
+            # If on_USER_IP returns an awaitable, it gets scheduled as a
+            # background process (see `BaseReplicationStreamProtocol.handle_command`).
             return self._handle_user_ip(cmd)
         else:
+            # Returning None when this process definitely has nothing to do
+            # reduces the overhead of handling the USER_IP command, which is
+            # currently broadcast to all workers regardless of utility.
             return None
 
     async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
-        await self._store.insert_client_ip(
-            cmd.user_id,
-            cmd.access_token,
-            cmd.ip,
-            cmd.user_agent,
-            cmd.device_id,
-            cmd.last_seen,
-        )
-
-        assert self._server_notices_sender is not None
-        await self._server_notices_sender.on_user_ip(cmd.user_id)
+        """
+        Handles a User IP, branching depending on whether we are the main process
+        and/or the background worker.
+        """
+        if self._is_master:
+            assert self._server_notices_sender is not None
+            await self._server_notices_sender.on_user_ip(cmd.user_id)
+
+        if self._should_insert_client_ips:
+            await self._store.insert_client_ip(
+                cmd.user_id,
+                cmd.access_token,
+                cmd.ip,
+                cmd.user_agent,
+                cmd.device_id,
+                cmd.last_seen,
+            )
 
     def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
         if cmd.instance_name == self._instance_name:
@@ -698,7 +720,7 @@ class ReplicationCommandHandler:
         access_token: str,
         ip: str,
         user_agent: str,
-        device_id: str,
+        device_id: Optional[str],
         last_seen: int,
     ) -> None:
         """Tell the master that the user made a request."""