diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-04-27 11:07:40 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-04-27 11:07:40 +0100 |
commit | 9255a6cb17716c022ebae1dbe9c142b78ca86ea7 (patch) | |
tree | 185b1b00af216695daca391eb62a871e78d32e49 /synapse | |
parent | Merge pull request #3134 from matrix-org/erikj/fix_admin_media_api (diff) | |
download | synapse-9255a6cb17716c022ebae1dbe9c142b78ca86ea7.tar.xz |
Improve exception handling for background processes
There were a bunch of places where we fire off a process to happen in the background, but don't have any exception handling on it - instead relying on the unhandled error being logged when the relevent deferred gets garbage-collected. This is unsatisfactory for a number of reasons: - logging on garbage collection is best-effort and may happen some time after the error, if at all - it can be hard to figure out where the error actually happened. - it is logged as a scary CRITICAL error which (a) I always forget to grep for and (b) it's not really CRITICAL if a background process we don't care about fails. So this is an attempt to add exception handling to everything we fire off into the background.
Diffstat (limited to '')
-rw-r--r-- | synapse/app/appservice.py | 15 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 27 | ||||
-rw-r--r-- | synapse/app/pusher.py | 31 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 95 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 13 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 25 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 93 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 13 | ||||
-rw-r--r-- | synapse/groups/attestations.py | 44 | ||||
-rw-r--r-- | synapse/handlers/message.py | 22 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 19 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 61 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 43 | ||||
-rw-r--r-- | synapse/notifier.py | 13 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 11 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 5 | ||||
-rw-r--r-- | synapse/rest/media/v1/storage_provider.py | 9 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 24 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 7 |
20 files changed, 335 insertions, 237 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index f2540023a7..58f2c9d68c 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..4f2a9ca21a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -237,19 +237,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..739d113ad5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..777da564d7 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -340,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], - ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5f845e80d1..5ba7e9b416 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..dfc8d1b42e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -176,17 +176,20 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + try: + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + except Exception: + logger.exception("Error starting AS recoverer") @defer.inlineCallbacks def _is_service_up(self, service): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..32cbddbc53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -146,53 +146,56 @@ class Keyring(object): verify_requests (List[VerifyKeyRequest]): """ - # create a deferred for each server we're going to look up the keys - # for; we'll resolve them once we have completed our lookups. - # These will be passed into wait_for_previous_lookups to block - # any other lookups until we have finished. - # The deferreds are called with no logcontext. - server_to_deferred = { - rq.server_name: defer.Deferred() - for rq in verify_requests - } - - # We want to wait for any previous lookups to complete before - # proceeding. - yield self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, - ) - - # Actually start fetching keys. - self._get_server_verify_keys(verify_requests) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids - server_to_request_ids = {} - - for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) - - def remove_deferreds(res, verify_request): - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res - - for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, + try: + # create a deferred for each server we're going to look up the keys + # for; we'll resolve them once we have completed our lookups. + # These will be passed into wait_for_previous_lookups to block + # any other lookups until we have finished. + # The deferreds are called with no logcontext. + server_to_deferred = { + rq.server_name: defer.Deferred() + for rq in verify_requests + } + + # We want to wait for any previous lookups to complete before + # proceeding. + yield self.wait_for_previous_lookups( + [rq.server_name for rq in verify_requests], + server_to_deferred, ) + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + # + # map from server name to a set of request ids + server_to_request_ids = {} + + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res + + for verify_request in verify_requests: + verify_request.deferred.addBoth( + remove_deferreds, verify_request, + ) + except Exception: + logger.exception("Error starting key lookups") + @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 963d938edd..ded2b1871a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -323,6 +323,8 @@ class TransactionQueue(object): break yield self._process_presence_inner(states_map.values()) + except Exception: + logger.exception("Error sending presence states to servers") finally: self._processing_pending_presence = False diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ff0656df3e..19d09f5422 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id import functools @@ -152,11 +152,18 @@ class Authenticator(object): # alive retry_timings = yield self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: - logger.info("Marking origin %r as up", origin) - preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) + run_in_background(self._reset_retry_timings, origin) defer.returnValue(origin) + @defer.inlineCallbacks + def _reset_retry_timings(self, origin): + try: + logger.info("Marking origin %r as up", origin) + yield self.store.set_destination_retry_timings(origin, 0, 0) + except Exception: + logger.exception("Error resetting retry timings on %s", origin) + class BaseFederationServlet(object): REQUIRE_AUTH = True diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..7187df2508 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -165,28 +165,32 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestation(group_id, user_id): - if not self.is_mine_id(group_id): - destination = get_domain_from_id(group_id) - elif not self.is_mine_id(user_id): - destination = get_domain_from_id(user_id) - else: - logger.warn( - "Incorrectly trying to do attestations for user: %r in %r", - user_id, group_id, + try: + if not self.is_mine_id(group_id): + destination = get_domain_from_id(group_id) + elif not self.is_mine_id(user_id): + destination = get_domain_from_id(user_id) + else: + logger.warn( + "Incorrectly trying to do attestations for user: %r in %r", + user_id, group_id, + ) + yield self.store.remove_attestation_renewal(group_id, user_id) + return + + attestation = self.attestations.create_attestation(group_id, user_id) + + yield self.transport_client.renew_group_attestation( + destination, group_id, user_id, + content={"attestation": attestation}, ) - yield self.store.remove_attestation_renewal(group_id, user_id) - return - - attestation = self.attestations.create_attestation(group_id, user_id) - yield self.transport_client.renew_group_attestation( - destination, group_id, user_id, - content={"attestation": attestation}, - ) - - yield self.store.update_attestation_renewal( - group_id, user_id, attestation - ) + yield self.store.update_attestation_renewal( + group_id, user_id, attestation + ) + except Exception: + logger.exception("Error renewing attestation of %r in %r", + user_id, group_id) for row in rows: group_id = row["group_id"] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..d168ff5b86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -857,15 +857,25 @@ class EventCreationHandler(object): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") preserve_fn(_notify)() if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..585f3e4da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -255,6 +255,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -364,7 +372,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +430,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..823e2e27e1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -205,28 +205,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..939723a404 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -251,9 +251,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id - ) + run_in_background(self._notify_app_services, room_stream_id) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) @@ -267,6 +265,13 @@ class Notifier(object): rooms=[event.room_id], ) + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 58df98a793..ba7286cb72 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -77,10 +77,13 @@ class EmailPusher(object): @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2cbac571b8..1420d378ef 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -94,7 +94,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index c188192f2b..0252afd9d3 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -18,7 +18,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder from synapse.config._base import Config -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import logging import os @@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider): return self.backend.store_file(path, file_info) else: # TODO: Handle errors. - preserve_fn(self.backend.store_file)(path, file_info) + def store(): + try: + return self.backend.store_file(path, file_info) + except Exception: + logger.exception("Error storing file") + run_in_background(store) return defer.succeed(None) def fetch(self, path, file_info): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..d6587e4409 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs): deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield - on. + on (for instance because you want to pass it to deferred.gatherResults()). + + Note that if you completely discard the result, you should make sure that + `f` doesn't raise any deferred exceptions, otherwise a scary-looking + CRITICAL error about an unhandled error will be logged without much + indication about where it came from. """ current = LoggingContext.current_context() res = f(*args, **kwargs) |