diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/client.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 32 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 6 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 10 |
4 files changed, 55 insertions, 0 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 90fb6c1336..6d2513c4e2 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from .commands import ( FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) from .protocol import ClientReplicationStreamProtocol @@ -178,6 +179,12 @@ class ReplicationClientHandler(object): cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) + def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """Tell the master that the user made a request. + """ + cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) + self.send_command(cmd) + def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 84d2a2272a..a009214e43 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command): return " ".join((self.cache_func, json.dumps(self.keys))) +class UserIpCommand(Command): + """Sent periodically when a worker sees activity from a client. + + Format:: + + USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent> + """ + NAME = "USER_IP" + + def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen): + self.user_id = user_id + self.access_token = access_token + self.ip = ip + self.user_agent = user_agent + self.device_id = device_id + self.last_seen = last_seen + + @classmethod + def from_line(cls, line): + user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5) + + return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen)) + + def to_line(self): + return " ".join(( + self.user_id, self.access_token, self.ip, self.device_id, + str(self.last_seen), self.user_agent, + )) + + # Map of command name to command type. COMMAND_MAP = { cmd.NAME: cmd @@ -320,6 +350,7 @@ COMMAND_MAP = { SyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) } @@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = ( FederationAckCommand.NAME, RemovePusherCommand.NAME, InvalidateCacheCommand.NAME, + UserIpCommand.NAME, ErrorCommand.NAME, ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 9fee2a484b..062272f8dd 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_INVALIDATE_CACHE(self, cmd): self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + def on_USER_IP(self, cmd): + self.streamer.on_user_ip( + cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id, + cmd.last_seen, + ) + @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): """Subscribe the remote to a streams. diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 69c46911ec..3ea3ca5a6f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync") federation_ack_counter = metrics.register_counter("federation_ack") remove_pusher_counter = metrics.register_counter("remove_pusher") invalidate_cache_counter = metrics.register_counter("invalidate_cache") +user_ip_cache_counter = metrics.register_counter("user_ip_cache") logger = logging.getLogger(__name__) @@ -238,6 +239,15 @@ class ReplicationStreamer(object): invalidate_cache_counter.inc() getattr(self.store, cache_func).invalidate(tuple(keys)) + @measure_func("repl.on_user_ip") + def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """The client saw a user request + """ + user_ip_cache_counter.inc() + self.store.insert_client_ip( + user_id, access_token, ip, user_agent, device_id, last_seen, + ) + def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. |