From 7c7706f42b56dd61f5eb17679aa12247f7058ed5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Mar 2018 15:40:13 +0000 Subject: Fix bug where state cache used lots of memory The state cache bases its size on the sum of the size of entries. The size of the entry is calculated once on insertion, so it is important that the size of entries does not change. The DictionaryCache modified the entries size, which caused the state cache to incorrectly think it was smaller than it actually was. --- synapse/util/caches/dictionary_cache.py | 6 +++++- synapse/util/caches/lrucache.py | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index d4105822b3..1709e8b429 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -132,9 +132,13 @@ class DictionaryCache(object): self._update_or_insert(key, value, known_absent) def _update_or_insert(self, key, value, known_absent): - entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {})) + # We pop and reinsert as we need to tell the cache the size may have + # changed + + entry = self.cache.pop(key, DictionaryEntry(False, set(), {})) entry.value.update(value) entry.known_absent.update(known_absent) + self.cache[key] = entry def _insert(self, key, value, known_absent): self.cache[key] = DictionaryEntry(True, known_absent, value) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f088dd430e..a4bf8fa6ae 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -154,14 +154,14 @@ class LruCache(object): def cache_set(key, value, callbacks=[]): node = cache.get(key, None) if node is not None: - if value != node.value: + if node.callbacks and value != node.value: for cb in node.callbacks: cb() node.callbacks.clear() - if size_callback: - cached_cache_len[0] -= size_callback(node.value) - cached_cache_len[0] += size_callback(value) + if size_callback: + cached_cache_len[0] -= size_callback(node.value) + cached_cache_len[0] += size_callback(value) node.callbacks.update(callbacks) -- cgit 1.5.1 From 5a6e54264d3a9f8b9b1f2e99db94e31ea3c21d24 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 9 Mar 2018 13:56:26 +0000 Subject: Make 'unexpected logging context' into warnings I think we've now fixed enough of these that the rest can be logged at warning. --- synapse/util/logcontext.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d660ec785b..fdd4a93d07 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -163,7 +163,7 @@ class LoggingContext(object): current = self.set_current_context(self.previous_context) if current is not self: if current is self.sentinel: - logger.debug("Expected logging context %s has been lost", self) + logger.warn("Expected logging context %s has been lost", self) else: logger.warn( "Current logging context %s is not expected context %s", @@ -278,7 +278,7 @@ class PreserveLoggingContext(object): context = LoggingContext.set_current_context(self.current_context) if context != self.new_context: - logger.debug( + logger.warn( "Unexpected logging context: %s is not %s", context, self.new_context, ) -- cgit 1.5.1 From 9a0d783c113ae74c55e409d33219cd77f3662b9f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Mar 2018 11:35:53 +0000 Subject: Add comments --- synapse/util/caches/lrucache.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index a4bf8fa6ae..1c5a982094 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -154,11 +154,18 @@ class LruCache(object): def cache_set(key, value, callbacks=[]): node = cache.get(key, None) if node is not None: + # We sometimes store large objects, e.g. dicts, which cause + # the inequality check to take a long time. So let's only do + # the check if we have some callbacks to call. if node.callbacks and value != node.value: for cb in node.callbacks: cb() node.callbacks.clear() + # We don't bother to protect this by value != node.value as + # generally size_callback will be cheap compared with equality + # checks. (For example, taking the size of two dicts is quicker + # than comparing them for equality.) if size_callback: cached_cache_len[0] -= size_callback(node.value) cached_cache_len[0] += size_callback(value) -- cgit 1.5.1 From 8cbbfaefc1fc597cbbef52a10dbfb8ecd4d8a8cd Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 23 Mar 2018 10:32:50 +0000 Subject: 404 correctly on missing paths via NoResource fixes https://github.com/matrix-org/synapse/issues/2043 and https://github.com/matrix-org/synapse/issues/2029 --- synapse/app/appservice.py | 4 ++-- synapse/app/client_reader.py | 4 ++-- synapse/app/event_creator.py | 4 ++-- synapse/app/federation_reader.py | 4 ++-- synapse/app/federation_sender.py | 4 ++-- synapse/app/frontend_proxy.py | 4 ++-- synapse/app/homeserver.py | 4 ++-- synapse/app/media_repository.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 4 ++-- synapse/app/user_dir.py | 4 ++-- synapse/util/httpresourcetree.py | 4 ++-- 12 files changed, 24 insertions(+), 24 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index c6fe4516d1..f2540023a7 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -36,7 +36,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -64,7 +64,7 @@ class AppserviceServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(self) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 0a8ce9bc66..267d34c881 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.client_reader") @@ -88,7 +88,7 @@ class ClientReaderServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 172e989b54..b915d12d53 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -52,7 +52,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.event_creator") @@ -104,7 +104,7 @@ class EventCreatorServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 20d157911b..c1dc66dd17 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -41,7 +41,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.federation_reader") @@ -77,7 +77,7 @@ class FederationReaderServer(HomeServer): FEDERATION_PREFIX: TransportLayerServer(self), }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index f760826d27..0cc3331519 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -42,7 +42,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.federation_sender") @@ -91,7 +91,7 @@ class FederationSenderServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(self) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 816c080d18..de889357c3 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.frontend_proxy") @@ -142,7 +142,7 @@ class FrontendProxyServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e477c7ced6..c00afbba28 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -56,7 +56,7 @@ from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from twisted.application import service from twisted.internet import defer, reactor -from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File @@ -126,7 +126,7 @@ class SynapseHomeServer(HomeServer): if WEB_CLIENT_PREFIX in resources: root_resource = RootRedirect(WEB_CLIENT_PREFIX) else: - root_resource = Resource() + root_resource = NoResource() root_resource = create_resource_tree(resources, root_resource) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 84c5791b3b..fc8282bbc1 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -43,7 +43,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.media_repository") @@ -84,7 +84,7 @@ class MediaRepositoryServer(HomeServer): ), }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 98a4a7c62c..d5c3a85195 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -37,7 +37,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.pusher") @@ -94,7 +94,7 @@ class PusherServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(self) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index abe91dcfbd..508b66613d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -56,7 +56,7 @@ from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.synchrotron") @@ -269,7 +269,7 @@ class SynchrotronServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 494ccb702c..5f845e80d1 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -43,7 +43,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import reactor -from twisted.web.resource import Resource +from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -116,7 +116,7 @@ class UserDirectoryServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( bind_addresses, diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py index 45be47159a..d747849553 100644 --- a/synapse/util/httpresourcetree.py +++ b/synapse/util/httpresourcetree.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.web.resource import Resource +from twisted.web.resource import NoResource import logging @@ -45,7 +45,7 @@ def create_resource_tree(desired_tree, root_resource): for path_seg in full_path.split('/')[1:-1]: if path_seg not in last_resource.listNames(): # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() + child_resource = NoResource() last_resource.putChild(path_seg, child_resource) res_id = _resource_id(last_resource, path_seg) resource_mappings[res_id] = child_resource -- cgit 1.5.1 From 05630758f25d958bf60fde4df5f80a89e4a9a0ac Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 29 Mar 2018 22:57:28 +0100 Subject: Use static JSONEncoders using json.dumps with custom options requires us to create a new JSONEncoder on each call. It's more efficient to create one upfront and reuse it. --- synapse/handlers/message.py | 4 ++-- synapse/replication/tcp/commands.py | 8 +++++--- synapse/storage/events.py | 23 ++++++++--------------- synapse/util/frozenutils.py | 19 +++++++++++++++++++ 4 files changed, 34 insertions(+), 20 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5a8ddc253e..6de6e13b7b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter from synapse.util.logcontext import preserve_fn, run_in_background from synapse.util.metrics import measure_func -from synapse.util.frozenutils import unfreeze +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master @@ -678,7 +678,7 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: - dump = simplejson.dumps(unfreeze(event.content)) + dump = frozendict_json_encoder.encode(event.content) simplejson.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 0005ad5879..34bcf903a3 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -24,6 +24,8 @@ import simplejson logger = logging.getLogger(__name__) +_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False) + class Command(object): """The base command class. @@ -107,7 +109,7 @@ class RdataCommand(Command): return " ".join(( self.stream_name, str(self.token) if self.token is not None else "batch", - simplejson.dumps(self.row, namedtuple_as_object=False), + _json_encoder.dumps(self.row), )) @@ -302,7 +304,7 @@ class InvalidateCacheCommand(Command): def to_line(self): return " ".join(( - self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False) + self.cache_func, _json_encoder.encode(self.keys), )) @@ -334,7 +336,7 @@ class UserIpCommand(Command): ) def to_line(self): - return self.user_id + " " + simplejson.dumps(( + return self.user_id + " " + _json_encoder.encode(( self.access_token, self.ip, self.user_agent, self.device_id, self.last_seen, )) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f3d65f4338..ece5e6c41f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -14,15 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.events_worker import EventsWorkerStore +from collections import OrderedDict, deque, namedtuple +from functools import wraps +import logging +import simplejson as json from twisted.internet import defer -from synapse.events import USE_FROZEN_DICTS +from synapse.storage.events_worker import EventsWorkerStore from synapse.util.async import ObservableDeferred +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import ( - PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -30,16 +34,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.types import get_domain_from_id - -from canonicaljson import encode_canonical_json -from collections import deque, namedtuple, OrderedDict -from functools import wraps - import synapse.metrics -import logging -import simplejson as json - # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 @@ -71,10 +67,7 @@ state_delta_reuse_delta_counter = metrics.register_counter( def encode_json(json_object): - if USE_FROZEN_DICTS: - return encode_canonical_json(json_object) - else: - return json.dumps(json_object, ensure_ascii=False) + return frozendict_json_encoder.encode(json_object) class _EventPeristenceQueue(object): diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 6322f0f55c..f497b51f4a 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -14,6 +14,7 @@ # limitations under the License. from frozendict import frozendict +import simplejson as json def freeze(o): @@ -49,3 +50,21 @@ def unfreeze(o): pass return o + + +def _handle_frozendict(obj): + """Helper for EventEncoder. Makes frozendicts serializable by returning + the underlying dict + """ + if type(obj) is frozendict: + # fishing the protected dict out of the object is a bit nasty, + # but we don't really want the overhead of copying the dict. + return obj._dict + raise TypeError('Object of type %s is not JSON serializable' % + obj.__class__.__name__) + + +# A JSONEncoder which is capable of encoding frozendics without barfing +frozendict_json_encoder = json.JSONEncoder( + default=_handle_frozendict, +) -- cgit 1.5.1 From a9a74101a4925bd208db682952b5dadf4b157a8d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 4 Apr 2018 08:58:53 +0100 Subject: Document the behaviour of ResponseCache it looks like everything that uses ResponseCache expects to have to `make_deferred_yieldable` its results. It's debatable whether that is the best approach, but let's document it for now to avoid further confusion. --- synapse/util/caches/response_cache.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 00af539880..4ecd91deb5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -31,6 +31,18 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000. def get(self, key): + """Look up the given key. + + Returns a deferred which doesn't follow the synapse logcontext rules, + so you'll probably want to make_deferred_yieldable it. + + Args: + key (str): + + Returns: + twisted.internet.defer.Deferred|None: None if there is no entry + for this key; otherwise a deferred result. + """ result = self.pending_result_cache.get(key) if result is not None: return result.observe() @@ -38,6 +50,26 @@ class ResponseCache(object): return None def set(self, key, deferred): + """Set the entry for the given key to the given deferred. + + *deferred* should run its callbacks in the sentinel logcontext (ie, + you should wrap normal synapse deferreds with + logcontext.run_in_background). + + Returns a new Deferred which also doesn't follow the synapse logcontext + rules, so you will want to make_deferred_yieldable it + + (TODO: before using this more widely, it might make sense to refactor + it and get() so that they do the necessary wrapping rather than having + to do it everywhere ResponseCache is used.) + + Args: + key (str): + deferred (twisted.internet.defer.Deferred): + + Returns: + twisted.internet.defer.Deferred + """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result -- cgit 1.5.1 From 518f6de0881378b1fa356e21256436491d43c93c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 4 Apr 2018 19:46:28 +0100 Subject: Remove redundant metrics which were deprecated in 0.27.0. --- CHANGES.rst | 9 +++++++++ UPGRADE.rst | 9 ++++++++- docs/metrics-howto.rst | 11 +++++++++++ synapse/http/server.py | 26 -------------------------- synapse/util/metrics.py | 25 ------------------------- 5 files changed, 28 insertions(+), 52 deletions(-) (limited to 'synapse/util') diff --git a/CHANGES.rst b/CHANGES.rst index 38372381ac..5fbad54427 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,12 @@ +Changes in synapse v0.28.0 (2018-xx-xx) +======================================= + +As previously advised, this release removes a number of redundant Prometheus +metrics. Administrators may need to update their dashboards and alerting rules +to use the updated metric names, if they have not already done so. See +`docs/metrics-howto.rst `_ +for more details. + Changes in synapse v0.27.2 (2018-03-26) ======================================= diff --git a/UPGRADE.rst b/UPGRADE.rst index f6bb1070b1..39a16b1c0c 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -52,7 +52,7 @@ Upgrading to $NEXT_VERSION ==================== This release expands the anonymous usage stats sent if the opt-in -``report_stats`` configuration is set to ``true``. We now capture RSS memory +``report_stats`` configuration is set to ``true``. We now capture RSS memory and cpu use at a very coarse level. This requires administrators to install the optional ``psutil`` python module. @@ -60,6 +60,13 @@ We would appreciate it if you could assist by ensuring this module is available and ``report_stats`` is enabled. This will let us see if performance changes to synapse are having an impact to the general community. +This release also removes a number of redundant Prometheus metrics. +Administrators may need to update their dashboards and alerting rules to use +the updated metric names, if they have not already done so. See +`docs/metrics-howto.rst `_ +for more details. + + Upgrading to v0.15.0 ==================== diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 8acc479bc3..5e2d7c52ec 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -34,6 +34,17 @@ How to monitor Synapse metrics using Prometheus Restart prometheus. +Deprecated metrics removed in 0.28.0 +------------------------------------ + +Synapse 0.28.0 removes all of the metrics deprecated by 0.27.0, which are those +listed under "Old name" below. This has been done to reduce the bandwidth used +by gathering metrics and the storage requirements for the Prometheus server, as +well as reducing CPU overhead for both Synapse and Prometheus. + +Administrators should update any alerts or monitoring dashboards to use the +"New name" listed below. + Block and response metrics renamed for 0.27.0 --------------------------------------------- diff --git a/synapse/http/server.py b/synapse/http/server.py index f19c068ef6..02c7e46f08 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -47,17 +47,6 @@ metrics = synapse.metrics.get_metrics_for(__name__) response_count = metrics.register_counter( "response_count", labels=["method", "servlet", "tag"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_requests", - "_response_time:count", - "_response_ru_utime:count", - "_response_ru_stime:count", - "_response_db_txn_count:count", - "_response_db_txn_duration:count", - ) - ) ) requests_counter = metrics.register_counter( @@ -73,39 +62,24 @@ outgoing_responses_counter = metrics.register_counter( response_timer = metrics.register_counter( "response_time_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_time:total", - ), ) response_ru_utime = metrics.register_counter( "response_ru_utime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_utime:total", - ), ) response_ru_stime = metrics.register_counter( "response_ru_stime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_stime:total", - ), ) response_db_txn_count = metrics.register_counter( "response_db_txn_count", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_count:total", - ), ) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request response_db_txn_duration = metrics.register_counter( "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_duration:total", - ), ) # seconds spent waiting for a db connection, when processing this request diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e4b5687a4b..c3d8237e8f 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -31,53 +31,28 @@ metrics = synapse.metrics.get_metrics_for(__name__) block_counter = metrics.register_counter( "block_count", labels=["block_name"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_block_timer:count", - "_block_ru_utime:count", - "_block_ru_stime:count", - "_block_db_txn_count:count", - "_block_db_txn_duration:count", - ) - ) ) block_timer = metrics.register_counter( "block_time_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_timer:total", - ), ) block_ru_utime = metrics.register_counter( "block_ru_utime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_utime:total", - ), ) block_ru_stime = metrics.register_counter( "block_ru_stime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_stime:total", - ), ) block_db_txn_count = metrics.register_counter( "block_db_txn_count", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", - ), ) # seconds spent waiting for db txns, excluding scheduling time, in this block block_db_txn_duration = metrics.register_counter( "block_db_txn_duration_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_duration:total", - ), ) # seconds spent waiting for a db connection, in this block -- cgit 1.5.1 From 01afc563c39006c21bb7752831cd62c146edc135 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 5 Apr 2018 16:24:04 +0100 Subject: Fix overzealous cache invalidation Fixes an issue where a cache invalidation would invalidate *all* pending entries, rather than just the entry that we intended to invalidate. --- synapse/util/caches/descriptors.py | 64 +++++++++++++++++++++-------------- tests/util/caches/test_descriptors.py | 46 +++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 26 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index bf3a66eae4..68285a7594 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,12 +40,11 @@ _CacheSentinel = object() class CacheEntry(object): __slots__ = [ - "deferred", "sequence", "callbacks", "invalidated" + "deferred", "callbacks", "invalidated" ] - def __init__(self, deferred, sequence, callbacks): + def __init__(self, deferred, callbacks): self.deferred = deferred - self.sequence = sequence self.callbacks = set(callbacks) self.invalidated = False @@ -62,7 +62,6 @@ class Cache(object): "max_entries", "name", "keylen", - "sequence", "thread", "metrics", "_pending_deferred_cache", @@ -80,7 +79,6 @@ class Cache(object): self.name = name self.keylen = keylen - self.sequence = 0 self.thread = None self.metrics = register_cache(name, self.cache) @@ -113,11 +111,10 @@ class Cache(object): callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - if val.sequence == self.sequence: - val.callbacks.update(callbacks) - if update_metrics: - self.metrics.inc_hits() - return val.deferred + val.callbacks.update(callbacks) + if update_metrics: + self.metrics.inc_hits() + return val.deferred val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) if val is not _CacheSentinel: @@ -137,12 +134,9 @@ class Cache(object): self.check_thread() entry = CacheEntry( deferred=value, - sequence=self.sequence, callbacks=callbacks, ) - entry.callbacks.update(callbacks) - existing_entry = self._pending_deferred_cache.pop(key, None) if existing_entry: existing_entry.invalidate() @@ -150,13 +144,25 @@ class Cache(object): self._pending_deferred_cache[key] = entry def shuffle(result): - if self.sequence == entry.sequence: - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry is entry: - self.cache.set(key, result, entry.callbacks) - else: - entry.invalidate() + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry is entry: + self.cache.set(key, result, entry.callbacks) else: + # oops, the _pending_deferred_cache has been updated since + # we started our query, so we are out of date. + # + # Better put back whatever we took out. (We do it this way + # round, rather than peeking into the _pending_deferred_cache + # and then removing on a match, to make the common case faster) + if existing_entry is not None: + self._pending_deferred_cache[key] = existing_entry + + # we're not going to put this entry into the cache, so need + # to make sure that the invalidation callbacks are called. + # That was probably done when _pending_deferred_cache was + # updated, but it's possible that `set` was called without + # `invalidate` being previously called, in which case it may + # not have been. Either way, let's double-check now. entry.invalidate() return result @@ -168,25 +174,29 @@ class Cache(object): def invalidate(self, key): self.check_thread() + self.cache.pop(key, None) - # Increment the sequence number so that any SELECT statements that - # raced with the INSERT don't update the cache (SYN-369) - self.sequence += 1 + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, which will (a) stop it being returned + # for future queries and (b) stop it being persisted as a proper entry + # in self.cache. entry = self._pending_deferred_cache.pop(key, None) + + # run the invalidation callbacks now, rather than waiting for the + # deferred to resolve. if entry: entry.invalidate() - self.cache.pop(key, None) - def invalidate_many(self, key): self.check_thread() if not isinstance(key, tuple): raise TypeError( "The cache key must be a tuple not %r" % (type(key),) ) - self.sequence += 1 self.cache.del_multi(key) + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, as above entry_dict = self._pending_deferred_cache.pop(key, None) if entry_dict is not None: for entry in iterate_tree_cache_entry(entry_dict): @@ -194,8 +204,10 @@ class Cache(object): def invalidate_all(self): self.check_thread() - self.sequence += 1 self.cache.clear() + for entry in self._pending_deferred_cache.itervalues(): + entry.invalidate() + self._pending_deferred_cache.clear() class _CacheDescriptorBase(object): diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 3f14ab503f..2516fe40f4 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import logging import mock @@ -25,6 +27,50 @@ from tests import unittest logger = logging.getLogger(__name__) +class CacheTestCase(unittest.TestCase): + def test_invalidate_all(self): + cache = descriptors.Cache("testcache") + + callback_record = [False, False] + + def record_callback(idx): + callback_record[idx] = True + + # add a couple of pending entries + d1 = defer.Deferred() + cache.set("key1", d1, partial(record_callback, 0)) + + d2 = defer.Deferred() + cache.set("key2", d2, partial(record_callback, 1)) + + # lookup should return the deferreds + self.assertIs(cache.get("key1"), d1) + self.assertIs(cache.get("key2"), d2) + + # let one of the lookups complete + d2.callback("result2") + self.assertEqual(cache.get("key2"), "result2") + + # now do the invalidation + cache.invalidate_all() + + # lookup should return none + self.assertIsNone(cache.get("key1", None)) + self.assertIsNone(cache.get("key2", None)) + + # both callbacks should have been callbacked + self.assertTrue( + callback_record[0], "Invalidation callback for key1 not called", + ) + self.assertTrue( + callback_record[1], "Invalidation callback for key2 not called", + ) + + # letting the other lookup complete should do nothing + d1.callback("result1") + self.assertIsNone(cache.get("key1", None)) + + class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache(self): -- cgit 1.5.1 From 13decdbf96981782616a3ee1826fce1213a1bc89 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 9 Apr 2018 12:58:37 +0100 Subject: Revert "Merge pull request #3066 from matrix-org/rav/remove_redundant_metrics" We aren't ready to release this yet, so I'm reverting it for now. This reverts commit d1679a4ed7947b0814e0f2af9b888a16c588f1a1, reversing changes made to e089100c6231541c446e37e157dec8feed02d283. --- CHANGES.rst | 9 --------- UPGRADE.rst | 9 +-------- docs/metrics-howto.rst | 11 ----------- synapse/http/server.py | 26 ++++++++++++++++++++++++++ synapse/util/metrics.py | 25 +++++++++++++++++++++++++ 5 files changed, 52 insertions(+), 28 deletions(-) (limited to 'synapse/util') diff --git a/CHANGES.rst b/CHANGES.rst index 5fbad54427..38372381ac 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,12 +1,3 @@ -Changes in synapse v0.28.0 (2018-xx-xx) -======================================= - -As previously advised, this release removes a number of redundant Prometheus -metrics. Administrators may need to update their dashboards and alerting rules -to use the updated metric names, if they have not already done so. See -`docs/metrics-howto.rst `_ -for more details. - Changes in synapse v0.27.2 (2018-03-26) ======================================= diff --git a/UPGRADE.rst b/UPGRADE.rst index 39a16b1c0c..f6bb1070b1 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -52,7 +52,7 @@ Upgrading to $NEXT_VERSION ==================== This release expands the anonymous usage stats sent if the opt-in -``report_stats`` configuration is set to ``true``. We now capture RSS memory +``report_stats`` configuration is set to ``true``. We now capture RSS memory and cpu use at a very coarse level. This requires administrators to install the optional ``psutil`` python module. @@ -60,13 +60,6 @@ We would appreciate it if you could assist by ensuring this module is available and ``report_stats`` is enabled. This will let us see if performance changes to synapse are having an impact to the general community. -This release also removes a number of redundant Prometheus metrics. -Administrators may need to update their dashboards and alerting rules to use -the updated metric names, if they have not already done so. See -`docs/metrics-howto.rst `_ -for more details. - - Upgrading to v0.15.0 ==================== diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 5e2d7c52ec..8acc479bc3 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -34,17 +34,6 @@ How to monitor Synapse metrics using Prometheus Restart prometheus. -Deprecated metrics removed in 0.28.0 ------------------------------------- - -Synapse 0.28.0 removes all of the metrics deprecated by 0.27.0, which are those -listed under "Old name" below. This has been done to reduce the bandwidth used -by gathering metrics and the storage requirements for the Prometheus server, as -well as reducing CPU overhead for both Synapse and Prometheus. - -Administrators should update any alerts or monitoring dashboards to use the -"New name" listed below. - Block and response metrics renamed for 0.27.0 --------------------------------------------- diff --git a/synapse/http/server.py b/synapse/http/server.py index ac75206ef5..64e083ebfc 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -47,6 +47,17 @@ metrics = synapse.metrics.get_metrics_for(__name__) response_count = metrics.register_counter( "response_count", labels=["method", "servlet", "tag"], + alternative_names=( + # the following are all deprecated aliases for the same metric + metrics.name_prefix + x for x in ( + "_requests", + "_response_time:count", + "_response_ru_utime:count", + "_response_ru_stime:count", + "_response_db_txn_count:count", + "_response_db_txn_duration:count", + ) + ) ) requests_counter = metrics.register_counter( @@ -62,24 +73,39 @@ outgoing_responses_counter = metrics.register_counter( response_timer = metrics.register_counter( "response_time_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_time:total", + ), ) response_ru_utime = metrics.register_counter( "response_ru_utime_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_ru_utime:total", + ), ) response_ru_stime = metrics.register_counter( "response_ru_stime_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_ru_stime:total", + ), ) response_db_txn_count = metrics.register_counter( "response_db_txn_count", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_db_txn_count:total", + ), ) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request response_db_txn_duration = metrics.register_counter( "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_db_txn_duration:total", + ), ) # seconds spent waiting for a db connection, when processing this request diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index c3d8237e8f..e4b5687a4b 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -31,28 +31,53 @@ metrics = synapse.metrics.get_metrics_for(__name__) block_counter = metrics.register_counter( "block_count", labels=["block_name"], + alternative_names=( + # the following are all deprecated aliases for the same metric + metrics.name_prefix + x for x in ( + "_block_timer:count", + "_block_ru_utime:count", + "_block_ru_stime:count", + "_block_db_txn_count:count", + "_block_db_txn_duration:count", + ) + ) ) block_timer = metrics.register_counter( "block_time_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_timer:total", + ), ) block_ru_utime = metrics.register_counter( "block_ru_utime_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_ru_utime:total", + ), ) block_ru_stime = metrics.register_counter( "block_ru_stime_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_ru_stime:total", + ), ) block_db_txn_count = metrics.register_counter( "block_db_txn_count", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_db_txn_count:total", + ), ) # seconds spent waiting for db txns, excluding scheduling time, in this block block_db_txn_duration = metrics.register_counter( "block_db_txn_duration_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_db_txn_duration:total", + ), ) # seconds spent waiting for a db connection, in this block -- cgit 1.5.1 From 9fbe70a7dc3afabfdac176ba1f4be32dd44602aa Mon Sep 17 00:00:00 2001 From: Vincent Breitmoser Date: Sat, 6 Jan 2018 18:11:02 +0100 Subject: Use sortedcontainers instead of blist This commit drop-in replaces blist with SortedContainers. They are written in pure python so work with pypy, but perform as good as native implementations, at least in a couple benchmarks: http://www.grantjenks.com/docs/sortedcontainers/performance.html --- synapse/federation/send_queue.py | 14 +++++++------- synapse/python_dependencies.py | 2 +- synapse/util/caches/stream_change_cache.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 93e5acebc1..945832283f 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics -from blist import sorteddict +from sortedcontainers import SortedDict from collections import namedtuple import logging @@ -56,19 +56,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 40eedb63cb..f9596bddaf 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -34,8 +34,8 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], - "blist": ["blist"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], + "sortedcontainers": ["sortedcontainers"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 941d873ab8..2ff46090a6 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -16,7 +16,7 @@ from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR -from blist import sorteddict +from sortedcontainers import SortedDict import logging @@ -35,7 +35,7 @@ class StreamChangeCache(object): def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = int(max_size * CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = sorteddict() + self._cache = SortedDict() self._earliest_known_stream_pos = current_stream_pos self.name = name self.metrics = register_cache(self.name, self._cache) -- cgit 1.5.1 From b3384232a031cc209fb5f0e085bc073a220448be Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Apr 2018 23:14:47 +0100 Subject: Add metrics for ResponseCache --- synapse/appservice/api.py | 3 ++- synapse/federation/federation_server.py | 2 +- synapse/handlers/room_list.py | 5 +++-- synapse/handlers/sync.py | 2 +- synapse/replication/http/send_event.py | 2 +- synapse/util/caches/response_cache.py | 14 +++++++++++++- 6 files changed, 21 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 40c433d7ae..11e9c37c63 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -73,7 +73,8 @@ class ApplicationServiceApi(SimpleHttpClient): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() - self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS) + self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta", + timeout_ms=HOUR_IN_MS) @defer.inlineCallbacks def query_user(self, service, user_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bea7fd0b71..e4ce037acf 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -65,7 +65,7 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) @defer.inlineCallbacks @log_function diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5d81f59b44..8028d793c2 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -44,8 +44,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list") + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0f713ce038..06d17ab20c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -169,7 +169,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs) + self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index bbe2f967b7..c6a6551d24 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -115,7 +115,7 @@ class ReplicationSendEventRestServlet(RestServlet): self.clock = hs.get_clock() # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000) + self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): result = self.response_cache.get(event_id) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 00af539880..7f68289723 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.util.async import ObservableDeferred +from synapse.util.caches import metrics as cache_metrics class ResponseCache(object): @@ -24,17 +25,28 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self, hs, timeout_ms=0): + def __init__(self, hs, name, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000. + self._metrics = cache_metrics.register_cache( + "response_cache", + size_callback=lambda: self.size(), + cache_name=name, + ) + + def size(self): + return len(self.pending_result_cache) + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: + self._metrics.inc_hits() return result.observe() else: + self._metrics.inc_misses() return None def set(self, key, deferred): -- cgit 1.5.1 From b78395b7fe449d59a5c46c81a869f9f191cd934f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 12 Apr 2018 12:08:59 +0100 Subject: Refactor ResponseCache usage Adds a `.wrap` method to ResponseCache which wraps up the boilerplate of a (get, set) pair, and then use it throughout the codebase. This will be largely non-functional, but does include the following functional changes: * federation_server.on_context_state_request: drops use of _server_linearizer which looked redundant and could cause incorrect cache misses by yielding between the get and the set. * RoomListHandler.get_remote_public_room_list(): fixes logcontext leaks * the wrap function includes some logging. I'm hoping this won't be too noisy on production. --- synapse/appservice/api.py | 8 +---- synapse/federation/federation_server.py | 16 +++------ synapse/handlers/room_list.py | 38 ++++++++------------- synapse/handlers/sync.py | 16 ++++----- synapse/replication/http/send_event.py | 18 ++++------ synapse/util/caches/response_cache.py | 58 +++++++++++++++++++++++++++++++-- 6 files changed, 87 insertions(+), 67 deletions(-) (limited to 'synapse/util') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 11e9c37c63..00efff1464 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind from synapse.api.errors import CodeMessageException from synapse.http.client import SimpleHttpClient from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(None) key = (service.id, protocol) - result = self.protocol_meta_cache.get(key) - if not result: - result = self.protocol_meta_cache.set( - key, preserve_fn(_get)() - ) - return make_deferred_yieldable(result) + return self.protocol_meta_cache.wrap(key, _get) @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e4ce037acf..d1611f39a9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -30,7 +30,6 @@ import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in @@ -212,16 +211,11 @@ class FederationServer(FederationBase): if not in_room: raise AuthError(403, "Host not in room.") - result = self._state_resp_cache.get((room_id, event_id)) - if not result: - with (yield self._server_linearizer.queue((origin, room_id))): - d = self._state_resp_cache.set( - (room_id, event_id), - preserve_fn(self._on_context_state_request_compute)(room_id, event_id) - ) - resp = yield make_deferred_yieldable(d) - else: - resp = yield make_deferred_yieldable(result) + resp = yield self._state_resp_cache.wrap( + (room_id, event_id), + self._on_context_state_request_compute, + room_id, event_id, + ) defer.returnValue((200, resp)) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 8028d793c2..add3f9b009 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -78,18 +77,11 @@ class RoomListHandler(BaseHandler): ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - logger.info("No cached result, calculating one.") - result = self.response_cache.set( - key, - preserve_fn(self._get_public_room_list)( - limit, since_token, network_tuple=network_tuple - ) - ) - else: - logger.info("Using cached deferred result.") - return make_deferred_yieldable(result) + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, @@ -423,18 +415,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06d17ab20c..c6946831ab 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -15,7 +15,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute -from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -180,15 +180,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - result = self.response_cache.get(sync_config.request_key) - if not result: - result = self.response_cache.set( - sync_config.request_key, - preserve_fn(self._wait_for_sync_for_user)( - sync_config, since_token, timeout, full_state - ) - ) - return make_deferred_yieldable(result) + return self.response_cache.wrap( + sync_config.request_key, + self._wait_for_sync_for_user, + sync_config, since_token, timeout, full_state, + ) @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index c6a6551d24..a9baa2c1c3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet): self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): - result = self.response_cache.get(event_id) - if not result: - result = self.response_cache.set( - event_id, - self._handle_request(request) - ) - else: - logger.warn("Returning cached response") - return make_deferred_yieldable(result) - - @preserve_fn + return self.response_cache.wrap( + event_id, + self._handle_request, + request + ) + @defer.inlineCallbacks def _handle_request(self, request): with Measure(self.clock, "repl_send_event_parse"): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 066fa423fd..0c2c347953 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -12,9 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from synapse.util.async import ObservableDeferred from synapse.util.caches import metrics as cache_metrics +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + +logger = logging.getLogger(__name__) class ResponseCache(object): @@ -31,6 +35,7 @@ class ResponseCache(object): self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000. + self._name = name self._metrics = cache_metrics.register_cache( "response_cache", size_callback=lambda: self.size(), @@ -47,7 +52,7 @@ class ResponseCache(object): so you'll probably want to make_deferred_yieldable it. Args: - key (str): + key (hashable): Returns: twisted.internet.defer.Deferred|None: None if there is no entry @@ -76,7 +81,7 @@ class ResponseCache(object): to do it everywhere ResponseCache is used.) Args: - key (str): + key (hashable): deferred (twisted.internet.defer.Deferred): Returns: @@ -97,3 +102,52 @@ class ResponseCache(object): result.addBoth(remove) return result.observe() + + def wrap(self, key, callback, *args, **kwargs): + """Wrap together a *get* and *set* call, taking care of logcontexts + + First looks up the key in the cache, and if it is present makes it + follow the synapse logcontext rules and returns it. + + Otherwise, makes a call to *callback(*args, **kwargs)*, which should + follow the synapse logcontext rules, and adds the result to the cache. + + Example usage: + + @defer.inlineCallbacks + def handle_request(request): + # etc + defer.returnValue(result) + + result = yield response_cache.wrap( + key, + handle_request, + request, + ) + + Args: + key (hashable): key to get/set in the cache + + callback (callable): function to call if the key is not found in + the cache + + *args: positional parameters to pass to the callback, if it is used + + **kwargs: named paramters to pass to the callback, if it is used + + Returns: + twisted.internet.defer.Deferred: yieldable result + """ + result = self.get(key) + if not result: + logger.info("[%s]: no cached result for [%s], calculating new one", + self._name, key) + d = run_in_background(callback, *args, **kwargs) + result = self.set(key, d) + elif result.called: + logger.info("[%s]: using completed cached result for [%s]", + self._name, key) + else: + logger.info("[%s]: using incomplete cached result for [%s]", + self._name, key) + return make_deferred_yieldable(result) -- cgit 1.5.1 From 60f6014bb7912cf5629ae7d4ab2452ed67e5304a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 13 Apr 2018 07:32:29 +0100 Subject: ResponseCache: fix handling of completed results Turns out that ObservableDeferred.observe doesn't return a deferred if the result is already completed. Fix handling and improve documentation. --- synapse/util/caches/response_cache.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 0c2c347953..7f79333e96 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -14,6 +14,8 @@ # limitations under the License. import logging +from twisted.internet import defer + from synapse.util.async import ObservableDeferred from synapse.util.caches import metrics as cache_metrics from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -48,15 +50,21 @@ class ResponseCache(object): def get(self, key): """Look up the given key. - Returns a deferred which doesn't follow the synapse logcontext rules, - so you'll probably want to make_deferred_yieldable it. + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if the request has completed, the actual + result. You will probably want to make_deferred_yieldable the result. + + If there is no entry for the key, returns None. It is worth noting that + this means there is no way to distinguish a completed result of None + from an absent cache entry. Args: key (hashable): Returns: - twisted.internet.defer.Deferred|None: None if there is no entry - for this key; otherwise a deferred result. + twisted.internet.defer.Deferred|None|E: None if there is no entry + for this key; otherwise either a deferred result or the result + itself. """ result = self.pending_result_cache.get(key) if result is not None: @@ -73,19 +81,17 @@ class ResponseCache(object): you should wrap normal synapse deferreds with logcontext.run_in_background). - Returns a new Deferred which also doesn't follow the synapse logcontext - rules, so you will want to make_deferred_yieldable it - - (TODO: before using this more widely, it might make sense to refactor - it and get() so that they do the necessary wrapping rather than having - to do it everywhere ResponseCache is used.) + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if *deferred* was already complete, the actual + result. You will probably want to make_deferred_yieldable the result. Args: key (hashable): - deferred (twisted.internet.defer.Deferred): + deferred (twisted.internet.defer.Deferred[T): Returns: - twisted.internet.defer.Deferred + twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual + result. """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result @@ -144,7 +150,7 @@ class ResponseCache(object): self._name, key) d = run_in_background(callback, *args, **kwargs) result = self.set(key, d) - elif result.called: + elif not isinstance(result, defer.Deferred) or result.called: logger.info("[%s]: using completed cached result for [%s]", self._name, key) else: -- cgit 1.5.1 From d3347ad48553bd678fca7e3259d0824225cc6af2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 13 Apr 2018 11:16:43 +0100 Subject: Revert "Use sortedcontainers instead of blist" This reverts commit 9fbe70a7dc3afabfdac176ba1f4be32dd44602aa. It turns out that sortedcontainers.SortedDict is not an exact match for blist.sorteddict; in particular, `popitem()` removes things from the opposite end of the dict. This is trivial to fix, but I want to add some unit tests, and potentially some more thought about it, before we do so. --- synapse/federation/send_queue.py | 14 +++++++------- synapse/python_dependencies.py | 2 +- synapse/util/caches/stream_change_cache.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 945832283f..93e5acebc1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics -from sortedcontainers import SortedDict +from blist import sorteddict from collections import namedtuple import logging @@ -56,19 +56,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = SortedDict() # Stream position -> user_id + self.presence_changed = sorteddict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = SortedDict() # stream position -> Edu + self.edus = sorteddict() # stream position -> Edu - self.failures = SortedDict() # stream position -> (destination, Failure) + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = SortedDict() # stream position -> destination + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 - self.pos_time = SortedDict() + self.pos_time = sorteddict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index f9596bddaf..40eedb63cb 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -34,8 +34,8 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], + "blist": ["blist"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], - "sortedcontainers": ["sortedcontainers"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 2ff46090a6..941d873ab8 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -16,7 +16,7 @@ from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR -from sortedcontainers import SortedDict +from blist import sorteddict import logging @@ -35,7 +35,7 @@ class StreamChangeCache(object): def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = int(max_size * CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = SortedDict() + self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name self.metrics = register_cache(self.name, self._cache) -- cgit 1.5.1 From f63ff73c7fc9e27fa42ac73bf520796ff37bfcc2 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sun, 15 Apr 2018 16:39:30 +0200 Subject: add __bool__ alias to __nonzero__ methods Signed-off-by: Adrian Tschira --- synapse/handlers/sync.py | 7 +++++++ synapse/notifier.py | 1 + synapse/util/logcontext.py | 1 + 3 files changed, 9 insertions(+) (limited to 'synapse/util') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06d17ab20c..fe790b4c06 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ @@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ @@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ or self.state or self.account_data ) + __bool__ = __nonzero__ # python3 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ @@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 class DeviceLists(collections.namedtuple("DeviceLists", [ @@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [ def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 class SyncResult(collections.namedtuple("SyncResult", [ @@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.device_lists or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index ef042681bc..0e40a4aad6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -144,6 +144,7 @@ class _NotifierUserStream(object): class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): def __nonzero__(self): return bool(self.events) + __bool__ = __nonzero__ # python3 class Notifier(object): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d660ec785b..d59adc236e 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -92,6 +92,7 @@ class LoggingContext(object): def __nonzero__(self): return False + __bool__ = __nonzero__ # python3 sentinel = Sentinel() -- cgit 1.5.1 From 878995e660e46c5dedb56cde8bd7a3d46838cdd7 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sun, 15 Apr 2018 21:37:58 +0200 Subject: Replace Queue with six.moves.queue and a six.range change which I missed the last time Signed-off-by: Adrian Tschira --- synapse/storage/event_federation.py | 6 ++++-- synapse/util/file_consumer.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 00ee82d300..e828328243 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,7 +24,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +80,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 90a2608d6f..3c8a165331 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -17,7 +17,7 @@ from twisted.internet import threads, reactor from synapse.util.logcontext import make_deferred_yieldable, preserve_fn -import Queue +from six.moves import queue class BackgroundFileConsumer(object): @@ -49,7 +49,7 @@ class BackgroundFileConsumer(object): # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. - self._bytes_queue = Queue.Queue() + self._bytes_queue = queue.Queue() # Deferred that is resolved when finished writing self._finished_deferred = None -- cgit 1.5.1 From 1ea904b9f09ea6a9df7792f61029d0250602d46b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Apr 2018 00:53:18 +0100 Subject: Use deferred.addTimeout instead of time_bound_deferred This doesn't feel like a wheel we need to reinvent. --- synapse/http/__init__.py | 22 +++++++++++++ synapse/http/client.py | 20 +++++------- synapse/http/matrixfederationclient.py | 35 ++++++++++----------- synapse/notifier.py | 23 +++++++------- synapse/util/__init__.py | 56 ---------------------------------- tests/util/test_clock.py | 33 -------------------- 6 files changed, 59 insertions(+), 130 deletions(-) delete mode 100644 tests/util/test_clock.py (limited to 'synapse/util') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index bfebb0f644..20e568bc43 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,3 +13,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet.defer import CancelledError +from twisted.python import failure + +from synapse.api.errors import SynapseError + + +class RequestTimedOutError(SynapseError): + """Exception representing timeout of an outbound request""" + def __init__(self): + super(RequestTimedOutError, self).__init__(504, "Timed out") + + +def cancelled_to_request_timed_out_error(value): + """Turns CancelledErrors into RequestTimedOutErrors. + + For use with deferred.addTimeout() + """ + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise RequestTimedOutError() + return value diff --git a/synapse/http/client.py b/synapse/http/client.py index f3e4973c2e..35c8d51e71 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,9 +19,9 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) +from synapse.http import cancelled_to_request_timed_out_error from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -from synapse.util import logcontext import synapse.metrics from synapse.http.endpoint import SpiderEndpoint @@ -95,21 +96,16 @@ class SimpleHttpClient(object): # counters to it outgoing_requests_counter.inc(method) - def send_request(): + logger.info("Sending request %s %s", method, uri) + + try: request_deferred = self.agent.request( method, uri, *args, **kwargs ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=60, + request_deferred.addTimeout( + 60, reactor, cancelled_to_request_timed_out_error, ) - - logger.info("Sending request %s %s", method, uri) - - try: - with logcontext.PreserveLoggingContext(): - response = yield send_request() + response = yield make_deferred_yieldable(request_deferred) incoming_responses_counter.inc(method, response.code) logger.info( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 60a29081e8..fe4b1636a6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,17 +13,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone +from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint +import synapse.metrics from synapse.util.async import sleep from synapse.util import logcontext -import synapse.metrics +from synapse.util.logcontext import make_deferred_yieldable +import synapse.util.retryutils from canonicaljson import encode_canonical_json @@ -184,21 +187,19 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, http_url_bytes, headers_dict) try: - def send_request(): - request_deferred = self.agent.request( - method, - url_bytes, - Headers(headers_dict), - producer - ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) - - with logcontext.PreserveLoggingContext(): - response = yield send_request() + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + request_deferred.addTimeout( + timeout / 1000. if timeout else 60, + reactor, cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..1e4f78b993 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.internet.defer import TimeoutError + from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state -from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -331,11 +332,11 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) + listener.deferred.addTimeout( + (end_time - now) / 1000., reactor, + ) with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) + yield listener.deferred current_token = user_stream.current_token @@ -346,7 +347,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimedOutError: + except TimeoutError: break except defer.CancelledError: break @@ -551,13 +552,11 @@ class Notifier(object): if end_time <= now: break + listener.deferred.addTimeout((end_time - now) / 1000., reactor) try: with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) - except DeferredTimedOutError: + yield listener.deferred + except TimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 756d8ffa32..814a7bf71b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -24,11 +23,6 @@ import logging logger = logging.getLogger(__name__) -class DeferredTimedOutError(SynapseError): - def __init__(self): - super(DeferredTimedOutError, self).__init__(504, "Timed out") - - def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -85,53 +79,3 @@ class Clock(object): except Exception: if not ignore_errs: raise - - def time_bound_deferred(self, given_deferred, time_out): - if given_deferred.called: - return given_deferred - - ret_deferred = defer.Deferred() - - def timed_out_fn(): - e = DeferredTimedOutError() - - try: - ret_deferred.errback(e) - except Exception: - pass - - try: - given_deferred.cancel() - except Exception: - pass - - timer = None - - def cancel(res): - try: - self.cancel_call_later(timer) - except Exception: - pass - return res - - ret_deferred.addBoth(cancel) - - def success(res): - try: - ret_deferred.callback(res) - except Exception: - pass - - return res - - def err(res): - try: - ret_deferred.errback(res) - except Exception: - pass - - given_deferred.addCallbacks(callback=success, errback=err) - - timer = self.call_later(time_out, timed_out_fn) - - return ret_deferred diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py deleted file mode 100644 index 9672603579..0000000000 --- a/tests/util/test_clock.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2017 Vector Creations Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from synapse import util -from twisted.internet import defer -from tests import unittest - - -class ClockTestCase(unittest.TestCase): - @defer.inlineCallbacks - def test_time_bound_deferred(self): - # just a deferred which never resolves - slow_deferred = defer.Deferred() - - clock = util.Clock() - time_bound = clock.time_bound_deferred(slow_deferred, 0.001) - - try: - yield time_bound - self.fail("Expected timedout error, but got nothing") - except util.DeferredTimedOutError: - pass -- cgit 1.5.1 From 9255a6cb17716c022ebae1dbe9c142b78ca86ea7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:07:40 +0100 Subject: Improve exception handling for background processes There were a bunch of places where we fire off a process to happen in the background, but don't have any exception handling on it - instead relying on the unhandled error being logged when the relevent deferred gets garbage-collected. This is unsatisfactory for a number of reasons: - logging on garbage collection is best-effort and may happen some time after the error, if at all - it can be hard to figure out where the error actually happened. - it is logged as a scary CRITICAL error which (a) I always forget to grep for and (b) it's not really CRITICAL if a background process we don't care about fails. So this is an attempt to add exception handling to everything we fire off into the background. --- synapse/app/appservice.py | 15 +++-- synapse/app/federation_sender.py | 27 +++++---- synapse/app/pusher.py | 31 +++++----- synapse/app/synchrotron.py | 95 ++++++++++++++++--------------- synapse/app/user_dir.py | 13 ++++- synapse/appservice/scheduler.py | 25 ++++---- synapse/crypto/keyring.py | 93 +++++++++++++++--------------- synapse/federation/transaction_queue.py | 2 + synapse/federation/transport/server.py | 13 ++++- synapse/groups/attestations.py | 44 +++++++------- synapse/handlers/message.py | 22 +++++-- synapse/handlers/presence.py | 19 +++++-- synapse/handlers/receipts.py | 61 ++++++++++---------- synapse/handlers/typing.py | 43 +++++++------- synapse/notifier.py | 13 +++-- synapse/push/emailpusher.py | 11 ++-- synapse/push/httppusher.py | 5 +- synapse/rest/media/v1/storage_provider.py | 9 ++- synapse/storage/event_push_actions.py | 24 +++++--- synapse/util/logcontext.py | 7 ++- 20 files changed, 335 insertions(+), 237 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index f2540023a7..58f2c9d68c 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..4f2a9ca21a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -237,19 +237,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..739d113ad5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..777da564d7 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -340,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], - ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5f845e80d1..5ba7e9b416 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..dfc8d1b42e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -176,17 +176,20 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + try: + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + except Exception: + logger.exception("Error starting AS recoverer") @defer.inlineCallbacks def _is_service_up(self, service): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..32cbddbc53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -146,53 +146,56 @@ class Keyring(object): verify_requests (List[VerifyKeyRequest]): """ - # create a deferred for each server we're going to look up the keys - # for; we'll resolve them once we have completed our lookups. - # These will be passed into wait_for_previous_lookups to block - # any other lookups until we have finished. - # The deferreds are called with no logcontext. - server_to_deferred = { - rq.server_name: defer.Deferred() - for rq in verify_requests - } - - # We want to wait for any previous lookups to complete before - # proceeding. - yield self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, - ) - - # Actually start fetching keys. - self._get_server_verify_keys(verify_requests) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids - server_to_request_ids = {} - - for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) - - def remove_deferreds(res, verify_request): - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res - - for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, + try: + # create a deferred for each server we're going to look up the keys + # for; we'll resolve them once we have completed our lookups. + # These will be passed into wait_for_previous_lookups to block + # any other lookups until we have finished. + # The deferreds are called with no logcontext. + server_to_deferred = { + rq.server_name: defer.Deferred() + for rq in verify_requests + } + + # We want to wait for any previous lookups to complete before + # proceeding. + yield self.wait_for_previous_lookups( + [rq.server_name for rq in verify_requests], + server_to_deferred, ) + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + # + # map from server name to a set of request ids + server_to_request_ids = {} + + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res + + for verify_request in verify_requests: + verify_request.deferred.addBoth( + remove_deferreds, verify_request, + ) + except Exception: + logger.exception("Error starting key lookups") + @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 963d938edd..ded2b1871a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -323,6 +323,8 @@ class TransactionQueue(object): break yield self._process_presence_inner(states_map.values()) + except Exception: + logger.exception("Error sending presence states to servers") finally: self._processing_pending_presence = False diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ff0656df3e..19d09f5422 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id import functools @@ -152,11 +152,18 @@ class Authenticator(object): # alive retry_timings = yield self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: - logger.info("Marking origin %r as up", origin) - preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) + run_in_background(self._reset_retry_timings, origin) defer.returnValue(origin) + @defer.inlineCallbacks + def _reset_retry_timings(self, origin): + try: + logger.info("Marking origin %r as up", origin) + yield self.store.set_destination_retry_timings(origin, 0, 0) + except Exception: + logger.exception("Error resetting retry timings on %s", origin) + class BaseFederationServlet(object): REQUIRE_AUTH = True diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..7187df2508 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -165,28 +165,32 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestation(group_id, user_id): - if not self.is_mine_id(group_id): - destination = get_domain_from_id(group_id) - elif not self.is_mine_id(user_id): - destination = get_domain_from_id(user_id) - else: - logger.warn( - "Incorrectly trying to do attestations for user: %r in %r", - user_id, group_id, + try: + if not self.is_mine_id(group_id): + destination = get_domain_from_id(group_id) + elif not self.is_mine_id(user_id): + destination = get_domain_from_id(user_id) + else: + logger.warn( + "Incorrectly trying to do attestations for user: %r in %r", + user_id, group_id, + ) + yield self.store.remove_attestation_renewal(group_id, user_id) + return + + attestation = self.attestations.create_attestation(group_id, user_id) + + yield self.transport_client.renew_group_attestation( + destination, group_id, user_id, + content={"attestation": attestation}, ) - yield self.store.remove_attestation_renewal(group_id, user_id) - return - - attestation = self.attestations.create_attestation(group_id, user_id) - yield self.transport_client.renew_group_attestation( - destination, group_id, user_id, - content={"attestation": attestation}, - ) - - yield self.store.update_attestation_renewal( - group_id, user_id, attestation - ) + yield self.store.update_attestation_renewal( + group_id, user_id, attestation + ) + except Exception: + logger.exception("Error renewing attestation of %r in %r", + user_id, group_id) for row in rows: group_id = row["group_id"] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..d168ff5b86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -857,15 +857,25 @@ class EventCreationHandler(object): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") preserve_fn(_notify)() if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..585f3e4da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -254,6 +254,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") + @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes @@ -364,7 +372,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +430,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..823e2e27e1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -205,28 +205,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..939723a404 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -251,9 +251,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id - ) + run_in_background(self._notify_app_services, room_stream_id) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) @@ -267,6 +265,13 @@ class Notifier(object): rooms=[event.room_id], ) + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 58df98a793..ba7286cb72 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -77,10 +77,13 @@ class EmailPusher(object): @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2cbac571b8..1420d378ef 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -94,7 +94,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index c188192f2b..0252afd9d3 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -18,7 +18,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder from synapse.config._base import Config -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import logging import os @@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider): return self.backend.store_file(path, file_info) else: # TODO: Handle errors. - preserve_fn(self.backend.store_file)(path, file_info) + def store(): + try: + return self.backend.store_file(path, file_info) + except Exception: + logger.exception("Error storing file") + run_in_background(store) return defer.succeed(None) def fetch(self, path, file_info): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..d6587e4409 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs): deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield - on. + on (for instance because you want to pass it to deferred.gatherResults()). + + Note that if you completely discard the result, you should make sure that + `f` doesn't raise any deferred exceptions, otherwise a scary-looking + CRITICAL error about an unhandled error will be logged without much + indication about where it came from. """ current = LoggingContext.current_context() res = f(*args, **kwargs) -- cgit 1.5.1 From 13843f771ea64cfd4fb7e2f03854b9506f63fcfd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 12:17:13 +0100 Subject: Trap exceptions thrown within run_in_background Turn any exceptions that get thrown synchronously within run_in_background into Failures instead. --- synapse/util/logcontext.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..127581a0b4 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,7 +308,13 @@ def run_in_background(f, *args, **kwargs): on. """ current = LoggingContext.current_context() - res = f(*args, **kwargs) + try: + res = f(*args, **kwargs) + except: # noqa: E722 + # the assumption here is that the caller doesn't want to be disturbed + # by synchronous exceptions, so let's turn them into Failures. + return defer.fail() + if isinstance(res, defer.Deferred) and not res.called: # The function will have reset the context before returning, so # we need to restore it now. -- cgit 1.5.1 From 9d2c1b8429996cb9766ac636034485e2a1d685cc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 12:52:30 +0100 Subject: Backport deferred.addTimeout Twisted 16.0 doesn't have addTimeout, so let's backport it. --- synapse/http/__init__.py | 2 +- synapse/http/client.py | 6 ++- synapse/http/matrixfederationclient.py | 7 ++-- synapse/notifier.py | 22 +++++++---- synapse/util/async.py | 67 ++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 14 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 20e568bc43..0d47ccdb59 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError): def cancelled_to_request_timed_out_error(value): """Turns CancelledErrors into RequestTimedOutErrors. - For use with deferred.addTimeout() + For use with async.add_timeout_to_deferred """ if isinstance(value, failure.Failure): value.trap(CancelledError) diff --git a/synapse/http/client.py b/synapse/http/client.py index 35c8d51e71..62309c3365 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -20,6 +20,7 @@ from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) from synapse.http import cancelled_to_request_timed_out_error +from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable import synapse.metrics @@ -102,8 +103,9 @@ class SimpleHttpClient(object): request_deferred = self.agent.request( method, uri, *args, **kwargs ) - request_deferred.addTimeout( - 60, reactor, cancelled_to_request_timed_out_error, + add_timeout_to_deferred( + request_deferred, + 60, cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fe4b1636a6..30036fe81c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep +from synapse.util.async import sleep, add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object): Headers(headers_dict), producer ) - request_deferred.addTimeout( + add_timeout_to_deferred( + request_deferred, timeout / 1000. if timeout else 60, - reactor, cancelled_to_request_timed_out_error, + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e4f78b993..a1c06e8fca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.internet.defer import TimeoutError +from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.util.logutils import log_function -from synapse.util.async import ObservableDeferred +from synapse.util.async import ( + ObservableDeferred, add_timeout_to_deferred, + DeferredTimeoutError, +) from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken @@ -332,8 +334,9 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred.addTimeout( - (end_time - now) / 1000., reactor, + add_timeout_to_deferred( + listener.deferred, + (end_time - now) / 1000., ) with PreserveLoggingContext(): yield listener.deferred @@ -347,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break @@ -552,11 +555,14 @@ class Notifier(object): if end_time <= now: break - listener.deferred.addTimeout((end_time - now) / 1000., reactor) + add_timeout_to_deferred( + listener.deferred.addTimeout, + (end_time - now) / 1000., + ) try: with PreserveLoggingContext(): yield listener.deferred - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..1df5c5600c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, preserve_fn @@ -392,3 +394,68 @@ class ReadWriteLock(object): self.key_to_current_writer.pop(key) defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value -- cgit 1.5.1 From 2a13af23bc0561ab48e0a90528231c40ee209724 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:29:27 +0100 Subject: Use run_in_background in preference to preserve_fn While I was going through uses of preserve_fn for other PRs, I converted places which only use the wrapped function once to use run_in_background, to avoid creating the function object. --- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 5 ++--- synapse/appservice/scheduler.py | 12 ++++++------ synapse/crypto/keyring.py | 28 ++++++++++++++++----------- synapse/federation/federation_client.py | 5 +++-- synapse/groups/attestations.py | 4 ++-- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/e2e_keys.py | 6 +++--- synapse/handlers/federation.py | 16 +++++++++------ synapse/handlers/initial_sync.py | 12 +++++++----- synapse/handlers/message.py | 5 +++-- synapse/handlers/typing.py | 7 ++++--- synapse/push/pusherpool.py | 20 +++++++++++-------- synapse/rest/media/v1/preview_url_resource.py | 5 +++-- synapse/storage/events_worker.py | 5 +++-- synapse/storage/stream.py | 5 +++-- synapse/util/async.py | 4 ++-- synapse/util/file_consumer.py | 6 ++++-- synapse/util/logcontext.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 4 ++-- 22 files changed, 97 insertions(+), 71 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..c6daa0d43f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -229,7 +229,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..8bd5c0c2b7 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..0c4ccc58bc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..ba1631b5c8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -51,7 +51,7 @@ components. from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure import logging @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..38944a7326 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( PreserveLoggingContext, - preserve_fn + preserve_fn, + run_in_background, ) from synapse.util.metrics import Measure @@ -127,7 +128,7 @@ class Keyring(object): verify_requests.append(verify_request) - preserve_fn(self._start_key_lookups)(verify_requests) + run_in_background(self._start_key_lookups, verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -313,7 +314,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - preserve_fn(do_iterations)().addErrback(on_err) + run_in_background(do_iterations).addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): @@ -329,8 +330,9 @@ class Keyring(object): """ res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.get_server_verify_keys)( - server_name, key_ids + run_in_background( + self.store.get_server_verify_keys, + server_name, key_ids, ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], @@ -358,7 +360,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(p_name, p_keys) + run_in_background(get_key, p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, @@ -398,7 +400,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(server_name, key_ids) + run_in_background(get_key, server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, @@ -481,7 +483,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -539,7 +542,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -615,7 +619,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_keys_json)( + run_in_background( + self.store.store_server_keys_json, server_name=server_name, key_id=key_id, from_server=server_name, @@ -716,7 +721,8 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. return logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_verify_key)( + run_in_background( + self.store.store_server_verify_key, server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..8adc60863e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,7 +33,7 @@ from synapse.federation.federation_base import ( import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination @@ -417,7 +417,8 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..5f53f17954 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -42,7 +42,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from signedjson.sign import sign_json @@ -192,4 +192,4 @@ class GroupAttestionRenewer(object): group_id = row["group_id"] user_id = row["user_id"] - preserve_fn(_renew_attestation)(group_id, user_id) + run_in_background(_renew_attestation, group_id, user_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0245197c02..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -198,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..fc958404a1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,7 +139,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache ])) @@ -242,7 +242,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries ])) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..c66ca0f381 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -637,7 +637,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..244b98dd8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,7 +850,8 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @@ -862,7 +863,7 @@ class EventCreationHandler(object): extra_users=extra_users ) - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..19cde70adf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..7bb5733090 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -137,8 +137,9 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) @@ -164,7 +165,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) yield make_deferred_yieldable(defer.gatherResults(deferreds)) @@ -207,7 +211,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0fc21540c6..9290d7946f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ from ._base import FileInfo from synapse.api.errors import ( SynapseError, Codes, ) -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -144,7 +144,8 @@ class PreviewUrlResource(Resource): observable = self._cache.get(url) if not observable: - download = preserve_fn(self._do_preview)( + download = run_in_background( + self._do_preview, url, requester.user, ts, ) observable = ObservableDeferred( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..5b245a936c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc @@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..bd07067328 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit): pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() + run_in_background(_concurrently_execute_inner) for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3c8a165331..3380970e4e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from six.moves import queue @@ -70,7 +70,9 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + self._finished_deferred = run_in_background( + threads.deferToThread, self._writer + ) if not streaming: self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..c2edf87e58 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred): returning a deferred. Then, when the deferred completes, restores the current logcontext before running callbacks/errbacks. - (This is more-or-less the opposite operation to preserve_fn.) + (This is more-or-less the opposite operation to run_in_background.) """ if isinstance(deferred, defer.Deferred) and not deferred.called: prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 1101881a2d..18424f6c36 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import collections import contextlib @@ -150,7 +150,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) + ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 47b0bb5eb3..4e93f69d3a 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -203,8 +203,8 @@ class RetryDestinationLimiter(object): ) except Exception: logger.exception( - "Failed to store set_destination_retry_timings", + "Failed to store destination_retry_timings", ) # we deliberately do this in the background. - synapse.util.logcontext.preserve_fn(store_retry_timings)() + synapse.util.logcontext.run_in_background(store_retry_timings) -- cgit 1.5.1 From 4f2f5171b752169e265b5d93c9b4e788171b5326 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 11:39:37 +0200 Subject: replace stringIO imports --- synapse/http/client.py | 4 ++-- synapse/util/logformatter.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/client.py b/synapse/http/client.py index 62309c3365..70a19d9b74 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -40,7 +40,7 @@ from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone -from StringIO import StringIO +from six import StringIO import simplejson as json import logging @@ -507,7 +507,7 @@ class SpiderHttpClient(SimpleHttpClient): reactor, SpiderEndpointFactory(hs) ) - ), [('gzip', GzipDecoder)] + ), [(b'gzip', GzipDecoder)] ) # We could look like Chrome: # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko) diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py index cdbc4bffd7..59ab3c6968 100644 --- a/synapse/util/logformatter.py +++ b/synapse/util/logformatter.py @@ -14,7 +14,7 @@ # limitations under the License. -import StringIO +from six import StringIO import logging import traceback -- cgit 1.5.1 From d82b6ea9e68150aece1fc46cb0821b31cf728910 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 13:57:00 +0200 Subject: Move more xrange to six plus a bonus next() Signed-off-by: Adrian Tschira --- synapse/federation/federation_client.py | 4 +++- synapse/handlers/room_list.py | 4 +++- synapse/storage/registration.py | 4 +++- synapse/storage/schema/delta/30/as_users.py | 4 +++- synapse/storage/stream.py | 4 +++- synapse/storage/tags.py | 4 +++- synapse/util/async.py | 6 ++++-- synapse/util/stringutils.py | 5 +++-- synapse/util/wheel_timer.py | 4 +++- 9 files changed, 28 insertions(+), 11 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..a2067fc390 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,8 @@ import itertools import logging import random +from six.moves import range + from twisted.internet import defer from synapse.api.constants import Membership @@ -413,7 +415,7 @@ class FederationClient(FederationBase): batch_size = 20 missing_events = list(missing_events) - for i in xrange(0, len(missing_events), batch_size): + for i in range(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [ diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index add3f9b009..5757bb7f8a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from six.moves import range + from ._base import BaseHandler from synapse.api.constants import ( @@ -200,7 +202,7 @@ class RoomListHandler(BaseHandler): step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 chunk = [] - for i in xrange(0, len(rooms_to_scan), step): + for i in range(0, len(rooms_to_scan), step): batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 6b557ca0cf..a50717db2d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -22,6 +22,8 @@ from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from six.moves import range + class RegistrationWorkerStore(SQLBaseStore): @cached() @@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore, match = regex.search(user_id) if match: found.add(int(match.group(1))) - for i in xrange(len(found) + 1): + for i in range(len(found) + 1): if i not in found: return i diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index c53e53c94f..85bd1a2006 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -14,6 +14,8 @@ import logging from synapse.config.appservice import load_appservices +from six.moves import range + logger = logging.getLogger(__name__) @@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n)) + user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3b8b539993..52c90d8e04 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -47,6 +47,8 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -196,7 +198,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 13bff9f055..6671d3cfca 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -22,6 +22,8 @@ from twisted.internet import defer import simplejson as json import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore): batch_size = 50 results = [] - for i in xrange(0, len(tag_ids), batch_size): + for i in range(0, len(tag_ids), batch_size): tags = yield self.runInteraction( "get_all_updated_tag_content", get_tag_content, diff --git a/synapse/util/async.py b/synapse/util/async.py index 1df5c5600c..cb53c31123 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -27,6 +27,8 @@ from contextlib import contextmanager import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -158,13 +160,13 @@ def concurrently_execute(func, args, limit): def _concurrently_execute_inner(): try: while True: - yield func(it.next()) + yield func(next(it)) except StopIteration: pass return logcontext.make_deferred_yieldable(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() - for _ in xrange(limit) + for _ in range(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index 95a6168e16..b98b9dc6e4 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -15,6 +15,7 @@ import random import string +from six.moves import range _string_with_symbols = ( string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" @@ -22,12 +23,12 @@ _string_with_symbols = ( def random_string(length): - return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) + return ''.join(random.choice(string.ascii_letters) for _ in range(length)) def random_string_with_symbols(length): return ''.join( - random.choice(_string_with_symbols) for _ in xrange(length) + random.choice(_string_with_symbols) for _ in range(length) ) diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index b70f9a6b0a..7a9e45aca9 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six.moves import range + class _Entry(object): __slots__ = ["end_key", "queue"] @@ -68,7 +70,7 @@ class WheelTimer(object): # Add empty entries between the end of the current list and when we want # to insert. This ensures there are no gaps. self.entries.extend( - _Entry(key) for key in xrange(last_key, then_key + 1) + _Entry(key) for key in range(last_key, then_key + 1) ) self.entries[-1].queue.append(obj) -- cgit 1.5.1 From e9143b659352e87fd9e26c8e5a771c78011bc945 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 23:56:59 +0200 Subject: more bytes strings Signed-off-by: Adrian Tschira --- synapse/http/endpoint.py | 2 +- synapse/http/server.py | 2 +- synapse/rest/media/v1/upload_resource.py | 6 +++--- synapse/util/httpresourcetree.py | 7 +++++-- 4 files changed, 10 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 00572c2897..db455e5909 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -286,7 +286,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t if (len(answers) == 1 and answers[0].type == dns.SRV and answers[0].payload - and answers[0].payload.target == dns.Name('.')): + and answers[0].payload.target == dns.Name(b'.')): raise ConnectError("Service %s unavailable" % service_name) for answer in answers: diff --git a/synapse/http/server.py b/synapse/http/server.py index 8d632290de..55b9ad5251 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -546,6 +546,6 @@ def _request_user_agent_is_curl(request): b"User-Agent", default=[] ) for user_agent in user_agents: - if "curl" in user_agent: + if b"curl" in user_agent: return True return False diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index f6f498cdc5..a31e75cb46 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -81,15 +81,15 @@ class UploadResource(Resource): headers = request.requestHeaders if headers.hasHeader("Content-Type"): - media_type = headers.getRawHeaders("Content-Type")[0] + media_type = headers.getRawHeaders(b"Content-Type")[0] else: raise SynapseError( msg="Upload request missing 'Content-Type'", code=400, ) - # if headers.hasHeader("Content-Disposition"): - # disposition = headers.getRawHeaders("Content-Disposition")[0] + # if headers.hasHeader(b"Content-Disposition"): + # disposition = headers.getRawHeaders(b"Content-Disposition")[0] # TODO(markjh): parse content-dispostion content_uri = yield self.media_repo.create_content( diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py index d747849553..e9f0f292ee 100644 --- a/synapse/util/httpresourcetree.py +++ b/synapse/util/httpresourcetree.py @@ -40,9 +40,12 @@ def create_resource_tree(desired_tree, root_resource): # extra resources to existing nodes. See self._resource_id for the key. resource_mappings = {} for full_path, res in desired_tree.items(): + # twisted requires all resources to be bytes + full_path = full_path.encode("utf-8") + logger.info("Attaching %s to path %s", res, full_path) last_resource = root_resource - for path_seg in full_path.split('/')[1:-1]: + for path_seg in full_path.split(b'/')[1:-1]: if path_seg not in last_resource.listNames(): # resource doesn't exist, so make a "dummy resource" child_resource = NoResource() @@ -57,7 +60,7 @@ def create_resource_tree(desired_tree, root_resource): # =========================== # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] + last_path_seg = full_path.split(b'/')[-1] # if there is already a resource here, thieve its children and # replace it -- cgit 1.5.1 From e482f8cd8504b36dc1ce2c1e51e0dee479d33249 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 2 May 2018 09:12:26 +0100 Subject: Fix incorrect reference to StringIO This was introduced in 4f2f5171 --- synapse/util/logformatter.py | 2 +- tests/util/test_logformatter.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 tests/util/test_logformatter.py (limited to 'synapse/util') diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py index 59ab3c6968..3e42868ea9 100644 --- a/synapse/util/logformatter.py +++ b/synapse/util/logformatter.py @@ -32,7 +32,7 @@ class LogFormatter(logging.Formatter): super(LogFormatter, self).__init__(*args, **kwargs) def formatException(self, ei): - sio = StringIO.StringIO() + sio = StringIO() (typ, val, tb) = ei # log the stack above the exception capture point if possible, but diff --git a/tests/util/test_logformatter.py b/tests/util/test_logformatter.py new file mode 100644 index 0000000000..1a1a8412f2 --- /dev/null +++ b/tests/util/test_logformatter.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys + +from synapse.util.logformatter import LogFormatter +from tests import unittest + + +class TestException(Exception): + pass + + +class LogFormatterTestCase(unittest.TestCase): + def test_formatter(self): + formatter = LogFormatter() + + try: + raise TestException("testytest") + except TestException: + ei = sys.exc_info() + + output = formatter.formatException(ei) + + # check the output looks vaguely sane + self.assertIn("testytest", output) + self.assertIn("Capture point", output) -- cgit 1.5.1 From f22e7cda2c72d461acba664cb083e8c4e3c7572a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 2 May 2018 11:46:23 +0100 Subject: Fix a class of logcontext leaks So, it turns out that if you have a first `Deferred` `D1`, you can add a callback which returns another `Deferred` `D2`, and `D2` must then complete before any further callbacks on `D1` will execute (and later callbacks on `D1` get the *result* of `D2` rather than `D2` itself). So, `D1` might have `called=True` (as in, it has started running its callbacks), but any new callbacks added to `D1` won't get run until `D2` completes - so if you `yield D1` in an `inlineCallbacks` function, your `yield` will 'block'. In conclusion: some of our assumptions in `logcontext` were invalid. We need to make sure that we don't optimise out the logcontext juggling when this situation happens. Fortunately, it is easy to detect by checking `D1.paused`. --- synapse/util/logcontext.py | 60 ++++++++++++++++++++++++-------------- tests/util/test_logcontext.py | 67 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 94 insertions(+), 33 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 01ac71e53e..e086e12213 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -302,7 +302,7 @@ def preserve_fn(f): def run_in_background(f, *args, **kwargs): """Calls a function, ensuring that the current context is restored after return from the function, and that the sentinel context is set once the - deferred returned by the funtion completes. + deferred returned by the function completes. Useful for wrapping functions that return a deferred which you don't yield on (for instance because you want to pass it to deferred.gatherResults()). @@ -320,24 +320,31 @@ def run_in_background(f, *args, **kwargs): # by synchronous exceptions, so let's turn them into Failures. return defer.fail() - if isinstance(res, defer.Deferred) and not res.called: - # The function will have reset the context before returning, so - # we need to restore it now. - LoggingContext.set_current_context(current) - - # The original context will be restored when the deferred - # completes, but there is nothing waiting for it, so it will - # get leaked into the reactor or some other function which - # wasn't expecting it. We therefore need to reset the context - # here. - # - # (If this feels asymmetric, consider it this way: we are - # effectively forking a new thread of execution. We are - # probably currently within a ``with LoggingContext()`` block, - # which is supposed to have a single entry and exit point. But - # by spawning off another deferred, we are effectively - # adding a new exit point.) - res.addBoth(_set_context_cb, LoggingContext.sentinel) + if not isinstance(res, defer.Deferred): + return res + + if res.called and not res.paused: + # The function should have maintained the logcontext, so we can + # optimise out the messing about + return res + + # The function may have reset the context before returning, so + # we need to restore it now. + ctx = LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(_set_context_cb, ctx) return res @@ -354,9 +361,18 @@ def make_deferred_yieldable(deferred): (This is more-or-less the opposite operation to run_in_background.) """ - if isinstance(deferred, defer.Deferred) and not deferred.called: - prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) - deferred.addBoth(_set_context_cb, prev_context) + if not isinstance(deferred, defer.Deferred): + return deferred + + if deferred.called and not deferred.paused: + # it looks like this deferred is ready to run any callbacks we give it + # immediately. We may as well optimise out the logcontext faffery. + return deferred + + # ok, we can't be sure that a yield won't block, so let's reset the + # logcontext, and add a callback to the deferred to restore it. + prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) + deferred.addBoth(_set_context_cb, prev_context) return deferred diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 4850722bc5..7707fbb1f0 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -36,24 +36,28 @@ class LoggingContextTestCase(unittest.TestCase): yield sleep(0) self._check_test_key("one") - def _test_preserve_fn(self, function): + def _test_run_in_background(self, function): sentinel_context = LoggingContext.current_context() callback_completed = [False] - @defer.inlineCallbacks - def cb(): + def test(): context_one.request = "one" - yield function() - self._check_test_key("one") + d = function() - callback_completed[0] = True + def cb(res): + self._check_test_key("one") + callback_completed[0] = True + return res + d.addCallback(cb) + + return d with LoggingContext() as context_one: context_one.request = "one" # fire off function, but don't wait on it. - logcontext.preserve_fn(cb)() + logcontext.run_in_background(test) self._check_test_key("one") @@ -80,20 +84,31 @@ class LoggingContextTestCase(unittest.TestCase): # test is done once d2 finishes return d2 - def test_preserve_fn_with_blocking_fn(self): + def test_run_in_background_with_blocking_fn(self): @defer.inlineCallbacks def blocking_function(): yield sleep(0) - return self._test_preserve_fn(blocking_function) + return self._test_run_in_background(blocking_function) - def test_preserve_fn_with_non_blocking_fn(self): + def test_run_in_background_with_non_blocking_fn(self): @defer.inlineCallbacks def nonblocking_function(): with logcontext.PreserveLoggingContext(): yield defer.succeed(None) - return self._test_preserve_fn(nonblocking_function) + return self._test_run_in_background(nonblocking_function) + + @unittest.DEBUG + def test_run_in_background_with_chained_deferred(self): + # a function which returns a deferred which looks like it has been + # called, but is actually paused + def testfunc(): + return logcontext.make_deferred_yieldable( + _chained_deferred_function() + ) + + return self._test_run_in_background(testfunc) @defer.inlineCallbacks def test_make_deferred_yieldable(self): @@ -118,6 +133,22 @@ class LoggingContextTestCase(unittest.TestCase): # now it should be restored self._check_test_key("one") + @defer.inlineCallbacks + def test_make_deferred_yieldable_with_chained_deferreds(self): + sentinel_context = LoggingContext.current_context() + + with LoggingContext() as context_one: + context_one.request = "one" + + d1 = logcontext.make_deferred_yieldable(_chained_deferred_function()) + # make sure that the context was reset by make_deferred_yieldable + self.assertIs(LoggingContext.current_context(), sentinel_context) + + yield d1 + + # now it should be restored + self._check_test_key("one") + @defer.inlineCallbacks def test_make_deferred_yieldable_on_non_deferred(self): """Check that make_deferred_yieldable does the right thing when its @@ -132,3 +163,17 @@ class LoggingContextTestCase(unittest.TestCase): r = yield d1 self.assertEqual(r, "bum") self._check_test_key("one") + + +# a function which returns a deferred which has been "called", but +# which had a function which returned another incomplete deferred on +# its callback list, so won't yet call any other new callbacks. +def _chained_deferred_function(): + d = defer.succeed(None) + + def cb(res): + d2 = defer.Deferred() + reactor.callLater(0, d2.callback, res) + return d2 + d.addCallback(cb) + return d -- cgit 1.5.1 From a7fe62f0cb7bdb40e5c9b7f09f694e9c0913610c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 3 May 2018 11:16:36 +0100 Subject: Fix logcontext leaks in rate limiter --- synapse/util/ratelimitutils.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 18424f6c36..0ab63c3d7d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,10 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import run_in_background +from synapse.util.logcontext import ( + run_in_background, make_deferred_yieldable, + PreserveLoggingContext, +) import collections import contextlib @@ -176,6 +179,9 @@ class _PerHostRatelimiter(object): return r def on_err(r): + # XXX: why is this necessary? this is called before we start + # processing the request so why would the request be in + # current_processing? self.current_processing.discard(request_id) return r @@ -187,7 +193,7 @@ class _PerHostRatelimiter(object): ret_defer.addCallbacks(on_start, on_err) ret_defer.addBoth(on_both) - return ret_defer + return make_deferred_yieldable(ret_defer) def _on_exit(self, request_id): logger.debug( @@ -197,7 +203,12 @@ class _PerHostRatelimiter(object): self.current_processing.discard(request_id) try: request_id, deferred = self.ready_request_queue.popitem() + + # XXX: why do we do the following? the on_start callback above will + # do it for us. self.current_processing.add(request_id) - deferred.callback(None) + + with PreserveLoggingContext(): + deferred.callback(None) except KeyError: pass -- cgit 1.5.1 From 73cbdef5f773ddd091713d71ac9c63639d7c0825 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Tue, 15 May 2018 17:55:46 +0200 Subject: fix py3 intern and remove unnecessary py3 encode Signed-off-by: Adrian Tschira --- synapse/util/caches/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4adae96681..329ccbb866 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,6 +16,9 @@ import synapse.metrics import os +from six.moves import intern +import six + CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) metrics = synapse.metrics.get_metrics_for("synapse.util.caches") @@ -66,7 +69,9 @@ def intern_string(string): return None try: - string = string.encode("ascii") + if six.PY2: + string = string.encode("ascii") + return intern(string) except UnicodeEncodeError: return string -- cgit 1.5.1 From 45b55e23d34371986ee0da5be784a1f2134fd58a Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sun, 29 Apr 2018 13:54:38 +0200 Subject: Add batch_iter to utils There's a frequent idiom I noticed where an iterable is split up into a number of chunks/batches. Unfortunately that method does not work with iterators like dict.keys() in python3. This implementation works with iterators. Signed-off-by: Adrian Tschira --- synapse/util/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 814a7bf71b..fc11e26623 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -20,6 +20,8 @@ from twisted.internet import defer, reactor, task import time import logging +from itertools import islice + logger = logging.getLogger(__name__) @@ -79,3 +81,19 @@ class Clock(object): except Exception: if not ignore_errs: raise + + +def batch_iter(iterable, size): + """batch an iterable up into tuples with a maximum size + + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size + + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) -- cgit 1.5.1 From df9f72d9e5fe264b86005208e0f096156eb03e4b Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 21 May 2018 19:47:37 -0500 Subject: replacing portions --- synapse/api/auth.py | 2 +- synapse/federation/federation_client.py | 17 ++- synapse/federation/federation_server.py | 16 ++- synapse/federation/send_queue.py | 8 +- synapse/federation/transaction_queue.py | 47 +++----- synapse/handlers/appservice.py | 19 +-- synapse/handlers/presence.py | 58 ++++----- synapse/http/client.py | 20 +--- synapse/http/matrixfederationclient.py | 14 +-- synapse/metrics/__init__.py | 182 ++++++++++------------------- synapse/notifier.py | 23 ++-- synapse/push/httppusher.py | 13 +-- synapse/push/push_rule_evaluator.py | 2 +- synapse/python_dependencies.py | 1 + synapse/replication/tcp/protocol.py | 88 ++++++-------- synapse/util/caches/__init__.py | 57 ++++++--- synapse/util/caches/descriptors.py | 2 +- synapse/util/caches/dictionary_cache.py | 2 +- synapse/util/caches/expiringcache.py | 4 +- synapse/util/caches/response_cache.py | 11 +- synapse/util/caches/stream_change_cache.py | 2 +- synapse/util/metrics.py | 91 ++++----------- tests/__init__.py | 3 + 23 files changed, 268 insertions(+), 414 deletions(-) (limited to 'synapse/util') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index f17fda6315..b052cf532b 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -57,7 +57,7 @@ class Auth(object): self.TOKEN_NOT_FOUND_HTTP_STATUS = 401 self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000) - register_cache("token_cache", self.token_cache) + register_cache("cache", "token_cache", self.token_cache) @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6163f7c466..2761ffae07 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,20 +32,17 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) -import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination -logger = logging.getLogger(__name__) - +from prometheus_client import Counter -# synapse.federation.federation_client is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +logger = logging.getLogger(__name__) -sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) PDU_RETRY_TIME_MS = 1 * 60 * 1000 @@ -108,7 +105,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc(query_type) + sent_queries_counter.labels(query_type).inc() return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, @@ -127,7 +124,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_device_keys") + sent_queries_counter.labels("client_device_keys").inc() return self.transport_layer.query_client_keys( destination, content, timeout ) @@ -137,7 +134,7 @@ class FederationClient(FederationBase): """Query the device keys for a list of user ids hosted on a remote server. """ - sent_queries_counter.inc("user_devices") + sent_queries_counter.labels("user_devices").inc() return self.transport_layer.query_user_devices( destination, user_id, timeout ) @@ -154,7 +151,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_one_time_keys") + sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys( destination, content, timeout ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 247ddc89d5..8211273006 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,12 +27,13 @@ from synapse.federation.federation_base import ( from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function +from prometheus_client import Counter + from six import iteritems # when processing incoming transactions, we try to handle multiple rooms in @@ -41,14 +42,11 @@ TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) -# synapse.federation.federation_server is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.server") - -received_pdus_counter = metrics.register_counter("received_pdus") +received_pdus_counter = Counter("synapse_federation_server_received_pdus", "") -received_edus_counter = metrics.register_counter("received_edus") +received_edus_counter = Counter("synapse_federation_server_received_edus", "") -received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) +received_queries_counter = Counter("synapse_federation_server_received_queries", "", ["type"]) class FederationServer(FederationBase): @@ -131,7 +129,7 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - received_pdus_counter.inc_by(len(transaction.pdus)) + received_pdus_counter.inc(len(transaction.pdus)) pdus_by_room = {} @@ -292,7 +290,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): - received_queries_counter.inc(query_type) + received_queries_counter.labels(query_type).inc() resp = yield self.registry.on_query(query_type, args) defer.returnValue((200, resp)) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0f0c687b37..e6e1888f3a 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,7 +33,7 @@ from .units import Edu from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure -import synapse.metrics +from synapse.metrics import LaterGauge from blist import sorteddict from collections import namedtuple @@ -45,9 +45,6 @@ from six import itervalues, iteritems logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -77,8 +74,7 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - metrics.register_callback( - queue_name + "_size", + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), "", lambda: len(queue), ) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ded2b1871a..778924a13c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,23 +26,18 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics +from synapse.metrics import LaterGauge +from synapse.metrics import ( + sent_edus_counter, sent_transactions_counter, events_processed_counter) + +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = client_metrics.register_distribution( - "sent_pdu_destinations" -) -sent_edus_counter = client_metrics.register_counter("sent_edus") - -sent_transactions_counter = client_metrics.register_counter("sent_transactions") - -events_processed_counter = client_metrics.register_counter("events_processed") +sent_pdus_destination_dist = Counter("synapse_federation_client_sent_pdu_destinations", "") class TransactionQueue(object): @@ -69,8 +64,7 @@ class TransactionQueue(object): # done self.pending_transactions = {} - metrics.register_callback( - "pending_destinations", + LaterGauge("pending_destinations", "", [], lambda: len(self.pending_transactions), ) @@ -94,12 +88,12 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - metrics.register_callback( - "pending_pdus", + LaterGauge( + "pending_pdus", "", [], lambda: sum(map(len, pdus.values())), ) - metrics.register_callback( - "pending_edus", + LaterGauge( + "pending_edus", "", [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) @@ -241,18 +235,15 @@ class TransactionQueue(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_lag.set( - now - ts, "federation_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "federation_sender", - ) + synapse.metrics.event_processing_lag.labels( + "federation_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "federation_sender").set(ts) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) + synapse.metrics.event_processing_positions.labels( + "federation_sender").set(next_token) finally: self._is_processing = False @@ -275,7 +266,7 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc_by(len(destinations)) + sent_pdus_destination_dist.inc(len(destinations)) for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index b596f098fd..a7345331af 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -21,14 +21,13 @@ from synapse.util.metrics import Measure from synapse.util.logcontext import ( make_deferred_yieldable, run_in_background, ) +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -events_processed_counter = metrics.register_counter("events_processed") +events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") def log_failure(failure): @@ -128,18 +127,12 @@ class ApplicationServicesHandler(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_positions.set( - upper_bound, "appservice_sender", - ) + synapse.metrics.event_processing_positions.labels("appservice_sender").set(upper_bound) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_lag.set( - now - ts, "appservice_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "appservice_sender", - ) + synapse.metrics.event_processing_lag.labels("appservice_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels("appservice_sender").set(ts) finally: self.is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 585f3e4da2..06d937ef3a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -36,27 +36,27 @@ from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id -import synapse.metrics +from synapse.metrics import LaterGauge import logging +from prometheus_client import Counter + logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) -notified_presence_counter = metrics.register_counter("notified_presence") -federation_presence_out_counter = metrics.register_counter("federation_presence_out") -presence_updates_counter = metrics.register_counter("presence_updates") -timers_fired_counter = metrics.register_counter("timers_fired") -federation_presence_counter = metrics.register_counter("federation_presence") -bump_active_time_counter = metrics.register_counter("bump_active_time") +notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") +federation_presence_out_counter = Counter("synapse_handler_presence_federation_presence_out", "") +presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") +timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") +federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") +bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") -get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) -notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) -state_transition_counter = metrics.register_counter( - "state_transition", labels=["from", "to"] +notify_reason_counter = Counter("synapse_handler_presence_notify_reason", "", ["reason"]) +state_transition_counter = Counter("synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -137,9 +137,9 @@ class PresenceHandler(object): for state in active_presence } - metrics.register_callback( - "user_to_current_state_size", lambda: len(self.user_to_current_state) - ) + LaterGauge( + "user_to_current_state_size", "", [], lambda: len(self.user_to_current_state) + ).register() now = self.clock.time_msec() for state in active_presence: @@ -208,7 +208,7 @@ class PresenceHandler(object): 60 * 1000, ) - metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer)) + LaterGauge("wheel_timer_size", "", [], lambda: len(self.wheel_timer)).register() @defer.inlineCallbacks def _on_shutdown(self): @@ -311,10 +311,10 @@ class PresenceHandler(object): # TODO: We should probably ensure there are no races hereafter - presence_updates_counter.inc_by(len(new_states)) + presence_updates_counter.inc(len(new_states)) if to_notify: - notified_presence_counter.inc_by(len(to_notify)) + notified_presence_counter.inc(len(to_notify)) yield self._persist_and_notify(to_notify.values()) self.unpersisted_users_changes |= set(s.user_id for s in new_states) @@ -325,7 +325,7 @@ class PresenceHandler(object): if user_id not in to_notify } if to_federation_ping: - federation_presence_out_counter.inc_by(len(to_federation_ping)) + federation_presence_out_counter.inc(len(to_federation_ping)) self._push_to_remotes(to_federation_ping.values()) @@ -363,7 +363,7 @@ class PresenceHandler(object): for user_id in users_to_check ] - timers_fired_counter.inc_by(len(states)) + timers_fired_counter.inc(len(states)) changes = handle_timeouts( states, @@ -707,7 +707,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace(**new_fields)) if updates: - federation_presence_counter.inc_by(len(updates)) + federation_presence_counter.inc(len(updates)) yield self._update_states(updates) @defer.inlineCallbacks @@ -982,28 +982,28 @@ def should_notify(old_state, new_state): return False if old_state.status_msg != new_state.status_msg: - notify_reason_counter.inc("status_msg_change") + notify_reason_counter.labels("status_msg_change").inc() return True if old_state.state != new_state.state: - notify_reason_counter.inc("state_change") - state_transition_counter.inc(old_state.state, new_state.state) + notify_reason_counter.labels("state_change").inc() + state_transition_counter.labels(old_state.state, new_state.state).inc() return True if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: - notify_reason_counter.inc("current_active_change") + notify_reason_counter.labels("current_active_change").inc() return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive if not new_state.currently_active: - notify_reason_counter.inc("last_active_change_online") + notify_reason_counter.labels("last_active_change_online").inc() return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.inc("last_active_change_not_online") + notify_reason_counter.labels("last_active_change_not_online").inc() return True return False @@ -1077,14 +1077,14 @@ class PresenceEventSource(object): if changed is not None and len(changed) < 500: # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list - get_updates_counter.inc("stream") + get_updates_counter.labels("stream").inc() for other_user_id in changed: if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) else: # Too many possible updates. Find all users we can see and check # if any of them have changed. - get_updates_counter.inc("full") + get_updates_counter.labels("full").inc() if from_key: user_ids_changed = stream_change_cache.get_entities_changed( diff --git a/synapse/http/client.py b/synapse/http/client.py index 70a19d9b74..61a1d2e2b3 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -23,7 +23,6 @@ from synapse.http import cancelled_to_request_timed_out_error from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -import synapse.metrics from synapse.http.endpoint import SpiderEndpoint from canonicaljson import encode_canonical_json @@ -42,6 +41,7 @@ from twisted.web._newclient import ResponseDone from six import StringIO +from prometheus_client import Counter import simplejson as json import logging import urllib @@ -49,16 +49,8 @@ import urllib logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", ["method", "code"]) class SimpleHttpClient(object): @@ -95,7 +87,7 @@ class SimpleHttpClient(object): def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it - outgoing_requests_counter.inc(method) + outgoing_requests_counter.labels(method).inc() logger.info("Sending request %s %s", method, uri) @@ -109,14 +101,14 @@ class SimpleHttpClient(object): ) response = yield make_deferred_yieldable(request_deferred) - incoming_responses_counter.inc(method, response.code) + incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", method, uri, response.code ) defer.returnValue(response) except Exception as e: - incoming_responses_counter.inc(method, "ERR") + incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", method, uri, type(e).__name__, e.message diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4b2b85464d..259d3884e2 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,19 +43,13 @@ import sys import urllib from six.moves.urllib import parse as urlparse +from prometheus_client import Counter + logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", "", ["method", "code"]) MAX_LONG_RETRIES = 10 diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e3b831db67..973ba6506f 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,14 +18,13 @@ import functools import time import gc import platform +import attr -from twisted.internet import reactor +from prometheus_client import Gauge, Histogram, Counter +from prometheus_client.core import ( + GaugeMetricFamily, CounterMetricFamily, REGISTRY) -from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, GaugeMetric, -) -from .process_collector import register_process_collector +from twisted.internet import reactor logger = logging.getLogger(__name__) @@ -34,149 +33,94 @@ logger = logging.getLogger(__name__) running_on_pypy = platform.python_implementation() == 'PyPy' all_metrics = [] all_collectors = [] +all_gauges = {} +@attr.s(hash=True) +class LaterGauge(object): -class Metrics(object): - """ A single Metrics object gives a (mutable) slice view of the all_metrics - dict, allowing callers to easily register new metrics that are namespaced - nicely.""" - - def __init__(self, name): - self.name_prefix = name - - def make_subspace(self, name): - return Metrics("%s_%s" % (self.name_prefix, name)) - - def register_collector(self, func): - all_collectors.append(func) - - def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s_%s" % (self.name_prefix, name) - - metric = metric_class(full_name, *args, **kwargs) - - all_metrics.append(metric) - return metric - - def register_counter(self, *args, **kwargs): - """ - Returns: - CounterMetric - """ - return self._register(CounterMetric, *args, **kwargs) - - def register_gauge(self, *args, **kwargs): - """ - Returns: - GaugeMetric - """ - return self._register(GaugeMetric, *args, **kwargs) + name = attr.ib() + desc = attr.ib() + labels = attr.ib(hash=False) + caller = attr.ib() - def register_callback(self, *args, **kwargs): - """ - Returns: - CallbackMetric - """ - return self._register(CallbackMetric, *args, **kwargs) + def collect(self): - def register_distribution(self, *args, **kwargs): - """ - Returns: - DistributionMetric - """ - return self._register(DistributionMetric, *args, **kwargs) - - def register_cache(self, *args, **kwargs): - """ - Returns: - CacheMetric - """ - return self._register(CacheMetric, *args, **kwargs) + g = GaugeMetricFamily(self.name, self.desc, self.labels) + try: + calls = self.caller() + except Exception as e: + print(e) + logger.err() + yield g -def register_memory_metrics(hs): - try: - import psutil - process = psutil.Process() - process.memory_info().rss - except (ImportError, AttributeError): - logger.warn( - "psutil is not installed or incorrect version." - " Disabling memory metrics." - ) - return - metric = MemoryUsageMetric(hs, psutil) - all_metrics.append(metric) + if isinstance(calls, dict): + for k, v in calls.items(): + g.add_metric(k, v) + else: + g.add_metric([], calls) + yield g -def get_metrics_for(pkg_name): - """ Returns a Metrics instance for conveniently creating metrics - namespaced with the given name prefix. """ + def register(self): + if self.name in all_gauges.keys(): + REGISTRY.unregister(all_gauges.pop(self.name)) - # Convert a "package.name" to "package_name" because Prometheus doesn't - # let us use . in metric names - return Metrics(pkg_name.replace(".", "_")) + REGISTRY.register(self) + all_gauges[self.name] = self -def render_all(): - strs = [] +# +# Python GC metrics +# - for collector in all_collectors: - collector() +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram("python_gc_time", "Time taken to GC (ms)", ["gen"], buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) - for metric in all_metrics: - try: - strs += metric.render() - except Exception: - strs += ["# FAILED to render"] - logger.exception("Failed to render metric") +class GCCounts(object): + def collect(self): + gc_counts = gc.get_count() - strs.append("") # to generate a final CRLF + cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) - return "\n".join(strs) + yield cm +REGISTRY.register(GCCounts()) -register_process_collector(get_metrics_for("process")) +# +# Twisted reactor metrics +# +tick_time = Histogram("python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (ms)", buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000]) +pending_calls_metric = Histogram("python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) -python_metrics = get_metrics_for("python") +# +# Federation Metrics +# -gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) -python_metrics.register_callback( - "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] -) +sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") -reactor_metrics = get_metrics_for("python.twisted.reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") -synapse_metrics = get_metrics_for("synapse") +events_processed_counter = Counter("synapse_federation_client_events_processed", "") # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. -event_processing_positions = synapse_metrics.register_gauge( - "event_processing_positions", labels=["name"], -) +event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) # Used to track the current max events stream position -event_persisted_position = synapse_metrics.register_gauge( - "event_persisted_position", -) +event_persisted_position = Gauge("synapse_event_persisted_position", "") # Used to track the received_ts of the last event processed by various # components -event_processing_last_ts = synapse_metrics.register_gauge( - "event_processing_last_ts", labels=["name"], -) +event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. -event_processing_lag = synapse_metrics.register_gauge( - "event_processing_lag", labels=["name"], -) - +event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) def runUntilCurrentTimer(func): @@ -206,8 +150,8 @@ def runUntilCurrentTimer(func): # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. - tick_time.inc_by(end - start) - pending_calls_metric.inc_by(num_pending) + tick_time.observe(end - start) + pending_calls_metric.observe(num_pending) if running_on_pypy: return ret @@ -224,8 +168,8 @@ def runUntilCurrentTimer(func): unreachable = gc.collect(i) end = time.time() * 1000 - gc_time.inc_by(end - start, i) - gc_unreachable.inc_by(unreachable, i) + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) return ret diff --git a/synapse/notifier.py b/synapse/notifier.py index 8355c7d621..123e6f1840 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,22 +28,19 @@ from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client -import synapse.metrics +from synapse.metrics import LaterGauge from collections import namedtuple +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) +notified_events_counter = Counter("synapse_notifier_notified_events", "") -notified_events_counter = metrics.register_counter("notified_events") - -users_woken_by_stream_counter = metrics.register_counter( - "users_woken_by_stream", labels=["stream"] -) +users_woken_by_stream_counter = Counter("synapse_notifier_users_woken_by_stream", "", ["stream"]) # TODO(paul): Should be shared somewhere @@ -108,7 +105,7 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred - users_woken_by_stream_counter.inc(stream_key) + users_woken_by_stream_counter.labels(stream_key).inc() with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) @@ -197,14 +194,14 @@ class Notifier(object): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) - metrics.register_callback("listeners", count_listeners) + LaterGauge("listeners", "", [], count_listeners) - metrics.register_callback( - "rooms", + LaterGauge( + "rooms", "", [], lambda: count(bool, self.room_to_user_streams.values()), ) - metrics.register_callback( - "users", + LaterGauge( + "users", "", [], lambda: len(self.user_to_user_stream), ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index b077e1a446..e22088ad6f 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -20,22 +20,17 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled from . import push_rule_evaluator from . import push_tools -import synapse from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure -logger = logging.getLogger(__name__) +from prometheus_client import Counter -metrics = synapse.metrics.get_metrics_for(__name__) +logger = logging.getLogger(__name__) -http_push_processed_counter = metrics.register_counter( - "http_pushes_processed", -) +http_push_processed_counter = Counter("http_pushes_processed", "") -http_push_failed_counter = metrics.register_counter( - "http_pushes_failed", -) +http_push_failed_counter = Counter("http_pushes_failed", "") class HttpPusher(object): diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 3601f2d365..c3e6c5c258 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -150,7 +150,7 @@ class PushRuleEvaluatorForEvent(object): # Caches (glob, word_boundary) -> regex for push. See _glob_matches regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR) -register_cache("regex_push_cache", regex_cache) +register_cache("cache", "regex_push_cache", regex_cache) def _glob_matches(glob, value, word_boundary=False): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 216db4d164..478c497722 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -56,6 +56,7 @@ REQUIREMENTS = { "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], "six": ["six"], + "prometheus_client": ["prometheus_client"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..5848f57c5e 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -60,19 +60,19 @@ from .commands import ( ) from .streams import STREAMS_MAP +from synapse.metrics import LaterGauge from synapse.util.stringutils import random_string -from synapse.metrics.metric import CounterMetric + +from prometheus_client import Counter + +from collections import defaultdict import logging -import synapse.metrics import struct import fcntl - -metrics = synapse.metrics.get_metrics_for(__name__) - -connection_close_counter = metrics.register_counter( - "close_reason", labels=["reason_type"], +connection_close_counter = Counter( + "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"], ) @@ -136,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None - self.inbound_commands_counter = CounterMetric( - "inbound_commands", labels=["command"], - ) - self.outbound_commands_counter = CounterMetric( - "outbound_commands", labels=["command"], - ) + self.inbound_commands_counter = defaultdict(int) + self.outbound_commands_counter = defaultdict(int) def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -201,7 +197,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter.inc(cmd_name) + self.inbound_commands_counter[cmd_name] = self.inbound_commands_counter[cmd_name] + 1 cmd_cls = COMMAND_MAP[cmd_name] try: @@ -251,8 +247,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - self.outbound_commands_counter.inc(cmd.NAME) - + self.outbound_commands_counter[cmd.NAME] = self.outbound_commands_counter[cmd.NAME] + 1 string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -317,9 +312,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def connectionLost(self, reason): logger.info("[%s] Replication connection closed: %r", self.id(), reason) if isinstance(reason, Failure): - connection_close_counter.inc(reason.type.__name__) + connection_close_counter.labels(reason.type.__name__).inc() else: - connection_close_counter.inc(reason.__class__.__name__) + connection_close_counter.labels(reason.__class__.__name__).inc() try: # Remove us from list of connections to be monitored @@ -566,14 +561,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # The following simply registers metrics for the replication connections -metrics.register_callback( - "pending_commands", +pending_commands = LaterGauge( + "pending_commands", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) def transport_buffer_size(protocol): @@ -583,14 +576,12 @@ def transport_buffer_size(protocol): return 0 -metrics.register_callback( - "transport_send_buffer", +transport_send_buffer = LaterGauge( + "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) def transport_kernel_read_buffer_size(protocol, read=True): @@ -608,48 +599,37 @@ def transport_kernel_read_buffer_size(protocol, read=True): return 0 -metrics.register_callback( - "transport_kernel_send_buffer", +tcp_transport_kernel_send_buffer = LaterGauge( + "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) -metrics.register_callback( - "transport_kernel_read_buffer", +tcp_transport_kernel_read_buffer = LaterGauge( + "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) -metrics.register_callback( - "inbound_commands", +tcp_inbound_commands = LaterGauge( + "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() - }, - labels=["command", "name", "conn_id"], -) + for k, count in p.inbound_commands_counter.items() + }) -metrics.register_callback( - "outbound_commands", +tcp_outbound_commands = LaterGauge( + "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() - }, - labels=["command", "name", "conn_id"], -) + for k, count in p.outbound_commands_counter.items() + }) # number of updates received for each RDATA stream -inbound_rdata_count = metrics.register_counter( - "inbound_rdata_count", - labels=["stream_name"], -) +inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", ["stream_name"]) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4adae96681..438dcddf55 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,29 +13,52 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.metrics +from prometheus_client.core import GaugeMetricFamily, REGISTRY + import os CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) -metrics = synapse.metrics.get_metrics_for("synapse.util.caches") - caches_by_name = {} -# cache_counter = metrics.register_cache( -# "cache", -# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, -# labels=["name"], -# ) - - -def register_cache(name, cache): - caches_by_name[name] = cache - return metrics.register_cache( - "cache", - lambda: len(cache), - name, - ) +collectors_by_name = {} + +def register_cache(name, cache_name, cache): + + # Check if the metric is already registered. Unregister it, if so. + metric_name = "synapse_util_caches_%s:%s" % (name, cache_name,) + if metric_name in collectors_by_name.keys(): + REGISTRY.unregister(collectors_by_name[metric_name]) + + class CacheMetric(object): + + hits = 0 + misses = 0 + evicted_size = 0 + + def inc_hits(self): + self.hits += 1 + + def inc_misses(self): + self.misses += 1 + + def inc_evictions(self, size=1): + self.evicted_size += size + + def collect(self): + cache_size = len(cache) + + gm = GaugeMetricFamily(metric_name, "", labels=["size", "hits", "misses", "total"]) + gm.add_metric(["size"], cache_size) + gm.add_metric(["hits"], self.hits) + gm.add_metric(["misses"], self.misses) + gm.add_metric(["total"], self.hits + self.misses) + yield gm + metric = CacheMetric() + REGISTRY.register(metric) + caches_by_name[cache_name] = cache + collectors_by_name[metric_name] = metric + return metric KNOWN_KEYS = { key: key for key in diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 68285a7594..a4188eb099 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -80,7 +80,7 @@ class Cache(object): self.name = name self.keylen = keylen self.thread = None - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("descriptor", name, self.cache) def _on_evicted(self, evicted_count): self.metrics.inc_evictions(evicted_count) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 1709e8b429..bdc21e348f 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -55,7 +55,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("dictionary", name, self.cache) def check_thread(self): expected_thread = self.thread diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 0aa103eecb..ff04c91955 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -52,12 +52,12 @@ class ExpiringCache(object): self._cache = OrderedDict() - self.metrics = register_cache(cache_name, self) - self.iterable = iterable self._size_estimate = 0 + self.metrics = register_cache("expiring", cache_name, self) + def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 7f79333e96..a8491b42d5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer from synapse.util.async import ObservableDeferred -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -38,15 +38,16 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000. self._name = name - self._metrics = cache_metrics.register_cache( - "response_cache", - size_callback=lambda: self.size(), - cache_name=name, + self._metrics = register_cache( + "response_cache", name, self ) def size(self): return len(self.pending_result_cache) + def __len__(self): + return self.size() + def get(self, key): """Look up the given key. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 941d873ab8..a7fe0397fa 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -38,7 +38,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = register_cache(self.name, self._cache) + self.metrics = register_cache("cache", self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e4b5687a4b..a964286d85 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -15,8 +15,8 @@ from twisted.internet import defer +from prometheus_client import Counter from synapse.util.logcontext import LoggingContext -import synapse.metrics from functools import wraps import logging @@ -24,66 +24,21 @@ import logging logger = logging.getLogger(__name__) +block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) -metrics = synapse.metrics.get_metrics_for(__name__) - -# total number of times we have hit this block -block_counter = metrics.register_counter( - "block_count", - labels=["block_name"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_block_timer:count", - "_block_ru_utime:count", - "_block_ru_stime:count", - "_block_db_txn_count:count", - "_block_db_txn_duration:count", - ) - ) -) - -block_timer = metrics.register_counter( - "block_time_seconds", - labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_timer:total", - ), -) - -block_ru_utime = metrics.register_counter( - "block_ru_utime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_utime:total", - ), -) - -block_ru_stime = metrics.register_counter( - "block_ru_stime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_stime:total", - ), -) - -block_db_txn_count = metrics.register_counter( - "block_db_txn_count", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", - ), -) +block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) + +block_ru_utime = Counter("synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) + +block_ru_stime = Counter("synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) + +block_db_txn_count = Counter("synapse_util_metrics_block_db_txn_count", "", ["block_name"]) # seconds spent waiting for db txns, excluding scheduling time, in this block -block_db_txn_duration = metrics.register_counter( - "block_db_txn_duration_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_duration:total", - ), -) +block_db_txn_duration = Counter("synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) # seconds spent waiting for a db connection, in this block -block_db_sched_duration = metrics.register_counter( - "block_db_sched_duration_seconds", labels=["block_name"], -) +block_db_sched_duration = Counter("synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) def measure_func(name): @@ -132,8 +87,8 @@ class Measure(object): duration = self.clock.time_msec() - self.start - block_counter.inc(self.name) - block_timer.inc_by(duration, self.name) + block_counter.labels(self.name).inc() + block_timer.labels(self.name).inc(duration) context = LoggingContext.current_context() @@ -150,19 +105,13 @@ class Measure(object): ru_utime, ru_stime = context.get_resource_usage() - block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) - block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by( - context.db_txn_count - self.db_txn_count, self.name - ) - block_db_txn_duration.inc_by( - (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000., - self.name - ) - block_db_sched_duration.inc_by( - (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000., - self.name - ) + block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime) + block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime) + block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count) + block_db_txn_duration.labels(self.name).inc( + (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.) + block_db_sched_duration.labels(self.name).inc( + (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/tests/__init__.py b/tests/__init__.py index bfebb0f644..aab20e8e02 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,3 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from twisted.trial import util +util.DEFAULT_TIMEOUT_DURATION = 10 -- cgit 1.5.1 From 020377a5504602cb5d2ce555c77bd18eefc87635 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 11:16:07 +0100 Subject: Fix logcontext resource usage tracking --- synapse/util/logcontext.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index eab9d57650..0bbda1dcaf 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -60,7 +60,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", - "usage_start", "usage_end", + "usage_start", "main_thread", "alive", "request", "tag", ] @@ -109,8 +109,10 @@ class LoggingContext(object): # ms spent waiting for db txns to be scheduled self.db_sched_duration_ms = 0 + # If alive has the thread resource usage when the logcontext last + # became active. self.usage_start = None - self.usage_end = None + self.main_thread = threading.current_thread() self.request = None self.tag = "" @@ -159,7 +161,7 @@ class LoggingContext(object): """Restore the logging context in thread local storage to the state it was before this context was entered. Returns: - None to avoid suppressing any exeptions that were thrown. + None to avoid suppressing any exceptions that were thrown. """ current = self.set_current_context(self.previous_context) if current is not self: @@ -185,29 +187,43 @@ class LoggingContext(object): def start(self): if threading.current_thread() is not self.main_thread: + logger.warning("Started logcontext %s on different thread", self) return - if self.usage_start and self.usage_end: - self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime - self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime - self.usage_start = None - self.usage_end = None - + # If we haven't already started record the thread resource usage so + # far if not self.usage_start: self.usage_start = get_thread_resource_usage() def stop(self): if threading.current_thread() is not self.main_thread: + logger.warning("Stopped logcontext %s on different thread", self) return + # When we stop lets record the resource used since we started if self.usage_start: - self.usage_end = get_thread_resource_usage() + usage_end = get_thread_resource_usage() + + self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime + self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime + + self.usage_start = None + else: + logger.warning("Called stop on logcontext %s without calling start", self) def get_resource_usage(self): + """Get CPU time used by this logcontext so far. + + Returns: + tuple[float, float]: The user and system CPU usage in seconds + """ ru_utime = self.ru_utime ru_stime = self.ru_stime - if self.usage_start and threading.current_thread() is self.main_thread: + # If we are on the correct thread and we're currently running then we + # can include resource usage so far. + is_main_thread = threading.current_thread() is self.main_thread + if self.alive and self.usage_start and is_main_thread: current = get_thread_resource_usage() ru_utime += current.ru_utime - self.usage_start.ru_utime ru_stime += current.ru_stime - self.usage_start.ru_stime -- cgit 1.5.1 From 7948ecf2342d7053a6ff6daa01268b9331e1c569 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 11:39:43 +0100 Subject: Comment --- synapse/util/logcontext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 0bbda1dcaf..914f616312 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -200,7 +200,7 @@ class LoggingContext(object): logger.warning("Stopped logcontext %s on different thread", self) return - # When we stop lets record the resource used since we started + # When we stop, let's record the resource used since we started if self.usage_start: usage_end = get_thread_resource_usage() -- cgit 1.5.1 From 85ba83eb5100abf02cf373d9a8d5010526facd45 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 16:28:23 -0500 Subject: fixes --- synapse/app/homeserver.py | 6 +++-- synapse/federation/transaction_queue.py | 6 ++--- synapse/metrics/__init__.py | 12 ++++++++-- synapse/notifier.py | 6 ++--- synapse/push/httppusher.py | 4 ++-- synapse/util/caches/__init__.py | 40 ++++++++++++++++++++++++--------- synapse/util/caches/descriptors.py | 2 +- 7 files changed, 52 insertions(+), 24 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a5b135193f..449bfacdb9 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -34,6 +34,7 @@ from synapse.module_api import ModuleApi from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ check_requirements @@ -60,6 +61,8 @@ from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File +from prometheus_client.twisted import MetricsResource + logger = logging.getLogger("synapse.app.homeserver") @@ -229,8 +232,7 @@ class SynapseHomeServer(HomeServer): resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) if name == "metrics" and self.get_config().enable_metrics: - from prometheus_client.twisted import MetricsResource - resources[METRICS_PREFIX] = MetricsResource() + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy()) if name == "replication": resources[REPLICATION_PREFIX] = ReplicationRestResource(self) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 778924a13c..2049351fdd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -64,7 +64,7 @@ class TransactionQueue(object): # done self.pending_transactions = {} - LaterGauge("pending_destinations", "", [], + LaterGauge("synapse_federation_client_pending_destinations", "", [], lambda: len(self.pending_transactions), ) @@ -89,11 +89,11 @@ class TransactionQueue(object): self.pending_edus_keyed_by_dest = edus_keyed = {} LaterGauge( - "pending_pdus", "", [], + "synapse_federation_client_pending_pdus", "", [], lambda: sum(map(len, pdus.values())), ) LaterGauge( - "pending_edus", "", [], + "synapse_federation_client_pending_edus", "", [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ab0b921497..38408efb54 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -29,12 +29,20 @@ from twisted.internet import reactor logger = logging.getLogger(__name__) - running_on_pypy = platform.python_implementation() == 'PyPy' all_metrics = [] all_collectors = [] all_gauges = {} + +class RegistryProxy(object): + + def collect(self): + for metric in REGISTRY.collect(): + if not metric.name.startswith("__"): + yield metric + + @attr.s(hash=True) class LaterGauge(object): @@ -45,7 +53,7 @@ class LaterGauge(object): def collect(self): - g = GaugeMetricFamily(self.name, self.desc, self.labels) + g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) try: calls = self.caller() diff --git a/synapse/notifier.py b/synapse/notifier.py index 123e6f1840..40cc553918 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -194,14 +194,14 @@ class Notifier(object): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) - LaterGauge("listeners", "", [], count_listeners) + LaterGauge("synapse_notifier_listeners", "", [], count_listeners) LaterGauge( - "rooms", "", [], + "synapse_notifier_rooms", "", [], lambda: count(bool, self.room_to_user_streams.values()), ) LaterGauge( - "users", "", [], + "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream), ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e22088ad6f..bf7ff74a1a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -28,9 +28,9 @@ from prometheus_client import Counter logger = logging.getLogger(__name__) -http_push_processed_counter = Counter("http_pushes_processed", "") +http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") -http_push_failed_counter = Counter("http_pushes_failed", "") +http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "") class HttpPusher(object): diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 438dcddf55..1c511a7072 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from prometheus_client.core import GaugeMetricFamily, REGISTRY +from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily import os @@ -22,10 +22,20 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) caches_by_name = {} collectors_by_name = {} -def register_cache(name, cache_name, cache): +cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) +cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) +cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) +cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) + +response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) +response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) +response_cache_evicted = Gauge("synapse_util_caches_response_cache:evicted_size", "", ["name"]) +response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) + +def register_cache(cache_type, cache_name, cache): # Check if the metric is already registered. Unregister it, if so. - metric_name = "synapse_util_caches_%s:%s" % (name, cache_name,) + metric_name = "cache_%s_%s" % (cache_type, cache_name,) if metric_name in collectors_by_name.keys(): REGISTRY.unregister(collectors_by_name[metric_name]) @@ -44,15 +54,22 @@ def register_cache(name, cache_name, cache): def inc_evictions(self, size=1): self.evicted_size += size - def collect(self): - cache_size = len(cache) + def describe(self): + return [] - gm = GaugeMetricFamily(metric_name, "", labels=["size", "hits", "misses", "total"]) - gm.add_metric(["size"], cache_size) - gm.add_metric(["hits"], self.hits) - gm.add_metric(["misses"], self.misses) - gm.add_metric(["total"], self.hits + self.misses) - yield gm + def collect(self): + if cache_type == "response_cache": + response_cache_size.labels(cache_name).set(len(cache)) + response_cache_hits.labels(cache_name).set(self.hits) + response_cache_evicted.labels(cache_name).set(self.evicted_size) + response_cache_total.labels(cache_name).set(self.hits + self.misses) + else: + cache_size.labels(cache_name).set(len(cache)) + cache_hits.labels(cache_name).set(self.hits) + cache_evicted.labels(cache_name).set(self.evicted_size) + cache_total.labels(cache_name).set(self.hits + self.misses) + + yield GaugeMetricFamily("__unused", "") metric = CacheMetric() REGISTRY.register(metric) @@ -60,6 +77,7 @@ def register_cache(name, cache_name, cache): collectors_by_name[metric_name] = metric return metric + KNOWN_KEYS = { key: key for key in ( diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index a4188eb099..8a9dcb2fc2 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -80,7 +80,7 @@ class Cache(object): self.name = name self.keylen = keylen self.thread = None - self.metrics = register_cache("descriptor", name, self.cache) + self.metrics = register_cache("cache", name, self.cache) def _on_evicted(self, evicted_count): self.metrics.inc_evictions(evicted_count) -- cgit 1.5.1 From 071206304d088aac8bd0e2fff600141dae1d09b6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 16:54:22 -0500 Subject: cleanup pep8 errors --- synapse/federation/federation_server.py | 5 +- synapse/federation/transaction_queue.py | 22 ++++-- synapse/http/request_metrics.py | 114 ++++++++++++++++++++++++-------- synapse/notifier.py | 3 +- synapse/util/caches/__init__.py | 7 +- 5 files changed, 114 insertions(+), 37 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8211273006..2d420a58a2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -46,10 +46,13 @@ received_pdus_counter = Counter("synapse_federation_server_received_pdus", "") received_edus_counter = Counter("synapse_federation_server_received_edus", "") -received_queries_counter = Counter("synapse_federation_server_received_queries", "", ["type"]) +received_queries_counter = Counter( + "synapse_federation_server_received_queries", "", ["type"] +) class FederationServer(FederationBase): + def __init__(self, hs): super(FederationServer, self).__init__(hs) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 2049351fdd..53442688c8 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -28,7 +28,10 @@ from synapse.handlers.presence import format_user_presence_state, get_interested import synapse.metrics from synapse.metrics import LaterGauge from synapse.metrics import ( - sent_edus_counter, sent_transactions_counter, events_processed_counter) + sent_edus_counter, + sent_transactions_counter, + events_processed_counter, +) from prometheus_client import Counter @@ -37,7 +40,9 @@ import logging logger = logging.getLogger(__name__) -sent_pdus_destination_dist = Counter("synapse_federation_client_sent_pdu_destinations", "") +sent_pdus_destination_dist = Counter( + "synapse_federation_client_sent_pdu_destinations", "" +) class TransactionQueue(object): @@ -64,7 +69,10 @@ class TransactionQueue(object): # done self.pending_transactions = {} - LaterGauge("synapse_federation_client_pending_destinations", "", [], + LaterGauge( + "synapse_federation_client_pending_destinations", + "", + [], lambda: len(self.pending_transactions), ) @@ -89,11 +97,15 @@ class TransactionQueue(object): self.pending_edus_keyed_by_dest = edus_keyed = {} LaterGauge( - "synapse_federation_client_pending_pdus", "", [], + "synapse_federation_client_pending_pdus", + "", + [], lambda: sum(map(len, pdus.values())), ) LaterGauge( - "synapse_federation_client_pending_edus", "", [], + "synapse_federation_client_pending_edus", + "", + [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index e7f1bfc4ae..7f11b5c5a4 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -25,47 +25,87 @@ logger = logging.getLogger(__name__) # total number of responses served, split by method/servlet/tag -response_count = Counter("synapse_http_server_response_count", "", ["method", "servlet", "tag"]) +response_count = Counter( + "synapse_http_server_response_count", "", ["method", "servlet", "tag"] +) -requests_counter = Counter("synapse_http_server_requests_received", "", ["method", "servlet"]) +requests_counter = Counter( + "synapse_http_server_requests_received", "", ["method", "servlet"] +) -outgoing_responses_counter = Counter("synapse_http_server_responses", "", ["method", "code"]) +outgoing_responses_counter = Counter( + "synapse_http_server_responses", "", ["method", "code"] +) -response_timer = Histogram("synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"]) +response_timer = Histogram( + "synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"] +) -response_ru_utime = Counter("synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"]) +response_ru_utime = Counter( + "synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"] +) -response_ru_stime = Counter("synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"]) +response_ru_stime = Counter( + "synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"] +) -response_db_txn_count = Counter("synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]) +response_db_txn_count = Counter( + "synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"] +) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -response_db_txn_duration = Counter("synapse_http_server_response_db_txn_duration_seconds", "", ["method", "servlet", "tag"]) +response_db_txn_duration = Counter( + "synapse_http_server_response_db_txn_duration_seconds", + "", + ["method", "servlet", "tag"], +) # seconds spent waiting for a db connection, when processing this request -response_db_sched_duration = Counter("synapse_http_request_response_db_sched_duration_seconds", "", ["method", "servlet", "tag"] +response_db_sched_duration = Counter( + "synapse_http_request_response_db_sched_duration_seconds", + "", + ["method", "servlet", "tag"], ) # size in bytes of the response written -response_size = Counter("synapse_http_request_response_size", "", ["method", "servlet", "tag"] +response_size = Counter( + "synapse_http_request_response_size", "", ["method", "servlet", "tag"] ) # In flight metrics are incremented while the requests are in flight, rather # than when the response was written. -in_flight_requests_ru_utime = Counter("synapse_http_request_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]) +in_flight_requests_ru_utime = Counter( + "synapse_http_request_in_flight_requests_ru_utime_seconds", + "", + ["method", "servlet"], +) -in_flight_requests_ru_stime = Counter("synapse_http_request_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]) +in_flight_requests_ru_stime = Counter( + "synapse_http_request_in_flight_requests_ru_stime_seconds", + "", + ["method", "servlet"], +) -in_flight_requests_db_txn_count = Counter("synapse_http_request_in_flight_requests_db_txn_count", "", ["method", "servlet"]) +in_flight_requests_db_txn_count = Counter( + "synapse_http_request_in_flight_requests_db_txn_count", "", ["method", "servlet"] +) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -in_flight_requests_db_txn_duration = Counter("synapse_http_request_in_flight_requests_db_txn_duration_seconds", "", ["method", "servlet"]) +in_flight_requests_db_txn_duration = Counter( + "synapse_http_request_in_flight_requests_db_txn_duration_seconds", + "", + ["method", "servlet"], +) # seconds spent waiting for a db connection, when processing this request -in_flight_requests_db_sched_duration = Counter("synapse_http_request_in_flight_requests_db_sched_duration_seconds", "", ["method", "servlet"]) +in_flight_requests_db_sched_duration = Counter( + "synapse_http_request_in_flight_requests_db_sched_duration_seconds", + "", + ["method", "servlet"], +) # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() @@ -91,9 +131,10 @@ def _get_in_flight_counts(): LaterGauge( - "synapse_http_request_metrics_in_flight_requests_count", "", + "synapse_http_request_metrics_in_flight_requests_count", + "", ["method", "servlet"], - _get_in_flight_counts + _get_in_flight_counts, ) @@ -128,16 +169,23 @@ class RequestMetrics(object): response_count.labels(request.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag).observe(time_msec - self.start) + response_timer.labels(request.method, self.name, tag).observe( + time_msec - self.start + ) ru_utime, ru_stime = context.get_resource_usage() response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) - response_db_txn_count.labels(request.method, self.name, tag).inc(context.db_txn_count) - response_db_txn_duration.labels(request.method, self.name, tag).inc(context.db_txn_duration_ms / 1000.) + response_db_txn_count.labels(request.method, self.name, tag).inc( + context.db_txn_count + ) + response_db_txn_duration.labels(request.method, self.name, tag).inc( + context.db_txn_duration_ms / 1000. + ) response_db_sched_duration.labels(request.method, self.name, tag).inc( - context.db_sched_duration_ms / 1000.) + context.db_sched_duration_ms / 1000. + ) response_size.labels(request.method, self.name, tag).inc(request.sentLength) @@ -154,11 +202,17 @@ class RequestMetrics(object): in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) - in_flight_requests_db_txn_count.labels(self.method, self.name).inc(diff.db_txn_count) + in_flight_requests_db_txn_count.labels(self.method, self.name).inc( + diff.db_txn_count + ) - in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(diff.db_txn_duration_ms / 1000.) + in_flight_requests_db_txn_duration.labels(self.method, self.name).inc( + diff.db_txn_duration_ms / 1000. + ) - in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(diff.db_sched_duration_ms / 1000.) + in_flight_requests_db_sched_duration.labels(self.method, self.name).inc( + diff.db_sched_duration_ms / 1000. + ) class _RequestStats(object): @@ -166,12 +220,16 @@ class _RequestStats(object): """ __slots__ = [ - "ru_utime", "ru_stime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "ru_utime", + "ru_stime", + "db_txn_count", + "db_txn_duration_ms", + "db_sched_duration_ms", ] - def __init__(self, ru_utime, ru_stime, db_txn_count, - db_txn_duration_ms, db_sched_duration_ms): + def __init__( + self, ru_utime, ru_stime, db_txn_count, db_txn_duration_ms, db_sched_duration_ms + ): self.ru_utime = ru_utime self.ru_stime = ru_stime self.db_txn_count = db_txn_count diff --git a/synapse/notifier.py b/synapse/notifier.py index 40cc553918..6dce20a284 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,8 @@ logger = logging.getLogger(__name__) notified_events_counter = Counter("synapse_notifier_notified_events", "") -users_woken_by_stream_counter = Counter("synapse_notifier_users_woken_by_stream", "", ["stream"]) +users_woken_by_stream_counter = Counter( + "synapse_notifier_users_woken_by_stream", "", ["stream"]) # TODO(paul): Should be shared somewhere diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 1c511a7072..e0c22df249 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -29,13 +29,16 @@ cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) -response_cache_evicted = Gauge("synapse_util_caches_response_cache:evicted_size", "", ["name"]) +response_cache_evicted = Gauge( + "synapse_util_caches_response_cache:evicted_size", "", ["name"] +) response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) + def register_cache(cache_type, cache_name, cache): # Check if the metric is already registered. Unregister it, if so. - metric_name = "cache_%s_%s" % (cache_type, cache_name,) + metric_name = "cache_%s_%s" % (cache_type, cache_name) if metric_name in collectors_by_name.keys(): REGISTRY.unregister(collectors_by_name[metric_name]) -- cgit 1.5.1 From 53cc2cde1f609ec34a4ce6a7c678302c65ddfe53 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 17:32:57 -0500 Subject: cleanup --- synapse/federation/send_queue.py | 5 ++--- synapse/handlers/appservice.py | 9 ++++++--- synapse/handlers/presence.py | 12 ++++++++---- synapse/http/client.py | 3 ++- synapse/http/matrixfederationclient.py | 6 ++++-- synapse/metrics/__init__.py | 28 ++++++++++++++++++++++------ synapse/push/bulk_push_rule_evaluator.py | 6 ++++-- synapse/storage/events.py | 9 ++++++--- synapse/util/metrics.py | 15 ++++++++++----- 9 files changed, 64 insertions(+), 29 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e6e1888f3a..c7ed465617 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -74,9 +74,8 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), "", - lambda: len(queue), - ) + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), + "", lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a7345331af..d9f35a5dba 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -127,12 +127,15 @@ class ApplicationServicesHandler(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_positions.labels("appservice_sender").set(upper_bound) + synapse.metrics.event_processing_positions.labels( + "appservice_sender").set(upper_bound) events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_lag.labels("appservice_sender").set(now - ts) - synapse.metrics.event_processing_last_ts.labels("appservice_sender").set(ts) + synapse.metrics.event_processing_lag.labels( + "appservice_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "appservice_sender").set(ts) finally: self.is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4ee87d5714..12939aa507 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -47,7 +47,8 @@ logger = logging.getLogger(__name__) notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") -federation_presence_out_counter = Counter("synapse_handler_presence_federation_presence_out", "") +federation_presence_out_counter = Counter( + "synapse_handler_presence_federation_presence_out", "") presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") @@ -55,8 +56,10 @@ bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) -notify_reason_counter = Counter("synapse_handler_presence_notify_reason", "", ["reason"]) -state_transition_counter = Counter("synapse_handler_presence_state_transition", "", ["from", "to"] +notify_reason_counter = Counter( + "synapse_handler_presence_notify_reason", "", ["reason"]) +state_transition_counter = Counter( + "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -213,7 +216,8 @@ class PresenceHandler(object): 60 * 1000, ) - LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], lambda: len(self.wheel_timer)) + LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], + lambda: len(self.wheel_timer)) @defer.inlineCallbacks def _on_shutdown(self): diff --git a/synapse/http/client.py b/synapse/http/client.py index 61a1d2e2b3..4d4eee3d64 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -50,7 +50,8 @@ import urllib logger = logging.getLogger(__name__) outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) -incoming_responses_counter = Counter("synapse_http_client_responses", "", ["method", "code"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", + ["method", "code"]) class SimpleHttpClient(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 259d3884e2..77eaa06a1a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -48,8 +48,10 @@ from prometheus_client import Counter logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") -outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) -incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", "", ["method", "code"]) +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", + "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", + "", ["method", "code"]) MAX_LONG_RETRIES = 10 diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 38408efb54..bed37b5f56 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -21,15 +21,14 @@ import platform import attr from prometheus_client import Gauge, Histogram, Counter -from prometheus_client.core import ( - GaugeMetricFamily, CounterMetricFamily, REGISTRY) +from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY from twisted.internet import reactor logger = logging.getLogger(__name__) -running_on_pypy = platform.python_implementation() == 'PyPy' +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] all_gauges = {} @@ -87,9 +86,16 @@ class LaterGauge(object): # gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) -gc_time = Histogram("python_gc_time", "Time taken to GC (ms)", ["gen"], buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (ms)", + ["gen"], + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], +) + class GCCounts(object): + def collect(self): gc_counts = gc.get_count() @@ -99,14 +105,23 @@ class GCCounts(object): yield cm + REGISTRY.register(GCCounts()) # # Twisted reactor metrics # -tick_time = Histogram("python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (ms)", buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000]) -pending_calls_metric = Histogram("python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (ms)", + buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], +) # # Federation Metrics @@ -134,6 +149,7 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) + def runUntilCurrentTimer(func): @functools.wraps(func) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 6fcca5e260..b0053e7f3f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -36,8 +36,10 @@ logger = logging.getLogger(__name__) rules_by_room = {} -push_rules_invalidation_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") -push_rules_state_size_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") +push_rules_invalidation_counter = Counter( + "synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter( + "synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") # Measures whether we use the fast path of using state deltas, or if we have to # recalculate from scratch diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 00d66886ad..b96104ccae 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -45,19 +45,22 @@ from prometheus_client import Counter logger = logging.getLogger(__name__) persist_event_counter = Counter("synapse_storage_events_persisted_events", "") -event_counter = Counter("synapse_storage_events_persisted_events_sep", "", ["type", "origin_type", "origin_entity"]) +event_counter = Counter("synapse_storage_events_persisted_events_sep", "", + ["type", "origin_type", "origin_entity"]) # The number of times we are recalculating the current state state_delta_counter = Counter("synapse_storage_events_state_delta", "") # The number of times we are recalculating state when there is only a # single forward extremity -state_delta_single_event_counter = Counter("synapse_storage_events_state_delta_single_event", "") +state_delta_single_event_counter = Counter( + "synapse_storage_events_state_delta_single_event", "") # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. -state_delta_reuse_delta_counter = Counter("synapse_storage_events_state_delta_reuse_delta", "") +state_delta_reuse_delta_counter = Counter( + "synapse_storage_events_state_delta_reuse_delta", "") def encode_json(json_object): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index a964286d85..424fdcb036 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -28,17 +28,22 @@ block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) -block_ru_utime = Counter("synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) +block_ru_utime = Counter( + "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) -block_ru_stime = Counter("synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) +block_ru_stime = Counter( + "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) -block_db_txn_count = Counter("synapse_util_metrics_block_db_txn_count", "", ["block_name"]) +block_db_txn_count = Counter( + "synapse_util_metrics_block_db_txn_count", "", ["block_name"]) # seconds spent waiting for db txns, excluding scheduling time, in this block -block_db_txn_duration = Counter("synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) +block_db_txn_duration = Counter( + "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) # seconds spent waiting for a db connection, in this block -block_db_sched_duration = Counter("synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) +block_db_sched_duration = Counter( + "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) def measure_func(name): -- cgit 1.5.1 From 357c74a50f1e1588a6a3d626bddb3555452c6f56 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 28 May 2018 19:14:41 +1000 Subject: add comment about why unreg --- synapse/util/caches/__init__.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index d968b71e7a..183faf75a1 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -41,6 +41,8 @@ response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["n def register_cache(cache_type, cache_name, cache): # Check if the metric is already registered. Unregister it, if so. + # This usually happens during tests, as at runtime these caches are + # effectively singletons. metric_name = "cache_%s_%s" % (cache_type, cache_name) if metric_name in collectors_by_name.keys(): REGISTRY.unregister(collectors_by_name[metric_name]) -- cgit 1.5.1 From 3ef5cd74a6bff9b33144cb834782e0402e6eb152 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 28 May 2018 19:39:27 +1000 Subject: update to more consistently use seconds in any metrics or logging --- synapse/http/request_metrics.py | 38 +++++++++++++++++++------------------- synapse/util/logcontext.py | 24 ++++++++++++------------ synapse/util/logutils.py | 6 +++--- synapse/util/metrics.py | 8 ++++---- 4 files changed, 38 insertions(+), 38 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index af3067b4bb..dc06f6c443 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -139,8 +139,8 @@ LaterGauge( class RequestMetrics(object): - def start(self, time_msec, name, method): - self.start = time_msec + def start(self, time_sec, name, method): + self.start = time_sec self.start_context = LoggingContext.current_context() self.name = name self.method = method @@ -149,7 +149,7 @@ class RequestMetrics(object): _in_flight_requests.add(self) - def stop(self, time_msec, request): + def stop(self, time_sec, request): _in_flight_requests.discard(self) context = LoggingContext.current_context() @@ -170,7 +170,7 @@ class RequestMetrics(object): response_count.labels(request.method, self.name, tag).inc() response_timer.labels(request.method, self.name, tag).observe( - time_msec - self.start + time_sec - self.start ) ru_utime, ru_stime = context.get_resource_usage() @@ -181,10 +181,10 @@ class RequestMetrics(object): context.db_txn_count ) response_db_txn_duration.labels(request.method, self.name, tag).inc( - context.db_txn_duration_ms / 1000. + context.db_txn_duration_sec ) response_db_sched_duration.labels(request.method, self.name, tag).inc( - context.db_sched_duration_ms / 1000. + context.db_sched_duration_sec ) response_size.labels(request.method, self.name, tag).inc(request.sentLength) @@ -207,11 +207,11 @@ class RequestMetrics(object): ) in_flight_requests_db_txn_duration.labels(self.method, self.name).inc( - diff.db_txn_duration_ms / 1000. + diff.db_txn_duration_sec ) in_flight_requests_db_sched_duration.labels(self.method, self.name).inc( - diff.db_sched_duration_ms / 1000. + diff.db_sched_duration_sec ) @@ -223,18 +223,18 @@ class _RequestStats(object): "ru_utime", "ru_stime", "db_txn_count", - "db_txn_duration_ms", - "db_sched_duration_ms", + "db_txn_duration_sec", + "db_sched_duration_sec", ] def __init__( - self, ru_utime, ru_stime, db_txn_count, db_txn_duration_ms, db_sched_duration_ms + self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec ): self.ru_utime = ru_utime self.ru_stime = ru_stime self.db_txn_count = db_txn_count - self.db_txn_duration_ms = db_txn_duration_ms - self.db_sched_duration_ms = db_sched_duration_ms + self.db_txn_duration_sec = db_txn_duration_sec + self.db_sched_duration_sec = db_sched_duration_sec @staticmethod def from_context(context): @@ -243,8 +243,8 @@ class _RequestStats(object): return _RequestStats( ru_utime, ru_stime, context.db_txn_count, - context.db_txn_duration_ms, - context.db_sched_duration_ms, + context.db_txn_duration_sec, + context.db_sched_duration_sec, ) def update(self, context): @@ -260,14 +260,14 @@ class _RequestStats(object): new.ru_utime - self.ru_utime, new.ru_stime - self.ru_stime, new.db_txn_count - self.db_txn_count, - new.db_txn_duration_ms - self.db_txn_duration_ms, - new.db_sched_duration_ms - self.db_sched_duration_ms, + new.db_txn_duration_sec - self.db_txn_duration_sec, + new.db_sched_duration_sec - self.db_sched_duration_sec, ) self.ru_utime = new.ru_utime self.ru_stime = new.ru_stime self.db_txn_count = new.db_txn_count - self.db_txn_duration_ms = new.db_txn_duration_ms - self.db_sched_duration_ms = new.db_sched_duration_ms + self.db_txn_duration_sec = new.db_txn_duration_sec + self.db_sched_duration_sec = new.db_sched_duration_sec return diff diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 914f616312..a58c723403 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -59,7 +59,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "usage_start", "main_thread", "alive", "request", "tag", @@ -84,10 +84,10 @@ class LoggingContext(object): def stop(self): pass - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): pass - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): pass def __nonzero__(self): @@ -103,11 +103,11 @@ class LoggingContext(object): self.ru_utime = 0. self.db_txn_count = 0 - # ms spent waiting for db txns, excluding scheduling time - self.db_txn_duration_ms = 0 + # sec spent waiting for db txns, excluding scheduling time + self.db_txn_duration_sec = 0 - # ms spent waiting for db txns to be scheduled - self.db_sched_duration_ms = 0 + # sec spent waiting for db txns to be scheduled + self.db_sched_duration_sec = 0 # If alive has the thread resource usage when the logcontext last # became active. @@ -230,18 +230,18 @@ class LoggingContext(object): return ru_utime, ru_stime - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): self.db_txn_count += 1 - self.db_txn_duration_ms += duration_ms + self.db_txn_duration_sec += duration_sec - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): """Record a use of the database pool Args: - sched_ms (int): number of milliseconds it took us to get a + sched_sec (float): number of seconds it took us to get a connection """ - self.db_sched_duration_ms += sched_ms + self.db_sched_duration_sec += sched_sec class LoggingContextFilter(logging.Filter): diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 3a83828d25..03249c5dc8 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -96,7 +96,7 @@ def time_function(f): id = _TIME_FUNC_ID _TIME_FUNC_ID += 1 - start = time.clock() * 1000 + start = time.clock() try: _log_debug_as_f( @@ -107,10 +107,10 @@ def time_function(f): r = f(*args, **kwargs) finally: - end = time.clock() * 1000 + end = time.clock() _log_debug_as_f( f, - "[FUNC END] {%s-%d} %f", + "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start,), ) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 424fdcb036..23fc0ca20d 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -74,7 +74,7 @@ class Measure(object): self.created_context = False def __enter__(self): - self.start = self.clock.time_msec() + self.start = self.clock.time() self.start_context = LoggingContext.current_context() if not self.start_context: self.start_context = LoggingContext("Measure") @@ -90,7 +90,7 @@ class Measure(object): if isinstance(exc_type, Exception) or not self.start_context: return - duration = self.clock.time_msec() - self.start + duration = self.clock.time() - self.start block_counter.labels(self.name).inc() block_timer.labels(self.name).inc(duration) @@ -114,9 +114,9 @@ class Measure(object): block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime) block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count) block_db_txn_duration.labels(self.name).inc( - (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.) + context.db_txn_duration_sec - self.db_txn_duration_sec) block_db_sched_duration.labels(self.name).inc( - (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.) + context.db_sched_duration_sec - self.db_sched_duration_sec) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) -- cgit 1.5.1 From 57ad76fa4a4fb76dc1b9c7e8232b9589b9731ff6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 28 May 2018 19:51:53 +1000 Subject: fix up tests --- synapse/util/metrics.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 23fc0ca20d..1ba7d65c7c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -62,7 +62,7 @@ class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", "ru_stime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "created_context", ] @@ -83,8 +83,8 @@ class Measure(object): self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration_ms = self.start_context.db_txn_duration_ms - self.db_sched_duration_ms = self.start_context.db_sched_duration_ms + self.db_txn_duration_sec = self.start_context.db_txn_duration_sec + self.db_sched_duration_sec = self.start_context.db_sched_duration_sec def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: -- cgit 1.5.1