summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/appservice.py19
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/event_creator.py4
-rw-r--r--synapse/app/federation_reader.py4
-rw-r--r--synapse/app/federation_sender.py35
-rw-r--r--synapse/app/frontend_proxy.py6
-rwxr-xr-xsynapse/app/homeserver.py44
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/app/pusher.py39
-rw-r--r--synapse/app/synchrotron.py108
-rwxr-xr-xsynapse/app/synctl.py5
-rw-r--r--synapse/app/user_dir.py17
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/appservice/api.py11
-rw-r--r--synapse/appservice/scheduler.py37
15 files changed, 202 insertions, 139 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index c6fe4516d1..58f2c9d68c 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -32,11 +32,11 @@ from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.internet import reactor, defer
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.appservice")
 
@@ -64,7 +64,7 @@ class AppserviceServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
-            preserve_fn(
-                self.appservice_handler.notify_interested_services
-            )(max_stream_id)
+            run_in_background(self._notify_app_services, max_stream_id)
+
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
 
 
 def start(config_options):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 0a8ce9bc66..267d34c881 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.client_reader")
 
@@ -88,7 +88,7 @@ class ClientReaderServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 172e989b54..b915d12d53 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -52,7 +52,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.event_creator")
 
@@ -104,7 +104,7 @@ class EventCreatorServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 20d157911b..c1dc66dd17 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -41,7 +41,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_reader")
 
@@ -77,7 +77,7 @@ class FederationReaderServer(HomeServer):
                         FEDERATION_PREFIX: TransportLayerServer(self),
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index f760826d27..a08af83a4c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,11 +38,11 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_sender")
 
@@ -91,7 +91,7 @@ class FederationSenderServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -229,7 +229,7 @@ class FederationSenderHandler(object):
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            preserve_fn(self.update_token)(token)
+            run_in_background(self.update_token, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -237,19 +237,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 816c080d18..b349e3e3ce 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
@@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
             # They're actually trying to upload something, proxy to main synapse.
             # Pass through the auth headers, if any, in case the access token
             # is there.
-            auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+            auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
             headers = {
                 "Authorization": auth_headers,
             }
@@ -142,7 +142,7 @@ class FrontendProxyServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e477c7ced6..a0e465d644 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
@@ -56,7 +57,7 @@ from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 from twisted.application import service
 from twisted.internet import defer, reactor
-from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.resource import EncodingResourceWrapper, NoResource
 from twisted.web.server import GzipEncoderFactory
 from twisted.web.static import File
 
@@ -126,7 +127,7 @@ class SynapseHomeServer(HomeServer):
         if WEB_CLIENT_PREFIX in resources:
             root_resource = RootRedirect(WEB_CLIENT_PREFIX)
         else:
-            root_resource = Resource()
+            root_resource = NoResource()
 
         root_resource = create_resource_tree(resources, root_resource)
 
@@ -402,6 +403,10 @@ def run(hs):
 
     stats = {}
 
+    # Contains the list of processes we will be monitoring
+    # currently either 0 or 1
+    stats_process = []
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -425,8 +430,21 @@ def run(hs):
         stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
+        r30_results = yield hs.get_datastore().count_r30_users()
+        for name, count in r30_results.iteritems():
+            stats["r30_users_" + name] = count
+
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
         stats["daily_sent_messages"] = daily_sent_messages
+        stats["cache_factor"] = CACHE_SIZE_FACTOR
+        stats["event_cache_size"] = hs.config.event_cache_size
+
+        if len(stats_process) > 0:
+            stats["memory_rss"] = 0
+            stats["cpu_average"] = 0
+            for process in stats_process:
+                stats["memory_rss"] += process.memory_info().rss
+                stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -437,10 +455,32 @@ def run(hs):
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
 
+    def performance_stats_init():
+        try:
+            import psutil
+            process = psutil.Process()
+            # Ensure we can fetch both, and make the initial request for cpu_percent
+            # so the next request will use this as the initial point.
+            process.memory_info().rss
+            process.cpu_percent(interval=None)
+            logger.info("report_stats can use psutil")
+            stats_process.append(process)
+        except (ImportError, AttributeError):
+            logger.warn(
+                "report_stats enabled but psutil is not installed or incorrect version."
+                " Disabling reporting of memory/cpu stats."
+                " Ensuring psutil is available will help matrix.org track performance"
+                " changes across releases."
+            )
+
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
         clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
 
+        # We need to defer this init for the cases that we daemonize
+        # otherwise the process ID we get is that of the non-daemon process
+        clock.call_later(0, performance_stats_init)
+
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 84c5791b3b..fc8282bbc1 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -43,7 +43,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.media_repository")
 
@@ -84,7 +84,7 @@ class MediaRepositoryServer(HomeServer):
                         ),
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 98a4a7c62c..26930d1b3b 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,11 +33,11 @@ from synapse.server import HomeServer
 from synapse.storage import DataStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.pusher")
 
@@ -94,7 +94,7 @@ class PusherServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -140,24 +140,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        preserve_fn(self.poke_pushers)(stream_name, token, rows)
+        run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
-        if stream_name == "pushers":
-            for row in rows:
-                if row.deleted:
-                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
-                else:
-                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
-        elif stream_name == "events":
-            yield self.pusher_pool.on_new_notifications(
-                token, token,
-            )
-        elif stream_name == "receipts":
-            yield self.pusher_pool.on_new_receipts(
-                token, token, set(row.room_id for row in rows)
-            )
+        try:
+            if stream_name == "pushers":
+                for row in rows:
+                    if row.deleted:
+                        yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    else:
+                        yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+            elif stream_name == "events":
+                yield self.pusher_pool.on_new_notifications(
+                    token, token,
+                )
+            elif stream_name == "receipts":
+                yield self.pusher_pool.on_new_receipts(
+                    token, token, set(row.room_id for row in rows)
+                )
+        except Exception:
+            logger.exception("Error poking pushers")
 
     def stop_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index abe91dcfbd..7152b1deb4 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -51,12 +51,14 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
+
+from six import iteritems
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
 
     def get_currently_syncing_users(self):
         return [
-            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
             if count > 0
         ]
 
@@ -269,7 +271,7 @@ class SynchrotronServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -325,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
-
-        preserve_fn(self.process_and_notify)(stream_name, token, rows)
+        run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -338,55 +339,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     @defer.inlineCallbacks
     def process_and_notify(self, stream_name, token, rows):
-        if stream_name == "events":
-            # We shouldn't get multiple rows per token for events stream, so
-            # we don't need to optimise this for multiple rows.
-            for row in rows:
-                event = yield self.store.get_event(row.event_id)
-                extra_users = ()
-                if event.type == EventTypes.Member:
-                    extra_users = (event.state_key,)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(
-                    event, token, max_token, extra_users
+        try:
+            if stream_name == "events":
+                # We shouldn't get multiple rows per token for events stream, so
+                # we don't need to optimise this for multiple rows.
+                for row in rows:
+                    event = yield self.store.get_event(row.event_id)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    max_token = self.store.get_room_max_stream_ordering()
+                    self.notifier.on_new_room_event(
+                        event, token, max_token, extra_users
+                    )
+            elif stream_name == "push_rules":
+                self.notifier.on_new_event(
+                    "push_rules_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "push_rules":
-            self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name in ("account_data", "tag_account_data",):
-            self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "typing":
-            self.typing_handler.process_replication_rows(token, rows)
-            self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "to_device":
-            entities = [row.entity for row in rows if row.entity.startswith("@")]
-            if entities:
+            elif stream_name in ("account_data", "tag_account_data",):
                 self.notifier.on_new_event(
-                    "to_device_key", token, users=entities,
+                    "account_data_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "device_lists":
-            all_room_ids = set()
-            for row in rows:
-                room_ids = yield self.store.get_rooms_for_user(row.user_id)
-                all_room_ids.update(room_ids)
-            self.notifier.on_new_event(
-                "device_list_key", token, rooms=all_room_ids,
-            )
-        elif stream_name == "presence":
-            yield self.presence_handler.process_replication_rows(token, rows)
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "groups_key", token, users=[row.user_id for row in rows],
-            )
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "typing":
+                self.typing_handler.process_replication_rows(token, rows)
+                self.notifier.on_new_event(
+                    "typing_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "to_device":
+                entities = [row.entity for row in rows if row.entity.startswith("@")]
+                if entities:
+                    self.notifier.on_new_event(
+                        "to_device_key", token, users=entities,
+                    )
+            elif stream_name == "device_lists":
+                all_room_ids = set()
+                for row in rows:
+                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    all_room_ids.update(room_ids)
+                self.notifier.on_new_event(
+                    "device_list_key", token, rooms=all_room_ids,
+                )
+            elif stream_name == "presence":
+                yield self.presence_handler.process_replication_rows(token, rows)
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "groups_key", token, users=[row.user_id for row in rows],
+                )
+        except Exception:
+            logger.exception("Error processing replication")
 
 
 def start(config_options):
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 0f0ddfa78a..712dfa870e 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -38,7 +38,7 @@ def pid_running(pid):
     try:
         os.kill(pid, 0)
         return True
-    except OSError, err:
+    except OSError as err:
         if err.errno == errno.EPERM:
             return True
         return False
@@ -98,7 +98,7 @@ def stop(pidfile, app):
         try:
             os.kill(pid, signal.SIGTERM)
             write("stopped %s" % (app,), colour=GREEN)
-        except OSError, err:
+        except OSError as err:
             if err.errno == errno.ESRCH:
                 write("%s not running" % (app,), colour=YELLOW)
             elif err.errno == errno.EPERM:
@@ -252,6 +252,7 @@ def main():
             for running_pid in running_pids:
                 while pid_running(running_pid):
                     time.sleep(0.2)
+            write("All processes exited; now restarting...")
 
     if action == "start" or action == "restart":
         if start_stop_synapse:
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 494ccb702c..5ba7e9b416 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -39,11 +39,11 @@ from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.internet import reactor, defer
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.user_dir")
 
@@ -116,7 +116,7 @@ class UserDirectoryServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
-            preserve_fn(self.user_directory.notify_new_event)()
+            run_in_background(self._notify_directory)
+
+    @defer.inlineCallbacks
+    def _notify_directory(self):
+        try:
+            yield self.user_directory.notify_new_event()
+        except Exception:
+            logger.exception("Error notifiying user directory of state update")
 
 
 def start(config_options):
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d5a7a5ce2f..5fdb579723 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -21,6 +21,8 @@ from twisted.internet import defer
 import logging
 import re
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -146,7 +148,7 @@ class ApplicationService(object):
                         )
 
                 regex = regex_obj.get("regex")
-                if isinstance(regex, basestring):
+                if isinstance(regex, string_types):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
                     raise ValueError(
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 40c433d7ae..00efff1464 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+                                                 timeout_ms=HOUR_IN_MS)
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
@@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        result = self.protocol_meta_cache.get(key)
-        if not result:
-            result = self.protocol_meta_cache.set(
-                key, preserve_fn(_get)()
-            )
-        return make_deferred_yieldable(result)
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6da315473d..6eddbc0828 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -51,7 +51,7 @@ components.
 from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
 import logging
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
     def enqueue(self, service, event):
         # if this service isn't being sent something
         self.queued_events.setdefault(service.id, []).append(event)
-        preserve_fn(self._send_request)(service)
+        run_in_background(self._send_request, service)
 
     @defer.inlineCallbacks
     def _send_request(self, service):
@@ -152,10 +152,10 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    preserve_fn(self._start_recoverer)(service)
-        except Exception as e:
-            logger.exception(e)
-            preserve_fn(self._start_recoverer)(service)
+                    run_in_background(self._start_recoverer, service)
+        except Exception:
+            logger.exception("Error creating appservice transaction")
+            run_in_background(self._start_recoverer, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
@@ -176,17 +176,20 @@ class _TransactionController(object):
 
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
-        yield self.store.set_appservice_state(
-            service,
-            ApplicationServiceState.DOWN
-        )
-        logger.info(
-            "Application service falling behind. Starting recoverer. AS ID %s",
-            service.id
-        )
-        recoverer = self.recoverer_fn(service, self.on_recovered)
-        self.add_recoverers([recoverer])
-        recoverer.recover()
+        try:
+            yield self.store.set_appservice_state(
+                service,
+                ApplicationServiceState.DOWN
+            )
+            logger.info(
+                "Application service falling behind. Starting recoverer. AS ID %s",
+                service.id
+            )
+            recoverer = self.recoverer_fn(service, self.on_recovered)
+            self.add_recoverers([recoverer])
+            recoverer.recover()
+        except Exception:
+            logger.exception("Error starting AS recoverer")
 
     @defer.inlineCallbacks
     def _is_service_up(self, service):