diff options
author | Erik Johnston <erik@matrix.org> | 2018-07-23 13:21:15 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-07-23 13:21:15 +0100 |
commit | 0b0b24cb82d3337bef5d3dd4b960990934d8d5c5 (patch) | |
tree | 3cbf0143b30ef340a91a99f5d491ee2eb7aa5e8c /synapse | |
parent | Update docs/workers.rst (diff) | |
parent | Merge pull request #3520 from matrix-org/matthew/sync_deleted_devices (diff) | |
download | synapse-0b0b24cb82d3337bef5d3dd4b960990934d8d5c5.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/client_apis_move
Diffstat (limited to 'synapse')
33 files changed, 683 insertions, 223 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 3cde33c0d7..5c0f2f83aa 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,4 +17,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.32.2" +__version__ = "0.33.0" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 14e6dca522..2ad1beb8d8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ import logging import os import sys +from six import iteritems + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -442,7 +444,7 @@ def run(hs): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.iteritems(): + for name, count in iteritems(daily_user_type_results): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -453,7 +455,7 @@ def run(hs): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in r30_results.iteritems(): + for name, count in iteritems(r30_results): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 68acc15a9a..d658f967ba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -25,6 +25,8 @@ import subprocess import sys import time +from six import iteritems + import yaml SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] @@ -173,7 +175,7 @@ def main(): os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) - for cache_name, factor in cache_factors.iteritems(): + for cache_name, factor in iteritems(cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb5946..f83a1581a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer @@ -159,7 +161,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5a956ecfb3..6996d6b695 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -30,7 +30,8 @@ from synapse.metrics import ( sent_edus_counter, sent_transactions_counter, ) -from synapse.util import PreserveLoggingContext, logcontext +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -165,10 +166,11 @@ class TransactionQueue(object): if self._is_processing: return - # fire off a processing loop in the background. It's likely it will - # outlast the current request, so run it in the sentinel logcontext. - with PreserveLoggingContext(): - self._process_event_queue_loop() + # fire off a processing loop in the background + run_as_background_process( + "process_event_queue_for_federation", + self._process_event_queue_loop, + ) @defer.inlineCallbacks def _process_event_queue_loop(self): @@ -432,14 +434,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Starting transaction loop", destination) - # Drop the logcontext before starting the transaction. It doesn't - # really make sense to log all the outbound transactions against - # whatever path led us to this point: that's pretty arbitrary really. - # - # (this also means we can fire off _perform_transaction without - # yielding) - with logcontext.PreserveLoggingContext(): - self._transaction_transmission_loop(destination) + run_as_background_process( + "federation_transaction_transmission_loop", + self._transaction_transmission_loop, + destination, + ) @defer.inlineCallbacks def _transaction_transmission_loop(self, destination): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 65f6041b10..a6d391c4e8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,8 +21,8 @@ import logging import sys import six -from six import iteritems -from six.moves import http_client +from six import iteritems, itervalues +from six.moves import http_client, zip from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -731,7 +731,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -748,7 +748,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -811,7 +811,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -827,15 +827,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -1515,7 +1515,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ba3c4642bc..7030d8beab 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -32,7 +32,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master from synapse.types import RoomAlias, UserID -from synapse.util.async import Limiter +from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func @@ -180,7 +180,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/http/client.py b/synapse/http/client.py index d6a0d75b2b..25b6307884 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE from twisted.internet import defer, protocol, reactor, ssl, task from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent -from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.client import ( + Agent, + BrowserLikeRedirectAgent, + ContentDecoderAgent, + FileBodyProducer as TwistedFileBodyProducer, GzipDecoder, HTTPConnectionPool, PartialDownloadError, diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index f24b4b949c..588e280571 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -38,7 +38,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] + "synapse_http_server_response_time_seconds", "sec", + ["method", "servlet", "tag", "code"], ) response_ru_utime = Counter( @@ -171,11 +172,13 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.labels(request.method, str(request.code)).inc() + response_code = str(request.code) + + outgoing_responses_counter.labels(request.method, response_code).inc() response_count.labels(request.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag).observe( + response_timer.labels(request.method, self.name, tag, response_code).observe( time_sec - self.start ) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py new file mode 100644 index 0000000000..9d820e44a6 --- /dev/null +++ b/synapse/metrics/background_process_metrics.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily + +from twisted.internet import defer + +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext + +_background_process_start_count = Counter( + "synapse_background_process_start_count", + "Number of background processes started", + ["name"], +) + +# we set registry=None in all of these to stop them getting registered with +# the default registry. Instead we collect them all via the CustomCollector, +# which ensures that we can update them before they are collected. +# +_background_process_ru_utime = Counter( + "synapse_background_process_ru_utime_seconds", + "User CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_ru_stime = Counter( + "synapse_background_process_ru_stime_seconds", + "System CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_db_txn_count = Counter( + "synapse_background_process_db_txn_count", + "Number of database transactions done by background processes", + ["name"], + registry=None, +) + +_background_process_db_txn_duration = Counter( + "synapse_background_process_db_txn_duration_seconds", + ("Seconds spent by background processes waiting for database " + "transactions, excluding scheduling time"), + ["name"], + registry=None, +) + +_background_process_db_sched_duration = Counter( + "synapse_background_process_db_sched_duration_seconds", + "Seconds spent by background processes waiting for database connections", + ["name"], + registry=None, +) + +# map from description to a counter, so that we can name our logcontexts +# incrementally. (It actually duplicates _background_process_start_count, but +# it's much simpler to do so than to try to combine them.) +_background_process_counts = dict() # type: dict[str, int] + +# map from description to the currently running background processes. +# +# it's kept as a dict of sets rather than a big set so that we can keep track +# of process descriptions that no longer have any active processes. +_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] + + +class _Collector(object): + """A custom metrics collector for the background process metrics. + + Ensures that all of the metrics are up-to-date with any in-flight processes + before they are returned. + """ + def collect(self): + background_process_in_flight_count = GaugeMetricFamily( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labels=["name"], + ) + + for desc, processes in six.iteritems(_background_processes): + background_process_in_flight_count.add_metric( + (desc,), len(processes), + ) + for process in processes: + process.update_metrics() + + yield background_process_in_flight_count + + # now we need to run collect() over each of the static Counters, and + # yield each metric they return. + for m in ( + _background_process_ru_utime, + _background_process_ru_stime, + _background_process_db_txn_count, + _background_process_db_txn_duration, + _background_process_db_sched_duration, + ): + for r in m.collect(): + yield r + + +REGISTRY.register(_Collector()) + + +class _BackgroundProcess(object): + def __init__(self, desc, ctx): + self.desc = desc + self._context = ctx + self._reported_stats = None + + def update_metrics(self): + """Updates the metrics with values from this process.""" + new_stats = self._context.get_resource_usage() + if self._reported_stats is None: + diff = new_stats + else: + diff = new_stats - self._reported_stats + self._reported_stats = new_stats + + _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime) + _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime) + _background_process_db_txn_count.labels(self.desc).inc( + diff.db_txn_count, + ) + _background_process_db_txn_duration.labels(self.desc).inc( + diff.db_txn_duration_sec, + ) + _background_process_db_sched_duration.labels(self.desc).inc( + diff.db_sched_duration_sec, + ) + + +def run_as_background_process(desc, func, *args, **kwargs): + """Run the given function in its own logcontext, with resource metrics + + This should be used to wrap processes which are fired off to run in the + background, instead of being associated with a particular request. + + Args: + desc (str): a description for this background process type + func: a function, which may return a Deferred + args: positional args for func + kwargs: keyword args for func + + Returns: None + """ + @defer.inlineCallbacks + def run(): + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() + + with LoggingContext(desc) as context: + context.request = "%s-%i" % (desc, count) + proc = _BackgroundProcess(desc, context) + _background_processes.setdefault(desc, set()).add(proc) + try: + yield func(*args, **kwargs) + finally: + proc.update_metrics() + _background_processes[desc].remove(proc) + + with PreserveLoggingContext(): + run() diff --git a/synapse/notifier.py b/synapse/notifier.py index 51cbd66f06..e650c3e494 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -274,7 +274,7 @@ class Notifier(object): logger.exception("Error notifying application services of event") def on_new_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend event wise. + """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. """ diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 75c2a4ec8e..3418f06fd6 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import PY3 + from synapse.http.server import JsonResource from synapse.rest.client import versions -from synapse.rest.client.v1 import admin, directory, events, initial_sync -from synapse.rest.client.v1 import login as v1_login -from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher -from synapse.rest.client.v1 import register as v1_register -from synapse.rest.client.v1 import room, voip +from synapse.rest.client.v1 import ( + admin, + directory, + events, + initial_sync, + login as v1_login, + logout, + presence, + profile, + push_rule, + pusher, + room, + voip, +) from synapse.rest.client.v2_alpha import ( account, account_data, @@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import ( user_directory, ) +if not PY3: + from synapse.rest.client.v1_only import ( + register as v1_register, + ) + class ClientRestResource(JsonResource): """A resource for version 1 of the matrix client API.""" @@ -54,14 +71,22 @@ class ClientRestResource(JsonResource): def register_servlets(client_resource, hs): versions.register_servlets(client_resource) - # "v1" - room.register_servlets(hs, client_resource) + if not PY3: + # "v1" (Python 2 only) + v1_register.register_servlets(hs, client_resource) + + # Deprecated in r0 + initial_sync.register_servlets(hs, client_resource) + room.register_deprecated_servlets(hs, client_resource) + + # Partially deprecated in r0 events.register_servlets(hs, client_resource) - v1_register.register_servlets(hs, client_resource) + + # "v1" + "r0" + room.register_servlets(hs, client_resource) v1_login.register_servlets(hs, client_resource) profile.register_servlets(hs, client_resource) presence.register_servlets(hs, client_resource) - initial_sync.register_servlets(hs, client_resource) directory.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource) admin.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 13fd63a5b2..99f6c6e3c3 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import hashlib +import hmac import logging from six.moves import http_client @@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class UserRegisterServlet(ClientV1RestServlet): + """ + Attributes: + NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted + nonces (dict[str, int]): The nonces that we will accept. A dict of + nonce to the time it was generated, in int seconds. + """ + PATTERNS = client_path_patterns("/admin/register") + NONCE_TIMEOUT = 60 + + def __init__(self, hs): + super(UserRegisterServlet, self).__init__(hs) + self.handlers = hs.get_handlers() + self.reactor = hs.get_reactor() + self.nonces = {} + self.hs = hs + + def _clear_old_nonces(self): + """ + Clear out old nonces that are older than NONCE_TIMEOUT. + """ + now = int(self.reactor.seconds()) + + for k, v in list(self.nonces.items()): + if now - v > self.NONCE_TIMEOUT: + del self.nonces[k] + + def on_GET(self, request): + """ + Generate a new nonce. + """ + self._clear_old_nonces() + + nonce = self.hs.get_secrets().token_hex(64) + self.nonces[nonce] = int(self.reactor.seconds()) + return (200, {"nonce": nonce.encode('ascii')}) + + @defer.inlineCallbacks + def on_POST(self, request): + self._clear_old_nonces() + + if not self.hs.config.registration_shared_secret: + raise SynapseError(400, "Shared secret registration is not enabled") + + body = parse_json_object_from_request(request) + + if "nonce" not in body: + raise SynapseError( + 400, "nonce must be specified", errcode=Codes.BAD_JSON, + ) + + nonce = body["nonce"] + + if nonce not in self.nonces: + raise SynapseError( + 400, "unrecognised nonce", + ) + + # Delete the nonce, so it can't be reused, even if it's invalid + del self.nonces[nonce] + + if "username" not in body: + raise SynapseError( + 400, "username must be specified", errcode=Codes.BAD_JSON, + ) + else: + if (not isinstance(body['username'], str) or len(body['username']) > 512): + raise SynapseError(400, "Invalid username") + + username = body["username"].encode("utf-8") + if b"\x00" in username: + raise SynapseError(400, "Invalid username") + + if "password" not in body: + raise SynapseError( + 400, "password must be specified", errcode=Codes.BAD_JSON, + ) + else: + if (not isinstance(body['password'], str) or len(body['password']) > 512): + raise SynapseError(400, "Invalid password") + + password = body["password"].encode("utf-8") + if b"\x00" in password: + raise SynapseError(400, "Invalid password") + + admin = body.get("admin", None) + got_mac = body["mac"] + + want_mac = hmac.new( + key=self.hs.config.registration_shared_secret.encode(), + digestmod=hashlib.sha1, + ) + want_mac.update(nonce) + want_mac.update(b"\x00") + want_mac.update(username) + want_mac.update(b"\x00") + want_mac.update(password) + want_mac.update(b"\x00") + want_mac.update(b"admin" if admin else b"notadmin") + want_mac = want_mac.hexdigest() + + if not hmac.compare_digest(want_mac, got_mac): + raise SynapseError( + 403, "HMAC incorrect", + ) + + # Reuse the parts of RegisterRestServlet to reduce code duplication + from synapse.rest.client.v2_alpha.register import RegisterRestServlet + register = RegisterRestServlet(self.hs) + + (user_id, _) = yield register.registration_handler.register( + localpart=username.lower(), password=password, admin=bool(admin), + generate_token=False, + ) + + result = yield register._create_registration_details(user_id, body) + defer.returnValue((200, result)) + + class WhoisRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)") @@ -614,3 +735,4 @@ def register_servlets(hs, http_server): ShutdownRoomRestServlet(hs).register(http_server) QuarantineMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server) + UserRegisterServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2c9459534e..b7bd878c90 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -830,10 +830,13 @@ def register_servlets(hs, http_server): RoomSendEventRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server) - RoomInitialSyncRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) JoinedRoomsRestServlet(hs).register(http_server) RoomEventServlet(hs).register(http_server) RoomEventContextServlet(hs).register(http_server) + + +def register_deprecated_servlets(hs, http_server): + RoomInitialSyncRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py new file mode 100644 index 0000000000..936f902ace --- /dev/null +++ b/synapse/rest/client/v1_only/__init__.py @@ -0,0 +1,3 @@ +""" +REST APIs that are only used in v1 (the legacy API). +""" diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py new file mode 100644 index 0000000000..9d4db7437c --- /dev/null +++ b/synapse/rest/client/v1_only/base.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module contains base REST classes for constructing client v1 servlets. +""" + +import re + +from synapse.api.urls import CLIENT_PREFIX + + +def v1_only_client_path_patterns(path_regex, include_in_unstable=True): + """Creates a regex compiled client path with the correct client path + prefix. + + Args: + path_regex (str): The regex string to match. This should NOT have a ^ + as this will be prefixed. + Returns: + list of SRE_Pattern + """ + patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)] + if include_in_unstable: + unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable") + patterns.append(re.compile("^" + unstable_prefix + path_regex)) + return patterns diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py index 25a143af8d..3439c3c6d4 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1_only/register.py @@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request +from synapse.rest.client.v1.base import ClientV1RestServlet from synapse.types import create_requester -from .base import ClientV1RestServlet, client_path_patterns +from .base import v1_only_client_path_patterns logger = logging.getLogger(__name__) @@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet): handler doesn't have a concept of multi-stages or sessions. """ - PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False) + PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False) def __init__(self, hs): """ @@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet): """Handles user creation via a server-to-server interface """ - PATTERNS = client_path_patterns("/createUser$", releases=()) + PATTERNS = v1_only_client_path_patterns("/createUser$") def __init__(self, hs): super(CreateUserRestServlet, self).__init__(hs) diff --git a/synapse/secrets.py b/synapse/secrets.py new file mode 100644 index 0000000000..f397daaa5e --- /dev/null +++ b/synapse/secrets.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Injectable secrets module for Synapse. + +See https://docs.python.org/3/library/secrets.html#module-secrets for the API +used in Python 3.6, and the API emulated in Python 2.7. +""" + +import six + +if six.PY3: + import secrets + + def Secrets(): + return secrets + + +else: + + import os + import binascii + + class Secrets(object): + def token_bytes(self, nbytes=32): + return os.urandom(nbytes) + + def token_hex(self, nbytes=32): + return binascii.hexlify(self.token_bytes(nbytes)) diff --git a/synapse/server.py b/synapse/server.py index 83eacccc29..140be9ebe8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -75,6 +75,7 @@ from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, ) +from synapse.secrets import Secrets from synapse.server_notices.server_notices_manager import ServerNoticesManager from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender @@ -159,6 +160,7 @@ class HomeServer(object): 'groups_server_handler', 'groups_attestation_signing', 'groups_attestation_renewer', + 'secrets', 'spam_checker', 'room_member_handler', 'federation_registry', @@ -409,6 +411,9 @@ class HomeServer(object): def build_groups_attestation_renewer(self): return GroupAttestionRenewer(self) + def build_secrets(self): + return Secrets() + def build_spam_checker(self): return SpamChecker(self) diff --git a/synapse/state.py b/synapse/state.py index 15a593d41c..504caae2f7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ import hashlib import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems, iterkeys, itervalues from frozendict import frozendict @@ -647,7 +647,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_id in event_ids ) if event_map is not None: - needed_events -= set(event_map.iterkeys()) + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) @@ -668,7 +668,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(event_map.iterkeys()) + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 98dde77431..1d41d8d445 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -344,7 +344,7 @@ class SQLBaseStore(object): parent_context = LoggingContext.current_context() if parent_context == LoggingContext.sentinel: logger.warn( - "Running db txn from sentinel context: metrics will be lost", + "Starting db connection from sentinel context: metrics will be lost", ) parent_context = None diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index dc9eca7d15..5fe1ca2de7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -19,6 +19,8 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process + from . import engines from ._base import SQLBaseStore @@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers = {} self._all_done = False - @defer.inlineCallbacks def start_doing_background_updates(self): - logger.info("Starting background schema updates") + run_as_background_process( + "background_updates", self._run_background_updates, + ) + @defer.inlineCallbacks + def _run_background_updates(self): + logger.info("Starting background schema updates") while True: yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b78eda3413..77ae10da3d 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -19,6 +19,7 @@ from six import iteritems from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import CACHE_SIZE_FACTOR from . import background_updates @@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._batch_row_update[key] = (user_agent, device_id, now) def _update_client_ips_batch(self): - to_update = self._batch_row_update - self._batch_row_update = {} - return self.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + def update(): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, + to_update, + ) + + run_as_background_process( + "update_client_ips", update, ) def _update_client_ips_batch_txn(self, txn, to_update): diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ec68e39f1e..cc3cdf2ebc 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -248,17 +248,31 @@ class DeviceStore(SQLBaseStore): def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, content, stream_id): - self._simple_upsert_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - values={ - "content": json.dumps(content), - } - ) + if content.get("deleted"): + self._simple_delete_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + ) + + txn.call_after( + self.device_id_exists_cache.invalidate, (user_id, device_id,) + ) + else: + self._simple_upsert_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "content": json.dumps(content), + } + ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) @@ -366,7 +380,7 @@ class DeviceStore(SQLBaseStore): now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( - txn, query_map.keys(), include_all_devices=True + txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True ) prev_sent_id_sql = """ @@ -393,12 +407,15 @@ class DeviceStore(SQLBaseStore): prev_id = stream_id - key_json = device.get("key_json", None) - if key_json: - result["keys"] = json.loads(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name + if device is not None: + key_json = device.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + else: + result["deleted"] = True results.append(result) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7ae5c65482..523b4360c3 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_e2e_device_keys(self, query_list, include_all_devices=False): + def get_e2e_device_keys( + self, query_list, include_all_devices=False, + include_deleted_devices=False, + ): """Fetch a list of device keys. Args: query_list(list): List of pairs of user_ids and device_ids. include_all_devices (bool): whether to include entries for devices that don't have device keys + include_deleted_devices (bool): whether to include null entries for + devices which no longer exist (but were in the query_list). + This option only takes effect if include_all_devices is true. Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". @@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore): results = yield self.runInteraction( "get_e2e_device_keys", self._get_e2e_device_keys_txn, - query_list, include_all_devices, + query_list, include_all_devices, include_deleted_devices, ) for user_id, device_keys in iteritems(results): @@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore): defer.returnValue(results) - def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): + def _get_e2e_device_keys_txn( + self, txn, query_list, include_all_devices=False, + include_deleted_devices=False, + ): query_clauses = [] query_params = [] + if include_all_devices is False: + include_deleted_devices = False + + if include_deleted_devices: + deleted_devices = set(query_list) + for (user_id, device_id) in query_list: query_clause = "user_id = ?" query_params.append(user_id) @@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore): result = {} for row in rows: + if include_deleted_devices: + deleted_devices.remove((row["user_id"], row["device_id"])) result.setdefault(row["user_id"], {})[row["device_id"]] = row + if include_deleted_devices: + for user_id, device_id in deleted_devices: + result.setdefault(user_id, {})[device_id] = None + return result @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2aaab0d02c..4ff0fdc4ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -155,11 +156,8 @@ class _EventPeristenceQueue(object): self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - # set handle_queue_loop off on the background. We don't want to - # attribute work done in it to the current request, so we drop the - # logcontext altogether. - with PreserveLoggingContext(): - handle_queue_loop() + # set handle_queue_loop off in the background + run_as_background_process("persist_events", handle_queue_loop) def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 67433606c6..f28239a808 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import ( LoggingContext, PreserveLoggingContext, @@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore): should_start = False if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) + run_as_background_process( + "fetch_events", + self.runWithConnection, + self._do_fetch, + ) logger.debug("Loading %d events", len(events)) with PreserveLoggingContext(): diff --git a/synapse/util/async.py b/synapse/util/async.py index 5d0fb39130..a7094e2fb4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging from contextlib import contextmanager @@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit): class Linearizer(object): - """Linearizes access to resources based on a key. Useful to ensure only one - thing is happening at a time on a given resource. + """Limits concurrent access to resources based on a key. Useful to ensure + only a few things happen at a time on a given resource. Example: - with (yield linearizer.queue("test_key")): + with (yield limiter.queue("test_key")): # do some work. """ - def __init__(self, name=None, clock=None): + def __init__(self, name=None, max_count=1, clock=None): + """ + Args: + max_count(int): The maximum number of concurrent accesses + """ if name is None: self.name = id(self) else: self.name = name - self.key_to_defer = {} if not clock: from twisted.internet import reactor clock = Clock(reactor) self._clock = clock + self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing, and + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. + self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - # If there is already a deferred in the queue, we pull it out so that - # we can wait on it later. - # Then we replace it with a deferred that we resolve *after* the - # context manager has exited. - # We only return the context manager after the previous deferred has - # resolved. - # This all has the net effect of creating a chain of deferreds that - # wait for the previous deferred before starting their work. - current_defer = self.key_to_defer.get(key) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) - new_defer = defer.Deferred() - self.key_to_defer[key] = new_defer + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1][new_defer] = 1 - if current_defer: logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key + "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) try: - with PreserveLoggingContext(): - yield current_defer - except Exception: - logger.exception("Unexpected exception in Linearizer") - - logger.info("Acquired linearizer lock %r for key %r", self.name, - key) + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise + + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + entry[0] += 1 # if the code holding the lock completes synchronously, then it # will recursively run the next claimant on the list. That can @@ -213,15 +232,15 @@ class Linearizer(object): # In order to break the cycle, we add a cheeky sleep(0) here to # ensure that we fall back to the reactor between each iteration. # - # (There's no particular need for it to happen before we return - # the context manager, but it needs to happen while we hold the - # lock, and the context manager's exit code must be synchronous, - # so actually this is the only sensible place. + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) yield self._clock.sleep(0) else: - logger.info("Acquired uncontended linearizer lock %r for key %r", - self.name, key) + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) + entry[0] += 1 @contextmanager def _ctx_manager(): @@ -229,73 +248,15 @@ class Linearizer(object): yield finally: logger.info("Releasing linearizer lock %r for key %r", self.name, key) - with PreserveLoggingContext(): - new_defer.callback(None) - current_d = self.key_to_defer.get(key) - if current_d is new_defer: - self.key_to_defer.pop(key, None) - - defer.returnValue(_ctx_manager()) - - -class Limiter(object): - """Limits concurrent access to resources based on a key. Useful to ensure - only a few thing happen at a time on a given resource. - - Example: - - with (yield limiter.queue("test_key")): - # do some work. - - """ - def __init__(self, max_count): - """ - Args: - max_count(int): The maximum number of concurrent access - """ - self.max_count = max_count - - # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing - # the second element is a list of deferreds for the things blocked from - # executing. - self.key_to_defer = {} - - @defer.inlineCallbacks - def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, []]) - - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When on of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry[0] >= self.max_count: - new_defer = defer.Deferred() - entry[1].append(new_defer) - - logger.info("Waiting to acquire limiter lock for key %r", key) - with PreserveLoggingContext(): - yield new_defer - logger.info("Acquired limiter lock for key %r", key) - else: - logger.info("Acquired uncontended limiter lock for key %r", key) - - entry[0] += 1 - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing limiter lock for key %r", key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them entry[0] -= 1 if entry[1]: - next_def = entry[1].pop(0) + (next_def, _) = entry[1].popitem(last=False) + # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) elif entry[0] == 0: diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 4abca91f6d..465adc54a8 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,6 +16,7 @@ import logging from collections import OrderedDict +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache logger = logging.getLogger(__name__) @@ -63,7 +64,10 @@ class ExpiringCache(object): return def f(): - self._prune_cache() + run_as_background_process( + "prune_cache_%s" % self._cache_name, + self._prune_cache, + ) self._clock.looping_call(f, self._expiry_ms / 2) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 734331caaa..194da87639 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -17,20 +17,18 @@ import logging from twisted.internet import defer -from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) def user_left_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_left_room", user=user, room_id=room_id) + distributor.fire("user_left_room", user=user, room_id=room_id) def user_joined_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_joined_room", user=user, room_id=room_id) + distributor.fire("user_joined_room", user=user, room_id=room_id) class Distributor(object): @@ -44,9 +42,7 @@ class Distributor(object): model will do for today. """ - def __init__(self, suppress_failures=True): - self.suppress_failures = suppress_failures - + def __init__(self): self.signals = {} self.pre_registration = {} @@ -56,7 +52,6 @@ class Distributor(object): self.signals[name] = Signal( name, - suppress_failures=self.suppress_failures, ) if name in self.pre_registration: @@ -75,10 +70,18 @@ class Distributor(object): self.pre_registration[name].append(observer) def fire(self, name, *args, **kwargs): + """Dispatches the given signal to the registered observers. + + Runs the observers as a background process. Does not return a deferred. + """ if name not in self.signals: raise KeyError("%r does not have a signal named %s" % (self, name)) - return self.signals[name].fire(*args, **kwargs) + run_as_background_process( + name, + self.signals[name].fire, + *args, **kwargs + ) class Signal(object): @@ -91,9 +94,8 @@ class Signal(object): method into all of the observers. """ - def __init__(self, name, suppress_failures): + def __init__(self, name): self.name = name - self.suppress_failures = suppress_failures self.observers = [] def observe(self, observer): @@ -103,7 +105,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -121,22 +122,17 @@ class Signal(object): failure.type, failure.value, failure.getTracebackObject())) - if not self.suppress_failures: - return failure return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) - with PreserveLoggingContext(): - deferreds = [ - do(observer) - for observer in self.observers - ] - - res = yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + deferreds = [ + run_in_background(do, o) + for o in self.observers + ] - defer.returnValue(res) + return make_deferred_yieldable(defer.gatherResults( + deferreds, consumeErrors=True, + )) def __repr__(self): return "<Signal name=%r>" % (self.name,) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index f6c7175f74..8dcae50b39 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -99,6 +99,17 @@ class ContextResourceUsage(object): self.db_sched_duration_sec = 0 self.evt_db_fetch_count = 0 + def __repr__(self): + return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', " + "db_txn_count='%r', db_txn_duration_sec='%r', " + "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count,) + def __iadd__(self, other): """Add another ContextResourceUsage's stats to this one's. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6ba7107896..97f1267380 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -104,12 +104,19 @@ class Measure(object): logger.warn("Expected context. (%r)", self.name) return - usage = context.get_resource_usage() - self.start_usage - block_ru_utime.labels(self.name).inc(usage.ru_utime) - block_ru_stime.labels(self.name).inc(usage.ru_stime) - block_db_txn_count.labels(self.name).inc(usage.db_txn_count) - block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + current = context.get_resource_usage() + usage = current - self.start_usage + try: + block_ru_utime.labels(self.name).inc(usage.ru_utime) + block_ru_stime.labels(self.name).inc(usage.ru_stime) + block_db_txn_count.labels(self.name).inc(usage.db_txn_count) + block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + except ValueError: + logger.warn( + "Failed to save metrics! OLD: %r, NEW: %r", + self.start_usage, current + ) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/synapse/visibility.py b/synapse/visibility.py index 9b97ea2b83..ba0499a022 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,11 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import operator -import six +from six import iteritems, itervalues +from six.moves import map from twisted.internet import defer @@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, return event # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) + filtered_events = map(allowed, events) # remove the None entries filtered_events = filter(operator.truth, filtered_events) @@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.itervalues(): + for ev in itervalues(state): if ev.type != EventTypes.Member: continue try: @@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events): ) visibility_ids = set() - for sids in event_to_state_ids.itervalues(): + for sids in itervalues(event_to_state_ids): hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events): event_map = yield store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() + for e in itervalues(event_map) ) if all_open: @@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events): # state_key_to_event_id_set = { e - for key_to_eid in six.itervalues(event_to_state_ids) + for key_to_eid in itervalues(event_to_state_ids) for e in key_to_eid.items() } @@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events): event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() + for key, inner_e_id in iteritems(key_to_eid) if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.iteritems() + for e_id, key_to_eid in iteritems(event_to_state_ids) } defer.returnValue([ |