From b4c3bc17343833205d3bb00009074734edce667a Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 26 Sep 2018 13:26:45 -0600 Subject: Handle HttpResponseException more safely for federated groups --- synapse/handlers/groups_local.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 53e5e2648b..e36f22f128 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -20,7 +20,7 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import (SynapseError, HttpResponseException) from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) @@ -37,9 +37,18 @@ def _create_rerouter(func_name): ) else: destination = get_domain_from_id(group_id) - return getattr(self.transport_client, func_name)( + logger.info("Triggering call") + d = getattr(self.transport_client, func_name)( destination, group_id, *args, **kwargs ) + def h(failure): + failure.trap(HttpResponseException) + e = failure.value + if e.code >= 400 and e.code < 500: + raise SynapseError(e.code, e.msg) + failure.raiseException() + d.addErrback(h) + return d return f -- cgit 1.5.1 From 82fa31799cdf5146495fa95634928a2a3f5070be Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 26 Sep 2018 14:01:02 -0600 Subject: Remove debugging statement --- synapse/handlers/groups_local.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index e36f22f128..68de8a8e53 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -37,7 +37,6 @@ def _create_rerouter(func_name): ) else: destination = get_domain_from_id(group_id) - logger.info("Triggering call") d = getattr(self.transport_client, func_name)( destination, group_id, *args, **kwargs ) -- cgit 1.5.1 From 7bb651de6a29e4b261b67aace1e0d5e4286b2e69 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Fri, 12 Oct 2018 14:53:30 -0600 Subject: More sane handling of group errors and pep8 --- synapse/handlers/groups_local.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 68de8a8e53..66da6edad1 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -20,7 +20,7 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import (SynapseError, HttpResponseException) +from synapse.api.errors import (HttpResponseException, SynapseError) from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) @@ -40,12 +40,13 @@ def _create_rerouter(func_name): d = getattr(self.transport_client, func_name)( destination, group_id, *args, **kwargs ) + def h(failure): failure.trap(HttpResponseException) e = failure.value - if e.code >= 400 and e.code < 500: - raise SynapseError(e.code, e.msg) - failure.raiseException() + if e.code == 403: + raise e.to_synapse_error() + return failure d.addErrback(h) return d return f -- cgit 1.5.1 From 164f8e484356f0dc8bc32f30f668d4b69b46000b Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Fri, 12 Oct 2018 15:11:59 -0600 Subject: isort --- synapse/handlers/groups_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 66da6edad1..cb7e7d49cb 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -20,7 +20,7 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import (HttpResponseException, SynapseError) +from synapse.api.errors import HttpResponseException, SynapseError from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) -- cgit 1.5.1 From 15133477ee6ea624dd937329c41d2d3c8ff2bd56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Oct 2018 13:58:37 +0100 Subject: Fix up use of resolve_events_with_factory --- synapse/handlers/federation.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 45d955e6f5..b028d58ae4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -53,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet -from synapse.state import resolve_events_with_factory +from synapse.state import StateResolutionStore, resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer @@ -384,21 +384,18 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - # Resolve any conflicting state - @defer.inlineCallbacks - def fetch(ev_ids): - fetched = yield self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - # add any events we fetch here to the `event_map` so that we - # can use them to build the state event list below. - event_map.update(fetched) - defer.returnValue(fetched) - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_maps, event_map, fetch, + room_version, state_maps, event_map, + state_res_store=StateResolutionStore(self.store), + ) + + evs = yield self.store.get_events( + list(state_map.values()), + get_prev_content=False, + check_redacted=False, ) + event_map.update(evs) # we need to give _process_received_pdu the actual state events # rather than event ids, so generate that now. -- cgit 1.5.1 From 74e761708398d5170783912f02d120f20113205e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Oct 2018 16:14:24 +0100 Subject: Clean up room alias creation --- synapse/handlers/directory.py | 77 ++++++++++++++++++++++--------------- synapse/handlers/room.py | 5 ++- synapse/rest/client/v1/directory.py | 37 +++--------------- 3 files changed, 55 insertions(+), 64 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 18741c5fac..02f12f6645 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def create_association(self, user_id, room_alias, room_id, servers=None): - # association creation for human users - # TODO(erikj): Do user auth. + def create_association(self, requester, room_alias, room_id, servers=None, + send_event=True): + """Attempt to create a new alias - if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): - raise SynapseError( - 403, "This user is not permitted to create this alias", - ) + Args: + requester (Requester) + room_alias (RoomAlias) + room_id (str) + servers (list[str]|None): List of servers that others servers + should try and join via + send_event (bool): Whether to send an updated m.room.aliases event - can_create = yield self.can_modify_alias( - room_alias, - user_id=user_id - ) - if not can_create: - raise SynapseError( - 400, "This alias is reserved by an application service.", - errcode=Codes.EXCLUSIVE - ) - yield self._create_association(room_alias, room_id, servers, creator=user_id) + Returns: + Deferred + """ - @defer.inlineCallbacks - def create_appservice_association(self, service, room_alias, room_id, - servers=None): - if not service.is_interested_in_alias(room_alias.to_string()): - raise SynapseError( - 400, "This application service has not reserved" - " this kind of alias.", errcode=Codes.EXCLUSIVE + user_id = requester.user.to_string() + + service = requester.app_service + if service: + if not service.is_interested_in_alias(room_alias.to_string()): + raise SynapseError( + 400, "This application service has not reserved" + " this kind of alias.", errcode=Codes.EXCLUSIVE + ) + else: + if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): + raise AuthError( + 403, "This user is not permitted to create this alias", + ) + + can_create = yield self.can_modify_alias( + room_alias, + user_id=user_id ) + if not can_create: + raise AuthError( + 400, "This alias is reserved by an application service.", + errcode=Codes.EXCLUSIVE + ) - # association creation for app services - yield self._create_association(room_alias, room_id, servers) + yield self._create_association(room_alias, room_id, servers, creator=user_id) + if send_event: + yield self.send_room_alias_update_event( + requester, + room_id + ) @defer.inlineCallbacks - def delete_association(self, requester, user_id, room_alias): + def delete_association(self, requester, room_alias): # association deletion for human users + user_id = requester.user.to_string() + try: can_delete = yield self._user_can_delete_alias(room_alias, user_id) except StoreError as e: @@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler): try: yield self.send_room_alias_update_event( requester, - requester.user.to_string(), room_id ) @@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def send_room_alias_update_event(self, requester, user_id, room_id): + def send_room_alias_update_event(self, requester, room_id): aliases = yield self.store.get_aliases_for_room(room_id) yield self.event_creation_handler.create_and_send_nonmember_event( @@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler): "type": EventTypes.Aliases, "state_key": self.hs.hostname, "room_id": room_id, - "sender": user_id, + "sender": requester.user.to_string(), "content": {"aliases": aliases}, }, ratelimit=False diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c3f820b975..ab1571b27b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler): if room_alias: directory_handler = self.hs.get_handlers().directory_handler yield directory_handler.create_association( - user_id=user_id, + requester=requester, room_id=room_id, room_alias=room_alias, servers=[self.hs.hostname], + send_event=False, ) preset_config = config.get( @@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() yield directory_handler.send_room_alias_update_event( - requester, user_id, room_id + requester, room_id ) defer.returnValue(result) diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 97733f3026..0220acf644 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet): if room is None: raise SynapseError(400, "Room does not exist") - dir_handler = self.handlers.directory_handler + requester = yield self.auth.get_user_by_req(request) - try: - # try to auth as a user - requester = yield self.auth.get_user_by_req(request) - try: - user_id = requester.user.to_string() - yield dir_handler.create_association( - user_id, room_alias, room_id, servers - ) - yield dir_handler.send_room_alias_update_event( - requester, - user_id, - room_id - ) - except SynapseError as e: - raise e - except Exception: - logger.exception("Failed to create association") - raise - except AuthError: - # try to auth as an application service - service = yield self.auth.get_appservice_by_req(request) - yield dir_handler.create_appservice_association( - service, room_alias, room_id, servers - ) - logger.info( - "Application service at %s created alias %s pointing to %s", - service.url, - room_alias.to_string(), - room_id - ) + yield self.handlers.directory_handler.create_association( + requester, room_alias, room_id, servers + ) defer.returnValue((200, {})) @@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet): room_alias = RoomAlias.from_string(room_alias) yield dir_handler.delete_association( - requester, user.to_string(), room_alias + requester, room_alias ) logger.info( -- cgit 1.5.1 From e7a16c6210191e9556fb1c11cbff2d98f0a5206c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Oct 2018 16:12:11 +0100 Subject: Remove redundant run_as_background_process() from pusherpool `on_new_notifications` and `on_new_receipts` in `HttpPusher` and `EmailPusher` now always return synchronously, so we can remove the `defer.gatherResults` on their results, and the `run_as_background_process` wrappers can be removed too because the PusherPool methods will now complete quickly enough. --- synapse/app/pusher.py | 4 ++-- synapse/handlers/federation.py | 4 ++-- synapse/handlers/message.py | 2 +- synapse/handlers/receipts.py | 2 +- synapse/push/emailpusher.py | 3 +-- synapse/push/httppusher.py | 2 -- synapse/push/pusherpool.py | 47 +++++++----------------------------------- 7 files changed, 14 insertions(+), 50 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e06b70894e..83b0863f00 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - self.pusher_pool.on_new_receipts( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) except Exception: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cab57a8849..63e495e3f8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + yield self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler): extra_users=extra_users ) - self.pusher_pool.on_new_notifications( + return self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4954b23a0d..6c4fcfb10a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -779,7 +779,7 @@ class EventCreationHandler(object): event, context=context ) - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a6f3181f09..4c2690ba26 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler): "receipt_key", max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. - self.hs.get_pusherpool().on_new_receipts( + yield self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids, ) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 0c9c0201e8..d5a99b838c 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -94,13 +94,12 @@ class EmailPusher(object): def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) self._start_processing() - return defer.succeed(None) def on_new_receipts(self, min_stream_id, max_stream_id): # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire - return defer.succeed(None) + pass def on_timer(self): self.timed_call = None diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5f6b21bc67..770f55feae 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -98,7 +98,6 @@ class HttpPusher(object): def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) self._start_processing() - return defer.suceed(None) def on_new_receipts(self, min_stream_id, max_stream_id): # Note that the min here shouldn't be relied upon to be accurate. @@ -106,7 +105,6 @@ class HttpPusher(object): # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... run_as_background_process("http_pusher.on_new_receipts", self._update_badge) - return defer.succeed(None) @defer.inlineCallbacks def _update_badge(self): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b9b68ec829..a4d1ce3aad 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,9 +18,8 @@ import logging from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push.pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.util.logcontext import run_in_background logger = logging.getLogger(__name__) @@ -122,45 +121,23 @@ class PusherPool: p['app_id'], p['pushkey'], p['user_name'], ) - def on_new_notifications(self, min_stream_id, max_stream_id): - run_as_background_process( - "on_new_notifications", - self._on_new_notifications, min_stream_id, max_stream_id, - ) - @defer.inlineCallbacks - def _on_new_notifications(self, min_stream_id, max_stream_id): + def on_new_notifications(self, min_stream_id, max_stream_id): try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id ) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_notifications, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_notifications(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_notifications") - def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - run_as_background_process( - "on_new_receipts", - self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids, - ) - @defer.inlineCallbacks - def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive @@ -170,21 +147,11 @@ class PusherPool: # This returns a tuple, user_id is at index 3 users_affected = set([r[3] for r in updated_receipts]) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_receipts, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_receipts(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_receipts") -- cgit 1.5.1 From 5c445114d356c9c23b99f0a2c4246be983a38b69 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 23 Oct 2018 13:12:32 +0100 Subject: Correctly account for cpu usage by background threads (#4074) Wrap calls to deferToThread() in a thing which uses a child logcontext to attribute CPU usage to the right request. While we're in the area, remove the logcontext_tracer stuff, which is never used, and afaik doesn't work. Fixes #4064 --- changelog.d/4074.bugfix | 1 + synapse/handlers/auth.py | 18 +---- synapse/rest/media/v1/media_repository.py | 24 +++--- synapse/rest/media/v1/media_storage.py | 8 +- synapse/rest/media/v1/storage_provider.py | 6 +- synapse/util/logcontext.py | 120 +++++++++++++++++------------- 6 files changed, 97 insertions(+), 80 deletions(-) create mode 100644 changelog.d/4074.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/4074.bugfix b/changelog.d/4074.bugfix new file mode 100644 index 0000000000..b3b6b00243 --- /dev/null +++ b/changelog.d/4074.bugfix @@ -0,0 +1 @@ +Correctly account for cpu usage by background threads diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 2a5eab124f..329e3c7d71 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -22,7 +22,7 @@ import bcrypt import pymacaroons from canonicaljson import json -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.client import PartialDownloadError import synapse.util.stringutils as stringutils @@ -37,8 +37,8 @@ from synapse.api.errors import ( ) from synapse.module_api import ModuleApi from synapse.types import UserID +from synapse.util import logcontext from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable from ._base import BaseHandler @@ -884,11 +884,7 @@ class AuthHandler(BaseHandler): bcrypt.gensalt(self.bcrypt_rounds), ).decode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -913,13 +909,7 @@ class AuthHandler(BaseHandler): if not isinstance(stored_hash, bytes): stored_hash = stored_hash.encode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), - self.hs.get_reactor().getThreadPool(), - _do_validate_hash, - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash) else: return defer.succeed(False) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index a828ff4438..08b1867fab 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse import twisted.internet.error import twisted.web.http -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.resource import Resource from synapse.api.errors import ( @@ -36,8 +36,8 @@ from synapse.api.errors import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext from synapse.util.async_helpers import Linearizer -from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import is_ascii, random_string @@ -492,10 +492,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -534,10 +535,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -620,15 +622,17 @@ class MediaRepository(object): for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type, - )) + ) elif t_method == "scale": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type, - )) + ) else: logger.error("Unrecognized method: %r", t_method) continue diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index a6189224ee..896078fe76 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -21,9 +21,10 @@ import sys import six -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.protocols.basic import FileSender +from synapse.util import logcontext from synapse.util.file_consumer import BackgroundFileConsumer from synapse.util.logcontext import make_deferred_yieldable @@ -64,9 +65,10 @@ class MediaStorage(object): with self.store_into_file(file_info) as (f, fname, finish_cb): # Write to the main repository - yield make_deferred_yieldable(threads.deferToThread( + yield logcontext.defer_to_thread( + self.hs.get_reactor(), _write_file_synchronously, source, f, - )) + ) yield finish_cb() defer.returnValue(fname) diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 7b9f8b4d79..5aa03031f6 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -17,9 +17,10 @@ import logging import os import shutil -from twisted.internet import defer, threads +from twisted.internet import defer from synapse.config._base import Config +from synapse.util import logcontext from synapse.util.logcontext import run_in_background from .media_storage import FileResponder @@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider): if not os.path.exists(dirname): os.makedirs(dirname) - return threads.deferToThread( + return logcontext.defer_to_thread( + self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname, ) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 89224b26cc..4c6e92beb8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works. import logging import threading -from twisted.internet import defer +from twisted.internet import defer, threads logger = logging.getLogger(__name__) @@ -562,58 +562,76 @@ def _set_context_cb(result, context): return result -# modules to ignore in `logcontext_tracer` -_to_ignore = [ - "synapse.util.logcontext", - "synapse.http.server", - "synapse.storage._base", - "synapse.util.async_helpers", -] +def defer_to_thread(reactor, f, *args, **kwargs): + """ + Calls the function `f` using a thread from the reactor's default threadpool and + returns the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked, and whose threadpool we should use for the + function. + + Normally this will be hs.get_reactor(). + + f (callable): The function to call. + args: positional arguments to pass to f. -def logcontext_tracer(frame, event, arg): - """A tracer that logs whenever a logcontext "unexpectedly" changes within - a function. Probably inaccurate. + kwargs: keyword arguments to pass to f. - Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. """ - if event == 'call': - name = frame.f_globals["__name__"] - if name.startswith("synapse"): - if name == "synapse.util.logcontext": - if frame.f_code.co_name in ["__enter__", "__exit__"]: - tracer = frame.f_back.f_trace - if tracer: - tracer.just_changed = True - - tracer = frame.f_trace - if tracer: - return tracer - - if not any(name.startswith(ig) for ig in _to_ignore): - return LineTracer() - - -class LineTracer(object): - __slots__ = ["context", "just_changed"] - - def __init__(self): - self.context = LoggingContext.current_context() - self.just_changed = False - - def __call__(self, frame, event, arg): - if event in 'line': - if self.just_changed: - self.context = LoggingContext.current_context() - self.just_changed = False - else: - c = LoggingContext.current_context() - if c != self.context: - logger.info( - "Context changed! %s -> %s, %s, %s", - self.context, c, - frame.f_code.co_filename, frame.f_lineno - ) - self.context = c + return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) - return self + +def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): + """ + A wrapper for twisted.internet.threads.deferToThreadpool, which handles + logcontexts correctly. + + Calls the function `f` using a thread from the given threadpool and returns + the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked. Normally this will be hs.get_reactor(). + + threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for + running `f`. Normally this will be hs.get_reactor().getThreadPool(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + logcontext = LoggingContext.current_context() + + def g(): + with LoggingContext(parent_context=logcontext): + return f(*args, **kwargs) + + return make_deferred_yieldable( + threads.deferToThreadPool(reactor, threadpool, g) + ) -- cgit 1.5.1 From b3f6dddad2c3777e3620acfbb97e19409a5beb81 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 23 Oct 2018 14:29:17 +0100 Subject: Give some more things logcontexts (#4077) --- changelog.d/4077.misc | 1 + synapse/handlers/deactivate_account.py | 4 ++-- synapse/handlers/user_directory.py | 14 +++++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 changelog.d/4077.misc (limited to 'synapse/handlers') diff --git a/changelog.d/4077.misc b/changelog.d/4077.misc new file mode 100644 index 0000000000..52ca4c1de2 --- /dev/null +++ b/changelog.d/4077.misc @@ -0,0 +1 @@ +Give some more things logcontexts diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index b078df4a76..75fe50c42c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -17,8 +17,8 @@ import logging from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, create_requester -from synapse.util.logcontext import run_in_background from ._base import BaseHandler @@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler): None """ if not self._user_parter_running: - run_in_background(self._user_parter_loop) + run_as_background_process("user_parter_loop", self._user_parter_loop) @defer.inlineCallbacks def _user_parter_loop(self): diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d8413d6aa7..f11b430126 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -20,6 +20,7 @@ from six import iteritems from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import get_localpart_from_id from synapse.util.metrics import Measure @@ -98,7 +99,6 @@ class UserDirectoryHandler(object): """ return self.store.search_user_dir(user_id, search_term, limit) - @defer.inlineCallbacks def notify_new_event(self): """Called when there may be more deltas to process """ @@ -108,11 +108,15 @@ class UserDirectoryHandler(object): if self._is_processing: return + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + self._is_processing = False + self._is_processing = True - try: - yield self._unsafe_process() - finally: - self._is_processing = False + run_as_background_process("user_directory.notify_new_event", process) @defer.inlineCallbacks def handle_local_profile_change(self, user_id, profile): -- cgit 1.5.1 From 3e704822be40a7d1d3fa82bf554f6a85823b5686 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Tue, 23 Oct 2018 10:25:31 -0600 Subject: Comments help --- synapse/handlers/groups_local.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index cb7e7d49cb..173315af6c 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -41,6 +41,11 @@ def _create_rerouter(func_name): destination, group_id, *args, **kwargs ) + # Capture errors returned by the remote homeserver and + # re-throw specific errors as SynapseErrors. This is so + # when the remote end responds with things like 403 Not + # In Group, we can communicate that to the client instead + # of a 500. def h(failure): failure.trap(HttpResponseException) e = failure.value -- cgit 1.5.1 From 810715f79aafc586e70e9dcd3da4a2cc6823f704 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Oct 2018 09:44:22 +0100 Subject: Rename resolve_events_with_factory --- synapse/handlers/federation.py | 4 ++-- synapse/state/__init__.py | 14 +++++++------- synapse/state/v1.py | 2 +- synapse/state/v2.py | 2 +- tests/state/test_v2.py | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b028d58ae4..91a18a552c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -53,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet -from synapse.state import StateResolutionStore, resolve_events_with_factory +from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer @@ -385,7 +385,7 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x room_version = yield self.store.get_room_version(room_id) - state_map = yield resolve_events_with_factory( + state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 836e137296..9b40b18d5b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -394,7 +394,7 @@ class StateHandler(object): } with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events_with_factory( + new_state = yield resolve_events_with_store( room_version, state_set_ids, event_map=state_map, state_res_store=StateResolutionStore(self.store), @@ -478,10 +478,10 @@ class StateResolutionHandler(object): # start by assuming we won't have any conflicted state, and build up the new # state map by iterating through the state groups. If we discover a conflict, - # we give up and instead use `resolve_events_with_factory`. + # we give up and instead use `resolve_events_with_store`. # # XXX: is this actually worthwhile, or should we just let - # resolve_events_with_factory do it? + # resolve_events_with_store do it? new_state = {} conflicted_state = False for st in itervalues(state_groups_ids): @@ -496,7 +496,7 @@ class StateResolutionHandler(object): if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events_with_factory( + new_state = yield resolve_events_with_store( room_version, list(itervalues(state_groups_ids)), event_map=event_map, @@ -581,7 +581,7 @@ def _make_state_cache_entry( ) -def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store): +def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): """ Args: room_version(str): Version of the room @@ -604,11 +604,11 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_res_s a map from (type, state_key) to event_id. """ if room_version == RoomVersions.V1: - return v1.resolve_events_with_factory( + return v1.resolve_events_with_store( state_sets, event_map, state_res_store.get_events, ) elif room_version == RoomVersions.VDH_TEST: - return v2.resolve_events_with_factory( + return v2.resolve_events_with_store( state_sets, event_map, state_res_store, ) else: diff --git a/synapse/state/v1.py b/synapse/state/v1.py index 7a7157b352..70a981f4a2 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -31,7 +31,7 @@ POWER_KEY = (EventTypes.PowerLevels, "") @defer.inlineCallbacks -def resolve_events_with_factory(state_sets, event_map, state_map_factory): +def resolve_events_with_store(state_sets, event_map, state_map_factory): """ Args: state_sets(list): List of dicts of (type, state_key) -> event_id, diff --git a/synapse/state/v2.py b/synapse/state/v2.py index d034b4f38e..5d06f7e928 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def resolve_events_with_factory(state_sets, event_map, state_res_store): +def resolve_events_with_store(state_sets, event_map, state_res_store): """Resolves the state using the v2 state resolution algorithm Args: diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index b96bb6b25c..73f2ef7557 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -24,7 +24,7 @@ from synapse.event_auth import auth_types_for_event from synapse.events import FrozenEvent from synapse.state.v2 import ( lexicographical_topological_sort, - resolve_events_with_factory, + resolve_events_with_store, ) from synapse.types import EventID @@ -541,7 +541,7 @@ class StateTestCase(unittest.TestCase): elif len(prev_events) == 1: state_before = dict(state_at_event[prev_events[0]]) else: - state_d = resolve_events_with_factory( + state_d = resolve_events_with_store( [state_at_event[n] for n in prev_events], event_map=event_map, state_res_store=TestStateResolutionStore(event_map), -- cgit 1.5.1 From dacbeb2e03274d347bd3d3919a00a56661dbb002 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Oct 2018 09:47:49 +0100 Subject: Comment --- synapse/handlers/federation.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 91a18a552c..1daa08df7e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -390,6 +390,11 @@ class FederationHandler(BaseHandler): state_res_store=StateResolutionStore(self.store), ) + # We need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. + + # First though we need to fetch all the events that are in + # state_map, so we can build up the state below. evs = yield self.store.get_events( list(state_map.values()), get_prev_content=False, @@ -397,8 +402,6 @@ class FederationHandler(BaseHandler): ) event_map.update(evs) - # we need to give _process_received_pdu the actual state events - # rather than event ids, so generate that now. state = [ event_map[e] for e in six.itervalues(state_map) ] -- cgit 1.5.1