diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 7cd11cfae7..1735ca9345 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import (
preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
preserve_fn
)
+from synapse.util.metrics import Measure
from twisted.internet import defer
@@ -243,59 +244,60 @@ class Keyring(object):
@defer.inlineCallbacks
def do_iterations():
- merged_results = {}
+ with Measure(self.clock, "get_server_verify_keys"):
+ merged_results = {}
- missing_keys = {}
- for verify_request in verify_requests:
- missing_keys.setdefault(verify_request.server_name, set()).update(
- verify_request.key_ids
- )
-
- for fn in key_fetch_fns:
- results = yield fn(missing_keys.items())
- merged_results.update(results)
-
- # We now need to figure out which verify requests we have keys
- # for and which we don't
missing_keys = {}
- requests_missing_keys = []
for verify_request in verify_requests:
- server_name = verify_request.server_name
- result_keys = merged_results[server_name]
-
- if verify_request.deferred.called:
- # We've already called this deferred, which probably
- # means that we've already found a key for it.
- continue
-
- for key_id in verify_request.key_ids:
- if key_id in result_keys:
- with PreserveLoggingContext():
- verify_request.deferred.callback((
- server_name,
- key_id,
- result_keys[key_id],
- ))
- break
- else:
- # The else block is only reached if the loop above
- # doesn't break.
- missing_keys.setdefault(server_name, set()).update(
- verify_request.key_ids
- )
- requests_missing_keys.append(verify_request)
-
- if not missing_keys:
- break
-
- for verify_request in requests_missing_keys.values():
- verify_request.deferred.errback(SynapseError(
- 401,
- "No key for %s with id %s" % (
- verify_request.server_name, verify_request.key_ids,
- ),
- Codes.UNAUTHORIZED,
- ))
+ missing_keys.setdefault(verify_request.server_name, set()).update(
+ verify_request.key_ids
+ )
+
+ for fn in key_fetch_fns:
+ results = yield fn(missing_keys.items())
+ merged_results.update(results)
+
+ # We now need to figure out which verify requests we have keys
+ # for and which we don't
+ missing_keys = {}
+ requests_missing_keys = []
+ for verify_request in verify_requests:
+ server_name = verify_request.server_name
+ result_keys = merged_results[server_name]
+
+ if verify_request.deferred.called:
+ # We've already called this deferred, which probably
+ # means that we've already found a key for it.
+ continue
+
+ for key_id in verify_request.key_ids:
+ if key_id in result_keys:
+ with PreserveLoggingContext():
+ verify_request.deferred.callback((
+ server_name,
+ key_id,
+ result_keys[key_id],
+ ))
+ break
+ else:
+ # The else block is only reached if the loop above
+ # doesn't break.
+ missing_keys.setdefault(server_name, set()).update(
+ verify_request.key_ids
+ )
+ requests_missing_keys.append(verify_request)
+
+ if not missing_keys:
+ break
+
+ for verify_request in requests_missing_keys.values():
+ verify_request.deferred.errback(SynapseError(
+ 401,
+ "No key for %s with id %s" % (
+ verify_request.server_name, verify_request.key_ids,
+ ),
+ Codes.UNAUTHORIZED,
+ ))
def on_err(err):
for verify_request in verify_requests:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0ee4ebe504..c8dfd02e7b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -464,10 +464,10 @@ class SyncHandler(object):
else:
state = {}
- defer.returnValue({
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(state.values())
- })
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
@@ -485,9 +485,9 @@ class SyncHandler(object):
)
defer.returnValue(notifs)
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- defer.returnValue(None)
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
@defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2b3c05a740..168e53ce0c 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -19,6 +19,7 @@ from synapse.api.errors import (
)
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict
+from synapse.util.metrics import Measure
import synapse.metrics
import synapse.events
@@ -74,12 +75,12 @@ response_db_txn_duration = metrics.register_distribution(
_next_request_id = 0
-def request_handler(report_metrics=True):
+def request_handler(include_metrics=False):
"""Decorator for ``wrap_request_handler``"""
- return lambda request_handler: wrap_request_handler(request_handler, report_metrics)
+ return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
-def wrap_request_handler(request_handler, report_metrics):
+def wrap_request_handler(request_handler, include_metrics=False):
"""Wraps a method that acts as a request handler with the necessary logging
and exception handling.
@@ -103,54 +104,56 @@ def wrap_request_handler(request_handler, report_metrics):
_next_request_id += 1
with LoggingContext(request_id) as request_context:
- if report_metrics:
+ with Measure(self.clock, "wrapped_request_handler"):
request_metrics = RequestMetrics()
- request_metrics.start(self.clock)
-
- request_context.request = request_id
- with request.processing():
- try:
- with PreserveLoggingContext(request_context):
- yield request_handler(self, request)
- except CodeMessageException as e:
- code = e.code
- if isinstance(e, SynapseError):
- logger.info(
- "%s SynapseError: %s - %s", request, code, e.msg
- )
- else:
- logger.exception(e)
- outgoing_responses_counter.inc(request.method, str(code))
- respond_with_json(
- request, code, cs_exception(e), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- except:
- logger.exception(
- "Failed handle request %s.%s on %r: %r",
- request_handler.__module__,
- request_handler.__name__,
- self,
- request
- )
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True
- )
- finally:
+ request_metrics.start(self.clock, name=self.__class__.__name__)
+
+ request_context.request = request_id
+ with request.processing():
try:
- if report_metrics:
- request_metrics.stop(
- self.clock, request, self.__class__.__name__
+ with PreserveLoggingContext(request_context):
+ if include_metrics:
+ yield request_handler(self, request, request_metrics)
+ else:
+ yield request_handler(self, request)
+ except CodeMessageException as e:
+ code = e.code
+ if isinstance(e, SynapseError):
+ logger.info(
+ "%s SynapseError: %s - %s", request, code, e.msg
)
+ else:
+ logger.exception(e)
+ outgoing_responses_counter.inc(request.method, str(code))
+ respond_with_json(
+ request, code, cs_exception(e), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ version_string=self.version_string,
+ )
except:
- pass
+ logger.exception(
+ "Failed handle request %s.%s on %r: %r",
+ request_handler.__module__,
+ request_handler.__name__,
+ self,
+ request
+ )
+ respond_with_json(
+ request,
+ 500,
+ {
+ "error": "Internal server error",
+ "errcode": Codes.UNKNOWN,
+ },
+ send_cors=True
+ )
+ finally:
+ try:
+ request_metrics.stop(
+ self.clock, request
+ )
+ except Exception as e:
+ logger.warn("Failed to stop metrics: %r", e)
return wrapped_request_handler
@@ -220,9 +223,9 @@ class JsonResource(HttpServer, resource.Resource):
# It does its own metric reporting because _async_render dispatches to
# a callback and it's the class name of that callback we want to report
# against rather than the JsonResource itself.
- @request_handler(report_metrics=False)
+ @request_handler(include_metrics=True)
@defer.inlineCallbacks
- def _async_render(self, request):
+ def _async_render(self, request, request_metrics):
""" This gets called from render() every time someone sends us a request.
This checks if anyone has registered a callback for that method and
path.
@@ -231,9 +234,6 @@ class JsonResource(HttpServer, resource.Resource):
self._send_response(request, 200, {})
return
- request_metrics = RequestMetrics()
- request_metrics.start(self.clock)
-
# Loop through all the registered callbacks to check if the method
# and path regex match
for path_entry in self.path_regexs.get(request.method, []):
@@ -247,12 +247,6 @@ class JsonResource(HttpServer, resource.Resource):
callback = path_entry.callback
- servlet_instance = getattr(callback, "__self__", None)
- if servlet_instance is not None:
- servlet_classname = servlet_instance.__class__.__name__
- else:
- servlet_classname = "%r" % callback
-
kwargs = intern_dict({
name: urllib.unquote(value).decode("UTF-8") if value else value
for name, value in m.groupdict().items()
@@ -263,10 +257,13 @@ class JsonResource(HttpServer, resource.Resource):
code, response = callback_return
self._send_response(request, code, response)
- try:
- request_metrics.stop(self.clock, request, servlet_classname)
- except:
- pass
+ servlet_instance = getattr(callback, "__self__", None)
+ if servlet_instance is not None:
+ servlet_classname = servlet_instance.__class__.__name__
+ else:
+ servlet_classname = "%r" % callback
+
+ request_metrics.name = servlet_classname
return
@@ -298,11 +295,12 @@ class JsonResource(HttpServer, resource.Resource):
class RequestMetrics(object):
- def start(self, clock):
+ def start(self, clock, name):
self.start = clock.time_msec()
self.start_context = LoggingContext.current_context()
+ self.name = name
- def stop(self, clock, request, servlet_classname):
+ def stop(self, clock, request):
context = LoggingContext.current_context()
tag = ""
@@ -316,26 +314,26 @@ class RequestMetrics(object):
)
return
- incoming_requests_counter.inc(request.method, servlet_classname, tag)
+ incoming_requests_counter.inc(request.method, self.name, tag)
response_timer.inc_by(
clock.time_msec() - self.start, request.method,
- servlet_classname, tag
+ self.name, tag
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
- ru_utime, request.method, servlet_classname, tag
+ ru_utime, request.method, self.name, tag
)
response_ru_stime.inc_by(
- ru_stime, request.method, servlet_classname, tag
+ ru_stime, request.method, self.name, tag
)
response_db_txn_count.inc_by(
- context.db_txn_count, request.method, servlet_classname, tag
+ context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
- context.db_txn_duration, request.method, servlet_classname, tag
+ context.db_txn_duration, request.method, self.name, tag
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 40a148994f..c48024096d 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -20,6 +20,7 @@ from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
import synapse.metrics
@@ -231,24 +232,25 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
with PreserveLoggingContext():
- user_streams = set()
+ with Measure(self.clock, "on_new_event"):
+ user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ for user in users:
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
+ for room in rooms:
+ user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except:
- logger.exception("Failed to notify listener")
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(stream_key, new_token, time_now_ms)
+ except:
+ logger.exception("Failed to notify listener")
- self.notify_replication()
+ self.notify_replication()
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ad026b5e0b..97aef25321 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -22,6 +22,7 @@ from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
@@ -1132,54 +1133,55 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = yield self._simple_select_one_onecol(
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- desc="_get_event_from_row_rejected_reason",
- )
+ with Measure(self._clock, "_get_event_from_row"):
+ d = json.loads(js)
+ internal_metadata = json.loads(internal_metadata)
+
+ if rejected_reason:
+ rejected_reason = yield self._simple_select_one_onecol(
+ table="rejections",
+ keyvalues={"event_id": rejected_reason},
+ retcol="reason",
+ desc="_get_event_from_row_rejected_reason",
+ )
- original_ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
+ original_ev = FrozenEvent(
+ d,
+ internal_metadata_dict=internal_metadata,
+ rejected_reason=rejected_reason,
+ )
- redacted_event = None
- if redacted:
- redacted_event = prune_event(original_ev)
+ redacted_event = None
+ if redacted:
+ redacted_event = prune_event(original_ev)
- redaction_id = yield self._simple_select_one_onecol(
- table="redactions",
- keyvalues={"redacts": redacted_event.event_id},
- retcol="event_id",
- desc="_get_event_from_row_redactions",
- )
+ redaction_id = yield self._simple_select_one_onecol(
+ table="redactions",
+ keyvalues={"redacts": redacted_event.event_id},
+ retcol="event_id",
+ desc="_get_event_from_row_redactions",
+ )
- redacted_event.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
+ redacted_event.unsigned["redacted_by"] = redaction_id
+ # Get the redaction event.
- because = yield self.get_event(
- redaction_id,
- check_redacted=False,
- allow_none=True,
- )
+ because = yield self.get_event(
+ redaction_id,
+ check_redacted=False,
+ allow_none=True,
+ )
- if because:
- # It's fine to do add the event directly, since get_pdu_json
- # will serialise this field correctly
- redacted_event.unsigned["redacted_because"] = because
+ if because:
+ # It's fine to do add the event directly, since get_pdu_json
+ # will serialise this field correctly
+ redacted_event.unsigned["redacted_because"] = because
- cache_entry = _EventCacheEntry(
- event=original_ev,
- redacted_event=redacted_event,
- )
+ cache_entry = _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
+ )
- self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 76f301f549..4ea930d3e8 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -87,7 +87,7 @@ class Measure(object):
self.db_txn_duration = self.start_context.db_txn_duration
def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type is not None or not self.start_context:
+ if isinstance(exc_type, Exception) or not self.start_context:
return
duration = self.clock.time_msec() - self.start
|