From 63c19e1df949dd8fc15defc1942e2405c5426958 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 11:55:57 +0100 Subject: Move 3PU/3PL lookup APIs into /thirdparty containing entity --- synapse/rest/client/v2_alpha/thirdparty.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 9abca3a8ad..8821e101bf 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class ThirdPartyUserServlet(RestServlet): - PATTERNS = client_v2_patterns("/3pu(/(?P[^/]+))?$", + PATTERNS = client_v2_patterns("/thirdparty/user(/(?P[^/]+))?$", releases=()) def __init__(self, hs): @@ -50,7 +50,7 @@ class ThirdPartyUserServlet(RestServlet): class ThirdPartyLocationServlet(RestServlet): - PATTERNS = client_v2_patterns("/3pl(/(?P[^/]+))?$", + PATTERNS = client_v2_patterns("/thirdparty/location(/(?P[^/]+))?$", releases=()) def __init__(self, hs): -- cgit 1.4.1 From 921913935176f5bd3df5e5b960d87c94a2adb304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2016 15:23:39 +0100 Subject: Preserve some logcontexts --- synapse/app/homeserver.py | 3 ++- synapse/appservice/scheduler.py | 6 +++--- synapse/crypto/keyring.py | 36 ++++++++++++++++---------------- synapse/federation/federation_base.py | 7 ++++--- synapse/federation/federation_client.py | 17 +++++++++------ synapse/handlers/appservice.py | 8 +++---- synapse/handlers/federation.py | 28 +++++++++++++------------ synapse/handlers/message.py | 35 ++++++++++++++++++------------- synapse/handlers/typing.py | 12 +++++++---- synapse/notifier.py | 7 ++++++- synapse/push/push_tools.py | 9 ++++---- synapse/push/pusherpool.py | 12 ++++++----- synapse/rest/client/v2_alpha/register.py | 3 +-- synapse/storage/events.py | 16 ++++++++------ synapse/storage/stream.py | 6 +++--- synapse/util/async.py | 9 ++++---- synapse/util/logcontext.py | 15 +++++++++---- synapse/visibility.py | 6 +++--- 18 files changed, 136 insertions(+), 99 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 54f35900f8..4493d3b847 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -50,7 +50,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, logcontext_tracer from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -449,6 +449,7 @@ def run(hs): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) with LoggingContext("run"): + sys.settrace(logcontext_tracer) change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6450a12890..68a9de17b8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -150,12 +150,12 @@ class _TransactionController(object): if service_is_up: sent = yield txn.send(self.as_api) if sent: - txn.complete(self.store) + yield txn.complete(self.store) else: - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) except Exception as e: logger.exception(e) - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1735ca9345..d7211ee9b3 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -308,15 +308,15 @@ class Keyring(object): @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_server_verify_keys( + preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(dict(res)) @@ -337,13 +337,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(p_name, p_keys) + preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) union_of_keys = {} for result in results: @@ -383,13 +383,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(server_name, key_ids) + preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) merged = {} for result in results: @@ -466,9 +466,9 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -476,7 +476,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -524,7 +524,7 @@ class Keyring(object): keys.update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -534,7 +534,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -600,7 +600,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -613,7 +613,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) results[server_name] = response_keys @@ -702,7 +702,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -710,4 +710,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index da2f5e8cfd..2339cc9034 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError from synapse.util import unwrapFirstError +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -102,10 +103,10 @@ class FederationBase(object): warn, pdu ) - valid_pdus = yield defer.gatherResults( + valid_pdus = yield preserve_context_over_deferred(defer.gatherResults( deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if include_none: defer.returnValue(valid_pdus) @@ -129,7 +130,7 @@ class FederationBase(object): for pdu in pdus ] - deferreds = self.keyring.verify_json_objects_for_server([ + deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([ (p.origin, p.get_pdu_json()) for p in redacted_pdus ]) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ba3151713..f2b3aceb49 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent import synapse.metrics @@ -225,10 +226,10 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield defer.gatherResults( + pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(pdus) @@ -457,14 +458,16 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id in batch ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for success, result in res: if success: signed_events.append(result) @@ -853,14 +856,16 @@ class FederationClient(FederationBase): return srvs deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for (result, val), (e_id, _) in zip(res, ordered_missing): if result and val: signed_events.append(val) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..306686a384 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 328f8f4842..fe3092b14b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,7 +374,7 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) required_auth.update( a_id for event in results for a_id, _ in event.auth_events @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a52..4c3cd9d12e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c09..46181984c0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index c48024096d..b86648f5e4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,7 +19,7 @@ from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -174,6 +174,7 @@ class Notifier(object): lambda: len(self.user_to_user_stream), ) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -195,6 +196,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -212,6 +214,7 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) + @preserve_fn def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. @@ -226,6 +229,7 @@ class Notifier(object): rooms=[event.room_id], ) + @preserve_fn def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. @@ -252,6 +256,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index d555a33e9a..becb8ef1ae 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,14 +17,15 @@ from twisted.internet import defer from synapse.util.presentable_names import ( calculate_room_name, name_from_member_event ) +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @defer.inlineCallbacks def get_badge_count(store, user_id): - invites, joins = yield defer.gatherResults([ - store.get_invited_rooms_for_user(user_id), - store.get_rooms_for_user(user_id), - ], consumeErrors=True) + invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(store.get_invited_rooms_for_user)(user_id), + preserve_fn(store.get_rooms_for_user)(user_id), + ], consumeErrors=True)) my_receipts_by_room = yield store.get_receipts_for_user( user_id, "m.read", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 54c0f1b849..3837be523d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer import pusher -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.async import run_on_reactor import logging @@ -130,10 +130,12 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_notifications(min_stream_id, max_stream_id) + preserve_fn(p.on_new_notifications)( + min_stream_id, max_stream_id + ) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_notifications") @@ -155,10 +157,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_receipts(min_stream_id, max_stream_id) + preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 943f5676a3..2121bd75ea 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet): # register the user's device device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") - device_id = self.device_handler.check_device_registered( + return self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) - return device_id @defer.inlineCallbacks def _do_guest_registration(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef25321..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea1..0577a0525b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/async.py b/synapse/util/async.py index c84b23ff46..347fb1e380 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return defer.gatherResults([ + return preserve_context_over_deferred(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) - ], consumeErrors=True).addErrback(unwrapFirstError) + ], consumeErrors=True)).addErrback(unwrapFirstError) class Linearizer(object): @@ -181,7 +181,8 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - yield preserve_context_over_deferred(current_defer) + with PreserveLoggingContext(): + yield current_defer @contextmanager def _ctx_manager(): @@ -264,7 +265,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield defer.gatherResults(to_wait_on) + yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7a87045f87..6c83eb213d 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs): return res -def preserve_context_over_deferred(deferred): +def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - current_context = LoggingContext.current_context() - d = _PreservingContextDeferred(current_context) + if context is None: + context = LoggingContext.current_context() + d = _PreservingContextDeferred(context) deferred.chainDeferred(d) return d @@ -316,7 +317,13 @@ def preserve_fn(f): def g(*args, **kwargs): with PreserveLoggingContext(current): - return f(*args, **kwargs) + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred( + res, context=LoggingContext.sentinel + ) + else: + return res return g diff --git a/synapse/visibility.py b/synapse/visibility.py index 948ad51772..cc12c0a23d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): given events events ([synapse.events.EventBase]): list of events to filter """ - forgotten = yield defer.gatherResults([ + forgotten = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(store.who_forgot_in_room)( room_id, ) for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) + ], consumeErrors=True)) # Set of membership event_ids that have been forgotten event_id_forgotten = frozenset( -- cgit 1.4.1 From 9899824b851bc1e1ccaa0da8cb2dc5bf783014e8 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 12:33:01 +0100 Subject: Initial hack at the 3PN protocols metadata lookup API --- synapse/appservice/__init__.py | 2 ++ synapse/handlers/appservice.py | 8 +++++++ synapse/rest/client/v2_alpha/thirdparty.py | 34 ++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) (limited to 'synapse/rest/client') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index bde9b51b2e..126a10efb7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -88,6 +88,8 @@ class ApplicationService(object): self.sender = sender self.namespaces = self._check_namespaces(namespaces) self.id = id + + # .protocols is a publicly visible field if protocols: self.protocols = set(protocols) else: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..6273ff524c 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -175,6 +175,14 @@ class ApplicationServicesHandler(object): defer.returnValue(ret) + @defer.inlineCallbacks + def get_3pe_protocols(self): + services = yield self.store.get_app_services() + protocols = set() + for s in services: + protocols.update(s.protocols) + defer.returnValue(protocols) + @defer.inlineCallbacks def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 8821e101bf..2decadb001 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -25,6 +25,39 @@ from ._base import client_v2_patterns logger = logging.getLogger(__name__) +class ThirdPartyProtocolsServlet(RestServlet): + PATTERNS = client_v2_patterns("/thirdparty/protocols", releases=()) + + META = { + # TODO(paul): Declare kinds of metadata in here + } + + def __init__(self, hs): + super(ThirdPartyProtocolsServlet, self).__init__() + + self.auth = hs.get_auth() + self.appservice_handler = hs.get_application_service_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + yield self.auth.get_user_by_req(request) + + protocols = yield self.appservice_handler.get_3pe_protocols() + + result = {} + # TODO(paul): Eventually this kind of metadata wants to come from the + # ASes themselves + for protocol in protocols: + if protocol in self.META: + result[protocol] = self.META[protocol] + else: + # We don't know any metadata for it, but we'd best at least + # still declare that we know it exists + result[protocol] = {} + + defer.returnValue((200, result)) + + class ThirdPartyUserServlet(RestServlet): PATTERNS = client_v2_patterns("/thirdparty/user(/(?P[^/]+))?$", releases=()) @@ -74,5 +107,6 @@ class ThirdPartyLocationServlet(RestServlet): def register_servlets(hs, http_server): + ThirdPartyProtocolsServlet(hs).register(http_server) ThirdPartyUserServlet(hs).register(http_server) ThirdPartyLocationServlet(hs).register(http_server) -- cgit 1.4.1 From 965f33c9018c80a6afce1eb58b5a8ef442108fc3 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 12:34:03 +0100 Subject: Declare 'gitter' known protocol, with user lookup --- synapse/rest/client/v2_alpha/thirdparty.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 2decadb001..ae7901551b 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -30,6 +30,9 @@ class ThirdPartyProtocolsServlet(RestServlet): META = { # TODO(paul): Declare kinds of metadata in here + "gitter": { + "user_fields": ["username"], + } } def __init__(self, hs): -- cgit 1.4.1 From 8e1ed09dff902c84beb9dff59305aa78e8772d81 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 13:01:53 +0100 Subject: Move static knowledge of protocol metadata into AS handler; cache the result --- synapse/handlers/appservice.py | 26 ++++++++++++++++++++++++-- synapse/rest/client/v2_alpha/thirdparty.py | 21 +-------------------- 2 files changed, 25 insertions(+), 22 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 6273ff524c..1c5c5ec302 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -37,6 +37,13 @@ def log_failure(failure): class ApplicationServicesHandler(object): + PROTOCOL_META = { + # TODO(paul): Declare kinds of metadata in here + "gitter": { + "user_fields": ["username"], + } + } + def __init__(self, hs): self.store = hs.get_datastore() self.is_mine_id = hs.is_mine_id @@ -48,6 +55,7 @@ class ApplicationServicesHandler(object): self.current_max = 0 self.is_processing = False + self.supported_protocols = None @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -177,10 +185,24 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def get_3pe_protocols(self): + # The set of supported AS protocols and the metadata about them is + # effectively static during the lifetime of a homeserver process. We + # can look this up once and just cache it. + if self.supported_protocols: + defer.returnValue(self.supported_protocols) + services = yield self.store.get_app_services() - protocols = set() + protocols = {} for s in services: - protocols.update(s.protocols) + for p in s.protocols: + if p in self.PROTOCOL_META: + protocols[p] = self.PROTOCOL_META[p] + else: + # We don't know any metadata for it, but we'd best at least + # still declare that we know it exists + protocols[p] = {} + + self.supported_protocols = protocols defer.returnValue(protocols) @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index ae7901551b..bbc3e9b962 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -28,13 +28,6 @@ logger = logging.getLogger(__name__) class ThirdPartyProtocolsServlet(RestServlet): PATTERNS = client_v2_patterns("/thirdparty/protocols", releases=()) - META = { - # TODO(paul): Declare kinds of metadata in here - "gitter": { - "user_fields": ["username"], - } - } - def __init__(self, hs): super(ThirdPartyProtocolsServlet, self).__init__() @@ -46,19 +39,7 @@ class ThirdPartyProtocolsServlet(RestServlet): yield self.auth.get_user_by_req(request) protocols = yield self.appservice_handler.get_3pe_protocols() - - result = {} - # TODO(paul): Eventually this kind of metadata wants to come from the - # ASes themselves - for protocol in protocols: - if protocol in self.META: - result[protocol] = self.META[protocol] - else: - # We don't know any metadata for it, but we'd best at least - # still declare that we know it exists - result[protocol] = {} - - defer.returnValue((200, result)) + defer.returnValue((200, protocols)) class ThirdPartyUserServlet(RestServlet): -- cgit 1.4.1