From 6493b22b42685a6cc06c1113196d305e4d52deed Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:40:06 +0100 Subject: reraise exceptions more carefully We need to be careful (under python 2, at least) that when we reraise an exception after doing some error handling, we actually reraise the original exception rather than anything that might have been raised (and handled) during the error handling. --- synapse/handlers/federation.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..260df025f9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -19,9 +19,11 @@ import httplib import itertools import logging +import sys from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +import six from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -1513,12 +1515,14 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - logcontext.preserve_fn( - self.store.remove_push_actions_from_staging - )(event.event_id) - raise + tp, value, tb = sys.exc_info() + + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) if not backfilled: # this intentionally does not yield: we don't care about the result -- cgit 1.4.1 From 2a13af23bc0561ab48e0a90528231c40ee209724 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:29:27 +0100 Subject: Use run_in_background in preference to preserve_fn While I was going through uses of preserve_fn for other PRs, I converted places which only use the wrapped function once to use run_in_background, to avoid creating the function object. --- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 5 ++--- synapse/appservice/scheduler.py | 12 ++++++------ synapse/crypto/keyring.py | 28 ++++++++++++++++----------- synapse/federation/federation_client.py | 5 +++-- synapse/groups/attestations.py | 4 ++-- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/e2e_keys.py | 6 +++--- synapse/handlers/federation.py | 16 +++++++++------ synapse/handlers/initial_sync.py | 12 +++++++----- synapse/handlers/message.py | 5 +++-- synapse/handlers/typing.py | 7 ++++--- synapse/push/pusherpool.py | 20 +++++++++++-------- synapse/rest/media/v1/preview_url_resource.py | 5 +++-- synapse/storage/events_worker.py | 5 +++-- synapse/storage/stream.py | 5 +++-- synapse/util/async.py | 4 ++-- synapse/util/file_consumer.py | 6 ++++-- synapse/util/logcontext.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 4 ++-- 22 files changed, 97 insertions(+), 71 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..c6daa0d43f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -229,7 +229,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..8bd5c0c2b7 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..0c4ccc58bc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..ba1631b5c8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -51,7 +51,7 @@ components. from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure import logging @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..38944a7326 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( PreserveLoggingContext, - preserve_fn + preserve_fn, + run_in_background, ) from synapse.util.metrics import Measure @@ -127,7 +128,7 @@ class Keyring(object): verify_requests.append(verify_request) - preserve_fn(self._start_key_lookups)(verify_requests) + run_in_background(self._start_key_lookups, verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -313,7 +314,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - preserve_fn(do_iterations)().addErrback(on_err) + run_in_background(do_iterations).addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): @@ -329,8 +330,9 @@ class Keyring(object): """ res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.get_server_verify_keys)( - server_name, key_ids + run_in_background( + self.store.get_server_verify_keys, + server_name, key_ids, ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], @@ -358,7 +360,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(p_name, p_keys) + run_in_background(get_key, p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, @@ -398,7 +400,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(server_name, key_ids) + run_in_background(get_key, server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, @@ -481,7 +483,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -539,7 +542,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -615,7 +619,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_keys_json)( + run_in_background( + self.store.store_server_keys_json, server_name=server_name, key_id=key_id, from_server=server_name, @@ -716,7 +721,8 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. return logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_verify_key)( + run_in_background( + self.store.store_server_verify_key, server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..8adc60863e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,7 +33,7 @@ from synapse.federation.federation_base import ( import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination @@ -417,7 +417,8 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..5f53f17954 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -42,7 +42,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from signedjson.sign import sign_json @@ -192,4 +192,4 @@ class GroupAttestionRenewer(object): group_id = row["group_id"] user_id = row["user_id"] - preserve_fn(_renew_attestation)(group_id, user_id) + run_in_background(_renew_attestation, group_id, user_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0245197c02..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -198,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..fc958404a1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,7 +139,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache ])) @@ -242,7 +242,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries ])) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..c66ca0f381 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -637,7 +637,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..244b98dd8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,7 +850,8 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @@ -862,7 +863,7 @@ class EventCreationHandler(object): extra_users=extra_users ) - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..19cde70adf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..7bb5733090 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -137,8 +137,9 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) @@ -164,7 +165,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) yield make_deferred_yieldable(defer.gatherResults(deferreds)) @@ -207,7 +211,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0fc21540c6..9290d7946f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ from ._base import FileInfo from synapse.api.errors import ( SynapseError, Codes, ) -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -144,7 +144,8 @@ class PreviewUrlResource(Resource): observable = self._cache.get(url) if not observable: - download = preserve_fn(self._do_preview)( + download = run_in_background( + self._do_preview, url, requester.user, ts, ) observable = ObservableDeferred( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..5b245a936c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc @@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..bd07067328 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit): pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() + run_in_background(_concurrently_execute_inner) for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3c8a165331..3380970e4e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from six.moves import queue @@ -70,7 +70,9 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + self._finished_deferred = run_in_background( + threads.deferToThread, self._writer + ) if not streaming: self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..c2edf87e58 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred): returning a deferred. Then, when the deferred completes, restores the current logcontext before running callbacks/errbacks. - (This is more-or-less the opposite operation to preserve_fn.) + (This is more-or-less the opposite operation to run_in_background.) """ if isinstance(deferred, defer.Deferred) and not deferred.called: prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 1101881a2d..18424f6c36 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import collections import contextlib @@ -150,7 +150,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) + ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 47b0bb5eb3..4e93f69d3a 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -203,8 +203,8 @@ class RetryDestinationLimiter(object): ) except Exception: logger.exception( - "Failed to store set_destination_retry_timings", + "Failed to store destination_retry_timings", ) # we deliberately do this in the background. - synapse.util.logcontext.preserve_fn(store_retry_timings)() + synapse.util.logcontext.run_in_background(store_retry_timings) -- cgit 1.4.1 From 94f4d7f49ed322dac38e8946bba590ac41c0f40f Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 11:46:48 +0200 Subject: move httplib import to six --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 260df025f9..24ff6782c5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -16,7 +16,6 @@ """Contains handlers for federation events.""" -import httplib import itertools import logging import sys @@ -24,6 +23,7 @@ import sys from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json import six +from six.moves import http_client from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -889,7 +889,7 @@ class FederationHandler(BaseHandler): logger.warn("Rejecting event %s which has %i prev_events", ev.event_id, len(ev.prev_events)) raise SynapseError( - httplib.BAD_REQUEST, + http_client.BAD_REQUEST, "Too many prev_events", ) @@ -897,7 +897,7 @@ class FederationHandler(BaseHandler): logger.warn("Rejecting event %s which has %i auth_events", ev.event_id, len(ev.auth_events)) raise SynapseError( - httplib.BAD_REQUEST, + http_client.BAD_REQUEST, "Too many auth_events", ) -- cgit 1.4.1