From d83d004ccdb7ace1dcb51b8acf7645bc176b10a5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 2 Feb 2016 17:18:50 +0000 Subject: Fix flake8 warnings for new flake8 --- synapse/http/matrixfederationclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da13e32e78..c3589534f8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -152,7 +152,7 @@ class MatrixFederationHttpClient(object): return self.clock.time_bound_deferred( request_deferred, - time_out=timeout/1000. if timeout else 60, + time_out=timeout / 1000. if timeout else 60, ) response = yield preserve_context_over_fn( -- cgit 1.4.1 From d4f72a5bfb95d07d5af3f49c736823840659101a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Feb 2016 13:51:25 +0000 Subject: Allowing tagging log contexts --- synapse/handlers/sync.py | 10 ++++++++++ synapse/http/server.py | 41 ++++++++++++++++++++++++++--------------- synapse/util/logcontext.py | 7 ++++++- 3 files changed, 42 insertions(+), 16 deletions(-) (limited to 'synapse/http') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dc686db541..72ccaf1e3f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,6 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError +from synapse.util.logcontext import LoggingContext from twisted.internet import defer @@ -140,6 +141,15 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ + context = LoggingContext.current_context() + if context: + if since_token is None: + context.tag = "initial_sync" + elif full_state: + context.tag = "full_state_sync" + else: + context.tag = "incremental_sync" + if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. diff --git a/synapse/http/server.py b/synapse/http/server.py index 10d1fcd3f6..c250a4604f 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -41,7 +41,7 @@ metrics = synapse.metrics.get_metrics_for(__name__) incoming_requests_counter = metrics.register_counter( "requests", - labels=["method", "servlet"], + labels=["method", "servlet", "tag"], ) outgoing_responses_counter = metrics.register_counter( "responses", @@ -50,23 +50,23 @@ outgoing_responses_counter = metrics.register_counter( response_timer = metrics.register_distribution( "response_time", - labels=["method", "servlet"] + labels=["method", "servlet", "tag"] ) response_ru_utime = metrics.register_distribution( - "response_ru_utime", labels=["method", "servlet"] + "response_ru_utime", labels=["method", "servlet", "tag"] ) response_ru_stime = metrics.register_distribution( - "response_ru_stime", labels=["method", "servlet"] + "response_ru_stime", labels=["method", "servlet", "tag"] ) response_db_txn_count = metrics.register_distribution( - "response_db_txn_count", labels=["method", "servlet"] + "response_db_txn_count", labels=["method", "servlet", "tag"] ) response_db_txn_duration = metrics.register_distribution( - "response_db_txn_duration", labels=["method", "servlet"] + "response_db_txn_duration", labels=["method", "servlet", "tag"] ) @@ -226,7 +226,6 @@ class JsonResource(HttpServer, resource.Resource): servlet_classname = servlet_instance.__class__.__name__ else: servlet_classname = "%r" % callback - incoming_requests_counter.inc(request.method, servlet_classname) args = [ urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() @@ -237,21 +236,33 @@ class JsonResource(HttpServer, resource.Resource): code, response = callback_return self._send_response(request, code, response) - response_timer.inc_by( - self.clock.time_msec() - start, request.method, servlet_classname - ) - try: context = LoggingContext.current_context() + + tag = "" + if context: + tag = context.tag + + incoming_requests_counter.inc(request.method, servlet_classname, tag) + + response_timer.inc_by( + self.clock.time_msec() - start, request.method, + servlet_classname, tag + ) + ru_utime, ru_stime = context.get_resource_usage() - response_ru_utime.inc_by(ru_utime, request.method, servlet_classname) - response_ru_stime.inc_by(ru_stime, request.method, servlet_classname) + response_ru_utime.inc_by( + ru_utime, request.method, servlet_classname, tag + ) + response_ru_stime.inc_by( + ru_stime, request.method, servlet_classname, tag + ) response_db_txn_count.inc_by( - context.db_txn_count, request.method, servlet_classname + context.db_txn_count, request.method, servlet_classname, tag ) response_db_txn_duration.inc_by( - context.db_txn_duration, request.method, servlet_classname + context.db_txn_duration, request.method, servlet_classname, tag ) except: pass diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 0595c0fa4f..e701092cd8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -47,7 +47,8 @@ class LoggingContext(object): """ __slots__ = [ - "parent_context", "name", "usage_start", "usage_end", "main_thread", "__dict__" + "parent_context", "name", "usage_start", "usage_end", "main_thread", + "__dict__", "tag", ] thread_local = threading.local() @@ -72,6 +73,9 @@ class LoggingContext(object): def add_database_transaction(self, duration_ms): pass + def __nonzero__(self): + return False + sentinel = Sentinel() def __init__(self, name=None): @@ -83,6 +87,7 @@ class LoggingContext(object): self.db_txn_duration = 0. self.usage_start = None self.main_thread = threading.current_thread() + self.tag = "" def __str__(self): return "%s@%x" % (self.name, id(self)) -- cgit 1.4.1 From 13e6262659fd544155875e18c0a3351c12d7651d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Feb 2016 10:15:56 +0000 Subject: Add metrics to pushers --- synapse/http/server.py | 10 ++++++ synapse/push/__init__.py | 84 +++++++++++++++++++++++++++++----------------- synapse/util/metrics.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 31 deletions(-) create mode 100644 synapse/util/metrics.py (limited to 'synapse/http') diff --git a/synapse/http/server.py b/synapse/http/server.py index c250a4604f..06935783ca 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -208,6 +208,9 @@ class JsonResource(HttpServer, resource.Resource): if request.method == "OPTIONS": self._send_response(request, 200, {}) return + + start_context = LoggingContext.current_context() + # Loop through all the registered callbacks to check if the method # and path regex match for path_entry in self.path_regexs.get(request.method, []): @@ -243,6 +246,13 @@ class JsonResource(HttpServer, resource.Resource): if context: tag = context.tag + if context != start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + incoming_requests_counter.inc(request.method, servlet_classname, tag) response_timer.inc_by( diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 8b9d0f03e5..64e581b8ba 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -17,6 +17,8 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig from synapse.types import StreamToken +from synapse.util.logcontext import LoggingContext +from synapse.util.metrics import Measure import synapse.util.async import push_rule_evaluator as push_rule_evaluator @@ -27,6 +29,16 @@ import random logger = logging.getLogger(__name__) +_NEXT_ID = 1 + + +def _get_next_id(): + global _NEXT_ID + _id = _NEXT_ID + _NEXT_ID += 1 + return _id + + # Pushers could now be moved to pull out of the event_push_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. @@ -57,6 +69,8 @@ class Pusher(object): self.alive = True self.badge = None + self.name = "Pusher-%d" % (_get_next_id(),) + # The last value of last_active_time that we saw self.last_last_active_time = 0 self.has_unread = True @@ -86,38 +100,46 @@ class Pusher(object): @defer.inlineCallbacks def start(self): - if not self.last_token: - # First-time setup: get a token to start from (we can't - # just start from no token, ie. 'now' - # because we need the result to be reproduceable in case - # we fail to dispatch the push) - config = PaginationConfig(from_token=None, limit='1') - chunk = yield self.evStreamHandler.get_stream( - self.user_id, config, timeout=0, affect_presence=False - ) - self.last_token = chunk['end'] - self.store.update_pusher_last_token( - self.app_id, self.pushkey, self.user_id, self.last_token - ) - logger.info("Pusher %s for user %s starting from token %s", - self.pushkey, self.user_id, self.last_token) - - wait = 0 - while self.alive: - try: - if wait > 0: - yield synapse.util.async.sleep(wait) - yield self.get_and_dispatch() - wait = 0 - except: - if wait == 0: - wait = 1 - else: - wait = min(wait * 2, 1800) - logger.exception( - "Exception in pusher loop for pushkey %s. Pausing for %ds", - self.pushkey, wait + with LoggingContext(self.name): + if not self.last_token: + # First-time setup: get a token to start from (we can't + # just start from no token, ie. 'now' + # because we need the result to be reproduceable in case + # we fail to dispatch the push) + config = PaginationConfig(from_token=None, limit='1') + chunk = yield self.evStreamHandler.get_stream( + self.user_id, config, timeout=0, affect_presence=False ) + self.last_token = chunk['end'] + self.store.update_pusher_last_token( + self.app_id, self.pushkey, self.user_id, self.last_token + ) + logger.info("New pusher %s for user %s starting from token %s", + self.pushkey, self.user_id, self.last_token) + + else: + logger.info( + "Old pusher %s for user %s starting", + self.pushkey, self.user_id, + ) + + wait = 0 + while self.alive: + try: + if wait > 0: + yield synapse.util.async.sleep(wait) + with Measure(self.clock, "push"): + yield self.get_and_dispatch() + wait = 0 + except: + if wait == 0: + wait = 1 + else: + wait = min(wait * 2, 1800) + logger.exception( + "Exception in pusher loop for pushkey %s. Pausing for %ds", + self.pushkey, wait + ) @defer.inlineCallbacks def get_and_dispatch(self): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py new file mode 100644 index 0000000000..daf6087fe0 --- /dev/null +++ b/synapse/util/metrics.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.util.logcontext import LoggingContext +import synapse.metrics + +import logging + + +logger = logging.getLogger(__name__) + + +metrics = synapse.metrics.get_metrics_for(__name__) + +block_timer = metrics.register_distribution( + "block_timer", + labels=["block_name"] +) + +block_ru_utime = metrics.register_distribution( + "block_ru_utime", labels=["block_name"] +) + +block_ru_stime = metrics.register_distribution( + "block_ru_stime", labels=["block_name"] +) + +block_db_txn_count = metrics.register_distribution( + "block_db_txn_count", labels=["block_name"] +) + +block_db_txn_duration = metrics.register_distribution( + "block_db_txn_duration", labels=["block_name"] +) + + +class Measure(object): + __slots__ = ["clock", "name", "start_context", "start"] + + def __init__(self, clock, name): + self.clock = clock + self.name = name + self.start_context = None + self.start = None + + def __enter__(self): + self.start = self.clock.time_msec() + self.start_context = LoggingContext.current_context() + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return + + duration = self.clock.time_msec() - self.start + block_timer.inc_by(duration, self.name) + + context = LoggingContext.current_context() + if not context: + return + + if context != self.start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + ru_utime, ru_stime = context.get_resource_usage() + + block_ru_utime.inc_by(ru_utime, self.name) + block_ru_stime.inc_by(ru_stime, self.name) + block_db_txn_count.inc_by(context.db_txn_count, self.name) + block_db_txn_duration.inc_by(context.db_txn_duration, self.name) -- cgit 1.4.1 From 2c1fbea5319db2c64fa486adb32b5e66680b6daf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Feb 2016 10:22:44 +0000 Subject: Fix up logcontexts --- synapse/api/auth.py | 4 +- synapse/app/homeserver.py | 2 + synapse/crypto/keyring.py | 83 ++++++++++++----------- synapse/federation/federation_server.py | 4 +- synapse/federation/transaction_queue.py | 3 - synapse/handlers/_base.py | 10 +-- synapse/handlers/events.py | 11 +++- synapse/handlers/federation.py | 50 ++------------ synapse/handlers/presence.py | 20 +++--- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 11 +++- synapse/handlers/sync.py | 40 ++++++------ synapse/http/server.py | 5 +- synapse/notifier.py | 58 ++++++++-------- synapse/push/__init__.py | 2 +- synapse/push/pusherpool.py | 9 +-- synapse/rest/client/v2_alpha/account_data.py | 4 +- synapse/rest/client/v2_alpha/tags.py | 4 +- synapse/storage/_base.py | 18 ++--- synapse/storage/events.py | 34 ++++++---- synapse/storage/presence.py | 5 +- synapse/storage/stream.py | 9 +-- synapse/util/__init__.py | 6 +- synapse/util/async.py | 11 +++- synapse/util/caches/descriptors.py | 16 +++-- synapse/util/caches/snapshot_cache.py | 3 +- synapse/util/distributor.py | 15 +++-- synapse/util/logcontext.py | 98 ++++++++++++++++++++++++++-- synapse/util/logutils.py | 35 ++++++++++ synapse/util/metrics.py | 10 +-- synapse/util/ratelimitutils.py | 3 +- 31 files changed, 356 insertions(+), 229 deletions(-) (limited to 'synapse/http') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5bba9343f6..e2f84c4d57 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError from synapse.types import Requester, RoomID, UserID, EventID from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_context_over_fn from unpaddedbase64 import decode_base64 import logging @@ -529,7 +530,8 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - self.store.insert_client_ip( + preserve_context_over_fn( + self.store.insert_client_ip, user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e5c7e39cf9..2b4be7bdd0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -709,6 +709,8 @@ def run(hs): phone_home_task.start(60 * 60 * 24, now=False) def in_thread(): + # Uncomment to enable tracing of log context changes. + # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) reactor.run() diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index cddec0b2bc..d08ee0aa91 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -18,6 +18,10 @@ from synapse.api.errors import SynapseError, Codes from synapse.util.retryutils import get_retry_limiter from synapse.util import unwrapFirstError from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import ( + preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, + preserve_fn +) from twisted.internet import defer @@ -142,40 +146,43 @@ class Keyring(object): for server_name, _ in server_and_json } - # We want to wait for any previous lookups to complete before - # proceeding. - wait_on_deferred = self.wait_for_previous_lookups( - [server_name for server_name, _ in server_and_json], - server_to_deferred, - ) + with PreserveLoggingContext(): - # Actually start fetching keys. - wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) - ) + # We want to wait for any previous lookups to complete before + # proceeding. + wait_on_deferred = self.wait_for_previous_lookups( + [server_name for server_name, _ in server_and_json], + server_to_deferred, + ) - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - server_to_gids = {} + # Actually start fetching keys. + wait_on_deferred.addBoth( + lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + ) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + server_to_gids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res + def remove_deferreds(res, server_name, group_id): + server_to_gids[server_name].discard(group_id) + if not server_to_gids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for g_id, deferred in deferreds.items(): + server_name = group_id_to_group[g_id].server_name + server_to_gids.setdefault(server_name, set()).add(g_id) + deferred.addBoth(remove_deferreds, server_name, g_id) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - handle_key_deferred( + preserve_context_over_fn( + handle_key_deferred, group_id_to_group[g_id], deferreds[g_id], ) @@ -198,12 +205,13 @@ class Keyring(object): if server_name in self.key_downloads ] if wait_on: - yield defer.DeferredList(wait_on) + with PreserveLoggingContext(): + yield defer.DeferredList(wait_on) else: break for server_name, deferred in server_to_deferred.items(): - d = ObservableDeferred(deferred) + d = ObservableDeferred(preserve_context_over_deferred(deferred)) self.key_downloads[server_name] = d def rm(r, server_name): @@ -244,12 +252,13 @@ class Keyring(object): for group in group_id_to_group.values(): for key_id in group.key_ids: if key_id in merged_results[group.server_name]: - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, - key_id, - merged_results[group.server_name][key_id], - )) + with PreserveLoggingContext(): + group_id_to_deferred[group.group_id].callback(( + group.group_id, + group.server_name, + key_id, + merged_results[group.server_name][key_id], + )) break else: missing_groups.setdefault( @@ -504,7 +513,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -573,7 +582,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store.store_server_keys_json( + preserve_fn(self.store.store_server_keys_json)( server_name=server_name, key_id=key_id, from_server=server_name, @@ -675,7 +684,7 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. yield defer.gatherResults( [ - self.store.store_server_verify_key( + preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a97aa0c94a..90718192dd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -126,10 +126,8 @@ class FederationServer(FederationBase): results = [] for pdu in pdu_list: - d = self._handle_new_pdu(transaction.origin, pdu) - try: - yield d + yield self._handle_new_pdu(transaction.origin, pdu) results.append({}) except FederationError as e: self.send_failure(e, transaction.origin) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 622adad3ae..1928da03b3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -103,7 +103,6 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - @defer.inlineCallbacks def enqueue_pdu(self, pdu, destinations, order): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus @@ -141,8 +140,6 @@ class TransactionQueue(object): deferreds.append(deferred) - yield defer.DeferredList(deferreds, consumeErrors=True) - # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1423df6cf3..fa83d3e464 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -293,19 +293,11 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - notify_d.addErrback(log_failure) - # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 5ad8f3779a..4933c31c19 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event +from synapse.util.logcontext import preserve_context_over_fn from ._base import BaseHandler @@ -29,11 +30,17 @@ logger = logging.getLogger(__name__) def started_user_eventstream(distributor, user): - return distributor.fire("started_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "started_user_eventstream", user + ) def stopped_user_eventstream(distributor, user): - return distributor.fire("stopped_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "stopped_user_eventstream", user + ) class EventStreamHandler(BaseHandler): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ce1e9d6c7..b78b0502d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -221,19 +221,11 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: prev_state = context.current_state.get((event.type, event.state_key)) @@ -643,19 +635,11 @@ class FederationHandler(BaseHandler): ) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[joinee] ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -730,18 +714,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -811,19 +787,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[target_user], ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - defer.returnValue(event) @defer.inlineCallbacks @@ -948,18 +916,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - new_pdu = event destinations = set() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c21ff5c9..b61394f2b5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler): was_polling = target_user in self._user_cachemap if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) + yield self.start_polling_presence(target_user, state=state) elif not now_online and was_polling: - self.stop_polling_presence(target_user) + yield self.stop_polling_presence(target_user) # TODO(paul): perform a presence push as part of start/stop poll so # we don't have to do this all the time @@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler): if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: return - self.changed_presencelike_data(user, {"last_active": now}) + with PreserveLoggingContext(): + self.changed_presencelike_data(user, {"last_active": now}) def get_joined_rooms_for_user(self, user): """Get the list of rooms a user is joined to. @@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler): local_user, room_ids=[room_id], add_to_cache=False ) - self.push_update_to_local_and_remote( - observed_user=local_user, - users_to_push=[user], - statuscache=statuscache, - ) + with PreserveLoggingContext(): + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=statuscache, + ) @defer.inlineCallbacks def send_presence_invite(self, observer_user, observed_user): @@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.start_polling_presence( + yield self.start_polling_presence( observer_user, target_user=observed_user ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 2660fd21a2..24c850ae9b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -186,7 +186,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - registered_user(self.distributor, user) + yield registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bfd7e44e9f..a8e3a9029c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor +from synapse.util.logcontext import preserve_context_over_fn from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content): def user_left_room(distributor, user, room_id): - return distributor.fire("user_left_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_left_room", user=user, room_id=room_id + ) def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_joined_room", user=user, room_id=room_id + ) class RoomCreationHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 72271f2626..3f1cda5b0b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer @@ -241,15 +241,16 @@ class SyncHandler(BaseHandler): deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync_deferred = self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(joined.append) deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: @@ -262,15 +263,16 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync_deferred = self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(archived.append) deferreds.append(room_sync_deferred) diff --git a/synapse/http/server.py b/synapse/http/server.py index 06935783ca..a90e2e1125 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -99,9 +99,8 @@ def request_handler(request_handler): request_context.request = request_id with request.processing(): try: - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d + with PreserveLoggingContext(request_context): + yield request_handler(self, request) except CodeMessageException as e: code = e.code if isinstance(e, SynapseError): diff --git a/synapse/notifier.py b/synapse/notifier.py index 1a90bd55cd..560866b26e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -18,7 +18,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, ObservableDeferred +from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken import synapse.metrics @@ -73,7 +74,8 @@ class _NotifierUserStream(object): self.current_token = current_token self.last_notified_ms = time_now_ms - self.notify_deferred = ObservableDeferred(defer.Deferred()) + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an @@ -88,8 +90,10 @@ class _NotifierUserStream(object): ) self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred - self.notify_deferred = ObservableDeferred(defer.Deferred()) - noify_deferred.callback(self.current_token) + + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -184,8 +188,6 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) - @log_function - @defer.inlineCallbacks def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -199,12 +201,11 @@ class Notifier(object): until all previous events have been persisted before notifying the client streams. """ - yield run_on_reactor() - - self.pending_new_room_events.append(( - room_stream_id, event, extra_users - )) - self._notify_pending_new_room_events(max_room_stream_id) + with PreserveLoggingContext(): + self.pending_new_room_events.append(( + room_stream_id, event, extra_users + )) + self._notify_pending_new_room_events(max_room_stream_id) def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous @@ -251,31 +252,29 @@ class Notifier(object): extra_streams=app_streams, ) - @defer.inlineCallbacks - @log_function def on_new_event(self, stream_key, new_token, users=[], rooms=[], extra_streams=set()): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ - yield run_on_reactor() - user_streams = set() + with PreserveLoggingContext(): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, @@ -325,7 +324,8 @@ class Notifier(object): # that we don't miss any current_token updates. prev_token = current_token listener = user_stream.new_listener(prev_token) - yield listener.deferred + with PreserveLoggingContext(): + yield listener.deferred except defer.CancelledError: break diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 64e581b8ba..8da2d8716c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -111,7 +111,7 @@ class Pusher(object): self.user_id, config, timeout=0, affect_presence=False ) self.last_token = chunk['end'] - self.store.update_pusher_last_token( + yield self.store.update_pusher_last_token( self.app_id, self.pushkey, self.user_id, self.last_token ) logger.info("New pusher %s for user %s starting from token %s", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d1b7c0802f..d7dcb2de4b 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ from twisted.internet import defer from httppusher import HttpPusher from synapse.push import PusherConfigException +from synapse.util.logcontext import preserve_fn import logging @@ -76,7 +77,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", app_id, pushkey, p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def remove_pushers_by_user(self, user_id): @@ -91,7 +92,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind, @@ -110,7 +111,7 @@ class PusherPool: lang=lang, data=data, ) - self._refresh_pusher(app_id, pushkey, user_id) + yield self._refresh_pusher(app_id, pushkey, user_id) def _create_pusher(self, pusherdict): if pusherdict['kind'] == 'http': @@ -166,7 +167,7 @@ class PusherPool: if fullid in self.pushers: self.pushers[fullid].stop() self.pushers[fullid] = p - p.start() + preserve_fn(p.start)() logger.info("Started pushers") diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 985efe2a62..1456881c1a 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -57,7 +57,7 @@ class AccountDataServlet(RestServlet): user_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -99,7 +99,7 @@ class RoomAccountDataServlet(RestServlet): user_id, room_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 42f2203f3d..79c436a8cf 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -80,7 +80,7 @@ class TagServlet(RestServlet): max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -94,7 +94,7 @@ class TagServlet(RestServlet): max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index cfb87d9328..2e97ac84a8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,7 @@ import logging from synapse.api.errors import StoreError -from synapse.util.logcontext import preserve_context_over_fn, LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache import synapse.metrics @@ -298,10 +298,10 @@ class SQLBaseStore(object): func, *args, **kwargs ) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) for after_callback, after_args in after_callbacks: after_callback(*after_args) @@ -326,10 +326,10 @@ class SQLBaseStore(object): return func(conn, *args, **kwargs) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) defer.returnValue(result) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4d7cdd00d0..c6ed54721c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.logcontext import preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -664,14 +664,16 @@ class EventsStore(SQLBaseStore): for ids, d in lst: if not d.called: try: - d.callback([ - res[i] - for i in ids - if i in res - ]) + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list, row_dict) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -679,10 +681,12 @@ class EventsStore(SQLBaseStore): def fire(evs): for _, d in evs: if not d.called: - d.errback(e) + with PreserveLoggingContext(): + d.errback(e) if event_list: - reactor.callFromThread(fire, event_list) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -709,18 +713,20 @@ class EventsStore(SQLBaseStore): should_start = False if should_start: - self.runWithConnection( - self._do_fetch - ) + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch + ) - rows = yield preserve_context_over_deferred(events_d) + with PreserveLoggingContext(): + rows = yield events_d if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] res = yield defer.gatherResults( [ - self._get_event_from_row( + preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9b3aecaf8c..ef525f34c5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -68,8 +68,9 @@ class PresenceStore(SQLBaseStore): for row in rows }) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - res = self._simple_update_one( + res = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -79,7 +80,7 @@ class PresenceStore(SQLBaseStore): ) self.get_presence_state.invalidate((user_localpart,)) - return res + defer.returnValue(res) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 50436cb2d2..367ffc9543 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken +from synapse.util.logcontext import preserve_fn import logging @@ -170,12 +171,12 @@ class StreamStore(SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_room_events_stream_for_room( - room_id, from_key, to_key, limit - ).addCallback(lambda r, rm: (rm, r), room_id) + preserve_fn(self.get_room_events_stream_for_room)( + room_id, from_key, to_key, limit, + ) for room_id in room_ids ]) - results.update(dict(res)) + results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7566d9eb33..133671e238 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -61,10 +61,8 @@ class Clock(object): *args: Postional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - current_context = LoggingContext.current_context() - def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(current_context): + with PreserveLoggingContext(): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/async.py b/synapse/util/async.py index 200edd404c..640fae3890 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,13 +16,16 @@ from twisted.internet import defer, reactor -from .logcontext import preserve_context_over_deferred +from .logcontext import PreserveLoggingContext +@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return preserve_context_over_deferred(d) + with PreserveLoggingContext(): + reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def run_on_reactor(): @@ -54,6 +57,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (True, r)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().callback(r) except: pass @@ -63,6 +67,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (False, f)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().errback(f) except: pass diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index e27917c63a..277854ccbc 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,6 +18,9 @@ from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn +) from . import caches_by_name, DEBUG_CACHES, cache_counter @@ -190,7 +193,7 @@ class CacheDescriptor(object): defer.returnValue(cached_result) observer.addCallback(check_result) - return observer + return preserve_context_over_deferred(observer) except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated @@ -198,6 +201,7 @@ class CacheDescriptor(object): sequence = self.cache.sequence ret = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, obj, *args, **kwargs ) @@ -211,7 +215,7 @@ class CacheDescriptor(object): ret = ObservableDeferred(ret, consumeErrors=True) self.cache.update(sequence, cache_key, ret) - return ret.observe() + return preserve_context_over_deferred(ret.observe()) wrapped.invalidate = self.cache.invalidate wrapped.invalidate_all = self.cache.invalidate_all @@ -299,6 +303,7 @@ class CacheListDescriptor(object): args_to_call[self.list_name] = missing ret_d = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, **args_to_call ) @@ -308,7 +313,8 @@ class CacheListDescriptor(object): # We need to create deferreds for each arg in the list so that # we can insert the new deferred into the cache. for arg in missing: - observer = ret_d.observe() + with PreserveLoggingContext(): + observer = ret_d.observe() observer.addCallback(lambda r, arg: r.get(arg, None), arg) observer = ObservableDeferred(observer) @@ -327,10 +333,10 @@ class CacheListDescriptor(object): cached[arg] = res - return defer.gatherResults( + return preserve_context_over_deferred(defer.gatherResults( cached.values(), consumeErrors=True, - ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)) + ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) obj.__dict__[self.orig.__name__] = wrapped diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index b1e40417fd..d03678b8c8 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -87,7 +87,8 @@ class SnapshotCache(object): # expire from the rotation of that cache. self.next_result_cache[key] = result self.pending_result_cache.pop(key, None) + return r - result.observe().addBoth(shuffle_along) + result.addBoth(shuffle_along) return result.observe() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 4ebfebf701..8875813de4 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_deferred, -) +from synapse.util.logcontext import PreserveLoggingContext from synapse.util import unwrapFirstError @@ -97,6 +95,7 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) + @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -116,6 +115,7 @@ class Signal(object): failure.getTracebackObject())) if not self.suppress_failures: return failure + return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) with PreserveLoggingContext(): @@ -124,8 +124,11 @@ class Signal(object): for observer in self.observers ] - d = defer.gatherResults(deferreds, consumeErrors=True) + res = yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) - d.addErrback(unwrapFirstError) + defer.returnValue(res) - return preserve_context_over_deferred(d) + def __repr__(self): + return "" % (self.name,) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index e701092cd8..9134e67908 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -48,7 +48,7 @@ class LoggingContext(object): __slots__ = [ "parent_context", "name", "usage_start", "usage_end", "main_thread", - "__dict__", "tag", + "__dict__", "tag", "alive", ] thread_local = threading.local() @@ -88,6 +88,7 @@ class LoggingContext(object): self.usage_start = None self.main_thread = threading.current_thread() self.tag = "" + self.alive = True def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -106,6 +107,7 @@ class LoggingContext(object): The context that was previously active """ current = cls.current_context() + if current is not context: current.stop() cls.thread_local.current_context = context @@ -117,6 +119,7 @@ class LoggingContext(object): if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") self.parent_context = self.set_current_context(self) + self.alive = True return self def __exit__(self, type, value, traceback): @@ -136,6 +139,7 @@ class LoggingContext(object): self ) self.parent_context = None + self.alive = False def __getattr__(self, name): """Delegate member lookup to parent context""" @@ -213,7 +217,7 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context", "new_context"] + __slots__ = ["current_context", "new_context", "has_parent"] def __init__(self, new_context=LoggingContext.sentinel): self.new_context = new_context @@ -224,11 +228,26 @@ class PreserveLoggingContext(object): self.new_context ) + if self.current_context: + self.has_parent = self.current_context.parent_context is not None + if not self.current_context.alive: + logger.warn( + "Entering dead context: %s", + self.current_context, + ) + def __exit__(self, type, value, traceback): """Restores the current logging context""" - LoggingContext.set_current_context(self.current_context) + context = LoggingContext.set_current_context(self.current_context) + + if context != self.new_context: + logger.warn( + "Unexpected logging context: %s is not %s", + context, self.new_context, + ) + if self.current_context is not LoggingContext.sentinel: - if self.current_context.parent_context is None: + if not self.current_context.alive: logger.warn( "Restoring dead context: %s", self.current_context, @@ -289,3 +308,74 @@ def preserve_context_over_deferred(deferred): d = _PreservingContextDeferred(current_context) deferred.chainDeferred(d) return d + + +def preserve_fn(f): + """Ensures that function is called with correct context and that context is + restored after return. Useful for wrapping functions that return a deferred + which you don't yield on. + """ + current = LoggingContext.current_context() + + def g(*args, **kwargs): + with PreserveLoggingContext(current): + return f(*args, **kwargs) + + return g + + +# modules to ignore in `logcontext_tracer` +_to_ignore = [ + "synapse.util.logcontext", + "synapse.http.server", + "synapse.storage._base", + "synapse.util.async", +] + + +def logcontext_tracer(frame, event, arg): + """A tracer that logs whenever a logcontext "unexpectedly" changes within + a function. Probably inaccurate. + + Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + """ + if event == 'call': + name = frame.f_globals["__name__"] + if name.startswith("synapse"): + if name == "synapse.util.logcontext": + if frame.f_code.co_name in ["__enter__", "__exit__"]: + tracer = frame.f_back.f_trace + if tracer: + tracer.just_changed = True + + tracer = frame.f_trace + if tracer: + return tracer + + if not any(name.startswith(ig) for ig in _to_ignore): + return LineTracer() + + +class LineTracer(object): + __slots__ = ["context", "just_changed"] + + def __init__(self): + self.context = LoggingContext.current_context() + self.just_changed = False + + def __call__(self, frame, event, arg): + if event in 'line': + if self.just_changed: + self.context = LoggingContext.current_context() + self.just_changed = False + else: + c = LoggingContext.current_context() + if c != self.context: + logger.info( + "Context changed! %s -> %s, %s, %s", + self.context, c, + frame.f_code.co_filename, frame.f_lineno + ) + self.context = c + + return self diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index c37a157787..3a83828d25 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -168,3 +168,38 @@ def trace_function(f): wrapped.__name__ = func_name return wrapped + + +def get_previous_frames(): + s = inspect.currentframe().f_back.f_back + to_return = [] + while s: + if s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_return.append("{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + )) + + s = s.f_back + + return ", ". join(to_return) + + +def get_previous_frame(ignore=[]): + s = inspect.currentframe().f_back.f_back + + while s: + if s.f_globals["__name__"].startswith("synapse"): + if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + return "{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + ) + + s = s.f_back + + return None diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index daf6087fe0..ca48007218 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -68,16 +68,18 @@ class Measure(object): block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() - if not context: - return if context != self.start_context: logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context + "Context have unexpectedly changed from '%s' to '%s'. (%r)", + context, self.start_context, self.name ) return + if not context: + logger.warn("Expected context. (%r)", self.name) + return + ru_utime, ru_stime = context.get_resource_usage() block_ru_utime.inc_by(ru_utime, self.name) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index ea321bc6a9..4076eed269 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep +from synapse.util.logcontext import preserve_fn import collections import contextlib @@ -163,7 +164,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = sleep(self.sleep_msec / 1000.0) + ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) -- cgit 1.4.1 From 58c9f206929560044fccae84c36fdd89724ccfc0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 12 Feb 2016 13:46:59 +0000 Subject: Catch the exceptions thrown by twisted when you write to a closed connection --- synapse/http/server.py | 21 ++++++++++++++++++++- synapse/rest/client/v1/login.py | 10 ++++++---- synapse/rest/client/v2_alpha/auth.py | 5 +++-- synapse/rest/media/v0/content_repository.py | 4 ++-- synapse/rest/media/v1/base_resource.py | 4 ++-- 5 files changed, 33 insertions(+), 11 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/server.py b/synapse/http/server.py index a90e2e1125..b17b190ee5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -367,10 +367,29 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, "Origin, X-Requested-With, Content-Type, Accept") request.write(json_bytes) - request.finish() + finish_request(request) return NOT_DONE_YET +def finish_request(request): + """ Finish writing the response to the request. + + Twisted throws a RuntimeException if the connection closed before the + response was written but doesn't provide a convenient or reliable way to + determine if the connection was closed. So we catch and log the RuntimeException + + You might think that ``request.notifyFinish`` could be used to tell if the + request was finished. However the deferred it returns won't fire if the + connection was already closed, meaning we'd have to have called the method + right at the start of the request. By the time we want to write the response + it will already be too late. + """ + try: + request.finish() + except RuntimeError as e: + logger.info("Connection disconnected before response was written: %r", e) + + def _request_user_agent_is_curl(request): user_agents = request.requestHeaders.getRawHeaders( "User-Agent", default=[] diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 7199113dac..79101106ac 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -17,6 +17,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, LoginError, Codes from synapse.types import UserID +from synapse.http.server import finish_request + from base import ClientV1RestServlet, client_path_patterns import simplejson as json @@ -263,7 +265,7 @@ class SAML2RestServlet(ClientV1RestServlet): '?status=authenticated&access_token=' + token + '&user_id=' + user_id + '&ava=' + urllib.quote(json.dumps(saml2_auth.ava))) - request.finish() + finish_request(request) defer.returnValue(None) defer.returnValue((200, {"status": "authenticated", "user_id": user_id, "token": token, @@ -272,7 +274,7 @@ class SAML2RestServlet(ClientV1RestServlet): request.redirect(urllib.unquote( request.args['RelayState'][0]) + '?status=not_authenticated') - request.finish() + finish_request(request) defer.returnValue(None) defer.returnValue((200, {"status": "not_authenticated"})) @@ -309,7 +311,7 @@ class CasRedirectServlet(ClientV1RestServlet): "service": "%s?%s" % (hs_redirect_url, client_redirect_url_param) }) request.redirect("%s?%s" % (self.cas_server_url, service_param)) - request.finish() + finish_request(request) class CasTicketServlet(ClientV1RestServlet): @@ -362,7 +364,7 @@ class CasTicketServlet(ClientV1RestServlet): redirect_url = self.add_login_token_to_redirect_url(client_redirect_url, login_token) request.redirect(redirect_url) - request.finish() + finish_request(request) def add_login_token_to_redirect_url(self, url, token): url_parts = list(urlparse.urlparse(url)) diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index ff71c40b43..78181b7b18 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.constants import LoginType from synapse.api.errors import SynapseError from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +from synapse.http.server import finish_request from synapse.http.servlet import RestServlet from ._base import client_v2_patterns @@ -130,7 +131,7 @@ class AuthRestServlet(RestServlet): request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) request.write(html_bytes) - request.finish() + finish_request(request) defer.returnValue(None) else: raise SynapseError(404, "Unknown auth stage type") @@ -176,7 +177,7 @@ class AuthRestServlet(RestServlet): request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) request.write(html_bytes) - request.finish() + finish_request(request) defer.returnValue(None) else: diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index dcf3eaee1f..d9fc045fc6 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import respond_with_json_bytes +from synapse.http.server import respond_with_json_bytes, finish_request from synapse.util.stringutils import random_string from synapse.api.errors import ( @@ -144,7 +144,7 @@ class ContentRepoResource(resource.Resource): # after the file has been sent, clean up and finish the request def cbFinished(ignored): f.close() - request.finish() + finish_request(request) d.addCallback(cbFinished) else: respond_with_json_bytes( diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py index 58d56ec7a4..58ef91c0b8 100644 --- a/synapse/rest/media/v1/base_resource.py +++ b/synapse/rest/media/v1/base_resource.py @@ -16,7 +16,7 @@ from .thumbnailer import Thumbnailer from synapse.http.matrixfederationclient import MatrixFederationHttpClient -from synapse.http.server import respond_with_json +from synapse.http.server import respond_with_json, finish_request from synapse.util.stringutils import random_string from synapse.api.errors import ( cs_error, Codes, SynapseError @@ -238,7 +238,7 @@ class BaseMediaResource(Resource): with open(file_path, "rb") as f: yield FileSender().beginFileTransfer(f, request) - request.finish() + finish_request(request) else: self._respond_404(request) -- cgit 1.4.1 From 7bcee4733a05b239161d73461abd4d0e32caf4ac Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 8 Mar 2016 10:04:38 +0000 Subject: Encode unicode objects given to post_urlencode* otherwise urllib.urlencode chokes. --- synapse/http/client.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index fdd90b1c3c..b3cd5ff26a 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -103,7 +103,7 @@ class SimpleHttpClient(object): # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.urlencode(encode_urlencode_args(args), True) response = yield self.request( "POST", @@ -249,7 +249,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): @defer.inlineCallbacks def post_urlencoded_get_raw(self, url, args={}): - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.urlencode(encode_urlencode_args(args), True) response = yield self.request( "POST", @@ -268,6 +268,16 @@ class CaptchaServerHttpClient(SimpleHttpClient): # twisted dislikes google's response, no content length. defer.returnValue(e.response) +def encode_urlencode_args(args): + return { k: encode_urlencode_arg(v) for k, v in args.items() } + +def encode_urlencode_arg(arg): + if isinstance(arg, unicode): + return arg.encode('utf-8') + elif isinstance(arg, list): + return [ encode_urlencode_arg(i) for i in arg ] + else: + return arg def _print_ex(e): if hasattr(e, "reasons") and e.reasons: -- cgit 1.4.1 From 9a3c80a348b850a04e257b30ce41f0a7c453594b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 8 Mar 2016 10:09:07 +0000 Subject: pep8 --- synapse/http/client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index b3cd5ff26a..cbd45b2bbe 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -268,17 +268,20 @@ class CaptchaServerHttpClient(SimpleHttpClient): # twisted dislikes google's response, no content length. defer.returnValue(e.response) + def encode_urlencode_args(args): - return { k: encode_urlencode_arg(v) for k, v in args.items() } + return {k: encode_urlencode_arg(v) for k, v in args.items()} + def encode_urlencode_arg(arg): if isinstance(arg, unicode): return arg.encode('utf-8') elif isinstance(arg, list): - return [ encode_urlencode_arg(i) for i in arg ] + return [encode_urlencode_arg(i) for i in arg] else: return arg + def _print_ex(e): if hasattr(e, "reasons") and e.reasons: for ex in e.reasons: -- cgit 1.4.1 From b7dbe5147a85bea8f14b78a27ff499fe5a0d444a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 9 Mar 2016 11:26:26 +0000 Subject: Add a parse_json_object function to deduplicate all the copy+pasted _parse_json functions. Also document the parse_.* functions. --- synapse/http/servlet.py | 70 ++++++++++++++++++++++++++-- synapse/rest/client/v1/directory.py | 16 ++----- synapse/rest/client/v1/login.py | 15 +----- synapse/rest/client/v1/push_rule.py | 16 ++----- synapse/rest/client/v1/pusher.py | 17 ++----- synapse/rest/client/v1/register.py | 14 +----- synapse/rest/client/v1/room.py | 26 ++++------- synapse/rest/client/v2_alpha/_base.py | 22 --------- synapse/rest/client/v2_alpha/account.py | 8 ++-- synapse/rest/client/v2_alpha/register.py | 8 ++-- synapse/rest/client/v2_alpha/tokenrefresh.py | 6 +-- 11 files changed, 97 insertions(+), 121 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 7bd87940b4..41c519c000 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -15,14 +15,27 @@ """ This module contains base REST classes for constructing REST servlets. """ -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, Codes import logging +import simplejson logger = logging.getLogger(__name__) def parse_integer(request, name, default=None, required=False): + """Parse an integer parameter from the request string + + :param request: the twisted HTTP request. + :param name (str): the name of the query parameter. + :param default: value to use if the parameter is absent, defaults to None. + :param required (bool): whether to raise a 400 SynapseError if the + parameter is absent, defaults to False. + :return: An int value or the default. + :raises + SynapseError if the parameter is absent and required, or if the + parameter is present and not an integer. + """ if name in request.args: try: return int(request.args[name][0]) @@ -32,12 +45,25 @@ def parse_integer(request, name, default=None, required=False): else: if required: message = "Missing integer query parameter %r" % (name,) - raise SynapseError(400, message) + raise SynapseError(400, message, errcode=Codes.MISSING_PARAM) else: return default def parse_boolean(request, name, default=None, required=False): + """Parse a boolean parameter from the request query string + + :param request: the twisted HTTP request. + :param name (str): the name of the query parameter. + :param default: value to use if the parameter is absent, defaults to None. + :param required (bool): whether to raise a 400 SynapseError if the + parameter is absent, defaults to False. + :return: A bool value or the default. + :raises + SynapseError if the parameter is absent and required, or if the + parameter is present and not one of "true" or "false". + """ + if name in request.args: try: return { @@ -53,30 +79,64 @@ def parse_boolean(request, name, default=None, required=False): else: if required: message = "Missing boolean query parameter %r" % (name,) - raise SynapseError(400, message) + raise SynapseError(400, message, errcode=Codes.MISSING_PARAM) else: return default def parse_string(request, name, default=None, required=False, allowed_values=None, param_type="string"): + """Parse a string parameter from the request query string. + + :param request: the twisted HTTP request. + :param name (str): the name of the query parameter. + :param default: value to use if the parameter is absent, defaults to None. + :param required (bool): whether to raise a 400 SynapseError if the + parameter is absent, defaults to False. + :param allowed_values (list): List of allowed values for the string, + or None if any value is allowed, defaults to None + :return: A string value or the default. + :raises + SynapseError if the parameter is absent and required, or if the + parameter is present, must be one of a list of allowed values and + is not one of those allowed values. + """ + if name in request.args: value = request.args[name][0] if allowed_values is not None and value not in allowed_values: message = "Query parameter %r must be one of [%s]" % ( name, ", ".join(repr(v) for v in allowed_values) ) - raise SynapseError(message) + raise SynapseError(400, message) else: return value else: if required: message = "Missing %s query parameter %r" % (param_type, name) - raise SynapseError(400, message) + raise SynapseError(400, message, errcode=Codes.MISSING_PARAM) else: return default +def parse_json_object_from_request(request): + """Parse a JSON object from the body of a twisted HTTP request. + + :param request: the twisted HTTP request. + :raises + SynapseError if the request body couldn't be decoded as JSON or + if it wasn't a JSON object. + """ + try: + content = simplejson.loads(request.content.read()) + if type(content) != dict: + message = "Content must be a JSON object." + raise SynapseError(400, message, errcode=Codes.BAD_JSON) + return content + except simplejson.JSONDecodeError: + raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) + + class RestServlet(object): """ A Synapse REST Servlet. diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 8bfe9fdea8..60c5ec77aa 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -18,9 +18,10 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import RoomAlias +from synapse.http.servlet import parse_json_object_from_request + from .base import ClientV1RestServlet, client_path_patterns -import simplejson as json import logging @@ -45,7 +46,7 @@ class ClientDirectoryServer(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_alias): - content = _parse_json(request) + content = parse_json_object_from_request(request) if "room_id" not in content: raise SynapseError(400, "Missing room_id key", errcode=Codes.BAD_JSON) @@ -135,14 +136,3 @@ class ClientDirectoryServer(ClientV1RestServlet): ) defer.returnValue((200, {})) - - -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.", - errcode=Codes.NOT_JSON) - return content - except ValueError: - raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index f6902a60a8..fe593d07ce 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, LoginError, Codes from synapse.types import UserID from synapse.http.server import finish_request +from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns @@ -79,7 +80,7 @@ class LoginRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request): - login_submission = _parse_json(request) + login_submission = parse_json_object_from_request(request) try: if login_submission["type"] == LoginRestServlet.PASS_TYPE: if not self.password_enabled: @@ -400,18 +401,6 @@ class CasTicketServlet(ClientV1RestServlet): return (user, attributes) -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError( - 400, "Content must be a JSON object.", errcode=Codes.BAD_JSON - ) - return content - except ValueError: - raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) - - def register_servlets(hs, http_server): LoginRestServlet(hs).register(http_server) if hs.config.saml2_enabled: diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 981d7708db..b5695d427a 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import ( - SynapseError, Codes, UnrecognizedRequestError, NotFoundError, StoreError + SynapseError, UnrecognizedRequestError, NotFoundError, StoreError ) from .base import ClientV1RestServlet, client_path_patterns from synapse.storage.push_rule import ( @@ -25,8 +25,7 @@ from synapse.storage.push_rule import ( from synapse.push.clientformat import format_push_rules_for_user from synapse.push.baserules import BASE_RULE_IDS from synapse.push.rulekinds import PRIORITY_CLASS_MAP - -import simplejson as json +from synapse.http.servlet import parse_json_object_from_request class PushRuleRestServlet(ClientV1RestServlet): @@ -52,7 +51,7 @@ class PushRuleRestServlet(ClientV1RestServlet): if '/' in spec['rule_id'] or '\\' in spec['rule_id']: raise SynapseError(400, "rule_id may not contain slashes") - content = _parse_json(request) + content = parse_json_object_from_request(request) user_id = requester.user.to_string() @@ -341,14 +340,5 @@ class InvalidRuleException(Exception): pass -# XXX: C+ped from rest/room.py - surely this should be common? -def _parse_json(request): - try: - content = json.loads(request.content.read()) - return content - except ValueError: - raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) - - def register_servlets(hs, http_server): PushRuleRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 4c662e6e3c..ee029b4f77 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -17,9 +17,10 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.push import PusherConfigException +from synapse.http.servlet import parse_json_object_from_request + from .base import ClientV1RestServlet, client_path_patterns -import simplejson as json import logging logger = logging.getLogger(__name__) @@ -33,7 +34,7 @@ class PusherRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) user = requester.user - content = _parse_json(request) + content = parse_json_object_from_request(request) pusher_pool = self.hs.get_pusherpool() @@ -92,17 +93,5 @@ class PusherRestServlet(ClientV1RestServlet): return 200, {} -# XXX: C+ped from rest/room.py - surely this should be common? -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.", - errcode=Codes.NOT_JSON) - return content - except ValueError: - raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) - - def register_servlets(hs, http_server): PusherRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 040a7a7ffa..c6a2ef2ccc 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -20,12 +20,12 @@ from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType from .base import ClientV1RestServlet, client_path_patterns import synapse.util.stringutils as stringutils +from synapse.http.servlet import parse_json_object_from_request from synapse.util.async import run_on_reactor from hashlib import sha1 import hmac -import simplejson as json import logging logger = logging.getLogger(__name__) @@ -98,7 +98,7 @@ class RegisterRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request): - register_json = _parse_json(request) + register_json = parse_json_object_from_request(request) session = (register_json["session"] if "session" in register_json else None) @@ -355,15 +355,5 @@ class RegisterRestServlet(ClientV1RestServlet): ) -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.") - return content - except ValueError: - raise SynapseError(400, "Content not JSON.") - - def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4b7d198c52..7a9f3d11b9 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -22,6 +22,7 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event +from synapse.http.servlet import parse_json_object_from_request import simplejson as json import logging @@ -137,7 +138,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): requester = yield self.auth.get_user_by_req(request) - content = _parse_json(request) + content = parse_json_object_from_request(request) event_dict = { "type": event_type, @@ -179,7 +180,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, event_type, txn_id=None): requester = yield self.auth.get_user_by_req(request, allow_guest=True) - content = _parse_json(request) + content = parse_json_object_from_request(request) msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( @@ -229,7 +230,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): ) try: - content = _parse_json(request) + content = parse_json_object_from_request(request) except: # Turns out we used to ignore the body entirely, and some clients # cheekily send invalid bodies. @@ -433,7 +434,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): raise AuthError(403, "Guest access not allowed") try: - content = _parse_json(request) + content = parse_json_object_from_request(request) except: # Turns out we used to ignore the body entirely, and some clients # cheekily send invalid bodies. @@ -500,7 +501,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, event_id, txn_id=None): requester = yield self.auth.get_user_by_req(request) - content = _parse_json(request) + content = parse_json_object_from_request(request) msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( @@ -548,7 +549,7 @@ class RoomTypingRestServlet(ClientV1RestServlet): room_id = urllib.unquote(room_id) target_user = UserID.from_string(urllib.unquote(user_id)) - content = _parse_json(request) + content = parse_json_object_from_request(request) typing_handler = self.handlers.typing_notification_handler @@ -580,7 +581,7 @@ class SearchRestServlet(ClientV1RestServlet): def on_POST(self, request): requester = yield self.auth.get_user_by_req(request) - content = _parse_json(request) + content = parse_json_object_from_request(request) batch = request.args.get("next_batch", [None])[0] results = yield self.handlers.search_handler.search( @@ -592,17 +593,6 @@ class SearchRestServlet(ClientV1RestServlet): defer.returnValue((200, results)) -def _parse_json(request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.", - errcode=Codes.NOT_JSON) - return content - except ValueError: - raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) - - def register_txn_path(servlet, regex_string, http_server, with_get=False): """Registers a transaction-based path. diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 24af322126..b6faa2b0e6 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -17,11 +17,9 @@ """ from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX -from synapse.api.errors import SynapseError import re import logging -import simplejson logger = logging.getLogger(__name__) @@ -44,23 +42,3 @@ def client_v2_patterns(path_regex, releases=(0,)): new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns - - -def parse_request_allow_empty(request): - content = request.content.read() - if content is None or content == '': - return None - try: - return simplejson.loads(content) - except simplejson.JSONDecodeError: - raise SynapseError(400, "Content not JSON.") - - -def parse_json_dict_from_request(request): - try: - content = simplejson.loads(request.content.read()) - if type(content) != dict: - raise SynapseError(400, "Content must be a JSON object.") - return content - except simplejson.JSONDecodeError: - raise SynapseError(400, "Content not JSON.") diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index a614b79d45..688b051580 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -17,10 +17,10 @@ from twisted.internet import defer from synapse.api.constants import LoginType from synapse.api.errors import LoginError, SynapseError, Codes -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import run_on_reactor -from ._base import client_v2_patterns, parse_json_dict_from_request +from ._base import client_v2_patterns import logging @@ -41,7 +41,7 @@ class PasswordRestServlet(RestServlet): def on_POST(self, request): yield run_on_reactor() - body = parse_json_dict_from_request(request) + body = parse_json_object_from_request(request) authed, result, params = yield self.auth_handler.check_auth([ [LoginType.PASSWORD], @@ -114,7 +114,7 @@ class ThreepidRestServlet(RestServlet): def on_POST(self, request): yield run_on_reactor() - body = parse_json_dict_from_request(request) + body = parse_json_object_from_request(request) threePidCreds = body.get('threePidCreds') threePidCreds = body.get('three_pid_creds', threePidCreds) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ec5c21fa1f..b090e66bcf 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -17,9 +17,9 @@ from twisted.internet import defer from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_json_object_from_request -from ._base import client_v2_patterns, parse_json_dict_from_request +from ._base import client_v2_patterns import logging import hmac @@ -73,7 +73,7 @@ class RegisterRestServlet(RestServlet): ret = yield self.onEmailTokenRequest(request) defer.returnValue(ret) - body = parse_json_dict_from_request(request) + body = parse_json_object_from_request(request) # we do basic sanity checks here because the auth layer will store these # in sessions. Pull out the username/password provided to us. @@ -236,7 +236,7 @@ class RegisterRestServlet(RestServlet): @defer.inlineCallbacks def onEmailTokenRequest(self, request): - body = parse_json_dict_from_request(request) + body = parse_json_object_from_request(request) required = ['id_server', 'client_secret', 'email', 'send_attempt'] absent = [] diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py index 3553f6b040..a158c2209a 100644 --- a/synapse/rest/client/v2_alpha/tokenrefresh.py +++ b/synapse/rest/client/v2_alpha/tokenrefresh.py @@ -16,9 +16,9 @@ from twisted.internet import defer from synapse.api.errors import AuthError, StoreError, SynapseError -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_json_object_from_request -from ._base import client_v2_patterns, parse_json_dict_from_request +from ._base import client_v2_patterns class TokenRefreshRestServlet(RestServlet): @@ -35,7 +35,7 @@ class TokenRefreshRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): - body = parse_json_dict_from_request(request) + body = parse_json_object_from_request(request) try: old_refresh_token = body["refresh_token"] auth_handler = self.hs.get_handlers().auth_handler -- cgit 1.4.1 From e9c1cabac26cf8e28152ebdb3caf29d4457eea0e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 11 Mar 2016 16:41:03 +0000 Subject: Use parse_json_object_from_request to parse JSON out of request bodies --- synapse/federation/transport/server.py | 4 ++-- synapse/http/servlet.py | 17 ++++++++++++----- synapse/rest/client/v1/presence.py | 13 +++++-------- synapse/rest/client/v1/profile.py | 8 ++++---- synapse/rest/client/v1/room.py | 14 ++++---------- synapse/rest/client/v2_alpha/account_data.py | 21 ++++----------------- synapse/rest/client/v2_alpha/filter.py | 10 ++-------- synapse/rest/client/v2_alpha/keys.py | 22 +++++++--------------- synapse/rest/client/v2_alpha/tags.py | 12 +++--------- synapse/rest/key/v2/remote_key_resource.py | 12 ++---------- tests/rest/client/v1/test_profile.py | 6 ++++-- 11 files changed, 49 insertions(+), 90 deletions(-) (limited to 'synapse/http') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 6e92e2f8f4..208bff8d4f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource +from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -419,8 +420,7 @@ class On3pidBindServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, request): - content_bytes = request.content.read() - content = json.loads(content_bytes) + content = parse_json_object_from_request(request) if "invites" in content: last_exception = None for invite in content["invites"]: diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 41c519c000..1996f8b136 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -128,14 +128,21 @@ def parse_json_object_from_request(request): if it wasn't a JSON object. """ try: - content = simplejson.loads(request.content.read()) - if type(content) != dict: - message = "Content must be a JSON object." - raise SynapseError(400, message, errcode=Codes.BAD_JSON) - return content + content_bytes = request.content.read() + except: + raise SynapseError(400, "Error reading JSON content.") + + try: + content = simplejson.loads(content_bytes) except simplejson.JSONDecodeError: raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) + if type(content) != dict: + message = "Content must be a JSON object." + raise SynapseError(400, message, errcode=Codes.BAD_JSON) + + return content + class RestServlet(object): diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index bbfa1d6ac4..27d9ed586b 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -19,9 +19,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.types import UserID +from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns -import simplejson as json import logging logger = logging.getLogger(__name__) @@ -56,9 +56,10 @@ class PresenceStatusRestServlet(ClientV1RestServlet): raise AuthError(403, "Can only set your own presence state") state = {} - try: - content = json.loads(request.content.read()) + content = parse_json_object_from_request(request) + + try: state["presence"] = content.pop("presence") if "status_msg" in content: @@ -113,11 +114,7 @@ class PresenceListRestServlet(ClientV1RestServlet): raise SynapseError( 400, "Cannot modify another user's presence list") - try: - content = json.loads(request.content.read()) - except: - logger.exception("JSON parse error") - raise SynapseError(400, "Unable to parse content") + content = parse_json_object_from_request(request) if "invite" in content: for u in content["invite"]: diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 953764bd8e..65c4e2ebef 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -18,8 +18,7 @@ from twisted.internet import defer from .base import ClientV1RestServlet, client_path_patterns from synapse.types import UserID - -import simplejson as json +from synapse.http.servlet import parse_json_object_from_request class ProfileDisplaynameRestServlet(ClientV1RestServlet): @@ -44,8 +43,9 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) user = UserID.from_string(user_id) + content = parse_json_object_from_request(request) + try: - content = json.loads(request.content.read()) new_name = content["displayname"] except: defer.returnValue((400, "Unable to parse name")) @@ -81,8 +81,8 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) + content = parse_json_object_from_request(request) try: - content = json.loads(request.content.read()) new_name = content["avatar_url"] except: defer.returnValue((400, "Unable to parse name")) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 7a9f3d11b9..a1fa7daf79 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -24,7 +24,6 @@ from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event from synapse.http.servlet import parse_json_object_from_request -import simplejson as json import logging import urllib @@ -72,15 +71,10 @@ class RoomCreateRestServlet(ClientV1RestServlet): defer.returnValue((200, info)) def get_room_config(self, request): - try: - user_supplied_config = json.loads(request.content.read()) - if "visibility" not in user_supplied_config: - # default visibility - user_supplied_config["visibility"] = "public" - return user_supplied_config - except (ValueError, TypeError): - raise SynapseError(400, "Body must be JSON.", - errcode=Codes.BAD_JSON) + user_supplied_config = parse_json_object_from_request(request) + # default visibility + user_supplied_config.setdefault("visibility", "public") + return user_supplied_config def on_OPTIONS(self, request): return (200, {}) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 1456881c1a..b16079cece 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -15,15 +15,13 @@ from ._base import client_v2_patterns -from synapse.http.servlet import RestServlet -from synapse.api.errors import AuthError, SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError from twisted.internet import defer import logging -import simplejson as json - logger = logging.getLogger(__name__) @@ -47,11 +45,7 @@ class AccountDataServlet(RestServlet): if user_id != requester.user.to_string(): raise AuthError(403, "Cannot add account data for other users.") - try: - content_bytes = request.content.read() - body = json.loads(content_bytes) - except: - raise SynapseError(400, "Invalid JSON") + body = parse_json_object_from_request(request) max_id = yield self.store.add_account_data_for_user( user_id, account_data_type, body @@ -86,14 +80,7 @@ class RoomAccountDataServlet(RestServlet): if user_id != requester.user.to_string(): raise AuthError(403, "Cannot add account data for other users.") - try: - content_bytes = request.content.read() - body = json.loads(content_bytes) - except: - raise SynapseError(400, "Invalid JSON") - - if not isinstance(body, dict): - raise ValueError("Expected a JSON object") + body = parse_json_object_from_request(request) max_id = yield self.store.add_account_data_to_room( user_id, room_id, account_data_type, body diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index 7c94f6ec41..510f8b2c74 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -16,12 +16,11 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from ._base import client_v2_patterns -import simplejson as json import logging @@ -84,12 +83,7 @@ class CreateFilterRestServlet(RestServlet): if not self.hs.is_mine(target_user): raise SynapseError(400, "Can only create filters for local users") - try: - content = json.loads(request.content.read()) - - # TODO(paul): check for required keys and invalid keys - except: - raise SynapseError(400, "Invalid filter definition") + content = parse_json_object_from_request(request) filter_id = yield self.filtering.add_user_filter( user_localpart=target_user.localpart, diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index f989b08614..89ab39491c 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -15,16 +15,15 @@ from twisted.internet import defer -from synapse.api.errors import SynapseError -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from canonicaljson import encode_canonical_json from ._base import client_v2_patterns -import simplejson as json import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -68,10 +67,9 @@ class KeyUploadServlet(RestServlet): user_id = requester.user.to_string() # TODO: Check that the device_id matches that in the authentication # or derive the device_id from the authentication instead. - try: - body = json.loads(request.content.read()) - except: - raise SynapseError(400, "Invalid key JSON") + + body = parse_json_object_from_request(request) + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. @@ -173,10 +171,7 @@ class KeyQueryServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id, device_id): yield self.auth.get_user_by_req(request) - try: - body = json.loads(request.content.read()) - except: - raise SynapseError(400, "Invalid key JSON") + body = parse_json_object_from_request(request) result = yield self.handle_request(body) defer.returnValue(result) @@ -272,10 +267,7 @@ class OneTimeKeyServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id, device_id, algorithm): yield self.auth.get_user_by_req(request) - try: - body = json.loads(request.content.read()) - except: - raise SynapseError(400, "Invalid key JSON") + body = parse_json_object_from_request(request) result = yield self.handle_request(body) defer.returnValue(result) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 79c436a8cf..dac8603b07 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -15,15 +15,13 @@ from ._base import client_v2_patterns -from synapse.http.servlet import RestServlet -from synapse.api.errors import AuthError, SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError from twisted.internet import defer import logging -import simplejson as json - logger = logging.getLogger(__name__) @@ -72,11 +70,7 @@ class TagServlet(RestServlet): if user_id != requester.user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - try: - content_bytes = request.content.read() - body = json.loads(content_bytes) - except: - raise SynapseError(400, "Invalid tag JSON") + body = parse_json_object_from_request(request) max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 81ef1f4702..9552016fec 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -13,7 +13,7 @@ # limitations under the License. from synapse.http.server import request_handler, respond_with_json_bytes -from synapse.http.servlet import parse_integer +from synapse.http.servlet import parse_integer, parse_json_object_from_request from synapse.api.errors import SynapseError, Codes from twisted.web.resource import Resource @@ -22,7 +22,6 @@ from twisted.internet import defer from io import BytesIO -import json import logging logger = logging.getLogger(__name__) @@ -126,14 +125,7 @@ class RemoteKey(Resource): @request_handler @defer.inlineCallbacks def async_render_POST(self, request): - try: - content = json.loads(request.content.read()) - if type(content) != dict: - raise ValueError() - except ValueError: - raise SynapseError( - 400, "Content must be JSON object.", errcode=Codes.NOT_JSON - ) + content = parse_json_object_from_request(request) query = content["server_keys"] diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index 1d210f9bf8..af02fce8fb 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -95,7 +95,8 @@ class ProfileTestCase(unittest.TestCase): mocked_set.side_effect = AuthError(400, "message") (code, response) = yield self.mock_resource.trigger( - "PUT", "/profile/%s/displayname" % ("@4567:test"), '"Frank Jr."' + "PUT", "/profile/%s/displayname" % ("@4567:test"), + '{"displayname": "Frank Jr."}' ) self.assertTrue( @@ -121,7 +122,8 @@ class ProfileTestCase(unittest.TestCase): mocked_set.side_effect = SynapseError(400, "message") (code, response) = yield self.mock_resource.trigger( - "PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"), None + "PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"), + '{"displayname":"bob"}' ) self.assertTrue( -- cgit 1.4.1 From 398cd1edfbefa207e44047bd63adcdcc6e859f2e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 14 Mar 2016 14:16:41 +0000 Subject: Fix regression where synapse checked whether push rules were valid JSON before the compatibility hack that handled clients sending invalid JSON --- synapse/http/servlet.py | 21 +++++++++++++++++---- synapse/rest/client/v1/push_rule.py | 4 ++-- 2 files changed, 19 insertions(+), 6 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 1996f8b136..1c8bd8666f 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -119,13 +119,13 @@ def parse_string(request, name, default=None, required=False, return default -def parse_json_object_from_request(request): - """Parse a JSON object from the body of a twisted HTTP request. +def parse_json_value_from_request(request): + """Parse a JSON value from the body of a twisted HTTP request. :param request: the twisted HTTP request. + :returns: The JSON value. :raises - SynapseError if the request body couldn't be decoded as JSON or - if it wasn't a JSON object. + SynapseError if the request body couldn't be decoded as JSON. """ try: content_bytes = request.content.read() @@ -137,6 +137,19 @@ def parse_json_object_from_request(request): except simplejson.JSONDecodeError: raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) + return content + + +def parse_json_object_from_request(request): + """Parse a JSON object from the body of a twisted HTTP request. + + :param request: the twisted HTTP request. + :raises + SynapseError if the request body couldn't be decoded as JSON or + if it wasn't a JSON object. + """ + content = parse_json_value_from_request(request) + if type(content) != dict: message = "Content must be a JSON object." raise SynapseError(400, message, errcode=Codes.BAD_JSON) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index b5695d427a..02d837ee6a 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -25,7 +25,7 @@ from synapse.storage.push_rule import ( from synapse.push.clientformat import format_push_rules_for_user from synapse.push.baserules import BASE_RULE_IDS from synapse.push.rulekinds import PRIORITY_CLASS_MAP -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_value_from_request class PushRuleRestServlet(ClientV1RestServlet): @@ -51,7 +51,7 @@ class PushRuleRestServlet(ClientV1RestServlet): if '/' in spec['rule_id'] or '\\' in spec['rule_id']: raise SynapseError(400, "rule_id may not contain slashes") - content = parse_json_object_from_request(request) + content = parse_json_value_from_request(request) user_id = requester.user.to_string() -- cgit 1.4.1