summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py28
-rw-r--r--synapse/crypto/keyring.py102
-rw-r--r--synapse/handlers/sync.py14
-rw-r--r--synapse/http/server.py138
-rw-r--r--synapse/notifier.py30
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/util/metrics.py2
7 files changed, 212 insertions, 184 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 40e6f65236..54f35900f8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -51,7 +51,7 @@ from synapse.api.urls import (
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.util.logcontext import LoggingContext
-from synapse.metrics import register_memory_metrics
+from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
 from synapse.federation.transport.server import TransportLayerServer
@@ -385,6 +385,8 @@ def run(hs):
 
     start_time = hs.get_clock().time()
 
+    stats = {}
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -393,7 +395,10 @@ def run(hs):
         if uptime < 0:
             uptime = 0
 
-        stats = {}
+        # If the stats directory is empty then this is the first time we've
+        # reported stats.
+        first_time = not stats
+
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
@@ -406,6 +411,25 @@ def run(hs):
         daily_messages = yield hs.get_datastore().count_daily_messages()
         if daily_messages is not None:
             stats["daily_messages"] = daily_messages
+        else:
+            stats.pop("daily_messages", None)
+
+        if first_time:
+            # Add callbacks to report the synapse stats as metrics whenever
+            # prometheus requests them, typically every 30s.
+            # As some of the stats are expensive to calculate we only update
+            # them when synapse phones home to matrix.org every 24 hours.
+            metrics = get_metrics_for("synapse.usage")
+            metrics.add_callback("timestamp", lambda: stats["timestamp"])
+            metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
+            metrics.add_callback("total_users", lambda: stats["total_users"])
+            metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
+            metrics.add_callback(
+                "daily_active_users", lambda: stats["daily_active_users"]
+            )
+            metrics.add_callback(
+                "daily_messages", lambda: stats.get("daily_messages", 0)
+            )
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 7cd11cfae7..1735ca9345 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import (
     preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
     preserve_fn
 )
+from synapse.util.metrics import Measure
 
 from twisted.internet import defer
 
@@ -243,59 +244,60 @@ class Keyring(object):
 
         @defer.inlineCallbacks
         def do_iterations():
-            merged_results = {}
+            with Measure(self.clock, "get_server_verify_keys"):
+                merged_results = {}
 
-            missing_keys = {}
-            for verify_request in verify_requests:
-                missing_keys.setdefault(verify_request.server_name, set()).update(
-                    verify_request.key_ids
-                )
-
-            for fn in key_fetch_fns:
-                results = yield fn(missing_keys.items())
-                merged_results.update(results)
-
-                # We now need to figure out which verify requests we have keys
-                # for and which we don't
                 missing_keys = {}
-                requests_missing_keys = []
                 for verify_request in verify_requests:
-                    server_name = verify_request.server_name
-                    result_keys = merged_results[server_name]
-
-                    if verify_request.deferred.called:
-                        # We've already called this deferred, which probably
-                        # means that we've already found a key for it.
-                        continue
-
-                    for key_id in verify_request.key_ids:
-                        if key_id in result_keys:
-                            with PreserveLoggingContext():
-                                verify_request.deferred.callback((
-                                    server_name,
-                                    key_id,
-                                    result_keys[key_id],
-                                ))
-                            break
-                    else:
-                        # The else block is only reached if the loop above
-                        # doesn't break.
-                        missing_keys.setdefault(server_name, set()).update(
-                            verify_request.key_ids
-                        )
-                        requests_missing_keys.append(verify_request)
-
-                if not missing_keys:
-                    break
-
-            for verify_request in requests_missing_keys.values():
-                verify_request.deferred.errback(SynapseError(
-                    401,
-                    "No key for %s with id %s" % (
-                        verify_request.server_name, verify_request.key_ids,
-                    ),
-                    Codes.UNAUTHORIZED,
-                ))
+                    missing_keys.setdefault(verify_request.server_name, set()).update(
+                        verify_request.key_ids
+                    )
+
+                for fn in key_fetch_fns:
+                    results = yield fn(missing_keys.items())
+                    merged_results.update(results)
+
+                    # We now need to figure out which verify requests we have keys
+                    # for and which we don't
+                    missing_keys = {}
+                    requests_missing_keys = []
+                    for verify_request in verify_requests:
+                        server_name = verify_request.server_name
+                        result_keys = merged_results[server_name]
+
+                        if verify_request.deferred.called:
+                            # We've already called this deferred, which probably
+                            # means that we've already found a key for it.
+                            continue
+
+                        for key_id in verify_request.key_ids:
+                            if key_id in result_keys:
+                                with PreserveLoggingContext():
+                                    verify_request.deferred.callback((
+                                        server_name,
+                                        key_id,
+                                        result_keys[key_id],
+                                    ))
+                                break
+                        else:
+                            # The else block is only reached if the loop above
+                            # doesn't break.
+                            missing_keys.setdefault(server_name, set()).update(
+                                verify_request.key_ids
+                            )
+                            requests_missing_keys.append(verify_request)
+
+                    if not missing_keys:
+                        break
+
+                for verify_request in requests_missing_keys.values():
+                    verify_request.deferred.errback(SynapseError(
+                        401,
+                        "No key for %s with id %s" % (
+                            verify_request.server_name, verify_request.key_ids,
+                        ),
+                        Codes.UNAUTHORIZED,
+                    ))
 
         def on_err(err):
             for verify_request in verify_requests:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0ee4ebe504..c8dfd02e7b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -464,10 +464,10 @@ class SyncHandler(object):
             else:
                 state = {}
 
-            defer.returnValue({
-                (e.type, e.state_key): e
-                for e in sync_config.filter_collection.filter_room_state(state.values())
-            })
+        defer.returnValue({
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(state.values())
+        })
 
     @defer.inlineCallbacks
     def unread_notifs_for_room_id(self, room_id, sync_config):
@@ -485,9 +485,9 @@ class SyncHandler(object):
                 )
                 defer.returnValue(notifs)
 
-            # There is no new information in this period, so your notification
-            # count is whatever it was last time.
-            defer.returnValue(None)
+        # There is no new information in this period, so your notification
+        # count is whatever it was last time.
+        defer.returnValue(None)
 
     @defer.inlineCallbacks
     def generate_sync_result(self, sync_config, since_token=None, full_state=False):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2b3c05a740..168e53ce0c 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -19,6 +19,7 @@ from synapse.api.errors import (
 )
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches import intern_dict
+from synapse.util.metrics import Measure
 import synapse.metrics
 import synapse.events
 
@@ -74,12 +75,12 @@ response_db_txn_duration = metrics.register_distribution(
 _next_request_id = 0
 
 
-def request_handler(report_metrics=True):
+def request_handler(include_metrics=False):
     """Decorator for ``wrap_request_handler``"""
-    return lambda request_handler: wrap_request_handler(request_handler, report_metrics)
+    return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
 
 
-def wrap_request_handler(request_handler, report_metrics):
+def wrap_request_handler(request_handler, include_metrics=False):
     """Wraps a method that acts as a request handler with the necessary logging
     and exception handling.
 
@@ -103,54 +104,56 @@ def wrap_request_handler(request_handler, report_metrics):
         _next_request_id += 1
 
         with LoggingContext(request_id) as request_context:
-            if report_metrics:
+            with Measure(self.clock, "wrapped_request_handler"):
                 request_metrics = RequestMetrics()
-                request_metrics.start(self.clock)
-
-            request_context.request = request_id
-            with request.processing():
-                try:
-                    with PreserveLoggingContext(request_context):
-                        yield request_handler(self, request)
-                except CodeMessageException as e:
-                    code = e.code
-                    if isinstance(e, SynapseError):
-                        logger.info(
-                            "%s SynapseError: %s - %s", request, code, e.msg
-                        )
-                    else:
-                        logger.exception(e)
-                    outgoing_responses_counter.inc(request.method, str(code))
-                    respond_with_json(
-                        request, code, cs_exception(e), send_cors=True,
-                        pretty_print=_request_user_agent_is_curl(request),
-                        version_string=self.version_string,
-                    )
-                except:
-                    logger.exception(
-                        "Failed handle request %s.%s on %r: %r",
-                        request_handler.__module__,
-                        request_handler.__name__,
-                        self,
-                        request
-                    )
-                    respond_with_json(
-                        request,
-                        500,
-                        {
-                            "error": "Internal server error",
-                            "errcode": Codes.UNKNOWN,
-                        },
-                        send_cors=True
-                    )
-                finally:
+                request_metrics.start(self.clock, name=self.__class__.__name__)
+
+                request_context.request = request_id
+                with request.processing():
                     try:
-                        if report_metrics:
-                            request_metrics.stop(
-                                self.clock, request, self.__class__.__name__
+                        with PreserveLoggingContext(request_context):
+                            if include_metrics:
+                                yield request_handler(self, request, request_metrics)
+                            else:
+                                yield request_handler(self, request)
+                    except CodeMessageException as e:
+                        code = e.code
+                        if isinstance(e, SynapseError):
+                            logger.info(
+                                "%s SynapseError: %s - %s", request, code, e.msg
                             )
+                        else:
+                            logger.exception(e)
+                        outgoing_responses_counter.inc(request.method, str(code))
+                        respond_with_json(
+                            request, code, cs_exception(e), send_cors=True,
+                            pretty_print=_request_user_agent_is_curl(request),
+                            version_string=self.version_string,
+                        )
                     except:
-                        pass
+                        logger.exception(
+                            "Failed handle request %s.%s on %r: %r",
+                            request_handler.__module__,
+                            request_handler.__name__,
+                            self,
+                            request
+                        )
+                        respond_with_json(
+                            request,
+                            500,
+                            {
+                                "error": "Internal server error",
+                                "errcode": Codes.UNKNOWN,
+                            },
+                            send_cors=True
+                        )
+                    finally:
+                        try:
+                            request_metrics.stop(
+                                self.clock, request
+                            )
+                        except Exception as e:
+                            logger.warn("Failed to stop metrics: %r", e)
     return wrapped_request_handler
 
 
@@ -220,9 +223,9 @@ class JsonResource(HttpServer, resource.Resource):
     # It does its own metric reporting because _async_render dispatches to
     # a callback and it's the class name of that callback we want to report
     # against rather than the JsonResource itself.
-    @request_handler(report_metrics=False)
+    @request_handler(include_metrics=True)
     @defer.inlineCallbacks
-    def _async_render(self, request):
+    def _async_render(self, request, request_metrics):
         """ This gets called from render() every time someone sends us a request.
             This checks if anyone has registered a callback for that method and
             path.
@@ -231,9 +234,6 @@ class JsonResource(HttpServer, resource.Resource):
             self._send_response(request, 200, {})
             return
 
-        request_metrics = RequestMetrics()
-        request_metrics.start(self.clock)
-
         # Loop through all the registered callbacks to check if the method
         # and path regex match
         for path_entry in self.path_regexs.get(request.method, []):
@@ -247,12 +247,6 @@ class JsonResource(HttpServer, resource.Resource):
 
             callback = path_entry.callback
 
-            servlet_instance = getattr(callback, "__self__", None)
-            if servlet_instance is not None:
-                servlet_classname = servlet_instance.__class__.__name__
-            else:
-                servlet_classname = "%r" % callback
-
             kwargs = intern_dict({
                 name: urllib.unquote(value).decode("UTF-8") if value else value
                 for name, value in m.groupdict().items()
@@ -263,10 +257,13 @@ class JsonResource(HttpServer, resource.Resource):
                 code, response = callback_return
                 self._send_response(request, code, response)
 
-            try:
-                request_metrics.stop(self.clock, request, servlet_classname)
-            except:
-                pass
+            servlet_instance = getattr(callback, "__self__", None)
+            if servlet_instance is not None:
+                servlet_classname = servlet_instance.__class__.__name__
+            else:
+                servlet_classname = "%r" % callback
+
+            request_metrics.name = servlet_classname
 
             return
 
@@ -298,11 +295,12 @@ class JsonResource(HttpServer, resource.Resource):
 
 
 class RequestMetrics(object):
-    def start(self, clock):
+    def start(self, clock, name):
         self.start = clock.time_msec()
         self.start_context = LoggingContext.current_context()
+        self.name = name
 
-    def stop(self, clock, request, servlet_classname):
+    def stop(self, clock, request):
         context = LoggingContext.current_context()
 
         tag = ""
@@ -316,26 +314,26 @@ class RequestMetrics(object):
                 )
                 return
 
-        incoming_requests_counter.inc(request.method, servlet_classname, tag)
+        incoming_requests_counter.inc(request.method, self.name, tag)
 
         response_timer.inc_by(
             clock.time_msec() - self.start, request.method,
-            servlet_classname, tag
+            self.name, tag
         )
 
         ru_utime, ru_stime = context.get_resource_usage()
 
         response_ru_utime.inc_by(
-            ru_utime, request.method, servlet_classname, tag
+            ru_utime, request.method, self.name, tag
         )
         response_ru_stime.inc_by(
-            ru_stime, request.method, servlet_classname, tag
+            ru_stime, request.method, self.name, tag
         )
         response_db_txn_count.inc_by(
-            context.db_txn_count, request.method, servlet_classname, tag
+            context.db_txn_count, request.method, self.name, tag
         )
         response_db_txn_duration.inc_by(
-            context.db_txn_duration, request.method, servlet_classname, tag
+            context.db_txn_duration, request.method, self.name, tag
         )
 
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 40a148994f..c48024096d 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -20,6 +20,7 @@ from synapse.api.errors import AuthError
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
 import synapse.metrics
@@ -231,24 +232,25 @@ class Notifier(object):
         Will wake up all listeners for the given users and rooms.
         """
         with PreserveLoggingContext():
-            user_streams = set()
+            with Measure(self.clock, "on_new_event"):
+                user_streams = set()
 
-            for user in users:
-                user_stream = self.user_to_user_stream.get(str(user))
-                if user_stream is not None:
-                    user_streams.add(user_stream)
+                for user in users:
+                    user_stream = self.user_to_user_stream.get(str(user))
+                    if user_stream is not None:
+                        user_streams.add(user_stream)
 
-            for room in rooms:
-                user_streams |= self.room_to_user_streams.get(room, set())
+                for room in rooms:
+                    user_streams |= self.room_to_user_streams.get(room, set())
 
-            time_now_ms = self.clock.time_msec()
-            for user_stream in user_streams:
-                try:
-                    user_stream.notify(stream_key, new_token, time_now_ms)
-                except:
-                    logger.exception("Failed to notify listener")
+                time_now_ms = self.clock.time_msec()
+                for user_stream in user_streams:
+                    try:
+                        user_stream.notify(stream_key, new_token, time_now_ms)
+                    except:
+                        logger.exception("Failed to notify listener")
 
-            self.notify_replication()
+                self.notify_replication()
 
     def on_new_replication_data(self):
         """Used to inform replication listeners that something has happend
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ad026b5e0b..97aef25321 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -22,6 +22,7 @@ from synapse.events.utils import prune_event
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
@@ -1132,54 +1133,55 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
                             rejected_reason=None):
-        d = json.loads(js)
-        internal_metadata = json.loads(internal_metadata)
-
-        if rejected_reason:
-            rejected_reason = yield self._simple_select_one_onecol(
-                table="rejections",
-                keyvalues={"event_id": rejected_reason},
-                retcol="reason",
-                desc="_get_event_from_row_rejected_reason",
-            )
+        with Measure(self._clock, "_get_event_from_row"):
+            d = json.loads(js)
+            internal_metadata = json.loads(internal_metadata)
+
+            if rejected_reason:
+                rejected_reason = yield self._simple_select_one_onecol(
+                    table="rejections",
+                    keyvalues={"event_id": rejected_reason},
+                    retcol="reason",
+                    desc="_get_event_from_row_rejected_reason",
+                )
 
-        original_ev = FrozenEvent(
-            d,
-            internal_metadata_dict=internal_metadata,
-            rejected_reason=rejected_reason,
-        )
+            original_ev = FrozenEvent(
+                d,
+                internal_metadata_dict=internal_metadata,
+                rejected_reason=rejected_reason,
+            )
 
-        redacted_event = None
-        if redacted:
-            redacted_event = prune_event(original_ev)
+            redacted_event = None
+            if redacted:
+                redacted_event = prune_event(original_ev)
 
-            redaction_id = yield self._simple_select_one_onecol(
-                table="redactions",
-                keyvalues={"redacts": redacted_event.event_id},
-                retcol="event_id",
-                desc="_get_event_from_row_redactions",
-            )
+                redaction_id = yield self._simple_select_one_onecol(
+                    table="redactions",
+                    keyvalues={"redacts": redacted_event.event_id},
+                    retcol="event_id",
+                    desc="_get_event_from_row_redactions",
+                )
 
-            redacted_event.unsigned["redacted_by"] = redaction_id
-            # Get the redaction event.
+                redacted_event.unsigned["redacted_by"] = redaction_id
+                # Get the redaction event.
 
-            because = yield self.get_event(
-                redaction_id,
-                check_redacted=False,
-                allow_none=True,
-            )
+                because = yield self.get_event(
+                    redaction_id,
+                    check_redacted=False,
+                    allow_none=True,
+                )
 
-            if because:
-                # It's fine to do add the event directly, since get_pdu_json
-                # will serialise this field correctly
-                redacted_event.unsigned["redacted_because"] = because
+                if because:
+                    # It's fine to do add the event directly, since get_pdu_json
+                    # will serialise this field correctly
+                    redacted_event.unsigned["redacted_because"] = because
 
-        cache_entry = _EventCacheEntry(
-            event=original_ev,
-            redacted_event=redacted_event,
-        )
+            cache_entry = _EventCacheEntry(
+                event=original_ev,
+                redacted_event=redacted_event,
+            )
 
-        self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
+            self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 76f301f549..4ea930d3e8 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -87,7 +87,7 @@ class Measure(object):
         self.db_txn_duration = self.start_context.db_txn_duration
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        if exc_type is not None or not self.start_context:
+        if isinstance(exc_type, Exception) or not self.start_context:
             return
 
         duration = self.clock.time_msec() - self.start