diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index bbe2f967b7..a9baa2c1c3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet):
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
- self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+ self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
- result = self.response_cache.get(event_id)
- if not result:
- result = self.response_cache.set(
- event_id,
- self._handle_request(request)
- )
- else:
- logger.warn("Returning cached response")
- return make_deferred_yieldable(result)
-
- @preserve_fn
+ return self.response_cache.wrap(
+ event_id,
+ self._handle_request,
+ request
+ )
+
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 171227cce2..12aac3cc6b 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,11 +19,13 @@ allowed to be sent by which side.
"""
import logging
-import ujson as json
+import simplejson
logger = logging.getLogger(__name__)
+_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
+
class Command(object):
"""The base command class.
@@ -100,14 +102,14 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
- json.loads(row_json)
+ simplejson.loads(row_json)
)
def to_line(self):
return " ".join((
self.stream_name,
str(self.token) if self.token is not None else "batch",
- json.dumps(self.row),
+ _json_encoder.encode(self.row),
))
@@ -298,10 +300,12 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
- return cls(cache_func, json.loads(keys_json))
+ return cls(cache_func, simplejson.loads(keys_json))
def to_line(self):
- return " ".join((self.cache_func, json.dumps(self.keys)))
+ return " ".join((
+ self.cache_func, _json_encoder.encode(self.keys),
+ ))
class UserIpCommand(Command):
@@ -325,14 +329,14 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+ access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen
)
def to_line(self):
- return self.user_id + " " + json.dumps((
+ return self.user_id + " " + _json_encoder.encode((
self.access_token, self.ip, self.user_agent, self.device_id,
self.last_seen,
))
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0a9a290af4..a6280aae70 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -53,28 +53,28 @@ from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
-from commands import (
+from .commands import (
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
)
-from streams import STREAMS_MAP
+from .streams import STREAMS_MAP
+from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
-from synapse.metrics.metric import CounterMetric
-import logging
-import synapse.metrics
-import struct
-import fcntl
+from prometheus_client import Counter
+from collections import defaultdict
-metrics = synapse.metrics.get_metrics_for(__name__)
+from six import iterkeys, iteritems
-connection_close_counter = metrics.register_counter(
- "close_reason", labels=["reason_type"],
-)
+import logging
+import struct
+import fcntl
+connection_close_counter = Counter(
+ "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
# A list of all connected protocols. This allows us to send metrics about the
# connections.
@@ -136,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
- self.inbound_commands_counter = CounterMetric(
- "inbound_commands", labels=["command"],
- )
- self.outbound_commands_counter = CounterMetric(
- "outbound_commands", labels=["command"],
- )
+ self.inbound_commands_counter = defaultdict(int)
+ self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -201,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
- self.inbound_commands_counter.inc(cmd_name)
+ self.inbound_commands_counter[cmd_name] = (
+ self.inbound_commands_counter[cmd_name] + 1)
cmd_cls = COMMAND_MAP[cmd_name]
try:
@@ -251,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
- self.outbound_commands_counter.inc(cmd.NAME)
-
+ self.outbound_commands_counter[cmd.NAME] = (
+ self.outbound_commands_counter[cmd.NAME] + 1)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@@ -317,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
- connection_close_counter.inc(reason.type.__name__)
+ connection_close_counter.labels(reason.type.__name__).inc()
else:
- connection_close_counter.inc(reason.__class__.__name__)
+ connection_close_counter.labels(reason.__class__.__name__).inc()
try:
# Remove us from list of connections to be monitored
@@ -392,7 +389,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
- for stream in self.streamer.streams_by_name.iterkeys():
+ for stream in iterkeys(self.streamer.streams_by_name):
self.subscribe_to_stream(stream, token)
else:
self.subscribe_to_stream(stream_name, token)
@@ -498,7 +495,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
BaseReplicationStreamProtocol.connectionMade(self)
# Once we've connected subscribe to the necessary streams
- for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
+ for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
self.replicate(stream_name, token)
# Tell the server if we have any users currently syncing (should only
@@ -518,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
- inbound_rdata_count.inc(stream_name)
+ inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
@@ -566,14 +563,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
-metrics.register_callback(
- "pending_commands",
+pending_commands = LaterGauge(
+ "pending_commands", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
- },
- labels=["name", "conn_id"],
-)
+ })
def transport_buffer_size(protocol):
@@ -583,14 +578,12 @@ def transport_buffer_size(protocol):
return 0
-metrics.register_callback(
- "transport_send_buffer",
+transport_send_buffer = LaterGauge(
+ "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
- },
- labels=["name", "conn_id"],
-)
+ })
def transport_kernel_read_buffer_size(protocol, read=True):
@@ -608,48 +601,38 @@ def transport_kernel_read_buffer_size(protocol, read=True):
return 0
-metrics.register_callback(
- "transport_kernel_send_buffer",
+tcp_transport_kernel_send_buffer = LaterGauge(
+ "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
- },
- labels=["name", "conn_id"],
-)
+ })
-metrics.register_callback(
- "transport_kernel_read_buffer",
+tcp_transport_kernel_read_buffer = LaterGauge(
+ "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
- },
- labels=["name", "conn_id"],
-)
+ })
-metrics.register_callback(
- "inbound_commands",
+tcp_inbound_commands = LaterGauge(
+ "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
- for k, count in p.inbound_commands_counter.counts.iteritems()
- },
- labels=["command", "name", "conn_id"],
-)
+ for k, count in iteritems(p.inbound_commands_counter.counts)
+ })
-metrics.register_callback(
- "outbound_commands",
+tcp_outbound_commands = LaterGauge(
+ "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
- for k, count in p.outbound_commands_counter.counts.iteritems()
- },
- labels=["command", "name", "conn_id"],
-)
+ for k, count in iteritems(p.outbound_commands_counter.counts)
+ })
# number of updates received for each RDATA stream
-inbound_rdata_count = metrics.register_counter(
- "inbound_rdata_count",
- labels=["stream_name"],
-)
+inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
+ ["stream_name"])
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 786c3fe864..63bd6d2652 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -18,24 +18,25 @@
from twisted.internet import defer, reactor
from twisted.internet.protocol import Factory
-from streams import STREAMS_MAP, FederationStream
-from protocol import ServerReplicationStreamProtocol
+from .streams import STREAMS_MAP, FederationStream
+from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func
+from synapse.metrics import LaterGauge
import logging
-import synapse.metrics
+from prometheus_client import Counter
+from six import itervalues
-metrics = synapse.metrics.get_metrics_for(__name__)
-stream_updates_counter = metrics.register_counter(
- "stream_updates", labels=["stream_name"]
-)
-user_sync_counter = metrics.register_counter("user_sync")
-federation_ack_counter = metrics.register_counter("federation_ack")
-remove_pusher_counter = metrics.register_counter("remove_pusher")
-invalidate_cache_counter = metrics.register_counter("invalidate_cache")
-user_ip_cache_counter = metrics.register_counter("user_ip_cache")
+stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
+ "", ["stream_name"])
+user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
+federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
+remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
+invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache",
+ "")
+user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__)
@@ -69,33 +70,34 @@ class ReplicationStreamer(object):
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self._server_notices_sender = hs.get_server_notices_sender()
# Current connections.
self.connections = []
- metrics.register_callback("total_connections", lambda: len(self.connections))
+ LaterGauge("synapse_replication_tcp_resource_total_connections", "", [],
+ lambda: len(self.connections))
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
# disabled on the master.
self.streams = [
- stream(hs) for stream in STREAMS_MAP.itervalues()
+ stream(hs) for stream in itervalues(STREAMS_MAP)
if stream != FederationStream or not hs.config.send_federation
]
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
- metrics.register_callback(
- "connections_per_stream",
+ LaterGauge(
+ "synapse_replication_tcp_resource_connections_per_stream", "",
+ ["stream_name"],
lambda: {
(stream_name,): len([
conn for conn in self.connections
if stream_name in conn.replication_streams
])
for stream_name in self.streams_by_name
- },
- labels=["stream_name"],
- )
+ })
self.federation_sender = None
if not hs.config.send_federation:
@@ -175,7 +177,7 @@ class ReplicationStreamer(object):
logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
)
- stream_updates_counter.inc_by(len(updates), stream.NAME)
+ stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
@@ -253,6 +255,7 @@ class ReplicationStreamer(object):
yield self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen,
)
+ yield self._server_notices_sender.on_user_ip(user_id)
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.
|