diff options
Diffstat (limited to 'synapse')
30 files changed, 845 insertions, 90 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index b11a1d3ea3..5bada5e290 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.29.1" +__version__ = "0.30.0" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index a9ff5576f3..e6ad3768f0 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -19,6 +19,7 @@ import logging import simplejson as json from six import iteritems +from six.moves import http_client logger = logging.getLogger(__name__) @@ -51,6 +52,8 @@ class Codes(object): THREEPID_DENIED = "M_THREEPID_DENIED" INVALID_USERNAME = "M_INVALID_USERNAME" SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" + CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" + CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" class CodeMessageException(RuntimeError): @@ -138,6 +141,32 @@ class SynapseError(CodeMessageException): return res +class ConsentNotGivenError(SynapseError): + """The error returned to the client when the user has not consented to the + privacy policy. + """ + def __init__(self, msg, consent_uri): + """Constructs a ConsentNotGivenError + + Args: + msg (str): The human-readable error message + consent_url (str): The URL where the user can give their consent + """ + super(ConsentNotGivenError, self).__init__( + code=http_client.FORBIDDEN, + msg=msg, + errcode=Codes.CONSENT_NOT_GIVEN + ) + self._consent_uri = consent_uri + + def error_dict(self): + return cs_error( + self.msg, + self.errcode, + consent_uri=self._consent_uri + ) + + class RegistrationError(SynapseError): """An error raised when a registration event fails.""" pass @@ -292,7 +321,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs): Args: msg (str): The error message. - code (int): The error code. + code (str): The error code. kwargs : Additional keys to add to the response. Returns: A dict representing the error response JSON. diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 91a33a3402..bb46b5da8a 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +15,12 @@ # limitations under the License. """Contains the URL paths to prefix various aspects of the server with. """ +from hashlib import sha256 +import hmac + +from six.moves.urllib.parse import urlencode + +from synapse.config import ConfigError CLIENT_PREFIX = "/_matrix/client/api/v1" CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" @@ -25,3 +32,46 @@ SERVER_KEY_PREFIX = "/_matrix/key/v1" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" MEDIA_PREFIX = "/_matrix/media/r0" LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" + + +class ConsentURIBuilder(object): + def __init__(self, hs_config): + """ + Args: + hs_config (synapse.config.homeserver.HomeServerConfig): + """ + if hs_config.form_secret is None: + raise ConfigError( + "form_secret not set in config", + ) + if hs_config.public_baseurl is None: + raise ConfigError( + "public_baseurl not set in config", + ) + + self._hmac_secret = hs_config.form_secret.encode("utf-8") + self._public_baseurl = hs_config.public_baseurl + + def build_user_consent_uri(self, user_id): + """Build a URI which we can give to the user to do their privacy + policy consent + + Args: + user_id (str): mxid or username of user + + Returns + (str) the URI where the user can do consent + """ + mac = hmac.new( + key=self._hmac_secret, + msg=user_id, + digestmod=sha256, + ).hexdigest() + consent_uri = "%s_matrix/consent?%s" % ( + self._public_baseurl, + urlencode({ + "u": user_id, + "h": mac + }), + ) + return consent_uri diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index bceb21a8d5..caccbaa814 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -41,7 +41,6 @@ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.rest import ClientRestResource -from synapse.rest.consent.consent_resource import ConsentResource from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.media.v0.content_repository import ContentRepoResource @@ -186,6 +185,7 @@ class SynapseHomeServer(HomeServer): }) if name == "consent": + from synapse.rest.consent.consent_resource import ConsentResource consent_resource = ConsentResource(self) if compress: consent_resource = gz_wrap(consent_resource) diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 675fce0911..ddcd305f4c 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -18,25 +18,62 @@ from ._base import Config DEFAULT_CONFIG = """\ # User Consent configuration # -# uncomment and configure if enabling the 'consent' resource under 'listeners'. +# Parts of this section are required if enabling the 'consent' resource under +# 'listeners', in particular 'template_dir' and 'version'. # # 'template_dir' gives the location of the templates for the HTML forms. # This directory should contain one subdirectory per language (eg, 'en', 'fr'), # and each language directory should contain the policy document (named as # '<version>.html') and a success page (success.html). # -# 'default_version' gives the version of the policy document to serve up if -# there is no 'v' parameter. +# 'version' specifies the 'current' version of the policy document. It defines +# the version to be served by the consent resource if there is no 'v' +# parameter. +# +# 'server_notice_content', if enabled, will send a user a "Server Notice" +# asking them to consent to the privacy policy. The 'server_notices' section +# must also be configured for this to work. +# +# 'block_events_error', if set, will block any attempts to send events +# until the user consents to the privacy policy. The value of the setting is +# used as the text of the error. # # user_consent: # template_dir: res/templates/privacy -# default_version: 1.0 +# version: 1.0 +# server_notice_content: +# msgtype: m.text +# body: >- +# To continue using this homeserver you must review and agree to the +# terms and conditions at %(consent_uri)s +# block_events_error: >- +# To continue using this homeserver you must review and agree to the +# terms and conditions at %(consent_uri)s +# """ class ConsentConfig(Config): + def __init__(self): + super(ConsentConfig, self).__init__() + + self.user_consent_version = None + self.user_consent_template_dir = None + self.user_consent_server_notice_content = None + self.block_events_without_consent_error = None + def read_config(self, config): - self.consent_config = config.get("user_consent") + consent_config = config.get("user_consent") + if consent_config is None: + return + self.user_consent_version = str(consent_config["version"]) + self.user_consent_template_dir = consent_config["template_dir"] + self.user_consent_server_notice_content = consent_config.get( + "server_notice_content", + ) + self.block_events_without_consent_error = consent_config.get( + "block_events_error", + ) def default_config(self, **kwargs): return DEFAULT_CONFIG diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py index ccef8d2ec5..be1d1f762c 100644 --- a/synapse/config/server_notices_config.py +++ b/synapse/config/server_notices_config.py @@ -26,12 +26,13 @@ DEFAULT_CONFIG = """\ # setting, which defines the id of the user which will be used to send the # notices. # -# It's also possible to override the room name, or the display name of the -# "notices" user. +# It's also possible to override the room name, the display name of the +# "notices" user, and the avatar for the user. # # server_notices: # system_mxid_localpart: notices # system_mxid_display_name: "Server Notices" +# system_mxid_avatar_url: "mxc://server.com/oumMVlgDnLYFaPVkExemNVVZ" # room_name: "Server Notices" """ @@ -48,6 +49,10 @@ class ServerNoticesConfig(Config): The display name to use for the server notices user. None if server notices are not enabled. + server_notices_mxid_avatar_url (str|None): + The display name to use for the server notices user. + None if server notices are not enabled. + server_notices_room_name (str|None): The name to use for the server notices room. None if server notices are not enabled. @@ -56,6 +61,7 @@ class ServerNoticesConfig(Config): super(ServerNoticesConfig, self).__init__() self.server_notices_mxid = None self.server_notices_mxid_display_name = None + self.server_notices_mxid_avatar_url = None self.server_notices_room_name = None def read_config(self, config): @@ -68,7 +74,10 @@ class ServerNoticesConfig(Config): mxid_localpart, self.server_name, ).to_string() self.server_notices_mxid_display_name = c.get( - 'system_mxid_display_name', 'Server Notices', + 'system_mxid_display_name', None, + ) + self.server_notices_mxid_avatar_url = c.get( + 'system_mxid_avatar_url', None, ) # todo: i18n self.server_notices_room_name = c.get('room_name', "Server Notices") diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 4eb18775e8..d58ea6c650 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -61,7 +61,7 @@ class DeactivateAccountHandler(BaseHandler): yield self.store.user_delete_threepids(user_id) yield self.store.user_set_password_hash(user_id, None) - # Add the user to a table of users penpding deactivation (ie. + # Add the user to a table of users pending deactivation (ie. # removal from all the rooms they're a member of) yield self.store.add_user_pending_deactivation(user_id) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d3685fb12a..8bc642675f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() self.state = hs.get_state_handler() + self._server_notices_sender = hs.get_server_notices_sender() @defer.inlineCallbacks @log_function @@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ + + # send any outstanding server notices to the user. + yield self._server_notices_sender.on_user_syncing(auth_user_id) + auth_user = UserID.from_string(auth_user_id) presence_handler = self.hs.get_presence_handler() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ba3ede8024..2c72beff2e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -479,18 +479,18 @@ class FederationHandler(BaseHandler): # to get all state ids that we're interested in. event_map = yield self.store.get_events([ e_id - for key_to_eid in event_to_state_ids.values() - for key, e_id in key_to_eid.items() + for key_to_eid in event_to_state_ids.itervalues() + for key, e_id in key_to_eid.iteritems() if key[0] != EventTypes.Member or check_match(key[1]) ]) event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.items() + for key, inner_e_id in key_to_eid.iteritems() if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.items() + for e_id, key_to_eid in event_to_state_ids.iteritems() } def redact_disallowed(event, state): @@ -505,7 +505,7 @@ class FederationHandler(BaseHandler): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.values(): + for ev in state.itervalues(): if ev.type != EventTypes.Member: continue try: @@ -751,9 +751,19 @@ class FederationHandler(BaseHandler): curr_state = yield self.state_handler.get_current_state(room_id) def get_domains_from_state(state): + """Get joined domains from state + + Args: + state (dict[tuple, FrozenEvent]): State map from type/state + key to event. + + Returns: + list[tuple[str, int]]: Returns a list of servers with the + lowest depth of their joins. Sorted by lowest depth first. + """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() + for (e_type, state_key), event in state.iteritems() if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -770,7 +780,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.items(), key=lambda d: d[1]) + return sorted(joined_domains.iteritems(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -787,7 +797,7 @@ class FederationHandler(BaseHandler): yield self.backfill( dom, room_id, limit=100, - extremities=[e for e in extremities.keys()] + extremities=extremities, ) # If this succeeded then we probably already have the # appropriate stuff. @@ -833,7 +843,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.keys()) + event_ids = list(extremities.iterkeys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -843,31 +853,34 @@ class FederationHandler(BaseHandler): [resolve(room_id, [e]) for e in event_ids], consumeErrors=True, )) + + # dict[str, dict[tuple, str]], a map from event_id to state map of + # event_ids. states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.values() for e_id in ids], + [e_id for ids in states.itervalues() for e_id in ids.itervalues()], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.items() + for k, e_id in state_dict.iteritems() if e_id in state_map - } for key, state_dict in states.items() + } for key, state_dict in states.iteritems() } for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ - dom for dom in likely_domains + dom for dom, _ in likely_domains if dom not in tried_domains ]) if success: defer.returnValue(True) - tried_domains.update(likely_domains) + tried_domains.update(dom for dom, _ in likely_domains) defer.returnValue(False) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8343b5839d..c32b9bcae4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,10 +20,15 @@ import sys from canonicaljson import encode_canonical_json import six from twisted.internet import defer, reactor +from twisted.internet.defer import succeed from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership, MAX_DEPTH -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.errors import ( + AuthError, Codes, SynapseError, + ConsentNotGivenError, +) +from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -431,6 +436,9 @@ class EventCreationHandler(object): self.spam_checker = hs.get_spam_checker() + if self.config.block_events_without_consent_error is not None: + self._consent_uri_builder = ConsentURIBuilder(self.config) + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_events_and_hashes=None): @@ -482,6 +490,10 @@ class EventCreationHandler(object): target, e ) + is_exempt = yield self._is_exempt_from_privacy_policy(builder) + if not is_exempt: + yield self.assert_accepted_privacy_policy(requester) + if token_id is not None: builder.internal_metadata.token_id = token_id @@ -496,6 +508,83 @@ class EventCreationHandler(object): defer.returnValue((event, context)) + def _is_exempt_from_privacy_policy(self, builder): + """"Determine if an event to be sent is exempt from having to consent + to the privacy policy + + Args: + builder (synapse.events.builder.EventBuilder): event being created + + Returns: + Deferred[bool]: true if the event can be sent without the user + consenting + """ + # the only thing the user can do is join the server notices room. + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + if membership == Membership.JOIN: + return self._is_server_notices_room(builder.room_id) + return succeed(False) + + @defer.inlineCallbacks + def _is_server_notices_room(self, room_id): + if self.config.server_notices_mxid is None: + defer.returnValue(False) + user_ids = yield self.store.get_users_in_room(room_id) + defer.returnValue(self.config.server_notices_mxid in user_ids) + + @defer.inlineCallbacks + def assert_accepted_privacy_policy(self, requester): + """Check if a user has accepted the privacy policy + + Called when the given user is about to do something that requires + privacy consent. We see if the user is exempt and otherwise check that + they have given consent. If they have not, a ConsentNotGiven error is + raised. + + Args: + requester (synapse.types.Requester): + The user making the request + + Returns: + Deferred[None]: returns normally if the user has consented or is + exempt + + Raises: + ConsentNotGivenError: if the user has not given consent yet + """ + if self.config.block_events_without_consent_error is None: + return + + # exempt AS users from needing consent + if requester.app_service is not None: + return + + user_id = requester.user.to_string() + + # exempt the system notices user + if ( + self.config.server_notices_mxid is not None and + user_id == self.config.server_notices_mxid + ): + return + + u = yield self.store.get_user_by_id(user_id) + assert u is not None + if u["consent_version"] == self.config.user_consent_version: + return + + consent_uri = self._consent_uri_builder.build_user_consent_uri( + requester.user.localpart, + ) + msg = self.config.block_events_without_consent_error % { + 'consent_uri': consent_uri, + } + raise ConsentNotGivenError( + msg=msg, + consent_uri=consent_uri, + ) + @defer.inlineCallbacks def send_nonmember_event(self, requester, event, context, ratelimit=True): """ diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91218e40e6..500a131874 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -87,6 +87,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id self.clock = hs.get_clock() @@ -94,7 +99,6 @@ class PresenceHandler(object): self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() federation_registry = hs.get_federation_registry() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5e9fa95a2d..b5850db42f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -126,6 +126,10 @@ class RoomCreationHandler(BaseHandler): except Exception: raise SynapseError(400, "Invalid user_id: %s" % (i,)) + yield self.event_creation_handler.assert_accepted_privacy_policy( + requester, + ) + invite_3pid_list = config.get("invite_3pid", []) visibility = config.get("visibility", None) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 5263f09aaa..82adfc8fdf 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -306,6 +306,7 @@ class RoomMemberHandler(object): raise SynapseError( http_client.FORBIDDEN, "You cannot leave this room", + errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, ) if effective_membership_state == Membership.INVITE: diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 8c850bf23f..5917c83958 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -98,14 +98,87 @@ response_size = metrics.register_counter( "response_size", labels=["method", "servlet", "tag"] ) +# In flight metrics are incremented while the requests are in flight, rather +# than when the response was written. + +in_flight_requests_ru_utime = metrics.register_counter( + "in_flight_requests_ru_utime_seconds", labels=["method", "servlet"], +) + +in_flight_requests_ru_stime = metrics.register_counter( + "in_flight_requests_ru_stime_seconds", labels=["method", "servlet"], +) + +in_flight_requests_db_txn_count = metrics.register_counter( + "in_flight_requests_db_txn_count", labels=["method", "servlet"], +) + +# seconds spent waiting for db txns, excluding scheduling time, when processing +# this request +in_flight_requests_db_txn_duration = metrics.register_counter( + "in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"], +) + +# seconds spent waiting for a db connection, when processing this request +in_flight_requests_db_sched_duration = metrics.register_counter( + "in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"] +) + + +# The set of all in flight requests, set[RequestMetrics] +_in_flight_requests = set() + + +def _collect_in_flight(): + """Called just before metrics are collected, so we use it to update all + the in flight request metrics + """ + + for rm in _in_flight_requests: + rm.update_metrics() + + +metrics.register_collector(_collect_in_flight) + + +def _get_in_flight_counts(): + """Returns a count of all in flight requests by (method, server_name) + + Returns: + dict[tuple[str, str], int] + """ + + # Map from (method, name) -> int, the number of in flight requests of that + # type + counts = {} + for rm in _in_flight_requests: + key = (rm.method, rm.name,) + counts[key] = counts.get(key, 0) + 1 + + return counts + + +metrics.register_callback( + "in_flight_requests_count", + _get_in_flight_counts, + labels=["method", "servlet"] +) + class RequestMetrics(object): - def start(self, time_msec, name): + def start(self, time_msec, name, method): self.start = time_msec self.start_context = LoggingContext.current_context() self.name = name + self.method = method + + self._request_stats = _RequestStats.from_context(self.start_context) + + _in_flight_requests.add(self) def stop(self, time_msec, request): + _in_flight_requests.discard(self) + context = LoggingContext.current_context() tag = "" @@ -147,3 +220,88 @@ class RequestMetrics(object): ) response_size.inc_by(request.sentLength, request.method, self.name, tag) + + # We always call this at the end to ensure that we update the metrics + # regardless of whether a call to /metrics while the request was in + # flight. + self.update_metrics() + + def update_metrics(self): + """Updates the in flight metrics with values from this request. + """ + + diff = self._request_stats.update(self.start_context) + + in_flight_requests_ru_utime.inc_by( + diff.ru_utime, self.method, self.name, + ) + + in_flight_requests_ru_stime.inc_by( + diff.ru_stime, self.method, self.name, + ) + + in_flight_requests_db_txn_count.inc_by( + diff.db_txn_count, self.method, self.name, + ) + + in_flight_requests_db_txn_duration.inc_by( + diff.db_txn_duration_ms / 1000., self.method, self.name, + ) + + in_flight_requests_db_sched_duration.inc_by( + diff.db_sched_duration_ms / 1000., self.method, self.name, + ) + + +class _RequestStats(object): + """Keeps tracks of various metrics for an in flight request. + """ + + __slots__ = [ + "ru_utime", "ru_stime", + "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + ] + + def __init__(self, ru_utime, ru_stime, db_txn_count, + db_txn_duration_ms, db_sched_duration_ms): + self.ru_utime = ru_utime + self.ru_stime = ru_stime + self.db_txn_count = db_txn_count + self.db_txn_duration_ms = db_txn_duration_ms + self.db_sched_duration_ms = db_sched_duration_ms + + @staticmethod + def from_context(context): + ru_utime, ru_stime = context.get_resource_usage() + + return _RequestStats( + ru_utime, ru_stime, + context.db_txn_count, + context.db_txn_duration_ms, + context.db_sched_duration_ms, + ) + + def update(self, context): + """Updates the current values and returns the difference between the + old and new values. + + Returns: + _RequestStats: The difference between the old and new values + """ + new = _RequestStats.from_context(context) + + diff = _RequestStats( + new.ru_utime - self.ru_utime, + new.ru_stime - self.ru_stime, + new.db_txn_count - self.db_txn_count, + new.db_txn_duration_ms - self.db_txn_duration_ms, + new.db_sched_duration_ms - self.db_sched_duration_ms, + ) + + self.ru_utime = new.ru_utime + self.ru_stime = new.ru_stime + self.db_txn_count = new.db_txn_count + self.db_txn_duration_ms = new.db_txn_duration_ms + self.db_sched_duration_ms = new.db_sched_duration_ms + + return diff diff --git a/synapse/http/site.py b/synapse/http/site.py index 202a990508..b608504225 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -85,7 +85,9 @@ class SynapseRequest(Request): def _started_processing(self, servlet_name): self.start_time = int(time.time() * 1000) self.request_metrics = RequestMetrics() - self.request_metrics.start(self.start_time, name=servlet_name) + self.request_metrics.start( + self.start_time, name=servlet_name, method=self.method, + ) self.site.access_logger.info( "%s - %s - Received request: %s %s", diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..a603c520ea 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -69,6 +69,7 @@ class ReplicationStreamer(object): self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self._server_notices_sender = hs.get_server_notices_sender() # Current connections. self.connections = [] @@ -253,6 +254,7 @@ class ReplicationStreamer(object): yield self.store.insert_client_ip( user_id, access_token, ip, user_agent, device_id, last_seen, ) + yield self._server_notices_sender.on_user_ip(user_id) def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index fceca2edeb..20fa6678ef 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -19,6 +19,7 @@ import logging from synapse.api.auth import get_access_token_from_request from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -80,27 +81,26 @@ class HttpTransactionCache(object): Returns: Deferred which resolves to a tuple of (response_code, response_dict). """ - try: - return self.transactions[txn_key][0].observe() - except (KeyError, IndexError): - pass # execute the function instead. - - deferred = fn(*args, **kwargs) - - # if the request fails with a Twisted failure, remove it - # from the transaction map. This is done to ensure that we don't - # cache transient errors like rate-limiting errors, etc. - def remove_from_map(err): - self.transactions.pop(txn_key, None) - return err - deferred.addErrback(remove_from_map) - - # We don't add any other errbacks to the raw deferred, so we ask - # ObservableDeferred to swallow the error. This is fine as the error will - # still be reported to the observers. - observable = ObservableDeferred(deferred, consumeErrors=True) - self.transactions[txn_key] = (observable, self.clock.time_msec()) - return observable.observe() + if txn_key in self.transactions: + observable = self.transactions[txn_key][0] + else: + # execute the function instead. + deferred = run_in_background(fn, *args, **kwargs) + + observable = ObservableDeferred(deferred) + self.transactions[txn_key] = (observable, self.clock.time_msec()) + + # if the request fails with an exception, remove it + # from the transaction map. This is done to ensure that we don't + # cache transient errors like rate-limiting errors, etc. + def remove_from_map(err): + self.transactions.pop(txn_key, None) + # we deliberately do not propagate the error any further, as we + # expect the observers to have reported it. + + deferred.addErrback(remove_from_map) + + return make_deferred_yieldable(observable.observe()) def _cleanup(self): now = self.clock.time_msec() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index eb91c0b293..a291cffbf1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -85,6 +85,7 @@ class SyncRestServlet(RestServlet): self.clock = hs.get_clock() self.filtering = hs.get_filtering() self.presence_handler = hs.get_presence_handler() + self._server_notices_sender = hs.get_server_notices_sender() @defer.inlineCallbacks def on_GET(self, request): @@ -149,6 +150,9 @@ class SyncRestServlet(RestServlet): else: since_token = None + # send any outstanding server notices to the user. + yield self._server_notices_sender.on_user_syncing(user.to_string()) + affect_presence = set_presence != PresenceState.OFFLINE if affect_presence: diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index d791302278..724911d1e6 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -95,8 +95,8 @@ class ConsentResource(Resource): # this is required by the request_handler wrapper self.clock = hs.get_clock() - consent_config = hs.config.consent_config - if consent_config is None: + self._default_consent_version = hs.config.user_consent_version + if self._default_consent_version is None: raise ConfigError( "Consent resource is enabled but user_consent section is " "missing in config file.", @@ -104,7 +104,7 @@ class ConsentResource(Resource): # daemonize changes the cwd to /, so make the path absolute now. consent_template_directory = path.abspath( - consent_config["template_dir"], + hs.config.user_consent_template_dir, ) if not path.isdir(consent_template_directory): raise ConfigError( @@ -114,9 +114,10 @@ class ConsentResource(Resource): ) loader = jinja2.FileSystemLoader(consent_template_directory) - self._jinja_env = jinja2.Environment(loader=loader) - - self._default_consent_verison = consent_config["default_version"] + self._jinja_env = jinja2.Environment( + loader=loader, + autoescape=jinja2.select_autoescape(['html', 'htm', 'xml']), + ) if hs.config.form_secret is None: raise ConfigError( @@ -131,6 +132,7 @@ class ConsentResource(Resource): return NOT_DONE_YET @wrap_html_request_handler + @defer.inlineCallbacks def _async_render_GET(self, request): """ Args: @@ -138,16 +140,26 @@ class ConsentResource(Resource): """ version = parse_string(request, "v", - default=self._default_consent_verison) + default=self._default_consent_version) username = parse_string(request, "u", required=True) userhmac = parse_string(request, "h", required=True) self._check_hash(username, userhmac) + if username.startswith('@'): + qualified_user_id = username + else: + qualified_user_id = UserID(username, self.hs.hostname).to_string() + + u = yield self.store.get_user_by_id(qualified_user_id) + if u is None: + raise NotFoundError("Unknown user") + try: self._render_template( request, "%s.html" % (version,), user=username, userhmac=userhmac, version=version, + has_consented=(u["consent_version"] == version), ) except TemplateNotFound: raise NotFoundError("Unknown policy version") diff --git a/synapse/server.py b/synapse/server.py index 85f54cd047..58dbf78437 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -73,6 +73,10 @@ from synapse.rest.media.v1.media_repository import ( MediaRepositoryResource, ) from synapse.server_notices.server_notices_manager import ServerNoticesManager +from synapse.server_notices.server_notices_sender import ServerNoticesSender +from synapse.server_notices.worker_server_notices_sender import ( + WorkerServerNoticesSender, +) from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import DataStore from synapse.streams.events import EventSources @@ -158,6 +162,7 @@ class HomeServer(object): 'room_member_handler', 'federation_registry', 'server_notices_manager', + 'server_notices_sender', ] def __init__(self, hostname, **kwargs): @@ -401,8 +406,15 @@ class HomeServer(object): return FederationHandlerRegistry() def build_server_notices_manager(self): + if self.config.worker_app: + raise Exception("Workers cannot send server notices") return ServerNoticesManager(self) + def build_server_notices_sender(self): + if self.config.worker_app: + return WorkerServerNoticesSender(self) + return ServerNoticesSender(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/server.pyi b/synapse/server.pyi index 6fbe15168d..ce28486233 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -10,6 +10,7 @@ import synapse.handlers.e2e_keys import synapse.handlers.set_password import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager +import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage @@ -69,3 +70,6 @@ class HomeServer(object): def get_server_notices_manager(self) -> synapse.server_notices.server_notices_manager.ServerNoticesManager: pass + + def get_server_notices_sender(self) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: + pass diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py new file mode 100644 index 0000000000..a709802856 --- /dev/null +++ b/synapse/server_notices/consent_server_notices.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from six import (iteritems, string_types) +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.api.urls import ConsentURIBuilder +from synapse.config import ConfigError +from synapse.types import get_localpart_from_id + +logger = logging.getLogger(__name__) + + +class ConsentServerNotices(object): + """Keeps track of whether we need to send users server_notices about + privacy policy consent, and sends one if we do. + """ + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ + self._server_notices_manager = hs.get_server_notices_manager() + self._store = hs.get_datastore() + + self._users_in_progress = set() + + self._current_consent_version = hs.config.user_consent_version + self._server_notice_content = hs.config.user_consent_server_notice_content + + if self._server_notice_content is not None: + if not self._server_notices_manager.is_enabled(): + raise ConfigError( + "user_consent configuration requires server notices, but " + "server notices are not enabled.", + ) + if 'body' not in self._server_notice_content: + raise ConfigError( + "user_consent server_notice_consent must contain a 'body' " + "key.", + ) + + self._consent_uri_builder = ConsentURIBuilder(hs.config) + + @defer.inlineCallbacks + def maybe_send_server_notice_to_user(self, user_id): + """Check if we need to send a notice to this user, and does so if so + + Args: + user_id (str): user to check + + Returns: + Deferred + """ + if self._server_notice_content is None: + # not enabled + return + + # make sure we don't send two messages to the same user at once + if user_id in self._users_in_progress: + return + self._users_in_progress.add(user_id) + try: + u = yield self._store.get_user_by_id(user_id) + + if u["consent_version"] == self._current_consent_version: + # user has already consented + return + + if u["consent_server_notice_sent"] == self._current_consent_version: + # we've already sent a notice to the user + return + + # need to send a message. + try: + consent_uri = self._consent_uri_builder.build_user_consent_uri( + get_localpart_from_id(user_id), + ) + content = copy_with_str_subst( + self._server_notice_content, { + 'consent_uri': consent_uri, + }, + ) + yield self._server_notices_manager.send_notice( + user_id, content, + ) + yield self._store.user_set_consent_server_notice_sent( + user_id, self._current_consent_version, + ) + except SynapseError as e: + logger.error("Error sending server notice about user consent: %s", e) + finally: + self._users_in_progress.remove(user_id) + + +def copy_with_str_subst(x, substitutions): + """Deep-copy a structure, carrying out string substitions on any strings + + Args: + x (object): structure to be copied + substitutions (object): substitutions to be made - passed into the + string '%' operator + + Returns: + copy of x + """ + if isinstance(x, string_types): + return x % substitutions + if isinstance(x, dict): + return { + k: copy_with_str_subst(v, substitutions) for (k, v) in iteritems(x) + } + if isinstance(x, (list, tuple)): + return [copy_with_str_subst(y) for y in x] + + # assume it's uninterested and can be shallow-copied. + return x diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index f535b9c9da..a26deace53 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -35,6 +35,7 @@ class ServerNoticesManager(object): self._config = hs.config self._room_creation_handler = hs.get_room_creation_handler() self._event_creation_handler = hs.get_event_creation_handler() + self._is_mine_id = hs.is_mine_id def is_enabled(self): """Checks if server notices are enabled on this server. @@ -55,7 +56,7 @@ class ServerNoticesManager(object): event_content (dict): content of event to send Returns: - Deferrred[None] + Deferred[None] """ room_id = yield self.get_notice_room_for_user(user_id) @@ -89,6 +90,9 @@ class ServerNoticesManager(object): if not self.is_enabled(): raise Exception("Server notices not enabled") + assert self._is_mine_id(user_id), \ + "Cannot send server notices to remote users" + rooms = yield self._store.get_rooms_for_user_where_membership_is( user_id, [Membership.INVITE, Membership.JOIN], ) @@ -109,6 +113,19 @@ class ServerNoticesManager(object): # apparently no existing notice room: create a new one logger.info("Creating server notices room for %s", user_id) + # see if we want to override the profile info for the server user. + # note that if we want to override either the display name or the + # avatar, we have to use both. + join_profile = None + if ( + self._config.server_notices_mxid_display_name is not None or + self._config.server_notices_mxid_avatar_url is not None + ): + join_profile = { + "displayname": self._config.server_notices_mxid_display_name, + "avatar_url": self._config.server_notices_mxid_avatar_url, + } + requester = create_requester(system_mxid) info = yield self._room_creation_handler.create_room( requester, @@ -121,9 +138,7 @@ class ServerNoticesManager(object): "invite": (user_id,) }, ratelimit=False, - creator_join_profile={ - "displayname": self._config.server_notices_mxid_display_name, - }, + creator_join_profile=join_profile, ) room_id = info['room_id'] diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py new file mode 100644 index 0000000000..5d23965f34 --- /dev/null +++ b/synapse/server_notices/server_notices_sender.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.server_notices.consent_server_notices import ConsentServerNotices + + +class ServerNoticesSender(object): + """A centralised place which sends server notices automatically when + Certain Events take place + """ + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ + # todo: it would be nice to make this more dynamic + self._consent_server_notices = ConsentServerNotices(hs) + + def on_user_syncing(self, user_id): + """Called when the user performs a sync operation. + + Args: + user_id (str): mxid of user who synced + + Returns: + Deferred + """ + return self._consent_server_notices.maybe_send_server_notice_to_user( + user_id, + ) + + def on_user_ip(self, user_id): + """Called on the master when a worker process saw a client request. + + Args: + user_id (str): mxid + + Returns: + Deferred + """ + # The synchrotrons use a stubbed version of ServerNoticesSender, so + # we check for notices to send to the user in on_user_ip as well as + # in on_user_syncing + return self._consent_server_notices.maybe_send_server_notice_to_user( + user_id, + ) diff --git a/synapse/server_notices/worker_server_notices_sender.py b/synapse/server_notices/worker_server_notices_sender.py new file mode 100644 index 0000000000..4a133026c3 --- /dev/null +++ b/synapse/server_notices/worker_server_notices_sender.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + + +class WorkerServerNoticesSender(object): + """Stub impl of ServerNoticesSender which does nothing""" + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ + + def on_user_syncing(self, user_id): + """Called when the user performs a sync operation. + + Args: + user_id (str): mxid of user who synced + + Returns: + Deferred + """ + return defer.succeed(None) + + def on_user_ip(self, user_id): + """Called on the master when a worker process saw a client request. + + Args: + user_id (str): mxid + + Returns: + Deferred + """ + raise AssertionError("on_user_ip unexpectedly called on worker") diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ac264b5d25..979fa22438 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -379,7 +379,11 @@ class DataStore(RoomMemberStore, RoomStore, WHERE timestamp = ? ) udv ON u.user_id = udv.user_id AND u.device_id=udv.device_id - WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL + INNER JOIN users ON users.name=u.user_id + WHERE last_seen > ? AND last_seen <= ? + AND udv.timestamp IS NULL AND users.is_guest=0 + AND users.appservice_id IS NULL + GROUP BY u.user_id, u.device_id """ # This means that the day has rolled over but there could still diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 8d1a01f1ee..a530e29f43 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -33,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash", "is_guest"], + retcols=[ + "name", "password_hash", "is_guest", + "consent_version", "consent_server_notice_sent", + ], allow_none=True, desc="get_user_by_id", ) @@ -297,12 +300,41 @@ class RegistrationStore(RegistrationWorkerStore, Raises: StoreError(404) if user not found """ - return self._simple_update_one( - table='users', - keyvalues={'name': user_id, }, - updatevalues={'consent_version': consent_version, }, - desc="user_set_consent_version" - ) + def f(txn): + self._simple_update_one_txn( + txn, + table='users', + keyvalues={'name': user_id, }, + updatevalues={'consent_version': consent_version, }, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction("user_set_consent_version", f) + + def user_set_consent_server_notice_sent(self, user_id, consent_version): + """Updates the user table to record that we have sent the user a server + notice about privacy policy consent + + Args: + user_id (str): full mxid of the user to update + consent_version (str): version of the policy we have notified the + user about + + Raises: + StoreError(404) if user not found + """ + def f(txn): + self._simple_update_one_txn( + txn, + table='users', + keyvalues={'name': user_id, }, + updatevalues={'consent_server_notice_sent': consent_version, }, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction("user_set_consent_server_notice_sent", f) def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None): diff --git a/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql new file mode 100644 index 0000000000..14dcf18d73 --- /dev/null +++ b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql @@ -0,0 +1,20 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* record whether we have sent a server notice about consenting to the + * privacy policy. Specifically records the version of the policy we sent + * a message about. + */ +ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT; diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ea24710ad8..fb463c525a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -684,8 +684,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results to only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. Zero or less - means no limit. + limit (int): The maximum number of events to return. event_filter (Filter|None): If provided filters the events to those that match the filter. @@ -694,6 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): as a list of _EventDictReturn and a token that points to the end of the result set. """ + + assert int(limit) >= 0 + # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -723,22 +725,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds += " AND " + filter_clause args.extend(filter_args) - if int(limit) > 0: - args.append(int(limit)) - limit_str = " LIMIT ?" - else: - limit_str = "" + args.append(int(limit)) sql = ( "SELECT event_id, topological_ordering, stream_ordering" " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," - " stream_ordering %(order)s %(limit)s" + " stream_ordering %(order)s LIMIT ?" ) % { "bounds": bounds, "order": order, - "limit": limit_str } txn.execute(sql, args) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index eab9d57650..914f616312 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -60,7 +60,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", - "usage_start", "usage_end", + "usage_start", "main_thread", "alive", "request", "tag", ] @@ -109,8 +109,10 @@ class LoggingContext(object): # ms spent waiting for db txns to be scheduled self.db_sched_duration_ms = 0 + # If alive has the thread resource usage when the logcontext last + # became active. self.usage_start = None - self.usage_end = None + self.main_thread = threading.current_thread() self.request = None self.tag = "" @@ -159,7 +161,7 @@ class LoggingContext(object): """Restore the logging context in thread local storage to the state it was before this context was entered. Returns: - None to avoid suppressing any exeptions that were thrown. + None to avoid suppressing any exceptions that were thrown. """ current = self.set_current_context(self.previous_context) if current is not self: @@ -185,29 +187,43 @@ class LoggingContext(object): def start(self): if threading.current_thread() is not self.main_thread: + logger.warning("Started logcontext %s on different thread", self) return - if self.usage_start and self.usage_end: - self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime - self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime - self.usage_start = None - self.usage_end = None - + # If we haven't already started record the thread resource usage so + # far if not self.usage_start: self.usage_start = get_thread_resource_usage() def stop(self): if threading.current_thread() is not self.main_thread: + logger.warning("Stopped logcontext %s on different thread", self) return + # When we stop, let's record the resource used since we started if self.usage_start: - self.usage_end = get_thread_resource_usage() + usage_end = get_thread_resource_usage() + + self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime + self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime + + self.usage_start = None + else: + logger.warning("Called stop on logcontext %s without calling start", self) def get_resource_usage(self): + """Get CPU time used by this logcontext so far. + + Returns: + tuple[float, float]: The user and system CPU usage in seconds + """ ru_utime = self.ru_utime ru_stime = self.ru_stime - if self.usage_start and threading.current_thread() is self.main_thread: + # If we are on the correct thread and we're currently running then we + # can include resource usage so far. + is_main_thread = threading.current_thread() is self.main_thread + if self.alive and self.usage_start and is_main_thread: current = get_thread_resource_usage() ru_utime += current.ru_utime - self.usage_start.ru_utime ru_stime += current.ru_stime - self.usage_start.ru_stime |