summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/replication/slave/storage/client_ips.py59
-rw-r--r--synapse/replication/slave/storage/devices.py13
-rw-r--r--synapse/replication/tcp/commands.py8
-rw-r--r--synapse/replication/tcp/handler.py48
5 files changed, 55 insertions, 75 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py

index f1abb98653..2bd244ed79 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -275,7 +275,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS: raise - delay = 2 ** attempts + delay = 2**attempts logger.warning( "%s request connection failed; retrying in %ds: %r", cls.NAME, diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py deleted file mode 100644
index 14706a0817..0000000000 --- a/synapse/replication/slave/storage/client_ips.py +++ /dev/null
@@ -1,59 +0,0 @@ -# Copyright 2017 Vector Creations Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING - -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection -from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY -from synapse.util.caches.lrucache import LruCache - -from ._base import BaseSlavedStore - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class SlavedClientIpStore(BaseSlavedStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - self.client_ip_last_seen: LruCache[tuple, int] = LruCache( - cache_name="client_ip_last_seen", max_size=50000 - ) - - async def insert_client_ip( - self, user_id: str, access_token: str, ip: str, user_agent: str, device_id: str - ) -> None: - now = int(self._clock.time_msec()) - key = (user_id, access_token, ip) - - try: - last_seen = self.client_ip_last_seen.get(key) - except KeyError: - last_seen = None - - # Rate-limited inserts - if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - return - - self.client_ip_last_seen.set(key, now) - - self.hs.get_replication_command_handler().send_user_ip( - user_id, access_token, ip, user_agent, device_id, now - ) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 0ffd34f1da..00a634d3a9 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py
@@ -44,11 +44,22 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto extra_tables=[ ("user_signature_stream", "stream_id"), ("device_lists_outbound_pokes", "stream_id"), + ("device_lists_changes_in_room", "stream_id"), ], ) device_list_max = self._device_list_id_gen.get_current_token() + device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( + db_conn, + "device_lists_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=device_list_max, + limit=1000, + ) self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", device_list_max + "DeviceListStreamChangeCache", + min_device_list_id, + prefilled_cache=device_list_prefill, ) self._user_signature_stream_cache = StreamChangeCache( "UserSignatureStreamChangeCache", device_list_max diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 3654f6c03c..fe34948168 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -356,7 +356,7 @@ class UserIpCommand(Command): access_token: str, ip: str, user_agent: str, - device_id: str, + device_id: Optional[str], last_seen: int, ): self.user_id = user_id @@ -389,6 +389,12 @@ class UserIpCommand(Command): ) ) + def __repr__(self) -> str: + return ( + f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, " + f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})" + ) + class RemoteServerUpCommand(_SimpleCommand): """Sent when a worker has detected that a remote server is no longer diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b217c35f99..615f1828dd 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -235,6 +235,14 @@ class ReplicationCommandHandler: if self._is_master: self._server_notices_sender = hs.get_server_notices_sender() + if hs.config.redis.redis_enabled: + # If we're using Redis, it's the background worker that should + # receive USER_IP commands and store the relevant client IPs. + self._should_insert_client_ips = hs.config.worker.run_background_tasks + else: + # If we're NOT using Redis, this must be handled by the master + self._should_insert_client_ips = hs.get_instance_name() == "master" + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -401,23 +409,37 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_ip_cache_counter.inc() - if self._is_master: + if self._is_master or self._should_insert_client_ips: + # We make a point of only returning an awaitable if there's actually + # something to do; on_USER_IP is not an async function, but + # _handle_user_ip is. + # If on_USER_IP returns an awaitable, it gets scheduled as a + # background process (see `BaseReplicationStreamProtocol.handle_command`). return self._handle_user_ip(cmd) else: + # Returning None when this process definitely has nothing to do + # reduces the overhead of handling the USER_IP command, which is + # currently broadcast to all workers regardless of utility. return None async def _handle_user_ip(self, cmd: UserIpCommand) -> None: - await self._store.insert_client_ip( - cmd.user_id, - cmd.access_token, - cmd.ip, - cmd.user_agent, - cmd.device_id, - cmd.last_seen, - ) - - assert self._server_notices_sender is not None - await self._server_notices_sender.on_user_ip(cmd.user_id) + """ + Handles a User IP, branching depending on whether we are the main process + and/or the background worker. + """ + if self._is_master: + assert self._server_notices_sender is not None + await self._server_notices_sender.on_user_ip(cmd.user_id) + + if self._should_insert_client_ips: + await self._store.insert_client_ip( + cmd.user_id, + cmd.access_token, + cmd.ip, + cmd.user_agent, + cmd.device_id, + cmd.last_seen, + ) def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None: if cmd.instance_name == self._instance_name: @@ -698,7 +720,7 @@ class ReplicationCommandHandler: access_token: str, ip: str, user_agent: str, - device_id: str, + device_id: Optional[str], last_seen: int, ) -> None: """Tell the master that the user made a request."""