summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/app/appservice.py15
-rw-r--r--synapse/app/federation_sender.py31
-rw-r--r--synapse/app/pusher.py35
-rw-r--r--synapse/app/synchrotron.py104
-rw-r--r--synapse/app/user_dir.py13
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/appservice/scheduler.py37
-rw-r--r--synapse/config/_base.py6
-rw-r--r--synapse/config/appservice.py4
-rw-r--r--synapse/config/logger.py2
-rw-r--r--synapse/config/tls.py4
-rw-r--r--synapse/crypto/keyring.py129
-rw-r--r--synapse/federation/federation_client.py11
-rw-r--r--synapse/federation/federation_server.py55
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py2
-rw-r--r--synapse/federation/transport/server.py13
-rw-r--r--synapse/groups/attestations.py48
-rw-r--r--synapse/handlers/appservice.py19
-rw-r--r--synapse/handlers/e2e_keys.py10
-rw-r--r--synapse/handlers/federation.py141
-rw-r--r--synapse/handlers/initial_sync.py12
-rw-r--r--synapse/handlers/message.py128
-rw-r--r--synapse/handlers/presence.py19
-rw-r--r--synapse/handlers/receipts.py61
-rw-r--r--synapse/handlers/room_list.py42
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/handlers/typing.py50
-rw-r--r--synapse/http/__init__.py22
-rw-r--r--synapse/http/client.py24
-rw-r--r--synapse/http/endpoint.py2
-rw-r--r--synapse/http/matrixfederationclient.py41
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/notifier.py43
-rw-r--r--synapse/push/emailpusher.py11
-rw-r--r--synapse/push/httppusher.py9
-rw-r--r--synapse/push/pusher.py2
-rw-r--r--synapse/push/pusherpool.py28
-rw-r--r--synapse/python_dependencies.py20
-rw-r--r--synapse/replication/http/send_event.py18
-rw-r--r--synapse/replication/tcp/protocol.py4
-rw-r--r--synapse/replication/tcp/resource.py4
-rw-r--r--synapse/rest/client/v1/login.py2
-rw-r--r--synapse/rest/client/v1/register.py16
-rw-r--r--synapse/rest/client/v1/room.py9
-rw-r--r--synapse/rest/client/v2_alpha/register.py10
-rw-r--r--synapse/rest/media/v1/_base.py2
-rw-r--r--synapse/rest/media/v1/media_repository.py2
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py5
-rw-r--r--synapse/rest/media/v1/storage_provider.py9
-rw-r--r--synapse/rest/media/v1/upload_resource.py6
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/event_push_actions.py24
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/events_worker.py5
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/stream.py11
-rw-r--r--synapse/storage/tags.py4
-rw-r--r--synapse/util/__init__.py56
-rw-r--r--synapse/util/async.py77
-rw-r--r--synapse/util/caches/response_cache.py88
-rw-r--r--synapse/util/file_consumer.py10
-rw-r--r--synapse/util/httpresourcetree.py7
-rw-r--r--synapse/util/logcontext.py10
-rw-r--r--synapse/util/logformatter.py2
-rw-r--r--synapse/util/ratelimitutils.py4
-rw-r--r--synapse/util/retryutils.py4
-rw-r--r--synapse/util/stringutils.py5
-rw-r--r--synapse/util/wheel_timer.py4
75 files changed, 1105 insertions, 679 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 0c1c16b9a4..4924f44d4e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.27.4"
+__version__ = "0.28.0"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index bee59e80dd..a9ff5576f3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -18,6 +18,7 @@
 import logging
 
 import simplejson as json
+from six import iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
         A dict representing the error response JSON.
     """
     err = {"error": msg, "errcode": code}
-    for key, value in kwargs.iteritems():
+    for key, value in iteritems(kwargs):
         err[key] = value
     return err
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index f2540023a7..58f2c9d68c 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.appservice")
@@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
-            preserve_fn(
-                self.appservice_handler.notify_interested_services
-            )(max_stream_id)
+            run_in_background(self._notify_app_services, max_stream_id)
+
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
 
 
 def start(config_options):
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cc3331519..a08af83a4c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
@@ -229,7 +229,7 @@ class FederationSenderHandler(object):
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            preserve_fn(self.update_token)(token)
+            run_in_background(self.update_token, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -237,19 +237,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index d5c3a85195..26930d1b3b 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,7 +33,7 @@ from synapse.server import HomeServer
 from synapse.storage import DataStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
@@ -140,24 +140,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        preserve_fn(self.poke_pushers)(stream_name, token, rows)
+        run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
-        if stream_name == "pushers":
-            for row in rows:
-                if row.deleted:
-                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
-                else:
-                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
-        elif stream_name == "events":
-            yield self.pusher_pool.on_new_notifications(
-                token, token,
-            )
-        elif stream_name == "receipts":
-            yield self.pusher_pool.on_new_receipts(
-                token, token, set(row.room_id for row in rows)
-            )
+        try:
+            if stream_name == "pushers":
+                for row in rows:
+                    if row.deleted:
+                        yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    else:
+                        yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+            elif stream_name == "events":
+                yield self.pusher_pool.on_new_notifications(
+                    token, token,
+                )
+            elif stream_name == "receipts":
+                yield self.pusher_pool.on_new_receipts(
+                    token, token, set(row.room_id for row in rows)
+                )
+        except Exception:
+            logger.exception("Error poking pushers")
 
     def stop_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 508b66613d..7152b1deb4 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -51,13 +51,15 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
 from twisted.web.resource import NoResource
 
+from six import iteritems
+
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
 
     def get_currently_syncing_users(self):
         return [
-            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
             if count > 0
         ]
 
@@ -325,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
-
-        preserve_fn(self.process_and_notify)(stream_name, token, rows)
+        run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -338,55 +339,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     @defer.inlineCallbacks
     def process_and_notify(self, stream_name, token, rows):
-        if stream_name == "events":
-            # We shouldn't get multiple rows per token for events stream, so
-            # we don't need to optimise this for multiple rows.
-            for row in rows:
-                event = yield self.store.get_event(row.event_id)
-                extra_users = ()
-                if event.type == EventTypes.Member:
-                    extra_users = (event.state_key,)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(
-                    event, token, max_token, extra_users
+        try:
+            if stream_name == "events":
+                # We shouldn't get multiple rows per token for events stream, so
+                # we don't need to optimise this for multiple rows.
+                for row in rows:
+                    event = yield self.store.get_event(row.event_id)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    max_token = self.store.get_room_max_stream_ordering()
+                    self.notifier.on_new_room_event(
+                        event, token, max_token, extra_users
+                    )
+            elif stream_name == "push_rules":
+                self.notifier.on_new_event(
+                    "push_rules_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "push_rules":
-            self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name in ("account_data", "tag_account_data",):
-            self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "typing":
-            self.typing_handler.process_replication_rows(token, rows)
-            self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "to_device":
-            entities = [row.entity for row in rows if row.entity.startswith("@")]
-            if entities:
+            elif stream_name in ("account_data", "tag_account_data",):
                 self.notifier.on_new_event(
-                    "to_device_key", token, users=entities,
+                    "account_data_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "device_lists":
-            all_room_ids = set()
-            for row in rows:
-                room_ids = yield self.store.get_rooms_for_user(row.user_id)
-                all_room_ids.update(room_ids)
-            self.notifier.on_new_event(
-                "device_list_key", token, rooms=all_room_ids,
-            )
-        elif stream_name == "presence":
-            yield self.presence_handler.process_replication_rows(token, rows)
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "groups_key", token, users=[row.user_id for row in rows],
-            )
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "typing":
+                self.typing_handler.process_replication_rows(token, rows)
+                self.notifier.on_new_event(
+                    "typing_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "to_device":
+                entities = [row.entity for row in rows if row.entity.startswith("@")]
+                if entities:
+                    self.notifier.on_new_event(
+                        "to_device_key", token, users=entities,
+                    )
+            elif stream_name == "device_lists":
+                all_room_ids = set()
+                for row in rows:
+                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    all_room_ids.update(room_ids)
+                self.notifier.on_new_event(
+                    "device_list_key", token, rooms=all_room_ids,
+                )
+            elif stream_name == "presence":
+                yield self.presence_handler.process_replication_rows(token, rows)
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "groups_key", token, users=[row.user_id for row in rows],
+                )
+        except Exception:
+            logger.exception("Error processing replication")
 
 
 def start(config_options):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 5f845e80d1..5ba7e9b416 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.user_dir")
@@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
-            preserve_fn(self.user_directory.notify_new_event)()
+            run_in_background(self._notify_directory)
+
+    @defer.inlineCallbacks
+    def _notify_directory(self):
+        try:
+            yield self.user_directory.notify_new_event()
+        except Exception:
+            logger.exception("Error notifiying user directory of state update")
 
 
 def start(config_options):
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 11e9c37c63..00efff1464 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        result = self.protocol_meta_cache.get(key)
-        if not result:
-            result = self.protocol_meta_cache.set(
-                key, preserve_fn(_get)()
-            )
-        return make_deferred_yieldable(result)
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6da315473d..6eddbc0828 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -51,7 +51,7 @@ components.
 from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
 import logging
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
     def enqueue(self, service, event):
         # if this service isn't being sent something
         self.queued_events.setdefault(service.id, []).append(event)
-        preserve_fn(self._send_request)(service)
+        run_in_background(self._send_request, service)
 
     @defer.inlineCallbacks
     def _send_request(self, service):
@@ -152,10 +152,10 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    preserve_fn(self._start_recoverer)(service)
-        except Exception as e:
-            logger.exception(e)
-            preserve_fn(self._start_recoverer)(service)
+                    run_in_background(self._start_recoverer, service)
+        except Exception:
+            logger.exception("Error creating appservice transaction")
+            run_in_background(self._start_recoverer, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
@@ -176,17 +176,20 @@ class _TransactionController(object):
 
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
-        yield self.store.set_appservice_state(
-            service,
-            ApplicationServiceState.DOWN
-        )
-        logger.info(
-            "Application service falling behind. Starting recoverer. AS ID %s",
-            service.id
-        )
-        recoverer = self.recoverer_fn(service, self.on_recovered)
-        self.add_recoverers([recoverer])
-        recoverer.recover()
+        try:
+            yield self.store.set_appservice_state(
+                service,
+                ApplicationServiceState.DOWN
+            )
+            logger.info(
+                "Application service falling behind. Starting recoverer. AS ID %s",
+                service.id
+            )
+            recoverer = self.recoverer_fn(service, self.on_recovered)
+            self.add_recoverers([recoverer])
+            recoverer.recover()
+        except Exception:
+            logger.exception("Error starting AS recoverer")
 
     @defer.inlineCallbacks
     def _is_service_up(self, service):
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 32b439d20a..b748ed2b0a 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -281,15 +281,15 @@ class Config(object):
                     )
                 if not cls.path_exists(config_dir_path):
                     os.makedirs(config_dir_path)
-                with open(config_path, "wb") as config_file:
-                    config_bytes, config = obj.generate_config(
+                with open(config_path, "w") as config_file:
+                    config_str, config = obj.generate_config(
                         config_dir_path=config_dir_path,
                         server_name=server_name,
                         report_stats=(config_args.report_stats == "yes"),
                         is_generating_file=True
                     )
                     obj.invoke_all("generate_files", config)
-                    config_file.write(config_bytes)
+                    config_file.write(config_str)
                 print((
                     "A config file has been generated in %r for server name"
                     " %r with corresponding SSL keys and self-signed"
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 9a2359b6fd..277305e184 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -17,11 +17,11 @@ from ._base import Config, ConfigError
 from synapse.appservice import ApplicationService
 from synapse.types import UserID
 
-import urllib
 import yaml
 import logging
 
 from six import string_types
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
@@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename):
         )
 
     localpart = as_info["sender_localpart"]
-    if urllib.quote(localpart) != localpart:
+    if urlparse.quote(localpart) != localpart:
         raise ValueError(
             "sender_localpart needs characters which are not URL encoded."
         )
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 3f70039acd..6a7228dc2f 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -117,7 +117,7 @@ class LoggingConfig(Config):
         log_config = config.get("log_config")
         if log_config and not os.path.exists(log_config):
             log_file = self.abspath("homeserver.log")
-            with open(log_config, "wb") as log_config_file:
+            with open(log_config, "w") as log_config_file:
                 log_config_file.write(
                     DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
                 )
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 29eb012ddb..b66154bc7c 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -133,7 +133,7 @@ class TlsConfig(Config):
         tls_dh_params_path = config["tls_dh_params_path"]
 
         if not self.path_exists(tls_private_key_path):
-            with open(tls_private_key_path, "w") as private_key_file:
+            with open(tls_private_key_path, "wb") as private_key_file:
                 tls_private_key = crypto.PKey()
                 tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
                 private_key_pem = crypto.dump_privatekey(
@@ -148,7 +148,7 @@ class TlsConfig(Config):
                 )
 
         if not self.path_exists(tls_certificate_path):
-            with open(tls_certificate_path, "w") as certificate_file:
+            with open(tls_certificate_path, "wb") as certificate_file:
                 cert = crypto.X509()
                 subject = cert.get_subject()
                 subject.CN = config["server_name"]
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 35f810b07b..22ee0fc93f 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.logcontext import (
     PreserveLoggingContext,
-    preserve_fn
+    preserve_fn,
+    run_in_background,
 )
 from synapse.util.metrics import Measure
 
@@ -127,7 +128,7 @@ class Keyring(object):
 
             verify_requests.append(verify_request)
 
-        preserve_fn(self._start_key_lookups)(verify_requests)
+        run_in_background(self._start_key_lookups, verify_requests)
 
         # Pass those keys to handle_key_deferred so that the json object
         # signatures can be verified
@@ -146,53 +147,56 @@ class Keyring(object):
             verify_requests (List[VerifyKeyRequest]):
         """
 
-        # create a deferred for each server we're going to look up the keys
-        # for; we'll resolve them once we have completed our lookups.
-        # These will be passed into wait_for_previous_lookups to block
-        # any other lookups until we have finished.
-        # The deferreds are called with no logcontext.
-        server_to_deferred = {
-            rq.server_name: defer.Deferred()
-            for rq in verify_requests
-        }
-
-        # We want to wait for any previous lookups to complete before
-        # proceeding.
-        yield self.wait_for_previous_lookups(
-            [rq.server_name for rq in verify_requests],
-            server_to_deferred,
-        )
-
-        # Actually start fetching keys.
-        self._get_server_verify_keys(verify_requests)
-
-        # When we've finished fetching all the keys for a given server_name,
-        # resolve the deferred passed to `wait_for_previous_lookups` so that
-        # any lookups waiting will proceed.
-        #
-        # map from server name to a set of request ids
-        server_to_request_ids = {}
-
-        for verify_request in verify_requests:
-            server_name = verify_request.server_name
-            request_id = id(verify_request)
-            server_to_request_ids.setdefault(server_name, set()).add(request_id)
-
-        def remove_deferreds(res, verify_request):
-            server_name = verify_request.server_name
-            request_id = id(verify_request)
-            server_to_request_ids[server_name].discard(request_id)
-            if not server_to_request_ids[server_name]:
-                d = server_to_deferred.pop(server_name, None)
-                if d:
-                    d.callback(None)
-            return res
-
-        for verify_request in verify_requests:
-            verify_request.deferred.addBoth(
-                remove_deferreds, verify_request,
+        try:
+            # create a deferred for each server we're going to look up the keys
+            # for; we'll resolve them once we have completed our lookups.
+            # These will be passed into wait_for_previous_lookups to block
+            # any other lookups until we have finished.
+            # The deferreds are called with no logcontext.
+            server_to_deferred = {
+                rq.server_name: defer.Deferred()
+                for rq in verify_requests
+            }
+
+            # We want to wait for any previous lookups to complete before
+            # proceeding.
+            yield self.wait_for_previous_lookups(
+                [rq.server_name for rq in verify_requests],
+                server_to_deferred,
             )
 
+            # Actually start fetching keys.
+            self._get_server_verify_keys(verify_requests)
+
+            # When we've finished fetching all the keys for a given server_name,
+            # resolve the deferred passed to `wait_for_previous_lookups` so that
+            # any lookups waiting will proceed.
+            #
+            # map from server name to a set of request ids
+            server_to_request_ids = {}
+
+            for verify_request in verify_requests:
+                server_name = verify_request.server_name
+                request_id = id(verify_request)
+                server_to_request_ids.setdefault(server_name, set()).add(request_id)
+
+            def remove_deferreds(res, verify_request):
+                server_name = verify_request.server_name
+                request_id = id(verify_request)
+                server_to_request_ids[server_name].discard(request_id)
+                if not server_to_request_ids[server_name]:
+                    d = server_to_deferred.pop(server_name, None)
+                    if d:
+                        d.callback(None)
+                return res
+
+            for verify_request in verify_requests:
+                verify_request.deferred.addBoth(
+                    remove_deferreds, verify_request,
+                )
+        except Exception:
+            logger.exception("Error starting key lookups")
+
     @defer.inlineCallbacks
     def wait_for_previous_lookups(self, server_names, server_to_deferred):
         """Waits for any previous key lookups for the given servers to finish.
@@ -313,7 +317,7 @@ class Keyring(object):
                     if not verify_request.deferred.called:
                         verify_request.deferred.errback(err)
 
-        preserve_fn(do_iterations)().addErrback(on_err)
+        run_in_background(do_iterations).addErrback(on_err)
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
@@ -329,8 +333,9 @@ class Keyring(object):
         """
         res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.get_server_verify_keys)(
-                    server_name, key_ids
+                run_in_background(
+                    self.store.get_server_verify_keys,
+                    server_name, key_ids,
                 ).addCallback(lambda ks, server: (server, ks), server_name)
                 for server_name, key_ids in server_name_and_key_ids
             ],
@@ -352,13 +357,13 @@ class Keyring(object):
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
                 defer.returnValue({})
 
         results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(p_name, p_keys)
+                run_in_background(get_key, p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
@@ -384,7 +389,7 @@ class Keyring(object):
                 logger.info(
                     "Unable to get key %r for %r directly: %s %s",
                     key_ids, server_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
 
             if not keys:
@@ -398,7 +403,7 @@ class Keyring(object):
 
         results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(server_name, key_ids)
+                run_in_background(get_key, server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
@@ -481,7 +486,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=server_name,
                     from_server=perspective_name,
                     verify_keys=response_keys,
@@ -539,7 +545,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=key_server_name,
                     from_server=server_name,
                     verify_keys=verify_keys,
@@ -615,7 +622,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_keys_json)(
+                run_in_background(
+                    self.store.store_server_keys_json,
                     server_name=server_name,
                     key_id=key_id,
                     from_server=server_name,
@@ -716,7 +724,8 @@ class Keyring(object):
         # TODO(markjh): Store whether the keys have expired.
         return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_verify_key)(
+                run_in_background(
+                    self.store.store_server_verify_key,
                     server_name, server_name, key.time_added, key
                 )
                 for key_id, key in verify_keys.items()
@@ -734,7 +743,7 @@ def _handle_key_deferred(verify_request):
     except IOError as e:
         logger.warn(
             "Got IOError when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             502,
@@ -744,7 +753,7 @@ def _handle_key_deferred(verify_request):
     except Exception as e:
         logger.exception(
             "Got Exception when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             401,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..6163f7c466 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,8 @@ import itertools
 import logging
 import random
 
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
@@ -33,7 +35,7 @@ from synapse.federation.federation_base import (
 import synapse.metrics
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -394,7 +396,7 @@ class FederationClient(FederationBase):
             seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
             signed_events = seen_events.values()
         else:
-            seen_events = yield self.store.have_events(event_ids)
+            seen_events = yield self.store.have_seen_events(event_ids)
             signed_events = []
 
         failed_to_fetch = set()
@@ -413,11 +415,12 @@ class FederationClient(FederationBase):
 
         batch_size = 20
         missing_events = list(missing_events)
-        for i in xrange(0, len(missing_events), batch_size):
+        for i in range(0, len(missing_events), batch_size):
             batch = set(missing_events[i:i + batch_size])
 
             deferreds = [
-                preserve_fn(self.get_pdu)(
+                run_in_background(
+                    self.get_pdu,
                     destinations=random_server_list(),
                     event_id=e_id,
                 )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e4ce037acf..247ddc89d5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -30,9 +31,10 @@ import synapse.metrics
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
 
+from six import iteritems
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -212,16 +214,17 @@ class FederationServer(FederationBase):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        result = self._state_resp_cache.get((room_id, event_id))
-        if not result:
-            with (yield self._server_linearizer.queue((origin, room_id))):
-                d = self._state_resp_cache.set(
-                    (room_id, event_id),
-                    preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
-                )
-                resp = yield make_deferred_yieldable(d)
-        else:
-            resp = yield make_deferred_yieldable(result)
+        # we grab the linearizer to protect ourselves from servers which hammer
+        # us. In theory we might already have the response to this query
+        # in the cache so we could return it without waiting for the linearizer
+        # - but that's non-trivial to get right, and anyway somewhat defeats
+        # the point of the linearizer.
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            resp = yield self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id, event_id,
+            )
 
         defer.returnValue((200, resp))
 
@@ -425,9 +428,9 @@ class FederationServer(FederationBase):
             "Claimed one-time-keys: %s",
             ",".join((
                 "%s for %s:%s" % (key_id, user_id, device_id)
-                for user_id, user_keys in json_result.iteritems()
-                for device_id, device_keys in user_keys.iteritems()
-                for key_id, _ in device_keys.iteritems()
+                for user_id, user_keys in iteritems(json_result)
+                for device_id, device_keys in iteritems(user_keys)
+                for key_id, _ in iteritems(device_keys)
             )),
         )
 
@@ -494,13 +497,33 @@ class FederationServer(FederationBase):
     def _handle_received_pdu(self, origin, pdu):
         """ Process a PDU received in a federation /send/ transaction.
 
+        If the event is invalid, then this method throws a FederationError.
+        (The error will then be logged and sent back to the sender (which
+        probably won't do anything with it), and other events in the
+        transaction will be processed as normal).
+
+        It is likely that we'll then receive other events which refer to
+        this rejected_event in their prev_events, etc.  When that happens,
+        we'll attempt to fetch the rejected event again, which will presumably
+        fail, so those second-generation events will also get rejected.
+
+        Eventually, we get to the point where there are more than 10 events
+        between any new events and the original rejected event. Since we
+        only try to backfill 10 events deep on received pdu, we then accept the
+        new event, possibly introducing a discontinuity in the DAG, with new
+        forward extremities, so normal service is approximately returned,
+        until we try to backfill across the discontinuity.
+
         Args:
             origin (str): server which sent the pdu
             pdu (FrozenEvent): received pdu
 
         Returns (Deferred): completes with None
-        Raises: FederationError if the signatures / hash do not match
-    """
+
+        Raises: FederationError if the signatures / hash do not match, or
+            if the event was unacceptable for any other reason (eg, too large,
+            too many prev_events, couldn't find the prev_events)
+        """
         # check that it's actually being sent from a valid destination to
         # workaround bug #1753 in 0.18.5 and 0.18.6
         if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..0f0c687b37 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -40,6 +40,8 @@ from collections import namedtuple
 
 import logging
 
+from six import itervalues, iteritems
+
 logger = logging.getLogger(__name__)
 
 
@@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
 
             user_ids = set(
                 user_id
-                for uids in self.presence_changed.itervalues()
+                for uids in itervalues(self.presence_changed)
                 for user_id in uids
             )
 
@@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
         # stream position.
         keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
 
-        for ((destination, edu_key), pos) in keyed_edus.iteritems():
+        for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
         j = keys.bisect_right(to_token) + 1
         device_messages = {self.device_messages[k]: k for k in keys[i:j]}
 
-        for (destination, pos) in device_messages.iteritems():
+        for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
-    for destination, edu_map in buff.keyed_edus.iteritems():
+    for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=key,
             )
 
-    for destination, edu_list in buff.edus.iteritems():
+    for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in buff.failures.iteritems():
+    for destination, failure_list in iteritems(buff.failures):
         for failure in failure_list:
             transaction_queue.send_failure(destination, failure)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 963d938edd..ded2b1871a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -323,6 +323,8 @@ class TransactionQueue(object):
                     break
 
                 yield self._process_presence_inner(states_map.values())
+        except Exception:
+            logger.exception("Error sending presence states to servers")
         finally:
             self._processing_pending_presence = False
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index ff0656df3e..19d09f5422 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 
 import functools
@@ -152,11 +152,18 @@ class Authenticator(object):
         # alive
         retry_timings = yield self.store.get_destination_retry_timings(origin)
         if retry_timings and retry_timings["retry_last_ts"]:
-            logger.info("Marking origin %r as up", origin)
-            preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+            run_in_background(self._reset_retry_timings, origin)
 
         defer.returnValue(origin)
 
+    @defer.inlineCallbacks
+    def _reset_retry_timings(self, origin):
+        try:
+            logger.info("Marking origin %r as up", origin)
+            yield self.store.set_destination_retry_timings(origin, 0, 0)
+        except Exception:
+            logger.exception("Error resetting retry timings on %s", origin)
+
 
 class BaseFederationServlet(object):
     REQUIRE_AUTH = True
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 1fb709e6c3..6f11fa374b 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -42,7 +42,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 from synapse.types import get_domain_from_id
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 from signedjson.sign import sign_json
 
@@ -165,31 +165,35 @@ class GroupAttestionRenewer(object):
 
         @defer.inlineCallbacks
         def _renew_attestation(group_id, user_id):
-            if not self.is_mine_id(group_id):
-                destination = get_domain_from_id(group_id)
-            elif not self.is_mine_id(user_id):
-                destination = get_domain_from_id(user_id)
-            else:
-                logger.warn(
-                    "Incorrectly trying to do attestations for user: %r in %r",
-                    user_id, group_id,
+            try:
+                if not self.is_mine_id(group_id):
+                    destination = get_domain_from_id(group_id)
+                elif not self.is_mine_id(user_id):
+                    destination = get_domain_from_id(user_id)
+                else:
+                    logger.warn(
+                        "Incorrectly trying to do attestations for user: %r in %r",
+                        user_id, group_id,
+                    )
+                    yield self.store.remove_attestation_renewal(group_id, user_id)
+                    return
+
+                attestation = self.attestations.create_attestation(group_id, user_id)
+
+                yield self.transport_client.renew_group_attestation(
+                    destination, group_id, user_id,
+                    content={"attestation": attestation},
                 )
-                yield self.store.remove_attestation_renewal(group_id, user_id)
-                return
-
-            attestation = self.attestations.create_attestation(group_id, user_id)
 
-            yield self.transport_client.renew_group_attestation(
-                destination, group_id, user_id,
-                content={"attestation": attestation},
-            )
-
-            yield self.store.update_attestation_renewal(
-                group_id, user_id, attestation
-            )
+                yield self.store.update_attestation_renewal(
+                    group_id, user_id, attestation
+                )
+            except Exception:
+                logger.exception("Error renewing attestation of %r in %r",
+                                 user_id, group_id)
 
         for row in rows:
             group_id = row["group_id"]
             user_id = row["user_id"]
 
-            preserve_fn(_renew_attestation)(group_id, user_id)
+            run_in_background(_renew_attestation, group_id, user_id)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 0245197c02..0d11d890ff 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -198,7 +198,10 @@ class ApplicationServicesHandler(object):
         services = yield self._get_services_for_3pn(protocol)
 
         results = yield make_deferred_yieldable(defer.DeferredList([
-            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+            run_in_background(
+                self.appservice_api.query_3pe,
+                service, kind, protocol, fields,
+            )
             for service in services
         ], consumeErrors=True))
 
@@ -259,11 +262,15 @@ class ApplicationServicesHandler(object):
             event based on the service regex.
         """
         services = self.store.get_app_services()
-        interested_list = [
-            s for s in services if (
-                yield s.is_interested(event, self.store)
-            )
-        ]
+
+        # we can't use a list comprehension here. Since python 3, list
+        # comprehensions use a generator internally. This means you can't yield
+        # inside of a list comprehension anymore.
+        interested_list = []
+        for s in services:
+            if (yield s.is_interested(event, self.store)):
+                interested_list.append(s)
+
         defer.returnValue(interested_list)
 
     def _get_services_for_user(self, user_id):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 325c0c4a9f..25aec624af 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
     SynapseError, CodeMessageException, FederationDeniedError,
 )
 from synapse.types import get_domain_from_id, UserID
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -139,9 +139,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(do_remote_query)(destination)
+            run_in_background(do_remote_query, destination)
             for destination in remote_queries_not_in_cache
-        ]))
+        ], consumeErrors=True))
 
         defer.returnValue({
             "device_keys": results, "failures": failures,
@@ -242,9 +242,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(claim_client_keys)(destination)
+            run_in_background(claim_client_keys, destination)
             for destination in remote_queries
-        ]))
+        ], consumeErrors=True))
 
         logger.info(
             "Claimed one-time-keys: %s",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..f39233d846 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,16 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+
+import itertools
+import logging
+import sys
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
+import six
+from six.moves import http_client
+from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
@@ -43,10 +51,6 @@ from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.util.distributor import user_joined_room
 
-from twisted.internet import defer
-
-import itertools
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -115,6 +119,19 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # do some initial sanity-checking of the event. In particular, make
+        # sure it doesn't have hundreds of prev_events or auth_events, which
+        # could cause a huge state resolution or cascade of event fetches.
+        try:
+            self._sanity_check_event(pdu)
+        except SynapseError as err:
+            raise FederationError(
+                "ERROR",
+                err.code,
+                err.msg,
+                affected=pdu.event_id,
+            )
+
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if pdu.room_id in self.room_queues:
@@ -149,10 +166,6 @@ class FederationHandler(BaseHandler):
 
         auth_chain = []
 
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-
         fetch_state = False
 
         # Get missing pdus if necessary.
@@ -168,7 +181,7 @@ class FederationHandler(BaseHandler):
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
+            seen = yield self.store.have_seen_events(prevs)
 
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
@@ -196,8 +209,7 @@ class FederationHandler(BaseHandler):
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
-                        have_seen = yield self.store.have_events(prevs)
-                        seen = set(have_seen.iterkeys())
+                        seen = yield self.store.have_seen_events(prevs)
 
                         if not prevs - seen:
                             logger.info(
@@ -248,8 +260,7 @@ class FederationHandler(BaseHandler):
             min_depth (int): Minimum depth of events to return.
         """
         # We recalculate seen, since it may have changed.
-        have_seen = yield self.store.have_events(prevs)
-        seen = set(have_seen.keys())
+        seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
@@ -361,9 +372,7 @@ class FederationHandler(BaseHandler):
             if auth_chain:
                 event_ids |= {e.event_id for e in auth_chain}
 
-            seen_ids = set(
-                (yield self.store.have_events(event_ids)).keys()
-            )
+            seen_ids = yield self.store.have_seen_events(event_ids)
 
             if state and auth_chain is not None:
                 # If we have any state or auth_chain given to us by the replication
@@ -527,9 +536,16 @@ class FederationHandler(BaseHandler):
     def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
 
-        This will attempt to get more events from the remote. This may return
-        be successfull and still return no events if the other side has no new
-        events to offer.
+        This will attempt to get more events from the remote. If the other side
+        has no new events to offer, this will return an empty list.
+
+        As the events are received, we check their signatures, and also do some
+        sanity-checking on them. If any of the backfilled events are invalid,
+        this method throws a SynapseError.
+
+        TODO: make this more useful to distinguish failures of the remote
+        server from invalid events (there is probably no point in trying to
+        re-fetch invalid events from every other HS in the room.)
         """
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +557,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # ideally we'd sanity check the events here for excess prev_events etc,
+        # but it's hard to reject events at this point without completely
+        # breaking backfill in the same way that it is currently broken by
+        # events whose signature we cannot verify (#3121).
+        #
+        # So for now we accept the events anyway. #3124 tracks this.
+        #
+        # for ev in events:
+        #     self._sanity_check_event(ev)
+
         # Don't bother processing events we already have.
         seen_events = yield self.store.have_events_in_timeline(
             set(e.event_id for e in events)
@@ -613,7 +639,8 @@ class FederationHandler(BaseHandler):
 
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
-                        logcontext.preserve_fn(self.replication_layer.get_pdu)(
+                        logcontext.run_in_background(
+                            self.replication_layer.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -633,7 +660,7 @@ class FederationHandler(BaseHandler):
 
                 failed_to_fetch = missing_auth - set(auth_events)
 
-        seen_events = yield self.store.have_events(
+        seen_events = yield self.store.have_seen_events(
             set(auth_events.keys()) | set(state_events.keys())
         )
 
@@ -843,6 +870,38 @@ class FederationHandler(BaseHandler):
 
         defer.returnValue(False)
 
+    def _sanity_check_event(self, ev):
+        """
+        Do some early sanity checks of a received event
+
+        In particular, checks it doesn't have an excessive number of
+        prev_events or auth_events, which could cause a huge state resolution
+        or cascade of event fetches.
+
+        Args:
+            ev (synapse.events.EventBase): event to be checked
+
+        Returns: None
+
+        Raises:
+            SynapseError if the event does not pass muster
+        """
+        if len(ev.prev_events) > 20:
+            logger.warn("Rejecting event %s which has %i prev_events",
+                        ev.event_id, len(ev.prev_events))
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Too many prev_events",
+            )
+
+        if len(ev.auth_events) > 10:
+            logger.warn("Rejecting event %s which has %i auth_events",
+                        ev.event_id, len(ev.auth_events))
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Too many auth_events",
+            )
+
     @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -967,7 +1026,7 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
+            logcontext.run_in_background(self._handle_queued_pdus, room_queue)
 
         defer.returnValue(True)
 
@@ -1457,18 +1516,21 @@ class FederationHandler(BaseHandler):
                 backfilled=backfilled,
             )
         except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area
-            logcontext.preserve_fn(
-                self.store.remove_push_actions_from_staging
-            )(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            logcontext.run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
             # and don't need to wait for it.
-            logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
-                event_stream_id, max_stream_id
+            logcontext.run_in_background(
+                self.pusher_pool.on_new_notifications,
+                event_stream_id, max_stream_id,
             )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1482,7 +1544,8 @@ class FederationHandler(BaseHandler):
         """
         contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                logcontext.preserve_fn(self._prep_event)(
+                logcontext.run_in_background(
+                    self._prep_event,
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
@@ -1736,7 +1799,8 @@ class FederationHandler(BaseHandler):
             event_key = None
 
         if event_auth_events - current_state:
-            have_events = yield self.store.have_events(
+            # TODO: can we use store.have_seen_events here instead?
+            have_events = yield self.store.get_seen_events_with_rejections(
                 event_auth_events - current_state
             )
         else:
@@ -1759,12 +1823,12 @@ class FederationHandler(BaseHandler):
                     origin, event.room_id, event.event_id
                 )
 
-                seen_remotes = yield self.store.have_events(
+                seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
-                    if e.event_id in seen_remotes.keys():
+                    if e.event_id in seen_remotes:
                         continue
 
                     if e.event_id == event.event_id:
@@ -1791,7 +1855,7 @@ class FederationHandler(BaseHandler):
                     except AuthError:
                         pass
 
-                have_events = yield self.store.have_events(
+                have_events = yield self.store.get_seen_events_with_rejections(
                     [e_id for e_id, _ in event.auth_events]
                 )
                 seen_events = set(have_events.keys())
@@ -1810,7 +1874,8 @@ class FederationHandler(BaseHandler):
 
             different_events = yield logcontext.make_deferred_yieldable(
                 defer.gatherResults([
-                    logcontext.preserve_fn(self.store.get_event)(
+                    logcontext.run_in_background(
+                        self.store.get_event,
                         d,
                         allow_none=True,
                         allow_rejected=False,
@@ -1876,13 +1941,13 @@ class FederationHandler(BaseHandler):
                         local_auth_chain,
                     )
 
-                    seen_remotes = yield self.store.have_events(
+                    seen_remotes = yield self.store.have_seen_events(
                         [e.event_id for e in result["auth_chain"]]
                     )
 
                     # 3. Process any remote auth chain events we haven't seen.
                     for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes.keys():
+                        if ev.event_id in seen_remotes:
                             continue
 
                         if ev.event_id == event.event_id:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c5267b4b84..cd33a86599 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
                 (messages, token), current_state = yield make_deferred_yieldable(
                     defer.gatherResults(
                         [
-                            preserve_fn(self.store.get_recent_events_for_room)(
+                            run_in_background(
+                                self.store.get_recent_events_for_room,
                                 event.room_id,
                                 limit=limit,
                                 end_token=room_end_token,
@@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
-                preserve_fn(get_presence)(),
-                preserve_fn(get_receipts)(),
-                preserve_fn(self.store.get_recent_events_for_room)(
+                run_in_background(get_presence),
+                run_in_background(get_receipts),
+                run_in_background(
+                    self.store.get_recent_events_for_room,
                     room_id,
                     limit=limit,
                     end_token=now_token.room_key,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 54cd691f91..23502eda70 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,6 +13,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import simplejson
+import sys
+
+from canonicaljson import encode_canonical_json
+import six
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 
@@ -25,7 +31,7 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn, run_in_background
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.stringutils import random_string
@@ -34,12 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import simplejson
-
 logger = logging.getLogger(__name__)
 
 
@@ -433,7 +433,7 @@ class EventCreationHandler(object):
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -447,7 +447,13 @@ class EventCreationHandler(object):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
@@ -485,7 +491,7 @@ class EventCreationHandler(object):
         event, context = yield self.create_new_client_event(
             builder=builder,
             requester=requester,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         defer.returnValue((event, context))
@@ -588,39 +594,44 @@ class EventCreationHandler(object):
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
+
+        Args:
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
+
+        Returns:
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+        """
+
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
             )
+        else:
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
 
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+        else:
+            depth = 1
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -719,8 +730,14 @@ class EventCreationHandler(object):
         except:  # noqa: E722, as we reraise the exception this is fine.
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
-            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -840,22 +857,33 @@ class EventCreationHandler(object):
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
-        preserve_fn(self.pusher_pool.on_new_notifications)(
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
             event_stream_id, max_stream_id
         )
 
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+            try:
+                self.notifier.on_new_room_event(
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room event")
 
-        preserve_fn(_notify)()
+        run_in_background(_notify)
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
             # We don't want to block sending messages on any presence code. This
             # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(requester.user)
+            run_in_background(self._bump_active_time, requester.user)
+
+    @defer.inlineCallbacks
+    def _bump_active_time(self, user):
+        try:
+            presence = self.hs.get_presence_handler()
+            yield presence.bump_presence_active_time(user)
+        except Exception:
+            logger.exception("Error bumping presence active time")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..585f3e4da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -255,6 +255,14 @@ class PresenceHandler(object):
         logger.info("Finished _persist_unpersisted_changes")
 
     @defer.inlineCallbacks
+    def _update_states_and_catch_exception(self, new_states):
+        try:
+            res = yield self._update_states(new_states)
+            defer.returnValue(res)
+        except Exception:
+            logger.exception("Error updating presence")
+
+    @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
@@ -364,7 +372,7 @@ class PresenceHandler(object):
                     now=now,
                 )
 
-            preserve_fn(self._update_states)(changes)
+            run_in_background(self._update_states_and_catch_exception, changes)
         except Exception:
             logger.exception("Exception in _handle_timeouts loop")
 
@@ -422,20 +430,23 @@ class PresenceHandler(object):
 
         @defer.inlineCallbacks
         def _end():
-            if affect_presence:
+            try:
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
                 yield self._update_states([prev_state.copy_and_replace(
                     last_user_sync_ts=self.clock.time_msec(),
                 )])
+            except Exception:
+                logger.exception("Error updating presence after sync")
 
         @contextmanager
         def _user_syncing():
             try:
                 yield
             finally:
-                preserve_fn(_end)()
+                if affect_presence:
+                    run_in_background(_end)
 
         defer.returnValue(_user_syncing())
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 3f215c2b4e..2e0672161c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler):
         """Given a list of receipts, works out which remote servers should be
         poked and pokes them.
         """
-        # TODO: Some of this stuff should be coallesced.
-        for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
-            users = yield self.state.get_current_user_in_room(room_id)
-            remotedomains = set(get_domain_from_id(u) for u in users)
-            remotedomains = remotedomains.copy()
-            remotedomains.discard(self.server_name)
-
-            logger.debug("Sending receipt to: %r", remotedomains)
-
-            for domain in remotedomains:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.receipt",
-                    content={
-                        room_id: {
-                            receipt_type: {
-                                user_id: {
-                                    "event_ids": event_ids,
-                                    "data": data,
+        try:
+            # TODO: Some of this stuff should be coallesced.
+            for receipt in receipts:
+                room_id = receipt["room_id"]
+                receipt_type = receipt["receipt_type"]
+                user_id = receipt["user_id"]
+                event_ids = receipt["event_ids"]
+                data = receipt["data"]
+
+                users = yield self.state.get_current_user_in_room(room_id)
+                remotedomains = set(get_domain_from_id(u) for u in users)
+                remotedomains = remotedomains.copy()
+                remotedomains.discard(self.server_name)
+
+                logger.debug("Sending receipt to: %r", remotedomains)
+
+                for domain in remotedomains:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.receipt",
+                        content={
+                            room_id: {
+                                receipt_type: {
+                                    user_id: {
+                                        "event_ids": event_ids,
+                                        "data": data,
+                                    }
                                 }
-                            }
+                            },
                         },
-                    },
-                    key=(room_id, receipt_type, user_id),
-                )
+                        key=(room_id, receipt_type, user_id),
+                    )
+        except Exception:
+            logger.exception("Error pushing receipts to remote servers")
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 8028d793c2..5757bb7f8a 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -15,12 +15,13 @@
 
 from twisted.internet import defer
 
+from six.moves import range
+
 from ._base import BaseHandler
 
 from synapse.api.constants import (
     EventTypes, JoinRules,
 )
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
@@ -78,18 +79,11 @@ class RoomListHandler(BaseHandler):
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            logger.info("No cached result, calculating one.")
-            result = self.response_cache.set(
-                key,
-                preserve_fn(self._get_public_room_list)(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        else:
-            logger.info("Using cached deferred result.")
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
@@ -208,7 +202,7 @@ class RoomListHandler(BaseHandler):
             step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
 
         chunk = []
-        for i in xrange(0, len(rooms_to_scan), step):
+        for i in range(0, len(rooms_to_scan), step):
             batch = rooms_to_scan[i:i + step]
             logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
@@ -423,18 +417,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c45142d38d..714583f1d5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
     @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
-        prev_event_ids,
+        prev_events_and_hashes,
         txn_id=None,
         ratelimit=True,
         content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
             },
             token_id=requester.access_token_id,
             txn_id=txn_id,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
                     403, "Invites have been disabled on this server",
                 )
 
-        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+            room_id,
+        )
+        latest_event_ids = (
+            event_id for (event_id, _, _) in prev_events_and_hashes
+        )
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
             membership=effective_membership_state,
             txn_id=txn_id,
             ratelimit=ratelimit,
-            prev_event_ids=latest_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
             content=content,
         )
         defer.returnValue(res)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 06d17ab20c..b52e4c2aff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
 
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
         to tell if room needs to be part of the sync result.
         """
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             # nb the notification count does not, er, count: if there's nothing
             # else in the result, we don't need to send it.
         )
+    __bool__ = __nonzero__  # python3
 
 
 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
             or self.state
             or self.account_data
         )
+    __bool__ = __nonzero__  # python3
 
 
 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
     def __nonzero__(self):
         """Invited rooms should always be reported to the client"""
         return True
+    __bool__ = __nonzero__  # python3
 
 
 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
 
     def __nonzero__(self):
         return bool(self.join or self.invite or self.leave)
+    __bool__ = __nonzero__  # python3
 
 
 class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
 
     def __nonzero__(self):
         return bool(self.changed or self.left)
+    __bool__ = __nonzero__  # python3
 
 
 class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
             self.device_lists or
             self.groups
         )
+    __bool__ = __nonzero__  # python3
 
 
 class SyncHandler(object):
@@ -180,15 +187,11 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        result = self.response_cache.get(sync_config.request_key)
-        if not result:
-            result = self.response_cache.set(
-                sync_config.request_key,
-                preserve_fn(self._wait_for_sync_for_user)(
-                    sync_config, since_token, timeout, full_state
-                )
-            )
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            sync_config.request_key,
+            self._wait_for_sync_for_user,
+            sync_config, since_token, timeout, full_state,
+        )
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 77c0cf146f..5d9736e88f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID, get_domain_from_id
@@ -97,7 +97,8 @@ class TypingHandler(object):
             if self.hs.is_mine_id(member.user_id):
                 last_fed_poke = self._member_last_federation_poke.get(member, None)
                 if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
-                    preserve_fn(self._push_remote)(
+                    run_in_background(
+                        self._push_remote,
                         member=member,
                         typing=True
                     )
@@ -196,7 +197,7 @@ class TypingHandler(object):
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            preserve_fn(self._push_remote)(member, typing)
+            run_in_background(self._push_remote, member, typing)
 
         self._push_update_local(
             member=member,
@@ -205,28 +206,31 @@ class TypingHandler(object):
 
     @defer.inlineCallbacks
     def _push_remote(self, member, typing):
-        users = yield self.state.get_current_user_in_room(member.room_id)
-        self._member_last_federation_poke[member] = self.clock.time_msec()
+        try:
+            users = yield self.state.get_current_user_in_room(member.room_id)
+            self._member_last_federation_poke[member] = self.clock.time_msec()
 
-        now = self.clock.time_msec()
-        self.wheel_timer.insert(
-            now=now,
-            obj=member,
-            then=now + FEDERATION_PING_INTERVAL,
-        )
+            now = self.clock.time_msec()
+            self.wheel_timer.insert(
+                now=now,
+                obj=member,
+                then=now + FEDERATION_PING_INTERVAL,
+            )
 
-        for domain in set(get_domain_from_id(u) for u in users):
-            if domain != self.server_name:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.typing",
-                    content={
-                        "room_id": member.room_id,
-                        "user_id": member.user_id,
-                        "typing": typing,
-                    },
-                    key=member,
-                )
+            for domain in set(get_domain_from_id(u) for u in users):
+                if domain != self.server_name:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.typing",
+                        content={
+                            "room_id": member.room_id,
+                            "user_id": member.user_id,
+                            "typing": typing,
+                        },
+                        key=member,
+                    )
+        except Exception:
+            logger.exception("Error pushing typing notif to remotes")
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index bfebb0f644..0d47ccdb59 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,3 +13,24 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
+
+from synapse.api.errors import SynapseError
+
+
+class RequestTimedOutError(SynapseError):
+    """Exception representing timeout of an outbound request"""
+    def __init__(self):
+        super(RequestTimedOutError, self).__init__(504, "Timed out")
+
+
+def cancelled_to_request_timed_out_error(value):
+    """Turns CancelledErrors into RequestTimedOutErrors.
+
+    For use with async.add_timeout_to_deferred
+    """
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise RequestTimedOutError()
+    return value
diff --git a/synapse/http/client.py b/synapse/http/client.py
index f3e4973c2e..70a19d9b74 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,9 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
+from synapse.http import cancelled_to_request_timed_out_error
+from synapse.util.async import add_timeout_to_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
-from synapse.util import logcontext
 import synapse.metrics
 from synapse.http.endpoint import SpiderEndpoint
 
@@ -38,7 +40,7 @@ from twisted.web.http import PotentialDataLoss
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
-from StringIO import StringIO
+from six import StringIO
 
 import simplejson as json
 import logging
@@ -95,21 +97,17 @@ class SimpleHttpClient(object):
         # counters to it
         outgoing_requests_counter.inc(method)
 
-        def send_request():
+        logger.info("Sending request %s %s", method, uri)
+
+        try:
             request_deferred = self.agent.request(
                 method, uri, *args, **kwargs
             )
-
-            return self.clock.time_bound_deferred(
+            add_timeout_to_deferred(
                 request_deferred,
-                time_out=60,
+                60, cancelled_to_request_timed_out_error,
             )
-
-        logger.info("Sending request %s %s", method, uri)
-
-        try:
-            with logcontext.PreserveLoggingContext():
-                response = yield send_request()
+            response = yield make_deferred_yieldable(request_deferred)
 
             incoming_responses_counter.inc(method, response.code)
             logger.info(
@@ -509,7 +507,7 @@ class SpiderHttpClient(SimpleHttpClient):
                     reactor,
                     SpiderEndpointFactory(hs)
                 )
-            ), [('gzip', GzipDecoder)]
+            ), [(b'gzip', GzipDecoder)]
         )
         # We could look like Chrome:
         # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 00572c2897..db455e5909 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -286,7 +286,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
         if (len(answers) == 1
                 and answers[0].type == dns.SRV
                 and answers[0].payload
-                and answers[0].payload.target == dns.Name('.')):
+                and answers[0].payload.target == dns.Name(b'.')):
             raise ConnectError("Service %s unavailable" % service_name)
 
         for answer in answers:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 60a29081e8..4b2b85464d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,17 +13,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
+from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async import sleep
-from synapse.util import logcontext
 import synapse.metrics
+from synapse.util.async import sleep, add_timeout_to_deferred
+from synapse.util import logcontext
+from synapse.util.logcontext import make_deferred_yieldable
+import synapse.util.retryutils
 
 from canonicaljson import encode_canonical_json
 
@@ -38,8 +41,7 @@ import logging
 import random
 import sys
 import urllib
-import urlparse
-
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -184,21 +186,20 @@ class MatrixFederationHttpClient(object):
                         producer = body_callback(method, http_url_bytes, headers_dict)
 
                     try:
-                        def send_request():
-                            request_deferred = self.agent.request(
-                                method,
-                                url_bytes,
-                                Headers(headers_dict),
-                                producer
-                            )
-
-                            return self.clock.time_bound_deferred(
-                                request_deferred,
-                                time_out=timeout / 1000. if timeout else 60,
-                            )
-
-                        with logcontext.PreserveLoggingContext():
-                            response = yield send_request()
+                        request_deferred = self.agent.request(
+                            method,
+                            url_bytes,
+                            Headers(headers_dict),
+                            producer
+                        )
+                        add_timeout_to_deferred(
+                            request_deferred,
+                            timeout / 1000. if timeout else 60,
+                            cancelled_to_request_timed_out_error,
+                        )
+                        response = yield make_deferred_yieldable(
+                            request_deferred,
+                        )
 
                         log_result = "%d %s" % (response.code, response.phrase,)
                         break
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 8d632290de..55b9ad5251 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -546,6 +546,6 @@ def _request_user_agent_is_curl(request):
         b"User-Agent", default=[]
     )
     for user_agent in user_agents:
-        if "curl" in user_agent:
+        if b"curl" in user_agent:
             return True
     return False
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..8355c7d621 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,14 +14,17 @@
 # limitations under the License.
 
 from twisted.internet import defer
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
-from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.async import (
+    ObservableDeferred, add_timeout_to_deferred,
+    DeferredTimeoutError,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
@@ -144,6 +147,7 @@ class _NotifierUserStream(object):
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
     def __nonzero__(self):
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class Notifier(object):
@@ -250,9 +254,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        preserve_fn(self.appservice_handler.notify_interested_services)(
-            room_stream_id
-        )
+        run_in_background(self._notify_app_services, room_stream_id)
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
@@ -266,6 +268,13 @@ class Notifier(object):
             rooms=[event.room_id],
         )
 
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
+
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
@@ -330,11 +339,12 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
+                    add_timeout_to_deferred(
+                        listener.deferred,
+                        (end_time - now) / 1000.,
+                    )
                     with PreserveLoggingContext():
-                        yield self.clock.time_bound_deferred(
-                            listener.deferred,
-                            time_out=(end_time - now) / 1000.
-                        )
+                        yield listener.deferred
 
                     current_token = user_stream.current_token
 
@@ -345,7 +355,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimedOutError:
+                except DeferredTimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -550,13 +560,14 @@ class Notifier(object):
             if end_time <= now:
                 break
 
+            add_timeout_to_deferred(
+                listener.deferred.addTimeout,
+                (end_time - now) / 1000.,
+            )
             try:
                 with PreserveLoggingContext():
-                    yield self.clock.time_bound_deferred(
-                        listener.deferred,
-                        time_out=(end_time - now) / 1000.
-                    )
-            except DeferredTimedOutError:
+                    yield listener.deferred
+            except DeferredTimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 58df98a793..ba7286cb72 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -77,10 +77,13 @@ class EmailPusher(object):
     @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
-            self.throttle_params = yield self.store.get_throttle_params_by_room(
-                self.pusher_id
-            )
-            yield self._process()
+            try:
+                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                    self.pusher_id
+                )
+                yield self._process()
+            except Exception:
+                logger.exception("Error starting email pusher")
 
     def on_stop(self):
         if self.timed_call:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2cbac571b8..b077e1a446 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -18,8 +18,8 @@ import logging
 from twisted.internet import defer, reactor
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-import push_rule_evaluator
-import push_tools
+from . import push_rule_evaluator
+from . import push_tools
 import synapse
 from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
@@ -94,7 +94,10 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def on_started(self):
-        yield self._process()
+        try:
+            yield self._process()
+        except Exception:
+            logger.exception("Error starting http pusher")
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 71576330a9..5aa6667e91 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from httppusher import HttpPusher
+from .httppusher import HttpPusher
 
 import logging
 logger = logging.getLogger(__name__)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 134e89b371..750d11ca38 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -14,13 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
-from .pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.push.pusher import PusherFactory
 from synapse.util.async import run_on_reactor
-
-import logging
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -137,12 +137,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_notifications)(
-                                min_stream_id, max_stream_id
+                            run_in_background(
+                                p.on_new_notifications,
+                                min_stream_id, max_stream_id,
                             )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
@@ -164,10 +167,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
+                            run_in_background(
+                                p.on_new_receipts,
+                                min_stream_id, max_stream_id,
+                            )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
@@ -207,7 +215,7 @@ class PusherPool:
                 if appid_pushkey in byuser:
                     byuser[appid_pushkey].on_stop()
                 byuser[appid_pushkey] = p
-                preserve_fn(p.on_started)()
+                run_in_background(p.on_started)
 
         logger.info("Started pushers")
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 5cabf7dabe..711cbb6c50 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,5 +1,6 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,6 +19,18 @@ from distutils.version import LooseVersion
 
 logger = logging.getLogger(__name__)
 
+# this dict maps from python package name to a list of modules we expect it to
+# provide.
+#
+# the key is a "requirement specifier", as used as a parameter to `pip
+# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
+#
+# the value is a sequence of strings; each entry should be the name of the
+# python module, optionally followed by a version assertion which can be either
+# ">=<ver>" or "==<ver>".
+#
+# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
+# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
 REQUIREMENTS = {
     "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
     "frozendict>=0.4": ["frozendict"],
@@ -26,7 +39,11 @@ REQUIREMENTS = {
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
-    "Twisted>=16.0.0": ["twisted>=16.0.0"],
+
+    # we break under Twisted 18.4
+    # (https://github.com/matrix-org/synapse/issues/3135)
+    "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
+
     "pyopenssl>=0.14": ["OpenSSL>=0.14"],
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
@@ -39,6 +56,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "six": ["six"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c6a6551d24..a9baa2c1c3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
 
@@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet):
         self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
 
     def on_PUT(self, request, event_id):
-        result = self.response_cache.get(event_id)
-        if not result:
-            result = self.response_cache.set(
-                event_id,
-                self._handle_request(request)
-            )
-        else:
-            logger.warn("Returning cached response")
-        return make_deferred_yieldable(result)
-
-    @preserve_fn
+        return self.response_cache.wrap(
+            event_id,
+            self._handle_request,
+            request
+        )
+
     @defer.inlineCallbacks
     def _handle_request(self, request):
         with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0a9a290af4..d7d38464b2 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -53,12 +53,12 @@ from twisted.internet import defer
 from twisted.protocols.basic import LineOnlyReceiver
 from twisted.python.failure import Failure
 
-from commands import (
+from .commands import (
     COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
     ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
     NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
 )
-from streams import STREAMS_MAP
+from .streams import STREAMS_MAP
 
 from synapse.util.stringutils import random_string
 from synapse.metrics.metric import CounterMetric
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 786c3fe864..a41af4fd6c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -18,8 +18,8 @@
 from twisted.internet import defer, reactor
 from twisted.internet.protocol import Factory
 
-from streams import STREAMS_MAP, FederationStream
-from protocol import ServerReplicationStreamProtocol
+from .streams import STREAMS_MAP, FederationStream
+from .protocol import ServerReplicationStreamProtocol
 
 from synapse.util.metrics import Measure, measure_func
 
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 45844aa2d2..34df5be4e9 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -25,7 +25,7 @@ from .base import ClientV1RestServlet, client_path_patterns
 
 import simplejson as json
 import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 import logging
 from saml2 import BINDING_HTTP_POST
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 8a82097178..9b3022e0b0 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -30,6 +30,8 @@ from hashlib import sha1
 import hmac
 import logging
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -333,11 +335,11 @@ class RegisterRestServlet(ClientV1RestServlet):
     def _do_shared_secret(self, request, register_json, session):
         yield run_on_reactor()
 
-        if not isinstance(register_json.get("mac", None), basestring):
+        if not isinstance(register_json.get("mac", None), string_types):
             raise SynapseError(400, "Expected mac.")
-        if not isinstance(register_json.get("user", None), basestring):
+        if not isinstance(register_json.get("user", None), string_types):
             raise SynapseError(400, "Expected 'user' key.")
-        if not isinstance(register_json.get("password", None), basestring):
+        if not isinstance(register_json.get("password", None), string_types):
             raise SynapseError(400, "Expected 'password' key.")
 
         if not self.hs.config.registration_shared_secret:
@@ -358,14 +360,14 @@ class RegisterRestServlet(ClientV1RestServlet):
         got_mac = str(register_json["mac"])
 
         want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret,
+            key=self.hs.config.registration_shared_secret.encode(),
             digestmod=sha1,
         )
         want_mac.update(user)
-        want_mac.update("\x00")
+        want_mac.update(b"\x00")
         want_mac.update(password)
-        want_mac.update("\x00")
-        want_mac.update("admin" if admin else "notadmin")
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
         want_mac = want_mac.hexdigest()
 
         if compare_digest(want_mac, got_mac):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 2ad0e5943b..fcf9c9ab44 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -28,8 +28,9 @@ from synapse.http.servlet import (
     parse_json_object_from_request, parse_string, parse_integer
 )
 
+from six.moves.urllib import parse as urlparse
+
 import logging
-import urllib
 import simplejson as json
 
 logger = logging.getLogger(__name__)
@@ -433,7 +434,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         as_client_event = "raw" not in request.args
         filter_bytes = request.args.get("filter", None)
         if filter_bytes:
-            filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
+            filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8")
             event_filter = Filter(json.loads(filter_json))
         else:
             event_filter = None
@@ -718,8 +719,8 @@ class RoomTypingRestServlet(ClientV1RestServlet):
     def on_PUT(self, request, room_id, user_id):
         requester = yield self.auth.get_user_by_req(request)
 
-        room_id = urllib.unquote(room_id)
-        target_user = UserID.from_string(urllib.unquote(user_id))
+        room_id = urlparse.unquote(room_id)
+        target_user = UserID.from_string(urlparse.unquote(user_id))
 
         content = parse_json_object_from_request(request)
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index f317c919dc..5cab00aea9 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -35,6 +35,8 @@ from hashlib import sha1
 from synapse.util.async import run_on_reactor
 from synapse.util.ratelimitutils import FederationRateLimiter
 
+from six import string_types
+
 
 # We ought to be using hmac.compare_digest() but on older pythons it doesn't
 # exist. It's a _really minor_ security flaw to use plain string comparison
@@ -210,14 +212,14 @@ class RegisterRestServlet(RestServlet):
         # in sessions. Pull out the username/password provided to us.
         desired_password = None
         if 'password' in body:
-            if (not isinstance(body['password'], basestring) or
+            if (not isinstance(body['password'], string_types) or
                     len(body['password']) > 512):
                 raise SynapseError(400, "Invalid password")
             desired_password = body["password"]
 
         desired_username = None
         if 'username' in body:
-            if (not isinstance(body['username'], basestring) or
+            if (not isinstance(body['username'], string_types) or
                     len(body['username']) > 512):
                 raise SynapseError(400, "Invalid username")
             desired_username = body['username']
@@ -243,7 +245,7 @@ class RegisterRestServlet(RestServlet):
 
             access_token = get_access_token_from_request(request)
 
-            if isinstance(desired_username, basestring):
+            if isinstance(desired_username, string_types):
                 result = yield self._do_appservice_registration(
                     desired_username, access_token, body
                 )
@@ -464,7 +466,7 @@ class RegisterRestServlet(RestServlet):
         # includes the password and admin flag in the hashed text. Why are
         # these different?
         want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret,
+            key=self.hs.config.registration_shared_secret.encode(),
             msg=user,
             digestmod=sha1,
         ).hexdigest()
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index e7ac01da01..d9c4af9389 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -28,7 +28,7 @@ import os
 
 import logging
 import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index bb79599379..9800ce7581 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -47,7 +47,7 @@ import shutil
 
 import cgi
 import logging
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 0fc21540c6..9290d7946f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -35,7 +35,7 @@ from ._base import FileInfo
 from synapse.api.errors import (
     SynapseError, Codes,
 )
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
@@ -144,7 +144,8 @@ class PreviewUrlResource(Resource):
         observable = self._cache.get(url)
 
         if not observable:
-            download = preserve_fn(self._do_preview)(
+            download = run_in_background(
+                self._do_preview,
                 url, requester.user, ts,
             )
             observable = ObservableDeferred(
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index c188192f2b..0252afd9d3 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -18,7 +18,7 @@ from twisted.internet import defer, threads
 from .media_storage import FileResponder
 
 from synapse.config._base import Config
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 import logging
 import os
@@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider):
             return self.backend.store_file(path, file_info)
         else:
             # TODO: Handle errors.
-            preserve_fn(self.backend.store_file)(path, file_info)
+            def store():
+                try:
+                    return self.backend.store_file(path, file_info)
+                except Exception:
+                    logger.exception("Error storing file")
+            run_in_background(store)
             return defer.succeed(None)
 
     def fetch(self, path, file_info):
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index f6f498cdc5..a31e75cb46 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -81,15 +81,15 @@ class UploadResource(Resource):
         headers = request.requestHeaders
 
         if headers.hasHeader("Content-Type"):
-            media_type = headers.getRawHeaders("Content-Type")[0]
+            media_type = headers.getRawHeaders(b"Content-Type")[0]
         else:
             raise SynapseError(
                 msg="Upload request missing 'Content-Type'",
                 code=400,
             )
 
-        # if headers.hasHeader("Content-Disposition"):
-        #     disposition = headers.getRawHeaders("Content-Disposition")[0]
+        # if headers.hasHeader(b"Content-Disposition"):
+        #     disposition = headers.getRawHeaders(b"Content-Disposition")[0]
         # TODO(markjh): parse content-dispostion
 
         content_uri = yield self.media_repo.create_content(
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d300..8fbf7ffba7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import random
 
 from twisted.internet import defer
 
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
 import logging
-from Queue import PriorityQueue, Empty
+from six.moves.queue import PriorityQueue, Empty
+
+from six.moves import range
 
 
 logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             front_list = list(front)
             chunks = [
                 front_list[x:x + 100]
-                for x in xrange(0, len(front), 100)
+                for x in range(0, len(front), 100)
             ]
             for chunk in chunks:
                 txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             retcol="event_id",
         )
 
+    @defer.inlineCallbacks
+    def get_prev_events_for_room(self, room_id):
+        """
+        Gets a subset of the current forward extremities in the given room.
+
+        Limits the result to 10 extremities, so that we can avoid creating
+        events which refer to hundreds of prev_events.
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+        res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+        if len(res) > 10:
+            # Sort by reverse depth, so we point to the most recent.
+            res.sort(key=lambda a: -a[2])
+
+            # we use half of the limit for the actual most recent events, and
+            # the other half to randomly point to some of the older events, to
+            # make sure that we don't completely ignore the older events.
+            res = res[0:5] + random.sample(res[5:], 5)
+
+        defer.returnValue(res)
+
     def get_latest_event_ids_and_hashes_in_room(self, room_id):
+        """
+        Gets the current forward extremities in the given room
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+
         return self.runInteraction(
             "get_latest_event_ids_and_hashes_in_room",
             self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             room_id,
         )
 
-    @defer.inlineCallbacks
-    def get_max_depth_of_events(self, event_ids):
-        sql = (
-            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
-        ) % (",".join(["?"] * len(event_ids)),)
-
-        rows = yield self._execute(
-            "get_max_depth_of_events", None,
-            sql, *event_ids
-        )
-
-        if rows:
-            defer.returnValue(rows[0][0])
-        else:
-            defer.returnValue(1)
-
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e78f8d0114..c22762eb5c 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
+    @defer.inlineCallbacks
     def remove_push_actions_from_staging(self, event_id):
         """Called if we failed to persist the event to ensure that stale push
         actions don't build up in the DB
@@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             event_id (str)
         """
 
-        return self._simple_delete(
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event_id,
-            },
-            desc="remove_push_actions_from_staging",
-        )
+        try:
+            res = yield self._simple_delete(
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                desc="remove_push_actions_from_staging",
+            )
+            defer.returnValue(res)
+        except Exception:
+            # this method is called from an exception handler, so propagating
+            # another exception here really isn't helpful - there's nothing
+            # the caller can do about it. Just log the exception and move on.
+            logger.exception(
+                "Error removing push actions after event persistence failure",
+            )
 
     @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index da44b52fd6..5fe4a0e56c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@
 
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
+import itertools
 import logging
 
 import simplejson as json
@@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
 
         defer.returnValue(set(r["event_id"] for r in rows))
 
-    def have_events(self, event_ids):
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
+        Args:
+            event_ids (iterable[str]):
+
         Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
         """
         if not event_ids:
             return defer.succeed({})
@@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
 
             return res
 
-        return self.runInteraction(
-            "have_events", f,
-        )
+        return self.runInteraction("get_rejection_reasons", f)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a937b9bceb..ba834854e1 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
 )
 from synapse.util.metrics import Measure
 from synapse.api.errors import SynapseError
@@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         res = yield make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self._get_event_from_row)(
+                run_in_background(
+                    self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
                 )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 6b557ca0cf..a50717db2d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,6 +22,8 @@ from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from six.moves import range
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     @cached()
@@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore,
                 match = regex.search(user_id)
                 if match:
                     found.add(int(match.group(1)))
-            for i in xrange(len(found) + 1):
+            for i in range(len(found) + 1):
                 if i not in found:
                     return i
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
             # Convert the IDs to MXC URIs
             for media_id in local_mxcs:
-                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
             for hostname, media_id in remote_mxcs:
                 remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
 
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         while next_token:
             sql = """
                 SELECT stream_ordering, json FROM events
-                JOIN event_json USING (event_id)
+                JOIN event_json USING (room_id, event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
-                        if hostname == self.hostname:
+                        if hostname == self.hs.hostname:
                             local_media_mxcs.append(media_id)
                         else:
                             remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index c53e53c94f..85bd1a2006 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -14,6 +14,8 @@
 import logging
 from synapse.config.appservice import load_appservices
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
             sql = (
                 "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
-                " JOIN event_json USING (event_id)"
+                " JOIN event_json USING (room_id, event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..f0784ba137 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,12 +41,14 @@ from synapse.storage.events import EventsWorkerStore
 from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import abc
 import logging
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -196,13 +198,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
             res = yield make_deferred_yieldable(defer.gatherResults([
-                preserve_fn(self.get_room_events_stream_for_room)(
+                run_in_background(
+                    self.get_room_events_stream_for_room,
                     room_id, from_key, to_key, limit, order=order,
                 )
                 for room_id in rm_ids
-            ]))
+            ], consumeErrors=True))
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 13bff9f055..6671d3cfca 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -22,6 +22,8 @@ from twisted.internet import defer
 import simplejson as json
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         batch_size = 50
         results = []
-        for i in xrange(0, len(tag_ids), batch_size):
+        for i in range(0, len(tag_ids), batch_size):
             tags = yield self.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 756d8ffa32..814a7bf71b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.errors import SynapseError
 from synapse.util.logcontext import PreserveLoggingContext
 
 from twisted.internet import defer, reactor, task
@@ -24,11 +23,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class DeferredTimedOutError(SynapseError):
-    def __init__(self):
-        super(DeferredTimedOutError, self).__init__(504, "Timed out")
-
-
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
     failure.trap(defer.FirstError)
@@ -85,53 +79,3 @@ class Clock(object):
         except Exception:
             if not ignore_errs:
                 raise
-
-    def time_bound_deferred(self, given_deferred, time_out):
-        if given_deferred.called:
-            return given_deferred
-
-        ret_deferred = defer.Deferred()
-
-        def timed_out_fn():
-            e = DeferredTimedOutError()
-
-            try:
-                ret_deferred.errback(e)
-            except Exception:
-                pass
-
-            try:
-                given_deferred.cancel()
-            except Exception:
-                pass
-
-        timer = None
-
-        def cancel(res):
-            try:
-                self.cancel_call_later(timer)
-            except Exception:
-                pass
-            return res
-
-        ret_deferred.addBoth(cancel)
-
-        def success(res):
-            try:
-                ret_deferred.callback(res)
-            except Exception:
-                pass
-
-            return res
-
-        def err(res):
-            try:
-                ret_deferred.errback(res)
-            except Exception:
-                pass
-
-        given_deferred.addCallbacks(callback=success, errback=err)
-
-        timer = self.call_later(time_out, timed_out_fn)
-
-        return ret_deferred
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0729bb2863..9dd4e6b5bc 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -15,9 +15,11 @@
 
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
 
 from .logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, preserve_fn
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background
 )
 from synapse.util import logcontext, unwrapFirstError
 
@@ -25,6 +27,8 @@ from contextlib import contextmanager
 
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -156,13 +160,13 @@ def concurrently_execute(func, args, limit):
     def _concurrently_execute_inner():
         try:
             while True:
-                yield func(it.next())
+                yield func(next(it))
         except StopIteration:
             pass
 
     return logcontext.make_deferred_yieldable(defer.gatherResults([
-        preserve_fn(_concurrently_execute_inner)()
-        for _ in xrange(limit)
+        run_in_background(_concurrently_execute_inner)
+        for _ in range(limit)
     ], consumeErrors=True)).addErrback(unwrapFirstError)
 
 
@@ -392,3 +396,68 @@ class ReadWriteLock(object):
                     self.key_to_current_writer.pop(key)
 
         defer.returnValue(_ctx_manager())
+
+
+class DeferredTimeoutError(Exception):
+    """
+    This error is raised by default when a L{Deferred} times out.
+    """
+
+
+def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+    """
+    Add a timeout to a deferred by scheduling it to be cancelled after
+    timeout seconds.
+
+    This is essentially a backport of deferred.addTimeout, which was introduced
+    in twisted 16.5.
+
+    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
+    unless a cancelable function was passed to its initialization or unless
+    a different on_timeout_cancel callable is provided.
+
+    Args:
+        deferred (defer.Deferred): deferred to be timed out
+        timeout (Number): seconds to time out after
+
+        on_timeout_cancel (callable): A callable which is called immediately
+            after the deferred times out, and not if this deferred is
+            otherwise cancelled before the timeout.
+
+            It takes an arbitrary value, which is the value of the deferred at
+            that exact point in time (probably a CancelledError Failure), and
+            the timeout.
+
+            The default callable (if none is provided) will translate a
+            CancelledError Failure into a DeferredTimeoutError.
+    """
+    timed_out = [False]
+
+    def time_it_out():
+        timed_out[0] = True
+        deferred.cancel()
+
+    delayed_call = reactor.callLater(timeout, time_it_out)
+
+    def convert_cancelled(value):
+        if timed_out[0]:
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
+        return value
+
+    deferred.addBoth(convert_cancelled)
+
+    def cancel_timeout(result):
+        # stop the pending call to cancel the deferred if it's been fired
+        if delayed_call.active():
+            delayed_call.cancel()
+        return result
+
+    deferred.addBoth(cancel_timeout)
+
+
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise DeferredTimeoutError(timeout, "Deferred")
+    return value
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -31,6 +37,7 @@ class ResponseCache(object):
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
         self._metrics = cache_metrics.register_cache(
             "response_cache",
             size_callback=lambda: self.size(),
@@ -43,15 +50,21 @@ class ResponseCache(object):
     def get(self, key):
         """Look up the given key.
 
-        Returns a deferred which doesn't follow the synapse logcontext rules,
-        so you'll probably want to make_deferred_yieldable it.
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
 
         Args:
-            key (str):
+            key (hashable):
 
         Returns:
-            twisted.internet.defer.Deferred|None: None if there is no entry
-            for this key; otherwise a deferred result.
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
         """
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -68,19 +81,17 @@ class ResponseCache(object):
         you should wrap normal synapse deferreds with
         logcontext.run_in_background).
 
-        Returns a new Deferred which also doesn't follow the synapse logcontext
-        rules, so you will want to make_deferred_yieldable it
-
-        (TODO: before using this more widely, it might make sense to refactor
-        it and get() so that they do the necessary wrapping rather than having
-        to do it everywhere ResponseCache is used.)
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
 
         Args:
-            key (str):
-            deferred (twisted.internet.defer.Deferred):
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
 
         Returns:
-            twisted.internet.defer.Deferred
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
         """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
@@ -97,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3380970e4e 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -15,9 +15,9 @@
 
 from twisted.internet import threads, reactor
 
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
-import Queue
+from six.moves import queue
 
 
 class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self._bytes_queue = Queue.Queue()
+        self._bytes_queue = queue.Queue()
 
         # Deferred that is resolved when finished writing
         self._finished_deferred = None
@@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
 
         self._producer = producer
         self.streaming = streaming
-        self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
+        self._finished_deferred = run_in_background(
+            threads.deferToThread, self._writer
+        )
         if not streaming:
             self._producer.resumeProducing()
 
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index d747849553..e9f0f292ee 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -40,9 +40,12 @@ def create_resource_tree(desired_tree, root_resource):
     # extra resources to existing nodes. See self._resource_id for the key.
     resource_mappings = {}
     for full_path, res in desired_tree.items():
+        # twisted requires all resources to be bytes
+        full_path = full_path.encode("utf-8")
+
         logger.info("Attaching %s to path %s", res, full_path)
         last_resource = root_resource
-        for path_seg in full_path.split('/')[1:-1]:
+        for path_seg in full_path.split(b'/')[1:-1]:
             if path_seg not in last_resource.listNames():
                 # resource doesn't exist, so make a "dummy resource"
                 child_resource = NoResource()
@@ -57,7 +60,7 @@ def create_resource_tree(desired_tree, root_resource):
 
         # ===========================
         # now attach the actual desired resource
-        last_path_seg = full_path.split('/')[-1]
+        last_path_seg = full_path.split(b'/')[-1]
 
         # if there is already a resource here, thieve its children and
         # replace it
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec785b..bbadeec922 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object):
 
         def __nonzero__(self):
             return False
+        __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()
 
@@ -304,7 +305,12 @@ def run_in_background(f, *args, **kwargs):
     deferred returned by the funtion completes.
 
     Useful for wrapping functions that return a deferred which you don't yield
-    on.
+    on (for instance because you want to pass it to deferred.gatherResults()).
+
+    Note that if you completely discard the result, you should make sure that
+    `f` doesn't raise any deferred exceptions, otherwise a scary-looking
+    CRITICAL error about an unhandled error will be logged without much
+    indication about where it came from.
     """
     current = LoggingContext.current_context()
     res = f(*args, **kwargs)
@@ -340,7 +346,7 @@ def make_deferred_yieldable(deferred):
     returning a deferred. Then, when the deferred completes, restores the
     current logcontext before running callbacks/errbacks.
 
-    (This is more-or-less the opposite operation to preserve_fn.)
+    (This is more-or-less the opposite operation to run_in_background.)
     """
     if isinstance(deferred, defer.Deferred) and not deferred.called:
         prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
index cdbc4bffd7..59ab3c6968 100644
--- a/synapse/util/logformatter.py
+++ b/synapse/util/logformatter.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 
-import StringIO
+from six import StringIO
 import logging
 import traceback
 
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 1101881a2d..18424f6c36 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import LimitExceededError
 
 from synapse.util.async import sleep
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 import collections
 import contextlib
@@ -150,7 +150,7 @@ class _PerHostRatelimiter(object):
                 "Ratelimit [%s]: sleeping req",
                 id(request_id),
             )
-            ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
+            ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
 
             self.sleeping_requests.add(request_id)
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 47b0bb5eb3..4e93f69d3a 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
                 )
             except Exception:
                 logger.exception(
-                    "Failed to store set_destination_retry_timings",
+                    "Failed to store destination_retry_timings",
                 )
 
         # we deliberately do this in the background.
-        synapse.util.logcontext.preserve_fn(store_retry_timings)()
+        synapse.util.logcontext.run_in_background(store_retry_timings)
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 95a6168e16..b98b9dc6e4 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -15,6 +15,7 @@
 
 import random
 import string
+from six.moves import range
 
 _string_with_symbols = (
     string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
@@ -22,12 +23,12 @@ _string_with_symbols = (
 
 
 def random_string(length):
-    return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
+    return ''.join(random.choice(string.ascii_letters) for _ in range(length))
 
 
 def random_string_with_symbols(length):
     return ''.join(
-        random.choice(_string_with_symbols) for _ in xrange(length)
+        random.choice(_string_with_symbols) for _ in range(length)
     )
 
 
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index b70f9a6b0a..7a9e45aca9 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six.moves import range
+
 
 class _Entry(object):
     __slots__ = ["end_key", "queue"]
@@ -68,7 +70,7 @@ class WheelTimer(object):
         # Add empty entries between the end of the current list and when we want
         # to insert. This ensures there are no gaps.
         self.entries.extend(
-            _Entry(key) for key in xrange(last_key, then_key + 1)
+            _Entry(key) for key in range(last_key, then_key + 1)
         )
 
         self.entries[-1].queue.append(obj)