diff options
-rwxr-xr-x | synapse/app/homeserver.py | 28 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 102 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 14 | ||||
-rw-r--r-- | synapse/http/server.py | 138 | ||||
-rw-r--r-- | synapse/notifier.py | 30 | ||||
-rw-r--r-- | synapse/storage/events.py | 82 | ||||
-rw-r--r-- | synapse/util/metrics.py | 2 |
7 files changed, 212 insertions, 184 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 40e6f65236..54f35900f8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -51,7 +51,7 @@ from synapse.api.urls import ( from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext -from synapse.metrics import register_memory_metrics +from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer @@ -385,6 +385,8 @@ def run(hs): start_time = hs.get_clock().time() + stats = {} + @defer.inlineCallbacks def phone_stats_home(): logger.info("Gathering stats for reporting") @@ -393,7 +395,10 @@ def run(hs): if uptime < 0: uptime = 0 - stats = {} + # If the stats directory is empty then this is the first time we've + # reported stats. + first_time = not stats + stats["homeserver"] = hs.config.server_name stats["timestamp"] = now stats["uptime_seconds"] = uptime @@ -406,6 +411,25 @@ def run(hs): daily_messages = yield hs.get_datastore().count_daily_messages() if daily_messages is not None: stats["daily_messages"] = daily_messages + else: + stats.pop("daily_messages", None) + + if first_time: + # Add callbacks to report the synapse stats as metrics whenever + # prometheus requests them, typically every 30s. + # As some of the stats are expensive to calculate we only update + # them when synapse phones home to matrix.org every 24 hours. + metrics = get_metrics_for("synapse.usage") + metrics.add_callback("timestamp", lambda: stats["timestamp"]) + metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"]) + metrics.add_callback("total_users", lambda: stats["total_users"]) + metrics.add_callback("total_room_count", lambda: stats["total_room_count"]) + metrics.add_callback( + "daily_active_users", lambda: stats["daily_active_users"] + ) + metrics.add_callback( + "daily_messages", lambda: stats.get("daily_messages", 0) + ) logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7cd11cfae7..1735ca9345 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -22,6 +22,7 @@ from synapse.util.logcontext import ( preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, preserve_fn ) +from synapse.util.metrics import Measure from twisted.internet import defer @@ -243,59 +244,60 @@ class Keyring(object): @defer.inlineCallbacks def do_iterations(): - merged_results = {} + with Measure(self.clock, "get_server_verify_keys"): + merged_results = {} - missing_keys = {} - for verify_request in verify_requests: - missing_keys.setdefault(verify_request.server_name, set()).update( - verify_request.key_ids - ) - - for fn in key_fetch_fns: - results = yield fn(missing_keys.items()) - merged_results.update(results) - - # We now need to figure out which verify requests we have keys - # for and which we don't missing_keys = {} - requests_missing_keys = [] for verify_request in verify_requests: - server_name = verify_request.server_name - result_keys = merged_results[server_name] - - if verify_request.deferred.called: - # We've already called this deferred, which probably - # means that we've already found a key for it. - continue - - for key_id in verify_request.key_ids: - if key_id in result_keys: - with PreserveLoggingContext(): - verify_request.deferred.callback(( - server_name, - key_id, - result_keys[key_id], - )) - break - else: - # The else block is only reached if the loop above - # doesn't break. - missing_keys.setdefault(server_name, set()).update( - verify_request.key_ids - ) - requests_missing_keys.append(verify_request) - - if not missing_keys: - break - - for verify_request in requests_missing_keys.values(): - verify_request.deferred.errback(SynapseError( - 401, - "No key for %s with id %s" % ( - verify_request.server_name, verify_request.key_ids, - ), - Codes.UNAUTHORIZED, - )) + missing_keys.setdefault(verify_request.server_name, set()).update( + verify_request.key_ids + ) + + for fn in key_fetch_fns: + results = yield fn(missing_keys.items()) + merged_results.update(results) + + # We now need to figure out which verify requests we have keys + # for and which we don't + missing_keys = {} + requests_missing_keys = [] + for verify_request in verify_requests: + server_name = verify_request.server_name + result_keys = merged_results[server_name] + + if verify_request.deferred.called: + # We've already called this deferred, which probably + # means that we've already found a key for it. + continue + + for key_id in verify_request.key_ids: + if key_id in result_keys: + with PreserveLoggingContext(): + verify_request.deferred.callback(( + server_name, + key_id, + result_keys[key_id], + )) + break + else: + # The else block is only reached if the loop above + # doesn't break. + missing_keys.setdefault(server_name, set()).update( + verify_request.key_ids + ) + requests_missing_keys.append(verify_request) + + if not missing_keys: + break + + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( + 401, + "No key for %s with id %s" % ( + verify_request.server_name, verify_request.key_ids, + ), + Codes.UNAUTHORIZED, + )) def on_err(err): for verify_request in verify_requests: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0ee4ebe504..c8dfd02e7b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -464,10 +464,10 @@ class SyncHandler(object): else: state = {} - defer.returnValue({ - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(state.values()) - }) + defer.returnValue({ + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state(state.values()) + }) @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): @@ -485,9 +485,9 @@ class SyncHandler(object): ) defer.returnValue(notifs) - # There is no new information in this period, so your notification - # count is whatever it was last time. - defer.returnValue(None) + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) @defer.inlineCallbacks def generate_sync_result(self, sync_config, since_token=None, full_state=False): diff --git a/synapse/http/server.py b/synapse/http/server.py index 2b3c05a740..168e53ce0c 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -19,6 +19,7 @@ from synapse.api.errors import ( ) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches import intern_dict +from synapse.util.metrics import Measure import synapse.metrics import synapse.events @@ -74,12 +75,12 @@ response_db_txn_duration = metrics.register_distribution( _next_request_id = 0 -def request_handler(report_metrics=True): +def request_handler(include_metrics=False): """Decorator for ``wrap_request_handler``""" - return lambda request_handler: wrap_request_handler(request_handler, report_metrics) + return lambda request_handler: wrap_request_handler(request_handler, include_metrics) -def wrap_request_handler(request_handler, report_metrics): +def wrap_request_handler(request_handler, include_metrics=False): """Wraps a method that acts as a request handler with the necessary logging and exception handling. @@ -103,54 +104,56 @@ def wrap_request_handler(request_handler, report_metrics): _next_request_id += 1 with LoggingContext(request_id) as request_context: - if report_metrics: + with Measure(self.clock, "wrapped_request_handler"): request_metrics = RequestMetrics() - request_metrics.start(self.clock) - - request_context.request = request_id - with request.processing(): - try: - with PreserveLoggingContext(request_context): - yield request_handler(self, request) - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg - ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except: - logger.exception( - "Failed handle request %s.%s on %r: %r", - request_handler.__module__, - request_handler.__name__, - self, - request - ) - respond_with_json( - request, - 500, - { - "error": "Internal server error", - "errcode": Codes.UNKNOWN, - }, - send_cors=True - ) - finally: + request_metrics.start(self.clock, name=self.__class__.__name__) + + request_context.request = request_id + with request.processing(): try: - if report_metrics: - request_metrics.stop( - self.clock, request, self.__class__.__name__ + with PreserveLoggingContext(request_context): + if include_metrics: + yield request_handler(self, request, request_metrics) + else: + yield request_handler(self, request) + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg ) + else: + logger.exception(e) + outgoing_responses_counter.inc(request.method, str(code)) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + version_string=self.version_string, + ) except: - pass + logger.exception( + "Failed handle request %s.%s on %r: %r", + request_handler.__module__, + request_handler.__name__, + self, + request + ) + respond_with_json( + request, + 500, + { + "error": "Internal server error", + "errcode": Codes.UNKNOWN, + }, + send_cors=True + ) + finally: + try: + request_metrics.stop( + self.clock, request + ) + except Exception as e: + logger.warn("Failed to stop metrics: %r", e) return wrapped_request_handler @@ -220,9 +223,9 @@ class JsonResource(HttpServer, resource.Resource): # It does its own metric reporting because _async_render dispatches to # a callback and it's the class name of that callback we want to report # against rather than the JsonResource itself. - @request_handler(report_metrics=False) + @request_handler(include_metrics=True) @defer.inlineCallbacks - def _async_render(self, request): + def _async_render(self, request, request_metrics): """ This gets called from render() every time someone sends us a request. This checks if anyone has registered a callback for that method and path. @@ -231,9 +234,6 @@ class JsonResource(HttpServer, resource.Resource): self._send_response(request, 200, {}) return - request_metrics = RequestMetrics() - request_metrics.start(self.clock) - # Loop through all the registered callbacks to check if the method # and path regex match for path_entry in self.path_regexs.get(request.method, []): @@ -247,12 +247,6 @@ class JsonResource(HttpServer, resource.Resource): callback = path_entry.callback - servlet_instance = getattr(callback, "__self__", None) - if servlet_instance is not None: - servlet_classname = servlet_instance.__class__.__name__ - else: - servlet_classname = "%r" % callback - kwargs = intern_dict({ name: urllib.unquote(value).decode("UTF-8") if value else value for name, value in m.groupdict().items() @@ -263,10 +257,13 @@ class JsonResource(HttpServer, resource.Resource): code, response = callback_return self._send_response(request, code, response) - try: - request_metrics.stop(self.clock, request, servlet_classname) - except: - pass + servlet_instance = getattr(callback, "__self__", None) + if servlet_instance is not None: + servlet_classname = servlet_instance.__class__.__name__ + else: + servlet_classname = "%r" % callback + + request_metrics.name = servlet_classname return @@ -298,11 +295,12 @@ class JsonResource(HttpServer, resource.Resource): class RequestMetrics(object): - def start(self, clock): + def start(self, clock, name): self.start = clock.time_msec() self.start_context = LoggingContext.current_context() + self.name = name - def stop(self, clock, request, servlet_classname): + def stop(self, clock, request): context = LoggingContext.current_context() tag = "" @@ -316,26 +314,26 @@ class RequestMetrics(object): ) return - incoming_requests_counter.inc(request.method, servlet_classname, tag) + incoming_requests_counter.inc(request.method, self.name, tag) response_timer.inc_by( clock.time_msec() - self.start, request.method, - servlet_classname, tag + self.name, tag ) ru_utime, ru_stime = context.get_resource_usage() response_ru_utime.inc_by( - ru_utime, request.method, servlet_classname, tag + ru_utime, request.method, self.name, tag ) response_ru_stime.inc_by( - ru_stime, request.method, servlet_classname, tag + ru_stime, request.method, self.name, tag ) response_db_txn_count.inc_by( - context.db_txn_count, request.method, servlet_classname, tag + context.db_txn_count, request.method, self.name, tag ) response_db_txn_duration.inc_by( - context.db_txn_duration, request.method, servlet_classname, tag + context.db_txn_duration, request.method, self.name, tag ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 40a148994f..c48024096d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -20,6 +20,7 @@ from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client import synapse.metrics @@ -231,24 +232,25 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ with PreserveLoggingContext(): - user_streams = set() + with Measure(self.clock, "on_new_event"): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except: + logger.exception("Failed to notify listener") - self.notify_replication() + self.notify_replication() def on_new_replication_data(self): """Used to inform replication listeners that something has happend diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ad026b5e0b..97aef25321 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -22,6 +22,7 @@ from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError @@ -1132,54 +1133,55 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) + with Measure(self._clock, "_get_event_from_row"): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row_rejected_reason", + ) - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) + original_ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": redacted_event.event_id}, + retcol="event_id", + desc="_get_event_from_row_redactions", + ) - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. + redacted_event.unsigned["redacted_by"] = redaction_id + # Get the redaction event. - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) + because = yield self.get_event( + redaction_id, + check_redacted=False, + allow_none=True, + ) - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because + if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = because - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, - ) + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, + ) - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 76f301f549..4ea930d3e8 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -87,7 +87,7 @@ class Measure(object): self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None or not self.start_context: + if isinstance(exc_type, Exception) or not self.start_context: return duration = self.clock.time_msec() - self.start |