From a901ed16b5805adf04b5b8b1b99c14720e5abb3d Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 5 Mar 2015 19:10:57 +0000 Subject: Move federation API responding code out of weird mix of lambdas into Servlet-style methods on instances --- synapse/federation/transport/server.py | 300 ++++++++++++++------------------- 1 file changed, 130 insertions(+), 170 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ece6dbcf62..eb3e30a189 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -122,14 +122,9 @@ class TransportLayerServer(object): Args: handler (TransportReceivedHandler) """ - self.received_handler = handler - - # This is when someone is trying to send us a bunch of data. - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/send/([^/]*)/$"), - self._with_authentication(self._on_send_request) - ) + FederationSendServlet( + handler, self._with_authentication, self.server_name + ).register(self.server) @log_function def register_request_handler(self, handler): @@ -138,136 +133,48 @@ class TransportLayerServer(object): Args: handler (TransportRequestHandler) """ - self.request_handler = handler - - # This is for when someone asks us for everything since version X - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/pull/$"), - self._with_authentication( - lambda origin, content, query: - handler.on_pull_request(query["origin"][0], query["v"]) - ) - ) - - # This is when someone asks for a data item for a given server - # data_id pair. - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/event/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, event_id: - handler.on_pdu_request(origin, event_id) - ) - ) - - # This is when someone asks for all data for a given context. - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/state/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, context: - handler.on_context_state_request( - origin, - context, - query.get("event_id", [None])[0], - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/backfill/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, context: - self._on_backfill_request( - origin, context, query["v"], query["limit"] - ) - ) - ) - - # This is when we receive a server-server Query - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/query/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, query_type: - handler.on_query_request( - query_type, - {k: v[0].decode("utf-8") for k, v in query.items()} - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, user_id: - self._on_make_join_request( - origin, content, query, context, user_id - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - handler.on_event_auth( - origin, context, event_id, - ) - ) - ) - - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - self._on_send_join_request( - origin, content, query, - ) - ) - ) - - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - self._on_invite_request( - origin, content, query, - ) - ) - ) - - self.server.register_path( - "POST", - re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - self._on_query_auth_request( - origin, content, event_id, - ) - ) - ) - - self.server.register_path( - "POST", - re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"), - self._with_authentication( - lambda origin, content, query, room_id: - self._get_missing_events( - origin, content, room_id, - ) - ) - ) - + for servletclass in ( + FederationPullServlet, + FederationEventServlet, + FederationStateServlet, + FederationBackfillServlet, + FederationQueryServlet, + FederationMakeJoinServlet, + FederationEventServlet, + FederationSendJoinServlet, + FederationInviteServlet, + FederationQueryAuthServlet, + FederationGetMissingEventsServlet, + ): + servletclass(handler, self._with_authentication).register(self.server) + + +class BaseFederationServlet(object): + def __init__(self, handler, wrapper): + self.handler = handler + self.wrapper = wrapper + + def register(self, server): + pattern = re.compile("^" + PREFIX + self.PATH) + + for method in ("GET", "PUT", "POST"): + code = getattr(self, "on_%s" % (method), None) + if code is None: + continue + + server.register_path(method, pattern, self.wrapper(code)) + + +class FederationSendServlet(BaseFederationServlet): + PATH = "/send/([^/]*)/$" + + def __init__(self, handler, wrapper, server_name): + super(FederationSendServlet, self).__init__(handler, wrapper) + self.server_name = server_name + + # This is when someone is trying to send us a bunch of data. @defer.inlineCallbacks - @log_function - def _on_send_request(self, origin, content, query, transaction_id): + def on_PUT(self, origin, content, query, transaction_id): """ Called on PUT /send// Args: @@ -305,8 +212,7 @@ class TransportLayerServer(object): return try: - handler = self.received_handler - code, response = yield handler.on_incoming_transaction( + code, response = yield self.handler.on_incoming_transaction( transaction_data ) except: @@ -315,65 +221,119 @@ class TransportLayerServer(object): defer.returnValue((code, response)) - @log_function - def _on_backfill_request(self, origin, context, v_list, limits): + +class FederationPullServlet(BaseFederationServlet): + PATH = "/pull/$" + + # This is for when someone asks us for everything since version X + def on_GET(self, origin, content, query): + return self.handler.on_pull_request(query["origin"][0], query["v"]) + + +class FederationEventServlet(BaseFederationServlet): + PATH = "/event/([^/]*)/$" + + # This is when someone asks for a data item for a given server data_id pair. + def on_GET(self, origin, content, query, event_id): + return self.handler.on_pdu_request(origin, event_id) + + +class FederationStateServlet(BaseFederationServlet): + PATH = "/state/([^/]*)/$" + + # This is when someone asks for all data for a given context. + def on_GET(self, origin, content, query, context): + return self.handler.on_context_state_request(origin, context, + query.get("event_id", [None])[0], + ) + + +class FederationBackfillServlet(BaseFederationServlet): + PATH = "/backfill/([^/]*)/$" + + def on_GET(self, origin, content, query, context): + versions = query["v"] + limits = query["limit"] + if not limits: - return defer.succeed( - (400, {"error": "Did not include limit param"}) - ) + return defer.succeed((400, {"error": "Did not include limit param"})) limit = int(limits[-1]) - versions = v_list + return self.handler.on_backfill_request(origin, context, versions, limit) + + +class FederationQueryServlet(BaseFederationServlet): + PATH = "/query/([^/]*)$" - return self.request_handler.on_backfill_request( - origin, context, versions, limit + # This is when we receive a server-server Query + def on_GET(self, origin, content, query, query_type): + return self.handler.on_query_request(query_type, + {k: v[0].decode("utf-8") for k, v in query.items()} ) + +class FederationMakeJoinServlet(BaseFederationServlet): + PATH = "/make_join/([^/]*)/([^/]*)$" + @defer.inlineCallbacks - @log_function - def _on_make_join_request(self, origin, content, query, context, user_id): - content = yield self.request_handler.on_make_join_request( - context, user_id, - ) + def on_GET(self, origin, content, query, context, user_id): + content = yield self.handler.on_make_join_request(context, user_id) defer.returnValue((200, content)) - @defer.inlineCallbacks - @log_function - def _on_send_join_request(self, origin, content, query): - content = yield self.request_handler.on_send_join_request( - origin, content, - ) - defer.returnValue((200, content)) +class FederationEventAuthServlet(BaseFederationServlet): + PATH = "/event_auth/([^/]*)/([^/]*)$" + + def on_GET(self, origin, content, query, context, event_id): + return self.handler.on_event_auth(origin, context, event_id) + + +class FederationSendJoinServlet(BaseFederationServlet): + PATH = "/send_join/([^/]*)/([^/]*)$" @defer.inlineCallbacks - @log_function - def _on_invite_request(self, origin, content, query): - content = yield self.request_handler.on_invite_request( - origin, content, - ) + def on_PUT(self, origin, content, query, context, event_id): + # TODO(paul): assert that context/event_id parsed from path actually + # match those given in content + content = yield self.handler.on_send_join_request(origin, content) + defer.returnValue((200, content)) + + +class FederationInviteServlet(BaseFederationServlet): + PATH = "/invite/([^/]*)/([^/]*)$" + @defer.inlineCallbacks + def on_PUT(self, origin, content, query, context, event_id): + # TODO(paul): assert that context/event_id parsed from path actually + # match those given in content + content = yield self.handler.on_invite_request(origin, content) defer.returnValue((200, content)) + +class FederationQueryAuthServlet(BaseFederationServlet): + PATH = "/query_auth/([^/]*)/([^/]*)$" + @defer.inlineCallbacks - @log_function - def _on_query_auth_request(self, origin, content, event_id): - new_content = yield self.request_handler.on_query_auth_request( + def on_POST(self, origin, content, query, context, event_id): + new_content = yield self.handler.on_query_auth_request( origin, content, event_id ) defer.returnValue((200, new_content)) + +class FederationGetMissingEventsServlet(BaseFederationServlet): + PATH = "/get_missing_events/([^/]*)/?$" + @defer.inlineCallbacks - @log_function - def _get_missing_events(self, origin, content, room_id): + def on_POST(self, origin, content, query, room_id): limit = int(content.get("limit", 10)) min_depth = int(content.get("min_depth", 0)) earliest_events = content.get("earliest_events", []) latest_events = content.get("latest_events", []) - content = yield self.request_handler.on_get_missing_events( + content = yield self.handler.on_get_missing_events( origin, room_id=room_id, earliest_events=earliest_events, -- cgit 1.5.1 From ba8ac996f951c872c8815f09a4ffd3a508da6863 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 5 Mar 2015 19:43:17 +0000 Subject: Remove the dead 'rate_limit_origin' method from TransportLayerServer --- synapse/federation/transport/server.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index eb3e30a189..dc9f1e082b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -109,12 +109,6 @@ class TransportLayerServer(object): defer.returnValue(response) return new_handler - def rate_limit_origin(self, handler): - def new_handler(origin, *args, **kwargs): - response = yield handler(origin, *args, **kwargs) - defer.returnValue(response) - return new_handler() - @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. -- cgit 1.5.1 From 7644cb79b222207ef739a9ca29699f32aa3cee0b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 5 Mar 2015 20:33:16 +0000 Subject: Slightly neater(?) arrangement of authentication wrapper for HTTP servlet methods --- synapse/federation/transport/server.py | 62 ++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 25 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index dc9f1e082b..39b18ae303 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -19,6 +19,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function +import functools import logging import simplejson as json import re @@ -30,8 +31,9 @@ logger = logging.getLogger(__name__) class TransportLayerServer(object): """Handles incoming federation HTTP requests""" + # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks - def _authenticate_request(self, request): + def authenticate_request(self, request): json_request = { "method": request.method, "uri": request.uri, @@ -93,22 +95,6 @@ class TransportLayerServer(object): defer.returnValue((origin, content)) - def _with_authentication(self, handler): - @defer.inlineCallbacks - def new_handler(request, *args, **kwargs): - try: - (origin, content) = yield self._authenticate_request(request) - with self.ratelimiter.ratelimit(origin) as d: - yield d - response = yield handler( - origin, content, request.args, *args, **kwargs - ) - except: - logger.exception("_authenticate_request failed") - raise - defer.returnValue(response) - return new_handler - @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. @@ -116,8 +102,10 @@ class TransportLayerServer(object): Args: handler (TransportReceivedHandler) """ - FederationSendServlet( - handler, self._with_authentication, self.server_name + FederationSendServlet(handler, + authenticator=self, + ratelimiter=self.ratelimiter, + server_name=self.server_name, ).register(self.server) @log_function @@ -140,13 +128,37 @@ class TransportLayerServer(object): FederationQueryAuthServlet, FederationGetMissingEventsServlet, ): - servletclass(handler, self._with_authentication).register(self.server) + servletclass(handler, + authenticator=self, + ratelimiter=self.ratelimiter, + ).register(self.server) class BaseFederationServlet(object): - def __init__(self, handler, wrapper): + def __init__(self, handler, authenticator, ratelimiter): self.handler = handler - self.wrapper = wrapper + self.authenticator = authenticator + self.ratelimiter = ratelimiter + + def _wrap(self, code): + authenticator = self.authenticator + ratelimiter = self.ratelimiter + + @defer.inlineCallbacks + @functools.wraps(code) + def new_code(request, *args, **kwargs): + try: + (origin, content) = yield authenticator.authenticate_request(request) + with ratelimiter.ratelimit(origin) as d: + yield d + response = yield code( + origin, content, request.args, *args, **kwargs + ) + except: + logger.exception("authenticate_request failed") + raise + defer.returnValue(response) + return new_code def register(self, server): pattern = re.compile("^" + PREFIX + self.PATH) @@ -156,14 +168,14 @@ class BaseFederationServlet(object): if code is None: continue - server.register_path(method, pattern, self.wrapper(code)) + server.register_path(method, pattern, self._wrap(code)) class FederationSendServlet(BaseFederationServlet): PATH = "/send/([^/]*)/$" - def __init__(self, handler, wrapper, server_name): - super(FederationSendServlet, self).__init__(handler, wrapper) + def __init__(self, handler, server_name, **kwargs): + super(FederationSendServlet, self).__init__(handler, **kwargs) self.server_name = server_name # This is when someone is trying to send us a bunch of data. -- cgit 1.5.1 From 5eab2549ab13c14535de266cc153dc6d5b479590 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 5 Mar 2015 20:36:05 +0000 Subject: Append a $ on PATH at registration time, meaning each PATH attribute doesn't need it --- synapse/federation/transport/server.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 39b18ae303..8f985f8fe3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -161,7 +161,7 @@ class BaseFederationServlet(object): return new_code def register(self, server): - pattern = re.compile("^" + PREFIX + self.PATH) + pattern = re.compile("^" + PREFIX + self.PATH + "$") for method in ("GET", "PUT", "POST"): code = getattr(self, "on_%s" % (method), None) @@ -172,7 +172,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/([^/]*)/$" + PATH = "/send/([^/]*)/" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__(handler, **kwargs) @@ -229,7 +229,7 @@ class FederationSendServlet(BaseFederationServlet): class FederationPullServlet(BaseFederationServlet): - PATH = "/pull/$" + PATH = "/pull/" # This is for when someone asks us for everything since version X def on_GET(self, origin, content, query): @@ -237,7 +237,7 @@ class FederationPullServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/([^/]*)/$" + PATH = "/event/([^/]*)/" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -245,7 +245,7 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/([^/]*)/$" + PATH = "/state/([^/]*)/" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): @@ -255,7 +255,7 @@ class FederationStateServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/([^/]*)/$" + PATH = "/backfill/([^/]*)/" def on_GET(self, origin, content, query, context): versions = query["v"] @@ -270,7 +270,7 @@ class FederationBackfillServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet): - PATH = "/query/([^/]*)$" + PATH = "/query/([^/]*)" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): @@ -280,7 +280,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): - PATH = "/make_join/([^/]*)/([^/]*)$" + PATH = "/make_join/([^/]*)/([^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -289,14 +289,14 @@ class FederationMakeJoinServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth/([^/]*)/([^/]*)$" + PATH = "/event_auth/([^/]*)/([^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): - PATH = "/send_join/([^/]*)/([^/]*)$" + PATH = "/send_join/([^/]*)/([^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -307,7 +307,7 @@ class FederationSendJoinServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet): - PATH = "/invite/([^/]*)/([^/]*)$" + PATH = "/invite/([^/]*)/([^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -318,7 +318,7 @@ class FederationInviteServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/([^/]*)/([^/]*)$" + PATH = "/query_auth/([^/]*)/([^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): @@ -330,7 +330,8 @@ class FederationQueryAuthServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet): - PATH = "/get_missing_events/([^/]*)/?$" + # TODO(paul): Why does this path alone end with "/?" optional? + PATH = "/get_missing_events/([^/]*)/?" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): -- cgit 1.5.1 From d79d91a4a7bdd42bc6c4d0324623e11c8bd3c5ef Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 5 Mar 2015 20:53:33 +0000 Subject: Appease pep8 --- synapse/federation/transport/server.py | 46 ++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 19 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 8f985f8fe3..6c624977d7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -102,7 +102,8 @@ class TransportLayerServer(object): Args: handler (TransportReceivedHandler) """ - FederationSendServlet(handler, + FederationSendServlet( + handler, authenticator=self, ratelimiter=self.ratelimiter, server_name=self.server_name, @@ -115,20 +116,9 @@ class TransportLayerServer(object): Args: handler (TransportRequestHandler) """ - for servletclass in ( - FederationPullServlet, - FederationEventServlet, - FederationStateServlet, - FederationBackfillServlet, - FederationQueryServlet, - FederationMakeJoinServlet, - FederationEventServlet, - FederationSendJoinServlet, - FederationInviteServlet, - FederationQueryAuthServlet, - FederationGetMissingEventsServlet, - ): - servletclass(handler, + for servletclass in SERVLET_CLASSES: + servletclass( + handler, authenticator=self, ratelimiter=self.ratelimiter, ).register(self.server) @@ -138,11 +128,11 @@ class BaseFederationServlet(object): def __init__(self, handler, authenticator, ratelimiter): self.handler = handler self.authenticator = authenticator - self.ratelimiter = ratelimiter + self.ratelimiter = ratelimiter def _wrap(self, code): authenticator = self.authenticator - ratelimiter = self.ratelimiter + ratelimiter = self.ratelimiter @defer.inlineCallbacks @functools.wraps(code) @@ -249,7 +239,9 @@ class FederationStateServlet(BaseFederationServlet): # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): - return self.handler.on_context_state_request(origin, context, + return self.handler.on_context_state_request( + origin, + context, query.get("event_id", [None])[0], ) @@ -274,7 +266,8 @@ class FederationQueryServlet(BaseFederationServlet): # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): - return self.handler.on_query_request(query_type, + return self.handler.on_query_request( + query_type, {k: v[0].decode("utf-8") for k, v in query.items()} ) @@ -350,3 +343,18 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): ) defer.returnValue((200, content)) + + +SERVLET_CLASSES = ( + FederationPullServlet, + FederationEventServlet, + FederationStateServlet, + FederationBackfillServlet, + FederationQueryServlet, + FederationMakeJoinServlet, + FederationEventServlet, + FederationSendJoinServlet, + FederationInviteServlet, + FederationQueryAuthServlet, + FederationGetMissingEventsServlet, +) -- cgit 1.5.1 From 98b867f7b7f6b82eafa8d7694ed4703b7ce47b19 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Mar 2015 10:15:58 +0000 Subject: Fix bug in logging. --- synapse/federation/transaction_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 741a4e7a1a..e6eb85501a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -144,7 +144,7 @@ class TransactionQueue(object): deferred.errback(failure) def log_failure(failure): - logger.warn("Failed to send pdu", failure.value) + logger.warn("Failed to send pdu: %s", failure.value) deferred.addErrback(log_failure) -- cgit 1.5.1 From abaf47bbb6b9be0b493d07ca8a9efe4cdf0aee01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Mar 2015 10:28:24 +0000 Subject: Fix bug in logging. --- synapse/federation/transaction_queue.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index e6eb85501a..9dc7849b17 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -115,8 +115,8 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) - def log_failure(failure): - logger.warn("Failed to send pdu", failure.value) + def log_failure(f): + logger.warn("Failed to send pdu to %s: %s", destination, f.value) deferred.addErrback(log_failure) @@ -143,8 +143,8 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) - def log_failure(failure): - logger.warn("Failed to send pdu: %s", failure.value) + def log_failure(f): + logger.warn("Failed to send edu to %s: %s", destination, f.value) deferred.addErrback(log_failure) @@ -174,7 +174,7 @@ class TransactionQueue(object): deferred.errback(f) def log_failure(f): - logger.warn("Failed to send pdu", f.value) + logger.warn("Failed to send failure to %s: %s", destination, f.value) deferred.addErrback(log_failure) -- cgit 1.5.1 From 120b6892840bae0e791348da4b1b35761e841b55 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 24 Feb 2015 17:20:14 +0000 Subject: Delete pointless (and unreachable) __init__ method from FederationClient --- synapse/federation/federation_client.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f131941f45..2284fc1d99 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -37,8 +37,6 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): - def __init__(self): - self._get_pdu_cache = None def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( -- cgit 1.5.1 From 9470412316dee5c782b0815383fff1ba10002f15 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 24 Feb 2015 18:10:44 +0000 Subject: Initial attempt at sprinkling some @metrics.counted decorations around the federation code --- synapse/federation/federation_client.py | 14 ++++++++++++++ synapse/federation/federation_server.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2284fc1d99..ef177b79cc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,6 +25,7 @@ from synapse.api.errors import ( from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent +import synapse.metrics from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination @@ -35,6 +36,8 @@ import random logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + class FederationClient(FederationBase): @@ -50,6 +53,7 @@ class FederationClient(FederationBase): self._get_pdu_cache.start() @log_function + @metrics.counted def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. @@ -77,6 +81,7 @@ class FederationClient(FederationBase): ) @log_function + @metrics.counted def send_edu(self, destination, edu_type, content): edu = Edu( origin=self.server_name, @@ -90,11 +95,13 @@ class FederationClient(FederationBase): return defer.succeed(None) @log_function + @metrics.counted def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) return defer.succeed(None) @log_function + @metrics.counted def make_query(self, destination, query_type, args, retry_on_dns_fail=True): """Sends a federation Query to a remote homeserver of the given type @@ -156,6 +163,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def get_pdu(self, destinations, event_id, outlier=False): """Requests the PDU with given origin and ID from the remote home servers. @@ -245,6 +253,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def get_state_for_room(self, destination, room_id, event_id): """Requests all of the `current` state PDUs for a given room from a remote home server. @@ -285,6 +294,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( destination, room_id, event_id, @@ -304,6 +314,7 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks + @metrics.counted def make_join(self, destinations, room_id, user_id): for destination in destinations: try: @@ -330,6 +341,7 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks + @metrics.counted def send_join(self, destinations, pdu): for destination in destinations: try: @@ -379,6 +391,7 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks + @metrics.counted def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() code, content = yield self.transport_layer.send_invite( @@ -402,6 +415,7 @@ class FederationClient(FederationBase): defer.returnValue(pdu) @defer.inlineCallbacks + @metrics.counted def query_auth(self, destination, room_id, event_id, local_auth): """ Params: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9c7dcdba96..3216fca95f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,6 +22,7 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent +import synapse.metrics from synapse.api.errors import FederationError, SynapseError @@ -32,6 +33,8 @@ import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + class FederationServer(FederationBase): def set_handler(self, handler): @@ -72,6 +75,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def on_backfill_request(self, origin, room_id, versions, limit): pdus = yield self.handler.on_backfill_request( origin, room_id, versions, limit @@ -81,6 +85,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def on_incoming_transaction(self, transaction_data): transaction = Transaction(**transaction_data) @@ -160,6 +165,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def on_context_state_request(self, origin, room_id, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( @@ -187,6 +193,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def on_pdu_request(self, origin, event_id): pdu = yield self._get_persisted_pdu(origin, event_id) @@ -199,10 +206,12 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + @metrics.counted def on_pull_request(self, origin, versions): raise NotImplementedError("Pull transactions not implemented") @defer.inlineCallbacks + @metrics.counted def on_query_request(self, query_type, args): if query_type in self.query_handlers: response = yield self.query_handlers[query_type](args) @@ -213,12 +222,14 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks + @metrics.counted def on_make_join_request(self, room_id, user_id): pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() defer.returnValue({"event": pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks + @metrics.counted def on_invite_request(self, origin, content): pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) @@ -226,6 +237,7 @@ class FederationServer(FederationBase): defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @defer.inlineCallbacks + @metrics.counted def on_send_join_request(self, origin, content): logger.debug("on_send_join_request: content: %s", content) pdu = self.event_from_pdu_json(content) @@ -240,6 +252,7 @@ class FederationServer(FederationBase): })) @defer.inlineCallbacks + @metrics.counted def on_event_auth(self, origin, room_id, event_id): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) @@ -248,6 +261,7 @@ class FederationServer(FederationBase): })) @defer.inlineCallbacks + @metrics.counted def on_query_auth_request(self, origin, content, event_id): """ Content is a dict with keys:: -- cgit 1.5.1 From 094803cf82cc748dd32fe4b03c6db016aeb90075 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 4 Mar 2015 18:15:34 +0000 Subject: Put vector gauges on transaction queue pending PDU and EDU dicts --- synapse/federation/transaction_queue.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9dc7849b17..b9d3f89324 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) +import synapse.metrics import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -56,9 +59,9 @@ class TransactionQueue(object): # Is a mapping from destination -> list of # tuple(pending pdus, deferred, order) - self.pending_pdus_by_dest = {} + self.pending_pdus_by_dest = pdus = {} # destination -> list of tuple(edu, deferred) - self.pending_edus_by_dest = {} + self.pending_edus_by_dest = edus = {} # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} @@ -66,6 +69,15 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) + metrics.register_callback("pending_pdus", + lambda: {(dest,): len(pdus[dest]) for dest in pdus.keys()}, + keys=["dest"], + ) + metrics.register_callback("pending_edus", + lambda: {(dest,): len(edus[dest]) for dest in edus.keys()}, + keys=["dest"], + ) + def can_send_to(self, destination): """Can we send messages to the given server? -- cgit 1.5.1 From f9478e475bf645038b4f1f163240d7fd0ec02af0 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 6 Mar 2015 15:28:06 +0000 Subject: Rename Metrics' "keys" to "labels" --- synapse/federation/transaction_queue.py | 4 ++-- synapse/http/client.py | 4 ++-- synapse/http/matrixfederationclient.py | 4 ++-- synapse/http/server.py | 4 ++-- synapse/metrics/metric.py | 24 ++++++++++++------------ synapse/storage/_base.py | 6 +++--- tests/metrics/test_metric.py | 6 +++--- 7 files changed, 26 insertions(+), 26 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index b9d3f89324..ae62c69fc3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -71,11 +71,11 @@ class TransactionQueue(object): metrics.register_callback("pending_pdus", lambda: {(dest,): len(pdus[dest]) for dest in pdus.keys()}, - keys=["dest"], + labels=["dest"], ) metrics.register_callback("pending_edus", lambda: {(dest,): len(edus[dest]) for dest in edus.keys()}, - keys=["dest"], + labels=["dest"], ) def can_send_to(self, destination): diff --git a/synapse/http/client.py b/synapse/http/client.py index e40e82e80b..ad2c9c05ec 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -35,10 +35,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) outgoing_requests_counter = metrics.register_counter("outgoing_requests", - keys=["method"], + labels=["method"], ) incoming_responses_counter = metrics.register_counter("incoming_responses", - keys=["method","code"], + labels=["method","code"], ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 0091527693..6b6d79a044 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -44,10 +44,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) outgoing_requests_counter = metrics.register_counter("outgoing_requests", - keys=["method"], + labels=["method"], ) incoming_responses_counter = metrics.register_counter("incoming_responses", - keys=["method","code"], + labels=["method","code"], ) diff --git a/synapse/http/server.py b/synapse/http/server.py index ac893bb40c..35bd3a00ba 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -38,10 +38,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) incoming_requests_counter = metrics.register_counter("incoming_requests", - keys=["method"], + labels=["method"], ) outgoing_responses_counter = metrics.register_counter("outgoing_responses", - keys=["method","code"], + labels=["method","code"], ) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 4a6ab9cd74..8ba13075f7 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -25,22 +25,22 @@ def map_concat(func, items): class BaseMetric(object): - def __init__(self, name, keys=[]): + def __init__(self, name, labels=[]): self.name = name - self.keys = keys # OK not to clone as we never write it + self.labels = labels # OK not to clone as we never write it def dimension(self): - return len(self.keys) + return len(self.labels) def is_scalar(self): - return not len(self.keys) + return not len(self.labels) def _render_key(self, values): if self.is_scalar(): return "" # TODO: some kind of value escape return "{%s}" % ( - ",".join(["%s=%s" % kv for kv in zip(self.keys, values)]) + ",".join(["%s=%s" % kv for kv in zip(self.labels, values)]) ) def render(self): @@ -62,7 +62,7 @@ class CounterMetric(BaseMetric): def inc(self, *values): if len(values) != self.dimension(): - raise ValueError("Expected as many values to inc() as keys (%d)" % + raise ValueError("Expected as many values to inc() as labels (%d)" % (self.dimension()) ) @@ -85,8 +85,8 @@ class CallbackMetric(BaseMetric): it is rendered. Typically this is used to implement gauges that yield the size or other state of some in-memory object by actively querying it.""" - def __init__(self, name, callback, keys=[]): - super(CallbackMetric, self).__init__(name, keys=keys) + def __init__(self, name, callback, labels=[]): + super(CallbackMetric, self).__init__(name, labels=labels) self.callback = callback @@ -139,15 +139,15 @@ class CacheMetric(object): This metric generates standard metric name pairs, so that monitoring rules can easily be applied to measure hit ratio.""" - def __init__(self, name, size_callback, keys=[]): + def __init__(self, name, size_callback, labels=[]): self.name = name - self.hits = CounterMetric(name + ":hits", keys=keys) - self.misses = CounterMetric(name + ":misses", keys=keys) + self.hits = CounterMetric(name + ":hits", labels=labels) + self.misses = CounterMetric(name + ":misses", labels=labels) self.size = CallbackMetric(name + ":size", callback=size_callback, - keys=keys, + labels=labels, ) def inc_hits(self, *values): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d8c5a60c71..a38b603584 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -38,9 +38,9 @@ transaction_logger = logging.getLogger("synapse.storage.txn") metrics = synapse.metrics.get_metrics_for("synapse.storage") -sql_query_timer = metrics.register_timer("queries", keys=["verb"]) -sql_txn_timer = metrics.register_timer("transactions", keys=["desc"]) -sql_getevents_timer = metrics.register_timer("get_events", keys=["desc"]) +sql_query_timer = metrics.register_timer("queries", labels=["verb"]) +sql_txn_timer = metrics.register_timer("transactions", labels=["desc"]) +sql_getevents_timer = metrics.register_timer("get_events", labels=["desc"]) # TODO(paul): diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py index b25520821d..fefe1a5867 100644 --- a/tests/metrics/test_metric.py +++ b/tests/metrics/test_metric.py @@ -43,7 +43,7 @@ class CounterMetricTestCase(unittest.TestCase): ]) def test_vector(self): - counter = CounterMetric("vector", keys=["method"]) + counter = CounterMetric("vector", labels=["method"]) # Empty counter doesn't yet know what values it has self.assertEquals(counter.render(), []) @@ -83,7 +83,7 @@ class CallbackMetricTestCase(unittest.TestCase): def test_vector(self): vals = dict() - metric = CallbackMetric("values", lambda: vals, keys=["type"]) + metric = CallbackMetric("values", lambda: vals, labels=["type"]) self.assertEquals(metric.render(), []) @@ -115,7 +115,7 @@ class TimerMetricTestCase(unittest.TestCase): ]) def test_vector(self): - metric = TimerMetric("queries", keys=["verb"]) + metric = TimerMetric("queries", labels=["verb"]) self.assertEquals(metric.render(), []) -- cgit 1.5.1 From b0cf86731957876ca877c35bf30c6f695f1a544c Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 6 Mar 2015 16:18:21 +0000 Subject: Use _ instead of . as a metric namespacing separator, for Prometheus --- synapse/federation/transaction_queue.py | 4 ++-- synapse/handlers/presence.py | 2 +- synapse/http/client.py | 4 ++-- synapse/http/matrixfederationclient.py | 4 ++-- synapse/http/server.py | 4 ++-- synapse/metrics/__init__.py | 14 +++++++++++--- synapse/notifier.py | 2 +- synapse/storage/_base.py | 18 +++++++++++++----- 8 files changed, 34 insertions(+), 18 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ae62c69fc3..ca5bcf21cf 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -69,11 +69,11 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) - metrics.register_callback("pending_pdus", + metrics.register_callback("pendingPdus", lambda: {(dest,): len(pdus[dest]) for dest in pdus.keys()}, labels=["dest"], ) - metrics.register_callback("pending_edus", + metrics.register_callback("pendingEdus", lambda: {(dest,): len(edus[dest]) for dest in edus.keys()}, labels=["dest"], ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 698946a48b..c6d6aef53b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -136,7 +136,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 - metrics.register_callback("user_cachemap:size", + metrics.register_callback("userCachemap:size", lambda: len(self._user_cachemap) ) diff --git a/synapse/http/client.py b/synapse/http/client.py index ad2c9c05ec..01737a7188 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -34,10 +34,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -outgoing_requests_counter = metrics.register_counter("outgoing_requests", +outgoing_requests_counter = metrics.register_counter("requests", labels=["method"], ) -incoming_responses_counter = metrics.register_counter("incoming_responses", +incoming_responses_counter = metrics.register_counter("responses", labels=["method","code"], ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6b6d79a044..11883d3852 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,10 +43,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -outgoing_requests_counter = metrics.register_counter("outgoing_requests", +outgoing_requests_counter = metrics.register_counter("requests", labels=["method"], ) -incoming_responses_counter = metrics.register_counter("incoming_responses", +incoming_responses_counter = metrics.register_counter("responses", labels=["method","code"], ) diff --git a/synapse/http/server.py b/synapse/http/server.py index 35bd3a00ba..23708c08c9 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -37,10 +37,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -incoming_requests_counter = metrics.register_counter("incoming_requests", +incoming_requests_counter = metrics.register_counter("requests", labels=["method"], ) -outgoing_responses_counter = metrics.register_counter("outgoing_responses", +outgoing_responses_counter = metrics.register_counter("responses", labels=["method","code"], ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 443d67f41c..47e475acd2 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -41,7 +41,12 @@ class Metrics(object): self.name_prefix = name def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s.%s" % (self.name_prefix, name) + if "_" in name: + raise ValueError("Metric names %s is invalid as it cannot contain an underscore" + % (name) + ) + + full_name = "%s_%s" % (self.name_prefix, name) metric = metric_class(full_name, *args, **kwargs) @@ -78,10 +83,13 @@ class Metrics(object): return wrapped -def get_metrics_for(name): +def get_metrics_for(pkg_name): """ Returns a Metrics instance for conveniently creating metrics namespaced with the given name prefix. """ - return Metrics(name) + + # Convert a "package.name" to "package_name" because Prometheus doesn't + # let us use . in metric names + return Metrics(pkg_name.replace(".", "_")) def render_all(): diff --git a/synapse/notifier.py b/synapse/notifier.py index 1f7cad624e..75e8152d03 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -122,7 +122,7 @@ class Notifier(object): all_listeners |= x return len(all_listeners) - metrics.register_callback("all_listeners", count_listeners) + metrics.register_callback("listeners", count_listeners) metrics.register_callback("rooms", lambda: count(bool, self.room_to_listeners.values()) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a38b603584..35d118c586 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -40,7 +40,14 @@ metrics = synapse.metrics.get_metrics_for("synapse.storage") sql_query_timer = metrics.register_timer("queries", labels=["verb"]) sql_txn_timer = metrics.register_timer("transactions", labels=["desc"]) -sql_getevents_timer = metrics.register_timer("get_events", labels=["desc"]) +sql_getevents_timer = metrics.register_timer("getEvents", labels=["desc"]) + +caches_by_name = {} +cache_counter = metrics.register_cache( + "cache", + lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, + labels=["name"], +) # TODO(paul): @@ -62,8 +69,9 @@ def cached(max_entries=1000): """ def wrap(orig): cache = OrderedDict() + name = orig.__name__ - counter = metrics.register_cache(orig.__name__, lambda: len(cache)) + caches_by_name[name] = cache def prefill(key, value): while len(cache) > max_entries: @@ -74,10 +82,10 @@ def cached(max_entries=1000): @defer.inlineCallbacks def wrapped(self, key): if key in cache: - counter.inc_hits() + cache_counter.inc_hits(name) defer.returnValue(cache[key]) - counter.inc_misses() + cache_counter.inc_misses(name) ret = yield orig(self, key) prefill(key, ret) defer.returnValue(ret) @@ -195,7 +203,7 @@ class SQLBaseStore(object): self._get_event_cache = LruCache(hs.config.event_cache_size) self._get_event_cache_counter = metrics.register_cache( - "get_event_cache", + "getEventCache", size_callback=lambda: len(self._get_event_cache), ) -- cgit 1.5.1 From 1748605c5d69cb93cbe6bb4d93060124cdc9282f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 9 Mar 2015 18:34:20 +0000 Subject: Count incoming HTTP requests per servlet that responds --- synapse/federation/transport/server.py | 4 ++++ synapse/http/server.py | 18 +++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 6c624977d7..7838a81362 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -148,6 +148,10 @@ class BaseFederationServlet(object): logger.exception("authenticate_request failed") raise defer.returnValue(response) + + # Extra logic that functools.wraps() doesn't finish + new_code.__self__ = code.__self__ + return new_code def register(self, server): diff --git a/synapse/http/server.py b/synapse/http/server.py index 23708c08c9..a0d190ff78 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -38,7 +38,7 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) incoming_requests_counter = metrics.register_counter("requests", - labels=["method"], + labels=["method", "servlet"], ) outgoing_responses_counter = metrics.register_counter("responses", labels=["method","code"], @@ -122,8 +122,6 @@ class JsonResource(HttpServer, resource.Resource): This checks if anyone has registered a callback for that method and path. """ - incoming_requests_counter.inc(request.method) - code = None start = self.clock.time_msec() try: @@ -143,6 +141,15 @@ class JsonResource(HttpServer, resource.Resource): # returned response. We pass both the request and any # matched groups from the regex to the callback. + callback = path_entry.callback + + servlet_instance = getattr(callback, "__self__", None) + if servlet_instance is not None: + servlet_classname = servlet_instance.__class__.__name__ + else: + servlet_classname = "%r" % callback + incoming_requests_counter.inc(request.method, servlet_classname) + args = [ urllib.unquote(u).decode("UTF-8") for u in m.groups() ] @@ -152,10 +159,7 @@ class JsonResource(HttpServer, resource.Resource): request.method, request.path ) - code, response = yield path_entry.callback( - request, - *args - ) + code, response = yield callback(request, *args) self._send_response(request, code, response) return -- cgit 1.5.1 From 2e4f0b2bd736fd70040d936145948b65b4e00b12 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 10 Mar 2015 15:29:22 +0000 Subject: Replace the @metrics.counted annotations in federation with specifically-written counters and distributions --- synapse/federation/federation_client.py | 27 +++++++++++++++------------ synapse/federation/federation_server.py | 26 ++++++++++++++------------ synapse/metrics/__init__.py | 17 ----------------- 3 files changed, 29 insertions(+), 41 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ef177b79cc..6811a0e3d1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -36,7 +36,15 @@ import random logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) + +# synapse.federation.federation_client is a silly name +metrics = synapse.metrics.get_metrics_for("synapse.federation.client") + +sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations") + +sent_edus_counter = metrics.register_counter("sent_edus") + +sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) class FederationClient(FederationBase): @@ -53,7 +61,6 @@ class FederationClient(FederationBase): self._get_pdu_cache.start() @log_function - @metrics.counted def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. @@ -70,6 +77,8 @@ class FederationClient(FederationBase): order = self._order self._order += 1 + sent_pdus_destination_dist.inc_by(len(destinations)) + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. @@ -81,7 +90,6 @@ class FederationClient(FederationBase): ) @log_function - @metrics.counted def send_edu(self, destination, edu_type, content): edu = Edu( origin=self.server_name, @@ -90,18 +98,18 @@ class FederationClient(FederationBase): content=content, ) + sent_edus_counter.inc() + # TODO, add errback, etc. self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) @log_function - @metrics.counted def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) return defer.succeed(None) @log_function - @metrics.counted def make_query(self, destination, query_type, args, retry_on_dns_fail=True): """Sends a federation Query to a remote homeserver of the given type @@ -118,6 +126,8 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ + sent_queries_counter.inc(query_type) + return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail ) @@ -163,7 +173,6 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def get_pdu(self, destinations, event_id, outlier=False): """Requests the PDU with given origin and ID from the remote home servers. @@ -253,7 +262,6 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def get_state_for_room(self, destination, room_id, event_id): """Requests all of the `current` state PDUs for a given room from a remote home server. @@ -294,7 +302,6 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( destination, room_id, event_id, @@ -314,7 +321,6 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks - @metrics.counted def make_join(self, destinations, room_id, user_id): for destination in destinations: try: @@ -341,7 +347,6 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - @metrics.counted def send_join(self, destinations, pdu): for destination in destinations: try: @@ -391,7 +396,6 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - @metrics.counted def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() code, content = yield self.transport_layer.send_invite( @@ -415,7 +419,6 @@ class FederationClient(FederationBase): defer.returnValue(pdu) @defer.inlineCallbacks - @metrics.counted def query_auth(self, destination, room_id, event_id, local_auth): """ Params: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3216fca95f..25c0014f97 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -33,7 +33,14 @@ import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) +# synapse.federation.federation_server is a silly name +metrics = synapse.metrics.get_metrics_for("synapse.federation.server") + +received_pdus_counter = metrics.register_counter("received_pdus") + +received_edus_counter = metrics.register_counter("received_edus") + +received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) class FederationServer(FederationBase): @@ -75,7 +82,6 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def on_backfill_request(self, origin, room_id, versions, limit): pdus = yield self.handler.on_backfill_request( origin, room_id, versions, limit @@ -85,10 +91,11 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def on_incoming_transaction(self, transaction_data): transaction = Transaction(**transaction_data) + received_pdus_counter.inc_by(len(transaction.pdus)) + for p in transaction.pdus: if "unsigned" in p: unsigned = p["unsigned"] @@ -158,6 +165,8 @@ class FederationServer(FederationBase): defer.returnValue((200, response)) def received_edu(self, origin, edu_type, content): + received_edus_counter.inc() + if edu_type in self.edu_handlers: self.edu_handlers[edu_type](origin, content) else: @@ -165,7 +174,6 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def on_context_state_request(self, origin, room_id, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( @@ -193,7 +201,6 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def on_pdu_request(self, origin, event_id): pdu = yield self._get_persisted_pdu(origin, event_id) @@ -206,13 +213,13 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - @metrics.counted def on_pull_request(self, origin, versions): raise NotImplementedError("Pull transactions not implemented") @defer.inlineCallbacks - @metrics.counted def on_query_request(self, query_type, args): + received_queries_counter.inc(query_type) + if query_type in self.query_handlers: response = yield self.query_handlers[query_type](args) defer.returnValue((200, response)) @@ -222,14 +229,12 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - @metrics.counted def on_make_join_request(self, room_id, user_id): pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() defer.returnValue({"event": pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks - @metrics.counted def on_invite_request(self, origin, content): pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) @@ -237,7 +242,6 @@ class FederationServer(FederationBase): defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @defer.inlineCallbacks - @metrics.counted def on_send_join_request(self, origin, content): logger.debug("on_send_join_request: content: %s", content) pdu = self.event_from_pdu_json(content) @@ -252,7 +256,6 @@ class FederationServer(FederationBase): })) @defer.inlineCallbacks - @metrics.counted def on_event_auth(self, origin, room_id, event_id): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) @@ -261,7 +264,6 @@ class FederationServer(FederationBase): })) @defer.inlineCallbacks - @metrics.counted def on_query_auth_request(self, origin, content, event_id): """ Content is a dict with keys:: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index f85c6418e5..94164974fc 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -62,23 +62,6 @@ class Metrics(object): def register_cache(self, *args, **kwargs): return self._register(CacheMetric, *args, **kwargs) - def counted(self, func): - """ A method decorator that registers a counter, to count invocations - of this method. """ - if not hasattr(self, "method_counter"): - self.method_counter = self.register_counter( - "calls", - labels=["method"] - ) - - counter = self.method_counter - name = func.__name__ - - def wrapped(*args, **kwargs): - counter.inc(name) - return func(*args, **kwargs) - return wrapped - def get_metrics_for(pkg_name): """ Returns a Metrics instance for conveniently creating metrics -- cgit 1.5.1 From c782e893ec5ceaf7e8136f45c9e6cfa8b11ec653 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 10 Mar 2015 18:24:52 +0000 Subject: Neater metrics from TransactionQueue --- synapse/federation/transaction_queue.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ca5bcf21cf..99e386fa52 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -57,27 +57,29 @@ class TransactionQueue(object): # done self.pending_transactions = {} + metrics.register_callback("pending_destinations", + lambda: len(self.pending_transactions), + ) + # Is a mapping from destination -> list of # tuple(pending pdus, deferred, order) self.pending_pdus_by_dest = pdus = {} # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + metrics.register_callback("pending_pdus", + lambda: sum(map(len, pdus.values())), + ) + metrics.register_callback("pending_edus", + lambda: sum(map(len, edus.values())), + ) + # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) - metrics.register_callback("pendingPdus", - lambda: {(dest,): len(pdus[dest]) for dest in pdus.keys()}, - labels=["dest"], - ) - metrics.register_callback("pendingEdus", - lambda: {(dest,): len(edus[dest]) for dest in edus.keys()}, - labels=["dest"], - ) - def can_send_to(self, destination): """Can we send messages to the given server? -- cgit 1.5.1 From 128cf2daf76e5b05a4e577b60ea406fdbb6986bf Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 12 Mar 2015 16:24:38 +0000 Subject: Appease pep8 --- synapse/federation/transaction_queue.py | 9 ++++++--- synapse/handlers/presence.py | 5 +++-- synapse/http/client.py | 10 +++++++--- synapse/http/matrixfederationclient.py | 10 +++++++--- synapse/http/server.py | 8 +++++--- synapse/metrics/__init__.py | 3 ++- synapse/metrics/metric.py | 11 ++++++----- synapse/metrics/resource.py | 2 +- synapse/notifier.py | 15 +++++++++------ 9 files changed, 46 insertions(+), 27 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 99e386fa52..4dccd93d0e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -57,7 +57,8 @@ class TransactionQueue(object): # done self.pending_transactions = {} - metrics.register_callback("pending_destinations", + metrics.register_callback( + "pending_destinations", lambda: len(self.pending_transactions), ) @@ -67,10 +68,12 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} - metrics.register_callback("pending_pdus", + metrics.register_callback( + "pending_pdus", lambda: sum(map(len, pdus.values())), ) - metrics.register_callback("pending_edus", + metrics.register_callback( + "pending_edus", lambda: sum(map(len, edus.values())), ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c6d6aef53b..731df00648 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -136,8 +136,9 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 - metrics.register_callback("userCachemap:size", - lambda: len(self._user_cachemap) + metrics.register_callback( + "userCachemap:size", + lambda: len(self._user_cachemap), ) def _get_or_make_usercache(self, user): diff --git a/synapse/http/client.py b/synapse/http/client.py index 01737a7188..2ae1c4d3a4 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -34,11 +34,13 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -outgoing_requests_counter = metrics.register_counter("requests", +outgoing_requests_counter = metrics.register_counter( + "requests", labels=["method"], ) -incoming_responses_counter = metrics.register_counter("responses", - labels=["method","code"], +incoming_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], ) @@ -64,9 +66,11 @@ class SimpleHttpClient(object): def _cb(response): incoming_responses_counter.inc(method, response.code) return response + def _eb(failure): incoming_responses_counter.inc(method, "ERR") return failure + d.addCallbacks(_cb, _eb) return d diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 11883d3852..7fa295cad5 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,11 +43,13 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -outgoing_requests_counter = metrics.register_counter("requests", +outgoing_requests_counter = metrics.register_counter( + "requests", labels=["method"], ) -incoming_responses_counter = metrics.register_counter("responses", - labels=["method","code"], +incoming_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], ) @@ -78,9 +80,11 @@ class MatrixFederationHttpAgent(_AgentBase): def _cb(response): incoming_responses_counter.inc(method, response.code) return response + def _eb(failure): incoming_responses_counter.inc(method, "ERR") return failure + d.addCallbacks(_cb, _eb) return d diff --git a/synapse/http/server.py b/synapse/http/server.py index a0d190ff78..d77cb77799 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -37,11 +37,13 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -incoming_requests_counter = metrics.register_counter("requests", +incoming_requests_counter = metrics.register_counter( + "requests", labels=["method", "servlet"], ) -outgoing_responses_counter = metrics.register_counter("responses", - labels=["method","code"], +outgoing_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 94164974fc..7b9c9c8bab 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -85,7 +85,7 @@ def render_all(): strs += ["# FAILED to render %s" % name] logger.exception("Failed to render %s metric", name) - strs.append("") # to generate a final CRLF + strs.append("") # to generate a final CRLF return "\n".join(strs) @@ -96,6 +96,7 @@ def render_all(): rusage = None PAGE_SIZE = getpagesize() + def update_resource_metrics(): global rusage rusage = getrusage(RUSAGE_SELF) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 12460c99c3..21b37748f6 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -27,7 +27,7 @@ class BaseMetric(object): def __init__(self, name, labels=[]): self.name = name - self.labels = labels # OK not to clone as we never write it + self.labels = labels # OK not to clone as we never write it def dimension(self): return len(self.labels) @@ -66,8 +66,8 @@ class CounterMetric(BaseMetric): def inc_by(self, incr, *values): if len(values) != self.dimension(): - raise ValueError("Expected as many values to inc() as labels (%d)" % - (self.dimension()) + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) ) # TODO: should assert that the tag values are all strings @@ -135,10 +135,11 @@ class CacheMetric(object): def __init__(self, name, size_callback, labels=[]): self.name = name - self.hits = CounterMetric(name + ":hits", labels=labels) + self.hits = CounterMetric(name + ":hits", labels=labels) self.total = CounterMetric(name + ":total", labels=labels) - self.size = CallbackMetric(name + ":size", + self.size = CallbackMetric( + name + ":size", callback=size_callback, labels=labels, ) diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py index ff7baab018..97ea797bf5 100644 --- a/synapse/metrics/resource.py +++ b/synapse/metrics/resource.py @@ -26,7 +26,7 @@ class MetricsResource(Resource): isLeaf = True def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() + Resource.__init__(self) # Resource is old-style, so no super() self.hs = hs diff --git a/synapse/notifier.py b/synapse/notifier.py index 88873d4534..7121d659d0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -127,14 +127,17 @@ class Notifier(object): return len(all_listeners) metrics.register_callback("listeners", count_listeners) - metrics.register_callback("rooms", - lambda: count(bool, self.room_to_listeners.values()) + metrics.register_callback( + "rooms", + lambda: count(bool, self.room_to_listeners.values()), ) - metrics.register_callback("users", - lambda: count(bool, self.user_to_listeners.values()) + metrics.register_callback( + "users", + lambda: count(bool, self.user_to_listeners.values()), ) - metrics.register_callback("appservices", - lambda: count(bool, self.appservice_to_listeners.values()) + metrics.register_callback( + "appservices", + lambda: count(bool, self.appservice_to_listeners.values()), ) @log_function -- cgit 1.5.1