diff --git a/synapse/__init__.py b/synapse/__init__.py
index b11a1d3ea3..5bada5e290 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.29.1"
+__version__ = "0.30.0"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index a9ff5576f3..e6ad3768f0 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -19,6 +19,7 @@ import logging
import simplejson as json
from six import iteritems
+from six.moves import http_client
logger = logging.getLogger(__name__)
@@ -51,6 +52,8 @@ class Codes(object):
THREEPID_DENIED = "M_THREEPID_DENIED"
INVALID_USERNAME = "M_INVALID_USERNAME"
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
+ CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
+ CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
class CodeMessageException(RuntimeError):
@@ -138,6 +141,32 @@ class SynapseError(CodeMessageException):
return res
+class ConsentNotGivenError(SynapseError):
+ """The error returned to the client when the user has not consented to the
+ privacy policy.
+ """
+ def __init__(self, msg, consent_uri):
+ """Constructs a ConsentNotGivenError
+
+ Args:
+ msg (str): The human-readable error message
+ consent_url (str): The URL where the user can give their consent
+ """
+ super(ConsentNotGivenError, self).__init__(
+ code=http_client.FORBIDDEN,
+ msg=msg,
+ errcode=Codes.CONSENT_NOT_GIVEN
+ )
+ self._consent_uri = consent_uri
+
+ def error_dict(self):
+ return cs_error(
+ self.msg,
+ self.errcode,
+ consent_uri=self._consent_uri
+ )
+
+
class RegistrationError(SynapseError):
"""An error raised when a registration event fails."""
pass
@@ -292,7 +321,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
Args:
msg (str): The error message.
- code (int): The error code.
+ code (str): The error code.
kwargs : Additional keys to add to the response.
Returns:
A dict representing the error response JSON.
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 91a33a3402..bb46b5da8a 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,6 +15,12 @@
# limitations under the License.
"""Contains the URL paths to prefix various aspects of the server with. """
+from hashlib import sha256
+import hmac
+
+from six.moves.urllib.parse import urlencode
+
+from synapse.config import ConfigError
CLIENT_PREFIX = "/_matrix/client/api/v1"
CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha"
@@ -25,3 +32,46 @@ SERVER_KEY_PREFIX = "/_matrix/key/v1"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
+
+
+class ConsentURIBuilder(object):
+ def __init__(self, hs_config):
+ """
+ Args:
+ hs_config (synapse.config.homeserver.HomeServerConfig):
+ """
+ if hs_config.form_secret is None:
+ raise ConfigError(
+ "form_secret not set in config",
+ )
+ if hs_config.public_baseurl is None:
+ raise ConfigError(
+ "public_baseurl not set in config",
+ )
+
+ self._hmac_secret = hs_config.form_secret.encode("utf-8")
+ self._public_baseurl = hs_config.public_baseurl
+
+ def build_user_consent_uri(self, user_id):
+ """Build a URI which we can give to the user to do their privacy
+ policy consent
+
+ Args:
+ user_id (str): mxid or username of user
+
+ Returns
+ (str) the URI where the user can do consent
+ """
+ mac = hmac.new(
+ key=self._hmac_secret,
+ msg=user_id,
+ digestmod=sha256,
+ ).hexdigest()
+ consent_uri = "%s_matrix/consent?%s" % (
+ self._public_baseurl,
+ urlencode({
+ "u": user_id,
+ "h": mac
+ }),
+ )
+ return consent_uri
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index bceb21a8d5..caccbaa814 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -41,7 +41,6 @@ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
-from synapse.rest.consent.consent_resource import ConsentResource
from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
@@ -186,6 +185,7 @@ class SynapseHomeServer(HomeServer):
})
if name == "consent":
+ from synapse.rest.consent.consent_resource import ConsentResource
consent_resource = ConsentResource(self)
if compress:
consent_resource = gz_wrap(consent_resource)
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index 675fce0911..ddcd305f4c 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -18,25 +18,62 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
-# uncomment and configure if enabling the 'consent' resource under 'listeners'.
+# Parts of this section are required if enabling the 'consent' resource under
+# 'listeners', in particular 'template_dir' and 'version'.
#
# 'template_dir' gives the location of the templates for the HTML forms.
# This directory should contain one subdirectory per language (eg, 'en', 'fr'),
# and each language directory should contain the policy document (named as
# '<version>.html') and a success page (success.html).
#
-# 'default_version' gives the version of the policy document to serve up if
-# there is no 'v' parameter.
+# 'version' specifies the 'current' version of the policy document. It defines
+# the version to be served by the consent resource if there is no 'v'
+# parameter.
+#
+# 'server_notice_content', if enabled, will send a user a "Server Notice"
+# asking them to consent to the privacy policy. The 'server_notices' section
+# must also be configured for this to work.
+#
+# 'block_events_error', if set, will block any attempts to send events
+# until the user consents to the privacy policy. The value of the setting is
+# used as the text of the error.
#
# user_consent:
# template_dir: res/templates/privacy
-# default_version: 1.0
+# version: 1.0
+# server_notice_content:
+# msgtype: m.text
+# body: >-
+# To continue using this homeserver you must review and agree to the
+# terms and conditions at %(consent_uri)s
+# block_events_error: >-
+# To continue using this homeserver you must review and agree to the
+# terms and conditions at %(consent_uri)s
+#
"""
class ConsentConfig(Config):
+ def __init__(self):
+ super(ConsentConfig, self).__init__()
+
+ self.user_consent_version = None
+ self.user_consent_template_dir = None
+ self.user_consent_server_notice_content = None
+ self.block_events_without_consent_error = None
+
def read_config(self, config):
- self.consent_config = config.get("user_consent")
+ consent_config = config.get("user_consent")
+ if consent_config is None:
+ return
+ self.user_consent_version = str(consent_config["version"])
+ self.user_consent_template_dir = consent_config["template_dir"]
+ self.user_consent_server_notice_content = consent_config.get(
+ "server_notice_content",
+ )
+ self.block_events_without_consent_error = consent_config.get(
+ "block_events_error",
+ )
def default_config(self, **kwargs):
return DEFAULT_CONFIG
diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py
index ccef8d2ec5..be1d1f762c 100644
--- a/synapse/config/server_notices_config.py
+++ b/synapse/config/server_notices_config.py
@@ -26,12 +26,13 @@ DEFAULT_CONFIG = """\
# setting, which defines the id of the user which will be used to send the
# notices.
#
-# It's also possible to override the room name, or the display name of the
-# "notices" user.
+# It's also possible to override the room name, the display name of the
+# "notices" user, and the avatar for the user.
#
# server_notices:
# system_mxid_localpart: notices
# system_mxid_display_name: "Server Notices"
+# system_mxid_avatar_url: "mxc://server.com/oumMVlgDnLYFaPVkExemNVVZ"
# room_name: "Server Notices"
"""
@@ -48,6 +49,10 @@ class ServerNoticesConfig(Config):
The display name to use for the server notices user.
None if server notices are not enabled.
+ server_notices_mxid_avatar_url (str|None):
+ The display name to use for the server notices user.
+ None if server notices are not enabled.
+
server_notices_room_name (str|None):
The name to use for the server notices room.
None if server notices are not enabled.
@@ -56,6 +61,7 @@ class ServerNoticesConfig(Config):
super(ServerNoticesConfig, self).__init__()
self.server_notices_mxid = None
self.server_notices_mxid_display_name = None
+ self.server_notices_mxid_avatar_url = None
self.server_notices_room_name = None
def read_config(self, config):
@@ -68,7 +74,10 @@ class ServerNoticesConfig(Config):
mxid_localpart, self.server_name,
).to_string()
self.server_notices_mxid_display_name = c.get(
- 'system_mxid_display_name', 'Server Notices',
+ 'system_mxid_display_name', None,
+ )
+ self.server_notices_mxid_avatar_url = c.get(
+ 'system_mxid_avatar_url', None,
)
# todo: i18n
self.server_notices_room_name = c.get('room_name', "Server Notices")
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 4eb18775e8..d58ea6c650 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -61,7 +61,7 @@ class DeactivateAccountHandler(BaseHandler):
yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
- # Add the user to a table of users penpding deactivation (ie.
+ # Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
yield self.store.add_user_pending_deactivation(user_id)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d3685fb12a..8bc642675f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
+ self._server_notices_sender = hs.get_server_notices_sender()
@defer.inlineCallbacks
@log_function
@@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
+
+ # send any outstanding server notices to the user.
+ yield self._server_notices_sender.on_user_syncing(auth_user_id)
+
auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ba3ede8024..2c72beff2e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -479,18 +479,18 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
- for key_to_eid in event_to_state_ids.values()
- for key, e_id in key_to_eid.items()
+ for key_to_eid in event_to_state_ids.itervalues()
+ for key, e_id in key_to_eid.iteritems()
if key[0] != EventTypes.Member or check_match(key[1])
])
event_to_state = {
e_id: {
key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.items()
+ for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
- for e_id, key_to_eid in event_to_state_ids.items()
+ for e_id, key_to_eid in event_to_state_ids.iteritems()
}
def redact_disallowed(event, state):
@@ -505,7 +505,7 @@ class FederationHandler(BaseHandler):
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
- for ev in state.values():
+ for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
@@ -751,9 +751,19 @@ class FederationHandler(BaseHandler):
curr_state = yield self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
+ """Get joined domains from state
+
+ Args:
+ state (dict[tuple, FrozenEvent]): State map from type/state
+ key to event.
+
+ Returns:
+ list[tuple[str, int]]: Returns a list of servers with the
+ lowest depth of their joins. Sorted by lowest depth first.
+ """
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
+ for (e_type, state_key), event in state.iteritems()
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -770,7 +780,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.items(), key=lambda d: d[1])
+ return sorted(joined_domains.iteritems(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -787,7 +797,7 @@ class FederationHandler(BaseHandler):
yield self.backfill(
dom, room_id,
limit=100,
- extremities=[e for e in extremities.keys()]
+ extremities=extremities,
)
# If this succeeded then we probably already have the
# appropriate stuff.
@@ -833,7 +843,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.keys())
+ event_ids = list(extremities.iterkeys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -843,31 +853,34 @@ class FederationHandler(BaseHandler):
[resolve(room_id, [e]) for e in event_ids],
consumeErrors=True,
))
+
+ # dict[str, dict[tuple, str]], a map from event_id to state map of
+ # event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.values() for e_id in ids],
+ [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.items()
+ for k, e_id in state_dict.iteritems()
if e_id in state_map
- } for key, state_dict in states.items()
+ } for key, state_dict in states.iteritems()
}
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([
- dom for dom in likely_domains
+ dom for dom, _ in likely_domains
if dom not in tried_domains
])
if success:
defer.returnValue(True)
- tried_domains.update(likely_domains)
+ tried_domains.update(dom for dom, _ in likely_domains)
defer.returnValue(False)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8343b5839d..c32b9bcae4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,10 +20,15 @@ import sys
from canonicaljson import encode_canonical_json
import six
from twisted.internet import defer, reactor
+from twisted.internet.defer import succeed
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.errors import (
+ AuthError, Codes, SynapseError,
+ ConsentNotGivenError,
+)
+from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -431,6 +436,9 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
+ if self.config.block_events_without_consent_error is not None:
+ self._consent_uri_builder = ConsentURIBuilder(self.config)
+
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None):
@@ -482,6 +490,10 @@ class EventCreationHandler(object):
target, e
)
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+ if not is_exempt:
+ yield self.assert_accepted_privacy_policy(requester)
+
if token_id is not None:
builder.internal_metadata.token_id = token_id
@@ -496,6 +508,83 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
+ def _is_exempt_from_privacy_policy(self, builder):
+ """"Determine if an event to be sent is exempt from having to consent
+ to the privacy policy
+
+ Args:
+ builder (synapse.events.builder.EventBuilder): event being created
+
+ Returns:
+ Deferred[bool]: true if the event can be sent without the user
+ consenting
+ """
+ # the only thing the user can do is join the server notices room.
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ if membership == Membership.JOIN:
+ return self._is_server_notices_room(builder.room_id)
+ return succeed(False)
+
+ @defer.inlineCallbacks
+ def _is_server_notices_room(self, room_id):
+ if self.config.server_notices_mxid is None:
+ defer.returnValue(False)
+ user_ids = yield self.store.get_users_in_room(room_id)
+ defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+ @defer.inlineCallbacks
+ def assert_accepted_privacy_policy(self, requester):
+ """Check if a user has accepted the privacy policy
+
+ Called when the given user is about to do something that requires
+ privacy consent. We see if the user is exempt and otherwise check that
+ they have given consent. If they have not, a ConsentNotGiven error is
+ raised.
+
+ Args:
+ requester (synapse.types.Requester):
+ The user making the request
+
+ Returns:
+ Deferred[None]: returns normally if the user has consented or is
+ exempt
+
+ Raises:
+ ConsentNotGivenError: if the user has not given consent yet
+ """
+ if self.config.block_events_without_consent_error is None:
+ return
+
+ # exempt AS users from needing consent
+ if requester.app_service is not None:
+ return
+
+ user_id = requester.user.to_string()
+
+ # exempt the system notices user
+ if (
+ self.config.server_notices_mxid is not None and
+ user_id == self.config.server_notices_mxid
+ ):
+ return
+
+ u = yield self.store.get_user_by_id(user_id)
+ assert u is not None
+ if u["consent_version"] == self.config.user_consent_version:
+ return
+
+ consent_uri = self._consent_uri_builder.build_user_consent_uri(
+ requester.user.localpart,
+ )
+ msg = self.config.block_events_without_consent_error % {
+ 'consent_uri': consent_uri,
+ }
+ raise ConsentNotGivenError(
+ msg=msg,
+ consent_uri=consent_uri,
+ )
+
@defer.inlineCallbacks
def send_nonmember_event(self, requester, event, context, ratelimit=True):
"""
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 91218e40e6..500a131874 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -87,6 +87,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -94,7 +99,6 @@ class PresenceHandler(object):
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
-
self.state = hs.get_state_handler()
federation_registry = hs.get_federation_registry()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5e9fa95a2d..b5850db42f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -126,6 +126,10 @@ class RoomCreationHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
+ yield self.event_creation_handler.assert_accepted_privacy_policy(
+ requester,
+ )
+
invite_3pid_list = config.get("invite_3pid", [])
visibility = config.get("visibility", None)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 5263f09aaa..82adfc8fdf 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -306,6 +306,7 @@ class RoomMemberHandler(object):
raise SynapseError(
http_client.FORBIDDEN,
"You cannot leave this room",
+ errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
if effective_membership_state == Membership.INVITE:
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 8c850bf23f..5917c83958 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -98,14 +98,87 @@ response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
)
+# In flight metrics are incremented while the requests are in flight, rather
+# than when the response was written.
+
+in_flight_requests_ru_utime = metrics.register_counter(
+ "in_flight_requests_ru_utime_seconds", labels=["method", "servlet"],
+)
+
+in_flight_requests_ru_stime = metrics.register_counter(
+ "in_flight_requests_ru_stime_seconds", labels=["method", "servlet"],
+)
+
+in_flight_requests_db_txn_count = metrics.register_counter(
+ "in_flight_requests_db_txn_count", labels=["method", "servlet"],
+)
+
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
+in_flight_requests_db_txn_duration = metrics.register_counter(
+ "in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"],
+)
+
+# seconds spent waiting for a db connection, when processing this request
+in_flight_requests_db_sched_duration = metrics.register_counter(
+ "in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"]
+)
+
+
+# The set of all in flight requests, set[RequestMetrics]
+_in_flight_requests = set()
+
+
+def _collect_in_flight():
+ """Called just before metrics are collected, so we use it to update all
+ the in flight request metrics
+ """
+
+ for rm in _in_flight_requests:
+ rm.update_metrics()
+
+
+metrics.register_collector(_collect_in_flight)
+
+
+def _get_in_flight_counts():
+ """Returns a count of all in flight requests by (method, server_name)
+
+ Returns:
+ dict[tuple[str, str], int]
+ """
+
+ # Map from (method, name) -> int, the number of in flight requests of that
+ # type
+ counts = {}
+ for rm in _in_flight_requests:
+ key = (rm.method, rm.name,)
+ counts[key] = counts.get(key, 0) + 1
+
+ return counts
+
+
+metrics.register_callback(
+ "in_flight_requests_count",
+ _get_in_flight_counts,
+ labels=["method", "servlet"]
+)
+
class RequestMetrics(object):
- def start(self, time_msec, name):
+ def start(self, time_msec, name, method):
self.start = time_msec
self.start_context = LoggingContext.current_context()
self.name = name
+ self.method = method
+
+ self._request_stats = _RequestStats.from_context(self.start_context)
+
+ _in_flight_requests.add(self)
def stop(self, time_msec, request):
+ _in_flight_requests.discard(self)
+
context = LoggingContext.current_context()
tag = ""
@@ -147,3 +220,88 @@ class RequestMetrics(object):
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
+
+ # We always call this at the end to ensure that we update the metrics
+ # regardless of whether a call to /metrics while the request was in
+ # flight.
+ self.update_metrics()
+
+ def update_metrics(self):
+ """Updates the in flight metrics with values from this request.
+ """
+
+ diff = self._request_stats.update(self.start_context)
+
+ in_flight_requests_ru_utime.inc_by(
+ diff.ru_utime, self.method, self.name,
+ )
+
+ in_flight_requests_ru_stime.inc_by(
+ diff.ru_stime, self.method, self.name,
+ )
+
+ in_flight_requests_db_txn_count.inc_by(
+ diff.db_txn_count, self.method, self.name,
+ )
+
+ in_flight_requests_db_txn_duration.inc_by(
+ diff.db_txn_duration_ms / 1000., self.method, self.name,
+ )
+
+ in_flight_requests_db_sched_duration.inc_by(
+ diff.db_sched_duration_ms / 1000., self.method, self.name,
+ )
+
+
+class _RequestStats(object):
+ """Keeps tracks of various metrics for an in flight request.
+ """
+
+ __slots__ = [
+ "ru_utime", "ru_stime",
+ "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+ ]
+
+ def __init__(self, ru_utime, ru_stime, db_txn_count,
+ db_txn_duration_ms, db_sched_duration_ms):
+ self.ru_utime = ru_utime
+ self.ru_stime = ru_stime
+ self.db_txn_count = db_txn_count
+ self.db_txn_duration_ms = db_txn_duration_ms
+ self.db_sched_duration_ms = db_sched_duration_ms
+
+ @staticmethod
+ def from_context(context):
+ ru_utime, ru_stime = context.get_resource_usage()
+
+ return _RequestStats(
+ ru_utime, ru_stime,
+ context.db_txn_count,
+ context.db_txn_duration_ms,
+ context.db_sched_duration_ms,
+ )
+
+ def update(self, context):
+ """Updates the current values and returns the difference between the
+ old and new values.
+
+ Returns:
+ _RequestStats: The difference between the old and new values
+ """
+ new = _RequestStats.from_context(context)
+
+ diff = _RequestStats(
+ new.ru_utime - self.ru_utime,
+ new.ru_stime - self.ru_stime,
+ new.db_txn_count - self.db_txn_count,
+ new.db_txn_duration_ms - self.db_txn_duration_ms,
+ new.db_sched_duration_ms - self.db_sched_duration_ms,
+ )
+
+ self.ru_utime = new.ru_utime
+ self.ru_stime = new.ru_stime
+ self.db_txn_count = new.db_txn_count
+ self.db_txn_duration_ms = new.db_txn_duration_ms
+ self.db_sched_duration_ms = new.db_sched_duration_ms
+
+ return diff
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 202a990508..b608504225 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -85,7 +85,9 @@ class SynapseRequest(Request):
def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.request_metrics = RequestMetrics()
- self.request_metrics.start(self.start_time, name=servlet_name)
+ self.request_metrics.start(
+ self.start_time, name=servlet_name, method=self.method,
+ )
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index a41af4fd6c..a603c520ea 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -69,6 +69,7 @@ class ReplicationStreamer(object):
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self._server_notices_sender = hs.get_server_notices_sender()
# Current connections.
self.connections = []
@@ -253,6 +254,7 @@ class ReplicationStreamer(object):
yield self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen,
)
+ yield self._server_notices_sender.on_user_ip(user_id)
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index fceca2edeb..20fa6678ef 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -19,6 +19,7 @@ import logging
from synapse.api.auth import get_access_token_from_request
from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -80,27 +81,26 @@ class HttpTransactionCache(object):
Returns:
Deferred which resolves to a tuple of (response_code, response_dict).
"""
- try:
- return self.transactions[txn_key][0].observe()
- except (KeyError, IndexError):
- pass # execute the function instead.
-
- deferred = fn(*args, **kwargs)
-
- # if the request fails with a Twisted failure, remove it
- # from the transaction map. This is done to ensure that we don't
- # cache transient errors like rate-limiting errors, etc.
- def remove_from_map(err):
- self.transactions.pop(txn_key, None)
- return err
- deferred.addErrback(remove_from_map)
-
- # We don't add any other errbacks to the raw deferred, so we ask
- # ObservableDeferred to swallow the error. This is fine as the error will
- # still be reported to the observers.
- observable = ObservableDeferred(deferred, consumeErrors=True)
- self.transactions[txn_key] = (observable, self.clock.time_msec())
- return observable.observe()
+ if txn_key in self.transactions:
+ observable = self.transactions[txn_key][0]
+ else:
+ # execute the function instead.
+ deferred = run_in_background(fn, *args, **kwargs)
+
+ observable = ObservableDeferred(deferred)
+ self.transactions[txn_key] = (observable, self.clock.time_msec())
+
+ # if the request fails with an exception, remove it
+ # from the transaction map. This is done to ensure that we don't
+ # cache transient errors like rate-limiting errors, etc.
+ def remove_from_map(err):
+ self.transactions.pop(txn_key, None)
+ # we deliberately do not propagate the error any further, as we
+ # expect the observers to have reported it.
+
+ deferred.addErrback(remove_from_map)
+
+ return make_deferred_yieldable(observable.observe())
def _cleanup(self):
now = self.clock.time_msec()
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index eb91c0b293..a291cffbf1 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -85,6 +85,7 @@ class SyncRestServlet(RestServlet):
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler()
+ self._server_notices_sender = hs.get_server_notices_sender()
@defer.inlineCallbacks
def on_GET(self, request):
@@ -149,6 +150,9 @@ class SyncRestServlet(RestServlet):
else:
since_token = None
+ # send any outstanding server notices to the user.
+ yield self._server_notices_sender.on_user_syncing(user.to_string())
+
affect_presence = set_presence != PresenceState.OFFLINE
if affect_presence:
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index d791302278..724911d1e6 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -95,8 +95,8 @@ class ConsentResource(Resource):
# this is required by the request_handler wrapper
self.clock = hs.get_clock()
- consent_config = hs.config.consent_config
- if consent_config is None:
+ self._default_consent_version = hs.config.user_consent_version
+ if self._default_consent_version is None:
raise ConfigError(
"Consent resource is enabled but user_consent section is "
"missing in config file.",
@@ -104,7 +104,7 @@ class ConsentResource(Resource):
# daemonize changes the cwd to /, so make the path absolute now.
consent_template_directory = path.abspath(
- consent_config["template_dir"],
+ hs.config.user_consent_template_dir,
)
if not path.isdir(consent_template_directory):
raise ConfigError(
@@ -114,9 +114,10 @@ class ConsentResource(Resource):
)
loader = jinja2.FileSystemLoader(consent_template_directory)
- self._jinja_env = jinja2.Environment(loader=loader)
-
- self._default_consent_verison = consent_config["default_version"]
+ self._jinja_env = jinja2.Environment(
+ loader=loader,
+ autoescape=jinja2.select_autoescape(['html', 'htm', 'xml']),
+ )
if hs.config.form_secret is None:
raise ConfigError(
@@ -131,6 +132,7 @@ class ConsentResource(Resource):
return NOT_DONE_YET
@wrap_html_request_handler
+ @defer.inlineCallbacks
def _async_render_GET(self, request):
"""
Args:
@@ -138,16 +140,26 @@ class ConsentResource(Resource):
"""
version = parse_string(request, "v",
- default=self._default_consent_verison)
+ default=self._default_consent_version)
username = parse_string(request, "u", required=True)
userhmac = parse_string(request, "h", required=True)
self._check_hash(username, userhmac)
+ if username.startswith('@'):
+ qualified_user_id = username
+ else:
+ qualified_user_id = UserID(username, self.hs.hostname).to_string()
+
+ u = yield self.store.get_user_by_id(qualified_user_id)
+ if u is None:
+ raise NotFoundError("Unknown user")
+
try:
self._render_template(
request, "%s.html" % (version,),
user=username, userhmac=userhmac, version=version,
+ has_consented=(u["consent_version"] == version),
)
except TemplateNotFound:
raise NotFoundError("Unknown policy version")
diff --git a/synapse/server.py b/synapse/server.py
index 85f54cd047..58dbf78437 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -73,6 +73,10 @@ from synapse.rest.media.v1.media_repository import (
MediaRepositoryResource,
)
from synapse.server_notices.server_notices_manager import ServerNoticesManager
+from synapse.server_notices.server_notices_sender import ServerNoticesSender
+from synapse.server_notices.worker_server_notices_sender import (
+ WorkerServerNoticesSender,
+)
from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
@@ -158,6 +162,7 @@ class HomeServer(object):
'room_member_handler',
'federation_registry',
'server_notices_manager',
+ 'server_notices_sender',
]
def __init__(self, hostname, **kwargs):
@@ -401,8 +406,15 @@ class HomeServer(object):
return FederationHandlerRegistry()
def build_server_notices_manager(self):
+ if self.config.worker_app:
+ raise Exception("Workers cannot send server notices")
return ServerNoticesManager(self)
+ def build_server_notices_sender(self):
+ if self.config.worker_app:
+ return WorkerServerNoticesSender(self)
+ return ServerNoticesSender(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 6fbe15168d..ce28486233 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -10,6 +10,7 @@ import synapse.handlers.e2e_keys
import synapse.handlers.set_password
import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager
+import synapse.server_notices.server_notices_sender
import synapse.state
import synapse.storage
@@ -69,3 +70,6 @@ class HomeServer(object):
def get_server_notices_manager(self) -> synapse.server_notices.server_notices_manager.ServerNoticesManager:
pass
+
+ def get_server_notices_sender(self) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
+ pass
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
new file mode 100644
index 0000000000..a709802856
--- /dev/null
+++ b/synapse/server_notices/consent_server_notices.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from six import (iteritems, string_types)
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.api.urls import ConsentURIBuilder
+from synapse.config import ConfigError
+from synapse.types import get_localpart_from_id
+
+logger = logging.getLogger(__name__)
+
+
+class ConsentServerNotices(object):
+ """Keeps track of whether we need to send users server_notices about
+ privacy policy consent, and sends one if we do.
+ """
+ def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
+ self._server_notices_manager = hs.get_server_notices_manager()
+ self._store = hs.get_datastore()
+
+ self._users_in_progress = set()
+
+ self._current_consent_version = hs.config.user_consent_version
+ self._server_notice_content = hs.config.user_consent_server_notice_content
+
+ if self._server_notice_content is not None:
+ if not self._server_notices_manager.is_enabled():
+ raise ConfigError(
+ "user_consent configuration requires server notices, but "
+ "server notices are not enabled.",
+ )
+ if 'body' not in self._server_notice_content:
+ raise ConfigError(
+ "user_consent server_notice_consent must contain a 'body' "
+ "key.",
+ )
+
+ self._consent_uri_builder = ConsentURIBuilder(hs.config)
+
+ @defer.inlineCallbacks
+ def maybe_send_server_notice_to_user(self, user_id):
+ """Check if we need to send a notice to this user, and does so if so
+
+ Args:
+ user_id (str): user to check
+
+ Returns:
+ Deferred
+ """
+ if self._server_notice_content is None:
+ # not enabled
+ return
+
+ # make sure we don't send two messages to the same user at once
+ if user_id in self._users_in_progress:
+ return
+ self._users_in_progress.add(user_id)
+ try:
+ u = yield self._store.get_user_by_id(user_id)
+
+ if u["consent_version"] == self._current_consent_version:
+ # user has already consented
+ return
+
+ if u["consent_server_notice_sent"] == self._current_consent_version:
+ # we've already sent a notice to the user
+ return
+
+ # need to send a message.
+ try:
+ consent_uri = self._consent_uri_builder.build_user_consent_uri(
+ get_localpart_from_id(user_id),
+ )
+ content = copy_with_str_subst(
+ self._server_notice_content, {
+ 'consent_uri': consent_uri,
+ },
+ )
+ yield self._server_notices_manager.send_notice(
+ user_id, content,
+ )
+ yield self._store.user_set_consent_server_notice_sent(
+ user_id, self._current_consent_version,
+ )
+ except SynapseError as e:
+ logger.error("Error sending server notice about user consent: %s", e)
+ finally:
+ self._users_in_progress.remove(user_id)
+
+
+def copy_with_str_subst(x, substitutions):
+ """Deep-copy a structure, carrying out string substitions on any strings
+
+ Args:
+ x (object): structure to be copied
+ substitutions (object): substitutions to be made - passed into the
+ string '%' operator
+
+ Returns:
+ copy of x
+ """
+ if isinstance(x, string_types):
+ return x % substitutions
+ if isinstance(x, dict):
+ return {
+ k: copy_with_str_subst(v, substitutions) for (k, v) in iteritems(x)
+ }
+ if isinstance(x, (list, tuple)):
+ return [copy_with_str_subst(y) for y in x]
+
+ # assume it's uninterested and can be shallow-copied.
+ return x
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index f535b9c9da..a26deace53 100644
--- a/synapse/server_notices/server_notices_manager.py
+++ b/synapse/server_notices/server_notices_manager.py
@@ -35,6 +35,7 @@ class ServerNoticesManager(object):
self._config = hs.config
self._room_creation_handler = hs.get_room_creation_handler()
self._event_creation_handler = hs.get_event_creation_handler()
+ self._is_mine_id = hs.is_mine_id
def is_enabled(self):
"""Checks if server notices are enabled on this server.
@@ -55,7 +56,7 @@ class ServerNoticesManager(object):
event_content (dict): content of event to send
Returns:
- Deferrred[None]
+ Deferred[None]
"""
room_id = yield self.get_notice_room_for_user(user_id)
@@ -89,6 +90,9 @@ class ServerNoticesManager(object):
if not self.is_enabled():
raise Exception("Server notices not enabled")
+ assert self._is_mine_id(user_id), \
+ "Cannot send server notices to remote users"
+
rooms = yield self._store.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE, Membership.JOIN],
)
@@ -109,6 +113,19 @@ class ServerNoticesManager(object):
# apparently no existing notice room: create a new one
logger.info("Creating server notices room for %s", user_id)
+ # see if we want to override the profile info for the server user.
+ # note that if we want to override either the display name or the
+ # avatar, we have to use both.
+ join_profile = None
+ if (
+ self._config.server_notices_mxid_display_name is not None or
+ self._config.server_notices_mxid_avatar_url is not None
+ ):
+ join_profile = {
+ "displayname": self._config.server_notices_mxid_display_name,
+ "avatar_url": self._config.server_notices_mxid_avatar_url,
+ }
+
requester = create_requester(system_mxid)
info = yield self._room_creation_handler.create_room(
requester,
@@ -121,9 +138,7 @@ class ServerNoticesManager(object):
"invite": (user_id,)
},
ratelimit=False,
- creator_join_profile={
- "displayname": self._config.server_notices_mxid_display_name,
- },
+ creator_join_profile=join_profile,
)
room_id = info['room_id']
diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py
new file mode 100644
index 0000000000..5d23965f34
--- /dev/null
+++ b/synapse/server_notices/server_notices_sender.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.server_notices.consent_server_notices import ConsentServerNotices
+
+
+class ServerNoticesSender(object):
+ """A centralised place which sends server notices automatically when
+ Certain Events take place
+ """
+ def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
+ # todo: it would be nice to make this more dynamic
+ self._consent_server_notices = ConsentServerNotices(hs)
+
+ def on_user_syncing(self, user_id):
+ """Called when the user performs a sync operation.
+
+ Args:
+ user_id (str): mxid of user who synced
+
+ Returns:
+ Deferred
+ """
+ return self._consent_server_notices.maybe_send_server_notice_to_user(
+ user_id,
+ )
+
+ def on_user_ip(self, user_id):
+ """Called on the master when a worker process saw a client request.
+
+ Args:
+ user_id (str): mxid
+
+ Returns:
+ Deferred
+ """
+ # The synchrotrons use a stubbed version of ServerNoticesSender, so
+ # we check for notices to send to the user in on_user_ip as well as
+ # in on_user_syncing
+ return self._consent_server_notices.maybe_send_server_notice_to_user(
+ user_id,
+ )
diff --git a/synapse/server_notices/worker_server_notices_sender.py b/synapse/server_notices/worker_server_notices_sender.py
new file mode 100644
index 0000000000..4a133026c3
--- /dev/null
+++ b/synapse/server_notices/worker_server_notices_sender.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+
+class WorkerServerNoticesSender(object):
+ """Stub impl of ServerNoticesSender which does nothing"""
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer):
+ """
+
+ def on_user_syncing(self, user_id):
+ """Called when the user performs a sync operation.
+
+ Args:
+ user_id (str): mxid of user who synced
+
+ Returns:
+ Deferred
+ """
+ return defer.succeed(None)
+
+ def on_user_ip(self, user_id):
+ """Called on the master when a worker process saw a client request.
+
+ Args:
+ user_id (str): mxid
+
+ Returns:
+ Deferred
+ """
+ raise AssertionError("on_user_ip unexpectedly called on worker")
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ac264b5d25..979fa22438 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -379,7 +379,11 @@ class DataStore(RoomMemberStore, RoomStore,
WHERE timestamp = ?
) udv
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
- WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL
+ INNER JOIN users ON users.name=u.user_id
+ WHERE last_seen > ? AND last_seen <= ?
+ AND udv.timestamp IS NULL AND users.is_guest=0
+ AND users.appservice_id IS NULL
+ GROUP BY u.user_id, u.device_id
"""
# This means that the day has rolled over but there could still
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8d1a01f1ee..a530e29f43 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -33,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore):
keyvalues={
"name": user_id,
},
- retcols=["name", "password_hash", "is_guest"],
+ retcols=[
+ "name", "password_hash", "is_guest",
+ "consent_version", "consent_server_notice_sent",
+ ],
allow_none=True,
desc="get_user_by_id",
)
@@ -297,12 +300,41 @@ class RegistrationStore(RegistrationWorkerStore,
Raises:
StoreError(404) if user not found
"""
- return self._simple_update_one(
- table='users',
- keyvalues={'name': user_id, },
- updatevalues={'consent_version': consent_version, },
- desc="user_set_consent_version"
- )
+ def f(txn):
+ self._simple_update_one_txn(
+ txn,
+ table='users',
+ keyvalues={'name': user_id, },
+ updatevalues={'consent_version': consent_version, },
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ return self.runInteraction("user_set_consent_version", f)
+
+ def user_set_consent_server_notice_sent(self, user_id, consent_version):
+ """Updates the user table to record that we have sent the user a server
+ notice about privacy policy consent
+
+ Args:
+ user_id (str): full mxid of the user to update
+ consent_version (str): version of the policy we have notified the
+ user about
+
+ Raises:
+ StoreError(404) if user not found
+ """
+ def f(txn):
+ self._simple_update_one_txn(
+ txn,
+ table='users',
+ keyvalues={'name': user_id, },
+ updatevalues={'consent_server_notice_sent': consent_version, },
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ return self.runInteraction("user_set_consent_server_notice_sent", f)
def user_delete_access_tokens(self, user_id, except_token_id=None,
device_id=None):
diff --git a/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
new file mode 100644
index 0000000000..14dcf18d73
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
@@ -0,0 +1,20 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* record whether we have sent a server notice about consenting to the
+ * privacy policy. Specifically records the version of the policy we sent
+ * a message about.
+ */
+ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT;
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index ea24710ad8..fb463c525a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -684,8 +684,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results to only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
- limit (int): The maximum number of events to return. Zero or less
- means no limit.
+ limit (int): The maximum number of events to return.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
@@ -694,6 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
as a list of _EventDictReturn and a token that points to the end
of the result set.
"""
+
+ assert int(limit) >= 0
+
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -723,22 +725,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds += " AND " + filter_clause
args.extend(filter_args)
- if int(limit) > 0:
- args.append(int(limit))
- limit_str = " LIMIT ?"
- else:
- limit_str = ""
+ args.append(int(limit))
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s %(limit)s"
+ " stream_ordering %(order)s LIMIT ?"
) % {
"bounds": bounds,
"order": order,
- "limit": limit_str
}
txn.execute(sql, args)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index eab9d57650..914f616312 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -60,7 +60,7 @@ class LoggingContext(object):
__slots__ = [
"previous_context", "name", "ru_stime", "ru_utime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
- "usage_start", "usage_end",
+ "usage_start",
"main_thread", "alive",
"request", "tag",
]
@@ -109,8 +109,10 @@ class LoggingContext(object):
# ms spent waiting for db txns to be scheduled
self.db_sched_duration_ms = 0
+ # If alive has the thread resource usage when the logcontext last
+ # became active.
self.usage_start = None
- self.usage_end = None
+
self.main_thread = threading.current_thread()
self.request = None
self.tag = ""
@@ -159,7 +161,7 @@ class LoggingContext(object):
"""Restore the logging context in thread local storage to the state it
was before this context was entered.
Returns:
- None to avoid suppressing any exeptions that were thrown.
+ None to avoid suppressing any exceptions that were thrown.
"""
current = self.set_current_context(self.previous_context)
if current is not self:
@@ -185,29 +187,43 @@ class LoggingContext(object):
def start(self):
if threading.current_thread() is not self.main_thread:
+ logger.warning("Started logcontext %s on different thread", self)
return
- if self.usage_start and self.usage_end:
- self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime
- self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime
- self.usage_start = None
- self.usage_end = None
-
+ # If we haven't already started record the thread resource usage so
+ # far
if not self.usage_start:
self.usage_start = get_thread_resource_usage()
def stop(self):
if threading.current_thread() is not self.main_thread:
+ logger.warning("Stopped logcontext %s on different thread", self)
return
+ # When we stop, let's record the resource used since we started
if self.usage_start:
- self.usage_end = get_thread_resource_usage()
+ usage_end = get_thread_resource_usage()
+
+ self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
+ self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+
+ self.usage_start = None
+ else:
+ logger.warning("Called stop on logcontext %s without calling start", self)
def get_resource_usage(self):
+ """Get CPU time used by this logcontext so far.
+
+ Returns:
+ tuple[float, float]: The user and system CPU usage in seconds
+ """
ru_utime = self.ru_utime
ru_stime = self.ru_stime
- if self.usage_start and threading.current_thread() is self.main_thread:
+ # If we are on the correct thread and we're currently running then we
+ # can include resource usage so far.
+ is_main_thread = threading.current_thread() is self.main_thread
+ if self.alive and self.usage_start and is_main_thread:
current = get_thread_resource_usage()
ru_utime += current.ru_utime - self.usage_start.ru_utime
ru_stime += current.ru_stime - self.usage_start.ru_stime
|