diff options
-rw-r--r-- | synapse/replication/tcp/client.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 48 |
2 files changed, 41 insertions, 9 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 251d3afcf4..90fb6c1336 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -175,7 +175,7 @@ class ReplicationClientHandler(object): def send_invalidate_cache(self, cache_func, keys): """Poke the master to invalidate a cache. """ - cmd = InvalidateCacheCommand(cache_func, keys) + cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) def await_sync(self, data): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 6864204616..19b1ce504f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -51,6 +51,7 @@ indicate which side is sending, these are *not* included on the wire:: from twisted.internet import defer from twisted.protocols.basic import LineOnlyReceiver +from twisted.python.failure import Failure from commands import ( COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, @@ -60,6 +61,7 @@ from commands import ( from streams import STREAMS_MAP from synapse.util.stringutils import random_string +from synapse.metrics.metric import CounterMetric import logging import synapse.metrics @@ -69,11 +71,8 @@ import fcntl metrics = synapse.metrics.get_metrics_for(__name__) -inbound_commands_counter = metrics.register_counter( - "inbound_commands", labels=["command", "name", "conn_id"], -) -outbound_commands_counter = metrics.register_counter( - "outbound_commands", labels=["command", "name", "conn_id"], +connection_close_counter = metrics.register_counter( + "close_reason", labels=["reason_type"], ) @@ -135,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None + self.inbound_commands_counter = CounterMetric( + "inbound_commands", labels=["command"], + ) + self.outbound_commands_counter = CounterMetric( + "outbound_commands", labels=["command"], + ) + def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -193,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) + self.inbound_commands_counter.inc(cmd_name) cmd_cls = COMMAND_MAP[cmd_name] try: @@ -214,6 +220,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): logger.exception("[%s] Failed to handle line: %r", self.id(), line) def close(self): + logger.warn("[%s] Closing connection", self.id()) self.time_we_closed = self.clock.time_msec() self.transport.loseConnection() self.on_connection_closed() @@ -242,7 +249,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) + self.outbound_commands_counter.inc(cmd.NAME) string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: @@ -307,6 +314,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def connectionLost(self, reason): logger.info("[%s] Replication connection closed: %r", self.id(), reason) + if isinstance(reason, Failure): + connection_close_counter.inc(reason.type.__name__) + else: + connection_close_counter.inc(reason.__class__.__name__) try: # Remove us from list of connections to be monitored @@ -495,7 +506,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) - self.transport.abortConnection() + self.send_error("Wrong remote") def on_RDATA(self, cmd): try: @@ -604,3 +615,24 @@ metrics.register_callback( }, labels=["name", "conn_id"], ) + + +metrics.register_callback( + "inbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.inbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +) + +metrics.register_callback( + "outbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.outbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +) |