From 9255a6cb17716c022ebae1dbe9c142b78ca86ea7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:07:40 +0100 Subject: Improve exception handling for background processes There were a bunch of places where we fire off a process to happen in the background, but don't have any exception handling on it - instead relying on the unhandled error being logged when the relevent deferred gets garbage-collected. This is unsatisfactory for a number of reasons: - logging on garbage collection is best-effort and may happen some time after the error, if at all - it can be hard to figure out where the error actually happened. - it is logged as a scary CRITICAL error which (a) I always forget to grep for and (b) it's not really CRITICAL if a background process we don't care about fails. So this is an attempt to add exception handling to everything we fire off into the background. --- synapse/app/pusher.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..739d113ad5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) -- cgit 1.5.1 From 2a13af23bc0561ab48e0a90528231c40ee209724 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:29:27 +0100 Subject: Use run_in_background in preference to preserve_fn While I was going through uses of preserve_fn for other PRs, I converted places which only use the wrapped function once to use run_in_background, to avoid creating the function object. --- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 5 ++--- synapse/appservice/scheduler.py | 12 ++++++------ synapse/crypto/keyring.py | 28 ++++++++++++++++----------- synapse/federation/federation_client.py | 5 +++-- synapse/groups/attestations.py | 4 ++-- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/e2e_keys.py | 6 +++--- synapse/handlers/federation.py | 16 +++++++++------ synapse/handlers/initial_sync.py | 12 +++++++----- synapse/handlers/message.py | 5 +++-- synapse/handlers/typing.py | 7 ++++--- synapse/push/pusherpool.py | 20 +++++++++++-------- synapse/rest/media/v1/preview_url_resource.py | 5 +++-- synapse/storage/events_worker.py | 5 +++-- synapse/storage/stream.py | 5 +++-- synapse/util/async.py | 4 ++-- synapse/util/file_consumer.py | 6 ++++-- synapse/util/logcontext.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 4 ++-- 22 files changed, 97 insertions(+), 71 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..c6daa0d43f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -229,7 +229,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..8bd5c0c2b7 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..0c4ccc58bc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..ba1631b5c8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -51,7 +51,7 @@ components. from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure import logging @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..38944a7326 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( PreserveLoggingContext, - preserve_fn + preserve_fn, + run_in_background, ) from synapse.util.metrics import Measure @@ -127,7 +128,7 @@ class Keyring(object): verify_requests.append(verify_request) - preserve_fn(self._start_key_lookups)(verify_requests) + run_in_background(self._start_key_lookups, verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -313,7 +314,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - preserve_fn(do_iterations)().addErrback(on_err) + run_in_background(do_iterations).addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): @@ -329,8 +330,9 @@ class Keyring(object): """ res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.get_server_verify_keys)( - server_name, key_ids + run_in_background( + self.store.get_server_verify_keys, + server_name, key_ids, ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], @@ -358,7 +360,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(p_name, p_keys) + run_in_background(get_key, p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, @@ -398,7 +400,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(server_name, key_ids) + run_in_background(get_key, server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, @@ -481,7 +483,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -539,7 +542,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -615,7 +619,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_keys_json)( + run_in_background( + self.store.store_server_keys_json, server_name=server_name, key_id=key_id, from_server=server_name, @@ -716,7 +721,8 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. return logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_verify_key)( + run_in_background( + self.store.store_server_verify_key, server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..8adc60863e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,7 +33,7 @@ from synapse.federation.federation_base import ( import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination @@ -417,7 +417,8 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..5f53f17954 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -42,7 +42,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from signedjson.sign import sign_json @@ -192,4 +192,4 @@ class GroupAttestionRenewer(object): group_id = row["group_id"] user_id = row["user_id"] - preserve_fn(_renew_attestation)(group_id, user_id) + run_in_background(_renew_attestation, group_id, user_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0245197c02..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -198,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..fc958404a1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,7 +139,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache ])) @@ -242,7 +242,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries ])) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..c66ca0f381 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -637,7 +637,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..244b98dd8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,7 +850,8 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @@ -862,7 +863,7 @@ class EventCreationHandler(object): extra_users=extra_users ) - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..19cde70adf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..7bb5733090 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -137,8 +137,9 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) @@ -164,7 +165,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) yield make_deferred_yieldable(defer.gatherResults(deferreds)) @@ -207,7 +211,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0fc21540c6..9290d7946f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ from ._base import FileInfo from synapse.api.errors import ( SynapseError, Codes, ) -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -144,7 +144,8 @@ class PreviewUrlResource(Resource): observable = self._cache.get(url) if not observable: - download = preserve_fn(self._do_preview)( + download = run_in_background( + self._do_preview, url, requester.user, ts, ) observable = ObservableDeferred( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..5b245a936c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc @@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..bd07067328 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit): pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() + run_in_background(_concurrently_execute_inner) for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3c8a165331..3380970e4e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from six.moves import queue @@ -70,7 +70,9 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + self._finished_deferred = run_in_background( + threads.deferToThread, self._writer + ) if not streaming: self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..c2edf87e58 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred): returning a deferred. Then, when the deferred completes, restores the current logcontext before running callbacks/errbacks. - (This is more-or-less the opposite operation to preserve_fn.) + (This is more-or-less the opposite operation to run_in_background.) """ if isinstance(deferred, defer.Deferred) and not deferred.called: prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 1101881a2d..18424f6c36 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import collections import contextlib @@ -150,7 +150,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) + ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 47b0bb5eb3..4e93f69d3a 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -203,8 +203,8 @@ class RetryDestinationLimiter(object): ) except Exception: logger.exception( - "Failed to store set_destination_retry_timings", + "Failed to store destination_retry_timings", ) # we deliberately do this in the background. - synapse.util.logcontext.preserve_fn(store_retry_timings)() + synapse.util.logcontext.run_in_background(store_retry_timings) -- cgit 1.5.1 From 318711e1399da009910c3a9e5fa297c28a2d0a97 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 10 May 2018 18:46:59 +0100 Subject: Set Server header in SynapseRequest (instead of everywhere that writes a response. Or rather, the subset of places which write responses where we haven't forgotten it). This also means that we don't have to have the mysterious version_string attribute in anything with a request handler. Unfortunately it does mean that we have to pass the version string wherever we instantiate a SynapseSite, which has been c&ped 150 times, but that is code that ought to be cleaned up anyway really. --- synapse/app/appservice.py | 1 + synapse/app/client_reader.py | 1 + synapse/app/event_creator.py | 1 + synapse/app/federation_reader.py | 1 + synapse/app/federation_sender.py | 1 + synapse/app/frontend_proxy.py | 1 + synapse/app/homeserver.py | 2 ++ synapse/app/media_repository.py | 1 + synapse/app/pusher.py | 1 + synapse/app/synchrotron.py | 1 + synapse/app/user_dir.py | 1 + synapse/http/additional_resource.py | 3 +-- synapse/http/server.py | 14 ++++---------- synapse/http/site.py | 11 ++++++++++- synapse/rest/client/v1/pusher.py | 1 - synapse/rest/client/v2_alpha/auth.py | 2 -- synapse/rest/key/v1/server_key_resource.py | 2 -- synapse/rest/key/v2/local_key_resource.py | 2 -- synapse/rest/key/v2/remote_key_resource.py | 2 -- synapse/rest/media/v1/download_resource.py | 3 +-- synapse/rest/media/v1/preview_url_resource.py | 1 - synapse/rest/media/v1/thumbnail_resource.py | 1 - synapse/rest/media/v1/upload_resource.py | 1 - 23 files changed, 28 insertions(+), 27 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 58f2c9d68c..b1efacc9f8 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -74,6 +74,7 @@ class AppserviceServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 267d34c881..38b98382c6 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -98,6 +98,7 @@ class ClientReaderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b915d12d53..bd7f3d5679 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -114,6 +114,7 @@ class EventCreatorServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index c1dc66dd17..6e10b27b9e 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -87,6 +87,7 @@ class FederationReaderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index a08af83a4c..6f24e32d6d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index b349e3e3ce..0f700ee786 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -152,6 +152,7 @@ class FrontendProxyServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a0e465d644..75f40fd5a4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -140,6 +140,7 @@ class SynapseHomeServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ), self.tls_server_context_factory, ) @@ -153,6 +154,7 @@ class SynapseHomeServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) logger.info("Synapse now listening on port %d", port) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index fc8282bbc1..9c93195f0a 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -94,6 +94,7 @@ class MediaRepositoryServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 26930d1b3b..3912eae48c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -104,6 +104,7 @@ class PusherServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7152b1deb4..c6294a7a0c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -281,6 +281,7 @@ class SynchrotronServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5ba7e9b416..53eb3474da 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -126,6 +126,7 @@ class UserDirectoryServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py index d9e7f5dfb7..a797396ade 100644 --- a/synapse/http/additional_resource.py +++ b/synapse/http/additional_resource.py @@ -42,8 +42,7 @@ class AdditionalResource(Resource): Resource.__init__(self) self._handler = handler - # these are required by the request_handler wrapper - self.version_string = hs.version_string + # required by the request_handler wrapper self.clock = hs.get_clock() def render(self, request): diff --git a/synapse/http/server.py b/synapse/http/server.py index f29e36f490..b6e2ae14a2 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -51,8 +51,8 @@ def wrap_json_request_handler(h): Also adds logging as per wrap_request_handler_with_logging. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have "version_string" and "clock" attributes (and - "request" must be a SynapseRequest). + where "self" must have a "clock" attribute (and "request" must be a + SynapseRequest). The handler must return a deferred. If the deferred succeeds we assume that a response has been sent. If the deferred fails with a SynapseError we use @@ -75,7 +75,6 @@ def wrap_json_request_handler(h): respond_with_json( request, code, cs_exception(e), send_cors=True, pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, ) except Exception: @@ -98,7 +97,6 @@ def wrap_json_request_handler(h): }, send_cors=True, pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, ) return wrap_request_handler_with_logging(wrapped_request_handler) @@ -192,7 +190,6 @@ class JsonResource(HttpServer, resource.Resource): self.canonical_json = canonical_json self.clock = hs.get_clock() self.path_regexs = {} - self.version_string = hs.version_string self.hs = hs def register_paths(self, method, path_patterns, callback): @@ -275,7 +272,6 @@ class JsonResource(HttpServer, resource.Resource): send_cors=True, response_code_message=response_code_message, pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, canonical_json=self.canonical_json, ) @@ -326,7 +322,7 @@ class RootRedirect(resource.Resource): def respond_with_json(request, code, json_object, send_cors=False, response_code_message=None, pretty_print=False, - version_string="", canonical_json=True): + canonical_json=True): # could alternatively use request.notifyFinish() and flip a flag when # the Deferred fires, but since the flag is RIGHT THERE it seems like # a waste. @@ -348,12 +344,11 @@ def respond_with_json(request, code, json_object, send_cors=False, request, code, json_bytes, send_cors=send_cors, response_code_message=response_code_message, - version_string=version_string ) def respond_with_json_bytes(request, code, json_bytes, send_cors=False, - version_string="", response_code_message=None): + response_code_message=None): """Sends encoded JSON in response to the given request. Args: @@ -367,7 +362,6 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, request.setResponseCode(code, message=response_code_message) request.setHeader(b"Content-Type", b"application/json") - request.setHeader(b"Server", version_string) request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate") diff --git a/synapse/http/site.py b/synapse/http/site.py index bfd9832aa0..202a990508 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -77,6 +77,11 @@ class SynapseRequest(Request): def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] + def render(self, resrc): + # override the Server header which is set by twisted + self.setHeader("Server", self.site.server_version_string) + return Request.render(self, resrc) + def _started_processing(self, servlet_name): self.start_time = int(time.time() * 1000) self.request_metrics = RequestMetrics() @@ -151,6 +156,8 @@ class SynapseRequest(Request): It is possible to update this afterwards by updating self.request_metrics.servlet_name. """ + # TODO: we should probably just move this into render() and finish(), + # to save having to call a separate method. self._started_processing(servlet_name) yield self._finished_processing() @@ -191,7 +198,8 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, + server_version_string, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) self.site_tag = site_tag @@ -199,6 +207,7 @@ class SynapseSite(Site): proxied = config.get("x_forwarded", False) self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) + self.server_version_string = server_version_string def log(self, request): pass diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 0206e664c1..40e523cc5f 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -176,7 +176,6 @@ class PushersRemoveRestServlet(RestServlet): request.setResponseCode(200) request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Server", self.hs.version_string) request.setHeader(b"Content-Length", b"%d" % ( len(PushersRemoveRestServlet.SUCCESS_HTML), )) diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 8e5577148f..d6f3a19648 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -129,7 +129,6 @@ class AuthRestServlet(RestServlet): html_bytes = html.encode("utf8") request.setResponseCode(200) request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Server", self.hs.version_string) request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) request.write(html_bytes) @@ -175,7 +174,6 @@ class AuthRestServlet(RestServlet): html_bytes = html.encode("utf8") request.setResponseCode(200) request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Server", self.hs.version_string) request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) request.write(html_bytes) diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py index bd4fea5774..1498d188c1 100644 --- a/synapse/rest/key/v1/server_key_resource.py +++ b/synapse/rest/key/v1/server_key_resource.py @@ -49,7 +49,6 @@ class LocalKey(Resource): """ def __init__(self, hs): - self.version_string = hs.version_string self.response_body = encode_canonical_json( self.response_json_object(hs.config) ) @@ -84,7 +83,6 @@ class LocalKey(Resource): def render_GET(self, request): return respond_with_json_bytes( request, 200, self.response_body, - version_string=self.version_string ) def getChild(self, name, request): diff --git a/synapse/rest/key/v2/local_key_resource.py b/synapse/rest/key/v2/local_key_resource.py index be68d9a096..04775b3c45 100644 --- a/synapse/rest/key/v2/local_key_resource.py +++ b/synapse/rest/key/v2/local_key_resource.py @@ -63,7 +63,6 @@ class LocalKey(Resource): isLeaf = True def __init__(self, hs): - self.version_string = hs.version_string self.config = hs.config self.clock = hs.clock self.update_response_body(self.clock.time_msec()) @@ -115,5 +114,4 @@ class LocalKey(Resource): self.update_response_body(time_now) return respond_with_json_bytes( request, 200, self.response_body, - version_string=self.version_string ) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 17b3077926..21b4c1175e 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -93,7 +93,6 @@ class RemoteKey(Resource): def __init__(self, hs): self.keyring = hs.get_keyring() self.store = hs.get_datastore() - self.version_string = hs.version_string self.clock = hs.get_clock() self.federation_domain_whitelist = hs.config.federation_domain_whitelist @@ -242,5 +241,4 @@ class RemoteKey(Resource): respond_with_json_bytes( request, 200, result_io.getvalue(), - version_string=self.version_string ) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 3fc3f64d62..8cf8820c31 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -37,9 +37,8 @@ class DownloadResource(Resource): self.media_repo = media_repo self.server_name = hs.hostname - # Both of these are expected by @request_handler() + # this is expected by @wrap_json_request_handler self.clock = hs.get_clock() - self.version_string = hs.version_string def render_GET(self, request): self._async_render_GET(request) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 6b089689b4..2839207abc 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -58,7 +58,6 @@ class PreviewUrlResource(Resource): self.auth = hs.get_auth() self.clock = hs.get_clock() - self.version_string = hs.version_string self.filepaths = media_repo.filepaths self.max_spider_size = hs.config.max_spider_size self.server_name = hs.hostname diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 6c12d79f56..aae6e464e8 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -44,7 +44,6 @@ class ThumbnailResource(Resource): self.media_storage = media_storage self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname - self.version_string = hs.version_string self.clock = hs.get_clock() def render_GET(self, request): diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 7d01c57fd1..7567476fce 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -41,7 +41,6 @@ class UploadResource(Resource): self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.max_upload_size - self.version_string = hs.version_string self.clock = hs.get_clock() def render_POST(self, request): -- cgit 1.5.1 From febe0ec8fd78028fe7c7b3a26a8dd85c32ee1550 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 31 May 2018 19:04:50 +1000 Subject: Run Prometheus on a different port, optionally. (#3274) --- docs/metrics-howto.rst | 77 ++++++++++++++++++++++++++++++++++------ synapse/app/_base.py | 13 +++++++ synapse/app/appservice.py | 7 ++++ synapse/app/client_reader.py | 11 ++++-- synapse/app/event_creator.py | 10 +++++- synapse/app/federation_reader.py | 10 +++++- synapse/app/federation_sender.py | 10 +++++- synapse/app/frontend_proxy.py | 10 +++++- synapse/app/homeserver.py | 13 ++++--- synapse/app/media_repository.py | 10 +++++- synapse/app/pusher.py | 10 +++++- synapse/app/synchrotron.py | 10 +++++- synapse/app/user_dir.py | 10 +++++- synapse/config/server.py | 10 ++++++ synapse/metrics/__init__.py | 3 +- synapse/metrics/resource.py | 4 +++ 16 files changed, 192 insertions(+), 26 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 8acc479bc3..25e06bca58 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -1,25 +1,47 @@ How to monitor Synapse metrics using Prometheus =============================================== -1. Install prometheus: +1. Install Prometheus: Follow instructions at http://prometheus.io/docs/introduction/install/ -2. Enable synapse metrics: +2. Enable Synapse metrics: - Simply setting a (local) port number will enable it. Pick a port. - prometheus itself defaults to 9090, so starting just above that for - locally monitored services seems reasonable. E.g. 9092: + There are two methods of enabling metrics in Synapse. - Add to homeserver.yaml:: + The first serves the metrics as a part of the usual web server and can be + enabled by adding the "metrics" resource to the existing listener as such:: - metrics_port: 9092 + resources: + - names: + - client + - metrics - Also ensure that ``enable_metrics`` is set to ``True``. + This provides a simple way of adding metrics to your Synapse installation, + and serves under ``/_synapse/metrics``. If you do not wish your metrics be + publicly exposed, you will need to either filter it out at your load + balancer, or use the second method. - Restart synapse. + The second method runs the metrics server on a different port, in a + different thread to Synapse. This can make it more resilient to heavy load + meaning metrics cannot be retrieved, and can be exposed to just internal + networks easier. The served metrics are available over HTTP only, and will + be available at ``/``. -3. Add a prometheus target for synapse. + Add a new listener to homeserver.yaml:: + + listeners: + - type: metrics + port: 9000 + bind_addresses: + - '0.0.0.0' + + For both options, you will need to ensure that ``enable_metrics`` is set to + ``True``. + + Restart Synapse. + +3. Add a Prometheus target for Synapse. It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``):: @@ -31,7 +53,40 @@ How to monitor Synapse metrics using Prometheus If your prometheus is older than 1.5.2, you will need to replace ``static_configs`` in the above with ``target_groups``. - Restart prometheus. + Restart Prometheus. + + +Removal of deprecated metrics & time based counters becoming histograms in 0.31.0 +--------------------------------------------------------------------------------- + +The duplicated metrics deprecated in Synapse 0.27.0 have been removed. + +All time duration-based metrics have been changed to be seconds. This affects: + +================================ +msec -> sec metrics +================================ +python_gc_time +python_twisted_reactor_tick_time +synapse_storage_query_time +synapse_storage_schedule_time +synapse_storage_transaction_time +================================ + +Several metrics have been changed to be histograms, which sort entries into +buckets and allow better analysis. The following metrics are now histograms: + +========================================= +Altered metrics +========================================= +python_gc_time +python_twisted_reactor_pending_calls +python_twisted_reactor_tick_time +synapse_http_server_response_time_seconds +synapse_storage_query_time +synapse_storage_schedule_time +synapse_storage_transaction_time +========================================= Block and response metrics renamed for 0.27.0 diff --git a/synapse/app/_base.py b/synapse/app/_base.py index e4318cdfc3..a6925ab139 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -124,6 +124,19 @@ def quit_with_error(error_string): sys.exit(1) +def listen_metrics(bind_addresses, port): + """ + Start Prometheus metrics server. + """ + from synapse.metrics import RegistryProxy + from prometheus_client import start_http_server + + for host in bind_addresses: + reactor.callInThread(start_http_server, int(port), + addr=host, registry=RegistryProxy) + logger.info("Metrics now reporting on %s:%d", host, port) + + def listen_tcp(bind_addresses, port, factory, backlog=50): """ Create a TCP socket for a port and several addresses diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index b1efacc9f8..dd114dee07 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -94,6 +94,13 @@ class AppserviceServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 38b98382c6..85dada7f9f 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -77,7 +78,7 @@ class ClientReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) PublicRoomListRestServlet(self).register(resource) @@ -118,7 +119,13 @@ class ClientReaderServer(HomeServer): globals={"hs": self}, ) ) - + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index bd7f3d5679..5ca77c0f1a 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -90,7 +91,7 @@ class EventCreatorServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) RoomSendEventRestServlet(self).register(resource) @@ -134,6 +135,13 @@ class EventCreatorServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 6e10b27b9e..2a1995d0cd 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.directory import DirectoryStore @@ -71,7 +72,7 @@ class FederationReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "federation": resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), @@ -107,6 +108,13 @@ class FederationReaderServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6f24e32d6d..81ad574043 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.federation import send_queue from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore @@ -89,7 +90,7 @@ class FederationSenderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) root_resource = create_resource_tree(resources, NoResource()) @@ -121,6 +122,13 @@ class FederationSenderServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 0f700ee786..5a164a7a95 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -29,6 +29,7 @@ from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, ) from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -131,7 +132,7 @@ class FrontendProxyServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) @@ -172,6 +173,13 @@ class FrontendProxyServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 449bfacdb9..51fc3645d5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,7 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ check_requirements from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX @@ -61,8 +61,6 @@ from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File -from prometheus_client.twisted import MetricsResource - logger = logging.getLogger("synapse.app.homeserver") @@ -232,7 +230,7 @@ class SynapseHomeServer(HomeServer): resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) if name == "metrics" and self.get_config().enable_metrics: - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy()) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) if name == "replication": resources[REPLICATION_PREFIX] = ReplicationRestResource(self) @@ -265,6 +263,13 @@ class SynapseHomeServer(HomeServer): reactor.addSystemEventTrigger( "before", "shutdown", server_listener.stopListening, ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 9c93195f0a..006bba80a8 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -73,7 +74,7 @@ class MediaRepositoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": media_repo = self.get_media_repository_resource() resources.update({ @@ -114,6 +115,13 @@ class MediaRepositoryServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3912eae48c..64df47f9cc 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,6 +23,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -92,7 +93,7 @@ class PusherServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) root_resource = create_resource_tree(resources, NoResource()) @@ -124,6 +125,13 @@ class PusherServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c6294a7a0c..6808d6d3e0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -257,7 +258,7 @@ class SynchrotronServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) sync.register_servlets(self, resource) @@ -301,6 +302,13 @@ class SynchrotronServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 53eb3474da..ada1c13cec 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -105,7 +106,7 @@ class UserDirectoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) user_directory.register_servlets(self, resource) @@ -146,6 +147,13 @@ class UserDirectoryServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/config/server.py b/synapse/config/server.py index 8f0b6d1f28..968ecd9ea0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -14,8 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from ._base import Config, ConfigError +logger = logging.Logger(__name__) + class ServerConfig(Config): @@ -138,6 +142,12 @@ class ServerConfig(Config): metrics_port = config.get("metrics_port") if metrics_port: + logger.warn( + ("The metrics_port configuration option is deprecated in Synapse 0.31 " + "in favour of a listener. Please see " + "http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst" + " on how to configure the new listener.")) + self.listeners.append({ "port": metrics_port, "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")], diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bfdbbc9a23..56c0032f91 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -39,7 +39,8 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") class RegistryProxy(object): - def collect(self): + @staticmethod + def collect(): for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py index 7996e6ab66..9789359077 100644 --- a/synapse/metrics/resource.py +++ b/synapse/metrics/resource.py @@ -13,4 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from prometheus_client.twisted import MetricsResource + METRICS_PREFIX = "/_synapse/metrics" + +__all__ = ["MetricsResource", "METRICS_PREFIX"] -- cgit 1.5.1