From d78ada31662e0aeeb5d13ddd16aabf432574fffd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 26 Apr 2018 12:34:40 +0100 Subject: Miscellaneous fixes to python_dependencies * add some doc about wtf this thing does * pin Twisted to < 18.4 * add explicit dep on six (fixes #3089) --- synapse/python_dependencies.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 5cabf7dabe..711cbb6c50 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -1,5 +1,6 @@ # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,18 @@ from distutils.version import LooseVersion logger = logging.getLogger(__name__) +# this dict maps from python package name to a list of modules we expect it to +# provide. +# +# the key is a "requirement specifier", as used as a parameter to `pip +# install`[1], or an `install_requires` argument to `setuptools.setup` [2]. +# +# the value is a sequence of strings; each entry should be the name of the +# python module, optionally followed by a version assertion which can be either +# ">=" or "==". +# +# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers. +# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies REQUIREMENTS = { "jsonschema>=2.5.1": ["jsonschema>=2.5.1"], "frozendict>=0.4": ["frozendict"], @@ -26,7 +39,11 @@ REQUIREMENTS = { "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], - "Twisted>=16.0.0": ["twisted>=16.0.0"], + + # we break under Twisted 18.4 + # (https://github.com/matrix-org/synapse/issues/3135) + "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"], + "pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], @@ -39,6 +56,7 @@ REQUIREMENTS = { "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "six": ["six"], } CONDITIONAL_REQUIREMENTS = { "web_client": { -- cgit 1.4.1 From 28dd536e80167677ce45bd8eeee95c6ff6eff66f Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Apr 2018 15:51:39 +0100 Subject: update changelog and bump version to 0.28.0 --- CHANGES.rst | 9 +++++++++ synapse/__init__.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/CHANGES.rst b/CHANGES.rst index b7da58edbb..74f454cb5b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,12 @@ +Changes in synapse v0.28.0-rc1 (2018-04-26) +=========================================== + +Bug Fixes: + +* Fix quarantine media admin API and search reindex (PR #3130) +* Fix media admin APIs (PR #3134) + + Changes in synapse v0.28.0-rc1 (2018-04-24) =========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index 2b2c440eb8..4924f44d4e 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.28.0-rc1" +__version__ = "0.28.0" -- cgit 1.4.1 From 9255a6cb17716c022ebae1dbe9c142b78ca86ea7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:07:40 +0100 Subject: Improve exception handling for background processes There were a bunch of places where we fire off a process to happen in the background, but don't have any exception handling on it - instead relying on the unhandled error being logged when the relevent deferred gets garbage-collected. This is unsatisfactory for a number of reasons: - logging on garbage collection is best-effort and may happen some time after the error, if at all - it can be hard to figure out where the error actually happened. - it is logged as a scary CRITICAL error which (a) I always forget to grep for and (b) it's not really CRITICAL if a background process we don't care about fails. So this is an attempt to add exception handling to everything we fire off into the background. --- synapse/app/appservice.py | 15 +++-- synapse/app/federation_sender.py | 27 +++++---- synapse/app/pusher.py | 31 +++++----- synapse/app/synchrotron.py | 95 ++++++++++++++++--------------- synapse/app/user_dir.py | 13 ++++- synapse/appservice/scheduler.py | 25 ++++---- synapse/crypto/keyring.py | 93 +++++++++++++++--------------- synapse/federation/transaction_queue.py | 2 + synapse/federation/transport/server.py | 13 ++++- synapse/groups/attestations.py | 44 +++++++------- synapse/handlers/message.py | 22 +++++-- synapse/handlers/presence.py | 19 +++++-- synapse/handlers/receipts.py | 61 ++++++++++---------- synapse/handlers/typing.py | 43 +++++++------- synapse/notifier.py | 13 +++-- synapse/push/emailpusher.py | 11 ++-- synapse/push/httppusher.py | 5 +- synapse/rest/media/v1/storage_provider.py | 9 ++- synapse/storage/event_push_actions.py | 24 +++++--- synapse/util/logcontext.py | 7 ++- 20 files changed, 335 insertions(+), 237 deletions(-) (limited to 'synapse') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index f2540023a7..58f2c9d68c 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.appservice") @@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..4f2a9ca21a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -237,19 +237,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..739d113ad5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..777da564d7 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -340,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], - ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 5f845e80d1..5ba7e9b416 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import NoResource logger = logging.getLogger("synapse.app.user_dir") @@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..dfc8d1b42e 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -176,17 +176,20 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + try: + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + except Exception: + logger.exception("Error starting AS recoverer") @defer.inlineCallbacks def _is_service_up(self, service): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..32cbddbc53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -146,53 +146,56 @@ class Keyring(object): verify_requests (List[VerifyKeyRequest]): """ - # create a deferred for each server we're going to look up the keys - # for; we'll resolve them once we have completed our lookups. - # These will be passed into wait_for_previous_lookups to block - # any other lookups until we have finished. - # The deferreds are called with no logcontext. - server_to_deferred = { - rq.server_name: defer.Deferred() - for rq in verify_requests - } - - # We want to wait for any previous lookups to complete before - # proceeding. - yield self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, - ) - - # Actually start fetching keys. - self._get_server_verify_keys(verify_requests) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids - server_to_request_ids = {} - - for verify_request in verify_requests: - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids.setdefault(server_name, set()).add(request_id) - - def remove_deferreds(res, verify_request): - server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res - - for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, + try: + # create a deferred for each server we're going to look up the keys + # for; we'll resolve them once we have completed our lookups. + # These will be passed into wait_for_previous_lookups to block + # any other lookups until we have finished. + # The deferreds are called with no logcontext. + server_to_deferred = { + rq.server_name: defer.Deferred() + for rq in verify_requests + } + + # We want to wait for any previous lookups to complete before + # proceeding. + yield self.wait_for_previous_lookups( + [rq.server_name for rq in verify_requests], + server_to_deferred, ) + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + # + # map from server name to a set of request ids + server_to_request_ids = {} + + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + + def remove_deferreds(res, verify_request): + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res + + for verify_request in verify_requests: + verify_request.deferred.addBoth( + remove_deferreds, verify_request, + ) + except Exception: + logger.exception("Error starting key lookups") + @defer.inlineCallbacks def wait_for_previous_lookups(self, server_names, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 963d938edd..ded2b1871a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -323,6 +323,8 @@ class TransactionQueue(object): break yield self._process_presence_inner(states_map.values()) + except Exception: + logger.exception("Error sending presence states to servers") finally: self._processing_pending_presence = False diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ff0656df3e..19d09f5422 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id import functools @@ -152,11 +152,18 @@ class Authenticator(object): # alive retry_timings = yield self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: - logger.info("Marking origin %r as up", origin) - preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) + run_in_background(self._reset_retry_timings, origin) defer.returnValue(origin) + @defer.inlineCallbacks + def _reset_retry_timings(self, origin): + try: + logger.info("Marking origin %r as up", origin) + yield self.store.set_destination_retry_timings(origin, 0, 0) + except Exception: + logger.exception("Error resetting retry timings on %s", origin) + class BaseFederationServlet(object): REQUIRE_AUTH = True diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..7187df2508 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -165,28 +165,32 @@ class GroupAttestionRenewer(object): @defer.inlineCallbacks def _renew_attestation(group_id, user_id): - if not self.is_mine_id(group_id): - destination = get_domain_from_id(group_id) - elif not self.is_mine_id(user_id): - destination = get_domain_from_id(user_id) - else: - logger.warn( - "Incorrectly trying to do attestations for user: %r in %r", - user_id, group_id, + try: + if not self.is_mine_id(group_id): + destination = get_domain_from_id(group_id) + elif not self.is_mine_id(user_id): + destination = get_domain_from_id(user_id) + else: + logger.warn( + "Incorrectly trying to do attestations for user: %r in %r", + user_id, group_id, + ) + yield self.store.remove_attestation_renewal(group_id, user_id) + return + + attestation = self.attestations.create_attestation(group_id, user_id) + + yield self.transport_client.renew_group_attestation( + destination, group_id, user_id, + content={"attestation": attestation}, ) - yield self.store.remove_attestation_renewal(group_id, user_id) - return - - attestation = self.attestations.create_attestation(group_id, user_id) - yield self.transport_client.renew_group_attestation( - destination, group_id, user_id, - content={"attestation": attestation}, - ) - - yield self.store.update_attestation_renewal( - group_id, user_id, attestation - ) + yield self.store.update_attestation_renewal( + group_id, user_id, attestation + ) + except Exception: + logger.exception("Error renewing attestation of %r in %r", + user_id, group_id) for row in rows: group_id = row["group_id"] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..d168ff5b86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -857,15 +857,25 @@ class EventCreationHandler(object): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") preserve_fn(_notify)() if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..585f3e4da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -254,6 +254,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") + @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes @@ -364,7 +372,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +430,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..823e2e27e1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -205,28 +205,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..939723a404 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -251,9 +251,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id - ) + run_in_background(self._notify_app_services, room_stream_id) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) @@ -267,6 +265,13 @@ class Notifier(object): rooms=[event.room_id], ) + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 58df98a793..ba7286cb72 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -77,10 +77,13 @@ class EmailPusher(object): @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2cbac571b8..1420d378ef 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -94,7 +94,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index c188192f2b..0252afd9d3 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -18,7 +18,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder from synapse.config._base import Config -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import logging import os @@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider): return self.backend.store_file(path, file_info) else: # TODO: Handle errors. - preserve_fn(self.backend.store_file)(path, file_info) + def store(): + try: + return self.backend.store_file(path, file_info) + except Exception: + logger.exception("Error storing file") + run_in_background(store) return defer.succeed(None) def fetch(self, path, file_info): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..d6587e4409 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs): deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield - on. + on (for instance because you want to pass it to deferred.gatherResults()). + + Note that if you completely discard the result, you should make sure that + `f` doesn't raise any deferred exceptions, otherwise a scary-looking + CRITICAL error about an unhandled error will be logged without much + indication about where it came from. """ current = LoggingContext.current_context() res = f(*args, **kwargs) -- cgit 1.4.1 From 605defb9e4c274d61d9da86546e6fd6c78f3f6cf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:16:28 +0100 Subject: Add missing consumeErrors In general we want defer.gatherResults to consumeErrors, rather than having exceptions hanging around and getting logged as CRITICAL unhandled errors. --- synapse/handlers/e2e_keys.py | 4 ++-- synapse/push/pusherpool.py | 8 ++++++-- synapse/storage/stream.py | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..7eb03ad32e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -141,7 +141,7 @@ class E2eKeysHandler(object): yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache - ])) + ], consumeErrors=True)) defer.returnValue({ "device_keys": results, "failures": failures, @@ -244,7 +244,7 @@ class E2eKeysHandler(object): yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries - ])) + ], consumeErrors=True)) logger.info( "Claimed one-time-keys: %s", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..2f467d1f9c 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -142,7 +142,9 @@ class PusherPool: ) ) - yield make_deferred_yieldable(defer.gatherResults(deferreds)) + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) except Exception: logger.exception("Exception in pusher on_new_notifications") @@ -167,7 +169,9 @@ class PusherPool: preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield make_deferred_yieldable(defer.gatherResults(deferreds)) + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..3b8b539993 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -202,7 +202,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) -- cgit 1.4.1 From 6493b22b42685a6cc06c1113196d305e4d52deed Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:40:06 +0100 Subject: reraise exceptions more carefully We need to be careful (under python 2, at least) that when we reraise an exception after doing some error handling, we actually reraise the original exception rather than anything that might have been raised (and handled) during the error handling. --- synapse/handlers/federation.py | 16 ++++++++++------ synapse/handlers/message.py | 21 ++++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..260df025f9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -19,9 +19,11 @@ import httplib import itertools import logging +import sys from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +import six from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -1513,12 +1515,14 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - logcontext.preserve_fn( - self.store.remove_push_actions_from_staging - )(event.event_id) - raise + tp, value, tb = sys.exc_info() + + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) if not backfilled: # this intentionally does not yield: we don't care about the result diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..8e2e44bdcd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,6 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import simplejson +import sys + +from canonicaljson import encode_canonical_json +import six from twisted.internet import defer, reactor from twisted.python.failure import Failure @@ -34,11 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler -from canonicaljson import encode_canonical_json - -import logging -import simplejson - logger = logging.getLogger(__name__) @@ -729,8 +730,14 @@ class EventCreationHandler(object): except: # noqa: E722, as we reraise the exception this is fine. # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise + tp, value, tb = sys.exc_info() + + run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) @defer.inlineCallbacks def persist_and_notify_client_event( -- cgit 1.4.1