summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml18
-rw-r--r--CHANGES.rst55
-rw-r--r--README.rst3
-rw-r--r--contrib/README.rst10
-rw-r--r--contrib/systemd/synapse.service4
-rwxr-xr-xjenkins/prepare_synapse.sh17
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/app/appservice.py15
-rw-r--r--synapse/app/federation_sender.py31
-rw-r--r--synapse/app/pusher.py35
-rw-r--r--synapse/app/synchrotron.py104
-rw-r--r--synapse/app/user_dir.py13
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/appservice/scheduler.py37
-rw-r--r--synapse/config/_base.py6
-rw-r--r--synapse/config/appservice.py4
-rw-r--r--synapse/config/logger.py2
-rw-r--r--synapse/config/tls.py4
-rw-r--r--synapse/crypto/keyring.py129
-rw-r--r--synapse/federation/federation_client.py11
-rw-r--r--synapse/federation/federation_server.py55
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py2
-rw-r--r--synapse/federation/transport/server.py13
-rw-r--r--synapse/groups/attestations.py48
-rw-r--r--synapse/handlers/appservice.py19
-rw-r--r--synapse/handlers/e2e_keys.py10
-rw-r--r--synapse/handlers/federation.py141
-rw-r--r--synapse/handlers/initial_sync.py12
-rw-r--r--synapse/handlers/message.py128
-rw-r--r--synapse/handlers/presence.py19
-rw-r--r--synapse/handlers/receipts.py61
-rw-r--r--synapse/handlers/room_list.py42
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/handlers/typing.py50
-rw-r--r--synapse/http/__init__.py22
-rw-r--r--synapse/http/client.py24
-rw-r--r--synapse/http/endpoint.py2
-rw-r--r--synapse/http/matrixfederationclient.py41
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/notifier.py43
-rw-r--r--synapse/push/emailpusher.py11
-rw-r--r--synapse/push/httppusher.py9
-rw-r--r--synapse/push/pusher.py2
-rw-r--r--synapse/push/pusherpool.py28
-rw-r--r--synapse/python_dependencies.py20
-rw-r--r--synapse/replication/http/send_event.py18
-rw-r--r--synapse/replication/tcp/protocol.py4
-rw-r--r--synapse/replication/tcp/resource.py4
-rw-r--r--synapse/rest/client/v1/login.py2
-rw-r--r--synapse/rest/client/v1/register.py16
-rw-r--r--synapse/rest/client/v1/room.py9
-rw-r--r--synapse/rest/client/v2_alpha/register.py10
-rw-r--r--synapse/rest/media/v1/_base.py2
-rw-r--r--synapse/rest/media/v1/media_repository.py2
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py5
-rw-r--r--synapse/rest/media/v1/storage_provider.py9
-rw-r--r--synapse/rest/media/v1/upload_resource.py6
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/event_push_actions.py24
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/events_worker.py5
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/stream.py11
-rw-r--r--synapse/storage/tags.py4
-rw-r--r--synapse/util/__init__.py56
-rw-r--r--synapse/util/async.py77
-rw-r--r--synapse/util/caches/response_cache.py88
-rw-r--r--synapse/util/file_consumer.py10
-rw-r--r--synapse/util/httpresourcetree.py7
-rw-r--r--synapse/util/logcontext.py10
-rw-r--r--synapse/util/logformatter.py2
-rw-r--r--synapse/util/ratelimitutils.py4
-rw-r--r--synapse/util/retryutils.py4
-rw-r--r--synapse/util/stringutils.py5
-rw-r--r--synapse/util/wheel_timer.py4
-rw-r--r--tests/config/test_load.py2
-rw-r--r--tests/crypto/test_keyring.py2
-rw-r--r--tests/rest/client/v1/test_rooms.py14
-rw-r--r--tests/storage/test_appservice.py12
-rw-r--r--tests/storage/test_event_federation.py68
-rw-r--r--tests/test_distributor.py2
-rw-r--r--tests/util/test_clock.py33
-rw-r--r--tests/util/test_file_consumer.py2
-rw-r--r--tests/util/test_linearizer.py3
-rw-r--r--tests/utils.py9
-rw-r--r--tox.ini10
92 files changed, 1305 insertions, 743 deletions
diff --git a/.travis.yml b/.travis.yml
index 3ce93cb434..e6ba6f4752 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,14 +1,22 @@
 sudo: false
 language: python
-python: 2.7
 
 # tell travis to cache ~/.cache/pip
 cache: pip
 
-env:
-  - TOX_ENV=packaging
-  - TOX_ENV=pep8
-  - TOX_ENV=py27
+matrix:
+  include:
+  - python: 2.7
+    env: TOX_ENV=packaging
+
+  - python: 2.7
+    env: TOX_ENV=pep8
+
+  - python: 2.7
+    env: TOX_ENV=py27
+    
+  - python: 3.6
+    env: TOX_ENV=py36
 
 install:
   - pip install tox
diff --git a/CHANGES.rst b/CHANGES.rst
index 19b7af606b..40d13c6484 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,53 @@
+Changes in synapse v0.28.0 (2018-04-26)
+===========================================
+
+Bug Fixes:
+
+* Fix quarantine media admin API and search reindex (PR #3130)
+* Fix media admin APIs (PR #3134)
+
+
+Changes in synapse v0.28.0-rc1 (2018-04-24)
+===========================================
+
+Minor performance improvement to federation sending and bug fixes.
+
+(Note: This release does not include the delta state resolution implementation discussed in matrix live)
+
+
+Features:
+
+* Add metrics for event processing lag (PR #3090)
+* Add metrics for ResponseCache (PR #3092)
+
+Changes:
+
+* Synapse on PyPy (PR #2760) Thanks to @Valodim!
+* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
+* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
+* Document the behaviour of ResponseCache (PR #3059)
+* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
+* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
+* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
+* Send federation events concurrently (PR #3078)
+* Limit concurrent event sends for a room (PR #3079)
+* Improve R30 stat definition (PR #3086)
+* Send events to ASes concurrently (PR #3088)
+* Refactor ResponseCache usage (PR #3093)
+* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
+* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
+* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
+* Refactor store.have_events (PR #3117)
+
+Bug Fixes:
+
+* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
+* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
+* fix federation_domain_whitelist (PR #3099)
+* Avoid creating events with huge numbers of prev_events (PR #3113)
+* Reject events which have lots of prev_events (PR #3118)
+
+
 Changes in synapse v0.27.4 (2018-04-13)
 ======================================
 
@@ -22,10 +72,10 @@ the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
 Changes in synapse v0.27.3-rc1 (2018-04-09)
 =======================================
 
-Notable changes include API support for joinability of groups. Also new metrics 
+Notable changes include API support for joinability of groups. Also new metrics
 and phone home stats. Phone home stats include better visibility of system usage
 so we can tweak synpase to work better for all users rather than our own experience
-with matrix.org. Also, recording 'r30' stat which is the measure we use to track 
+with matrix.org. Also, recording 'r30' stat which is the measure we use to track
 overal growth of the Matrix ecosystem. It is defined as:-
 
 Counts the number of native 30 day retained users, defined as:-
@@ -61,7 +111,6 @@ Bug fixes:
 
 * Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
 * Fix replication after switch to simplejson (PR #3015)
-* Fix replication after switch to simplejson (PR #3015)
 * 404 correctly on missing paths via NoResource (PR #3022)
 * Fix error when claiming e2e keys from offline servers (PR #3034)
 * fix tests/storage/test_user_directory.py (PR #3042)
diff --git a/README.rst b/README.rst
index 8812cc1b4f..28fbe45de6 100644
--- a/README.rst
+++ b/README.rst
@@ -614,6 +614,9 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
     $ dig -t srv _matrix._tcp.example.com
     _matrix._tcp.example.com. 3600    IN      SRV     10 0 8448 synapse.example.com.
 
+Note that the server hostname cannot be an alias (CNAME record): it has to point
+directly to the server hosting the synapse instance.
+
 You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
 its user-ids, by setting ``server_name``::
 
diff --git a/contrib/README.rst b/contrib/README.rst
new file mode 100644
index 0000000000..c296c55628
--- /dev/null
+++ b/contrib/README.rst
@@ -0,0 +1,10 @@
+Community Contributions
+=======================
+
+Everything in this directory are projects submitted by the community that may be useful
+to others. As such, the project maintainers cannot guarantee support, stability
+or backwards compatibility of these projects. 
+
+Files in this directory should *not* be relied on directly, as they may not
+continue to work or exist in future. If you wish to use any of these files then
+they should be copied to avoid them breaking from underneath you.
diff --git a/contrib/systemd/synapse.service b/contrib/systemd/synapse.service
index 3f037055b9..b81ce3915d 100644
--- a/contrib/systemd/synapse.service
+++ b/contrib/systemd/synapse.service
@@ -2,6 +2,9 @@
 # (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
 # rather than in a user home directory or similar under virtualenv.
 
+# **NOTE:** This is an example service file that may change in the future. If you
+# wish to use this please copy rather than symlink it.
+
 [Unit]
 Description=Synapse Matrix homeserver
 
@@ -12,6 +15,7 @@ Group=synapse
 WorkingDirectory=/var/lib/synapse
 ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
 ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
+# EnvironmentFile=-/etc/sysconfig/synapse  # Can be used to e.g. set SYNAPSE_CACHE_FACTOR 
 
 [Install]
 WantedBy=multi-user.target
diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh
index ffcb1cfab9..a30179f2aa 100755
--- a/jenkins/prepare_synapse.sh
+++ b/jenkins/prepare_synapse.sh
@@ -1,5 +1,7 @@
 #! /bin/bash
 
+set -eux
+
 cd "`dirname $0`/.."
 
 TOX_DIR=$WORKSPACE/.tox
@@ -14,7 +16,20 @@ fi
 tox -e py27 --notest -v
 
 TOX_BIN=$TOX_DIR/py27/bin
-$TOX_BIN/pip install setuptools
+
+# cryptography 2.2 requires setuptools >= 18.5.
+#
+# older versions of virtualenv (?) give us a virtualenv with the same version
+# of setuptools as is installed on the system python (and tox runs virtualenv
+# under python3, so we get the version of setuptools that is installed on that).
+#
+# anyway, make sure that we have a recent enough setuptools.
+$TOX_BIN/pip install 'setuptools>=18.5'
+
+# we also need a semi-recent version of pip, because old ones fail to install
+# the "enum34" dependency of cryptography.
+$TOX_BIN/pip install 'pip>=10'
+
 { python synapse/python_dependencies.py
   echo lxml psycopg2
 } | xargs $TOX_BIN/pip install
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 0c1c16b9a4..4924f44d4e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.27.4"
+__version__ = "0.28.0"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index bee59e80dd..a9ff5576f3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -18,6 +18,7 @@
 import logging
 
 import simplejson as json
+from six import iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
         A dict representing the error response JSON.
     """
     err = {"error": msg, "errcode": code}
-    for key, value in kwargs.iteritems():
+    for key, value in iteritems(kwargs):
         err[key] = value
     return err
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index f2540023a7..58f2c9d68c 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.appservice")
@@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
-            preserve_fn(
-                self.appservice_handler.notify_interested_services
-            )(max_stream_id)
+            run_in_background(self._notify_app_services, max_stream_id)
+
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
 
 
 def start(config_options):
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cc3331519..a08af83a4c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
@@ -229,7 +229,7 @@ class FederationSenderHandler(object):
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            preserve_fn(self.update_token)(token)
+            run_in_background(self.update_token, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -237,19 +237,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index d5c3a85195..26930d1b3b 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,7 +33,7 @@ from synapse.server import HomeServer
 from synapse.storage import DataStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
@@ -140,24 +140,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        preserve_fn(self.poke_pushers)(stream_name, token, rows)
+        run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
-        if stream_name == "pushers":
-            for row in rows:
-                if row.deleted:
-                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
-                else:
-                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
-        elif stream_name == "events":
-            yield self.pusher_pool.on_new_notifications(
-                token, token,
-            )
-        elif stream_name == "receipts":
-            yield self.pusher_pool.on_new_receipts(
-                token, token, set(row.room_id for row in rows)
-            )
+        try:
+            if stream_name == "pushers":
+                for row in rows:
+                    if row.deleted:
+                        yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    else:
+                        yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+            elif stream_name == "events":
+                yield self.pusher_pool.on_new_notifications(
+                    token, token,
+                )
+            elif stream_name == "receipts":
+                yield self.pusher_pool.on_new_receipts(
+                    token, token, set(row.room_id for row in rows)
+                )
+        except Exception:
+            logger.exception("Error poking pushers")
 
     def stop_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 508b66613d..7152b1deb4 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -51,13 +51,15 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
 from twisted.web.resource import NoResource
 
+from six import iteritems
+
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
 
     def get_currently_syncing_users(self):
         return [
-            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
             if count > 0
         ]
 
@@ -325,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
-
-        preserve_fn(self.process_and_notify)(stream_name, token, rows)
+        run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -338,55 +339,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     @defer.inlineCallbacks
     def process_and_notify(self, stream_name, token, rows):
-        if stream_name == "events":
-            # We shouldn't get multiple rows per token for events stream, so
-            # we don't need to optimise this for multiple rows.
-            for row in rows:
-                event = yield self.store.get_event(row.event_id)
-                extra_users = ()
-                if event.type == EventTypes.Member:
-                    extra_users = (event.state_key,)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(
-                    event, token, max_token, extra_users
+        try:
+            if stream_name == "events":
+                # We shouldn't get multiple rows per token for events stream, so
+                # we don't need to optimise this for multiple rows.
+                for row in rows:
+                    event = yield self.store.get_event(row.event_id)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    max_token = self.store.get_room_max_stream_ordering()
+                    self.notifier.on_new_room_event(
+                        event, token, max_token, extra_users
+                    )
+            elif stream_name == "push_rules":
+                self.notifier.on_new_event(
+                    "push_rules_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "push_rules":
-            self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name in ("account_data", "tag_account_data",):
-            self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "typing":
-            self.typing_handler.process_replication_rows(token, rows)
-            self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "to_device":
-            entities = [row.entity for row in rows if row.entity.startswith("@")]
-            if entities:
+            elif stream_name in ("account_data", "tag_account_data",):
                 self.notifier.on_new_event(
-                    "to_device_key", token, users=entities,
+                    "account_data_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "device_lists":
-            all_room_ids = set()
-            for row in rows:
-                room_ids = yield self.store.get_rooms_for_user(row.user_id)
-                all_room_ids.update(room_ids)
-            self.notifier.on_new_event(
-                "device_list_key", token, rooms=all_room_ids,
-            )
-        elif stream_name == "presence":
-            yield self.presence_handler.process_replication_rows(token, rows)
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "groups_key", token, users=[row.user_id for row in rows],
-            )
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "typing":
+                self.typing_handler.process_replication_rows(token, rows)
+                self.notifier.on_new_event(
+                    "typing_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "to_device":
+                entities = [row.entity for row in rows if row.entity.startswith("@")]
+                if entities:
+                    self.notifier.on_new_event(
+                        "to_device_key", token, users=entities,
+                    )
+            elif stream_name == "device_lists":
+                all_room_ids = set()
+                for row in rows:
+                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    all_room_ids.update(room_ids)
+                self.notifier.on_new_event(
+                    "device_list_key", token, rooms=all_room_ids,
+                )
+            elif stream_name == "presence":
+                yield self.presence_handler.process_replication_rows(token, rows)
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "groups_key", token, users=[row.user_id for row in rows],
+                )
+        except Exception:
+            logger.exception("Error processing replication")
 
 
 def start(config_options):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 5f845e80d1..5ba7e9b416 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.user_dir")
@@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
-            preserve_fn(self.user_directory.notify_new_event)()
+            run_in_background(self._notify_directory)
+
+    @defer.inlineCallbacks
+    def _notify_directory(self):
+        try:
+            yield self.user_directory.notify_new_event()
+        except Exception:
+            logger.exception("Error notifiying user directory of state update")
 
 
 def start(config_options):
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 11e9c37c63..00efff1464 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        result = self.protocol_meta_cache.get(key)
-        if not result:
-            result = self.protocol_meta_cache.set(
-                key, preserve_fn(_get)()
-            )
-        return make_deferred_yieldable(result)
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6da315473d..6eddbc0828 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -51,7 +51,7 @@ components.
 from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
 import logging
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
     def enqueue(self, service, event):
         # if this service isn't being sent something
         self.queued_events.setdefault(service.id, []).append(event)
-        preserve_fn(self._send_request)(service)
+        run_in_background(self._send_request, service)
 
     @defer.inlineCallbacks
     def _send_request(self, service):
@@ -152,10 +152,10 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    preserve_fn(self._start_recoverer)(service)
-        except Exception as e:
-            logger.exception(e)
-            preserve_fn(self._start_recoverer)(service)
+                    run_in_background(self._start_recoverer, service)
+        except Exception:
+            logger.exception("Error creating appservice transaction")
+            run_in_background(self._start_recoverer, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
@@ -176,17 +176,20 @@ class _TransactionController(object):
 
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
-        yield self.store.set_appservice_state(
-            service,
-            ApplicationServiceState.DOWN
-        )
-        logger.info(
-            "Application service falling behind. Starting recoverer. AS ID %s",
-            service.id
-        )
-        recoverer = self.recoverer_fn(service, self.on_recovered)
-        self.add_recoverers([recoverer])
-        recoverer.recover()
+        try:
+            yield self.store.set_appservice_state(
+                service,
+                ApplicationServiceState.DOWN
+            )
+            logger.info(
+                "Application service falling behind. Starting recoverer. AS ID %s",
+                service.id
+            )
+            recoverer = self.recoverer_fn(service, self.on_recovered)
+            self.add_recoverers([recoverer])
+            recoverer.recover()
+        except Exception:
+            logger.exception("Error starting AS recoverer")
 
     @defer.inlineCallbacks
     def _is_service_up(self, service):
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 32b439d20a..b748ed2b0a 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -281,15 +281,15 @@ class Config(object):
                     )
                 if not cls.path_exists(config_dir_path):
                     os.makedirs(config_dir_path)
-                with open(config_path, "wb") as config_file:
-                    config_bytes, config = obj.generate_config(
+                with open(config_path, "w") as config_file:
+                    config_str, config = obj.generate_config(
                         config_dir_path=config_dir_path,
                         server_name=server_name,
                         report_stats=(config_args.report_stats == "yes"),
                         is_generating_file=True
                     )
                     obj.invoke_all("generate_files", config)
-                    config_file.write(config_bytes)
+                    config_file.write(config_str)
                 print((
                     "A config file has been generated in %r for server name"
                     " %r with corresponding SSL keys and self-signed"
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 9a2359b6fd..277305e184 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -17,11 +17,11 @@ from ._base import Config, ConfigError
 from synapse.appservice import ApplicationService
 from synapse.types import UserID
 
-import urllib
 import yaml
 import logging
 
 from six import string_types
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
@@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename):
         )
 
     localpart = as_info["sender_localpart"]
-    if urllib.quote(localpart) != localpart:
+    if urlparse.quote(localpart) != localpart:
         raise ValueError(
             "sender_localpart needs characters which are not URL encoded."
         )
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 3f70039acd..6a7228dc2f 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -117,7 +117,7 @@ class LoggingConfig(Config):
         log_config = config.get("log_config")
         if log_config and not os.path.exists(log_config):
             log_file = self.abspath("homeserver.log")
-            with open(log_config, "wb") as log_config_file:
+            with open(log_config, "w") as log_config_file:
                 log_config_file.write(
                     DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
                 )
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 29eb012ddb..b66154bc7c 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -133,7 +133,7 @@ class TlsConfig(Config):
         tls_dh_params_path = config["tls_dh_params_path"]
 
         if not self.path_exists(tls_private_key_path):
-            with open(tls_private_key_path, "w") as private_key_file:
+            with open(tls_private_key_path, "wb") as private_key_file:
                 tls_private_key = crypto.PKey()
                 tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
                 private_key_pem = crypto.dump_privatekey(
@@ -148,7 +148,7 @@ class TlsConfig(Config):
                 )
 
         if not self.path_exists(tls_certificate_path):
-            with open(tls_certificate_path, "w") as certificate_file:
+            with open(tls_certificate_path, "wb") as certificate_file:
                 cert = crypto.X509()
                 subject = cert.get_subject()
                 subject.CN = config["server_name"]
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 35f810b07b..22ee0fc93f 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.logcontext import (
     PreserveLoggingContext,
-    preserve_fn
+    preserve_fn,
+    run_in_background,
 )
 from synapse.util.metrics import Measure
 
@@ -127,7 +128,7 @@ class Keyring(object):
 
             verify_requests.append(verify_request)
 
-        preserve_fn(self._start_key_lookups)(verify_requests)
+        run_in_background(self._start_key_lookups, verify_requests)
 
         # Pass those keys to handle_key_deferred so that the json object
         # signatures can be verified
@@ -146,53 +147,56 @@ class Keyring(object):
             verify_requests (List[VerifyKeyRequest]):
         """
 
-        # create a deferred for each server we're going to look up the keys
-        # for; we'll resolve them once we have completed our lookups.
-        # These will be passed into wait_for_previous_lookups to block
-        # any other lookups until we have finished.
-        # The deferreds are called with no logcontext.
-        server_to_deferred = {
-            rq.server_name: defer.Deferred()
-            for rq in verify_requests
-        }
-
-        # We want to wait for any previous lookups to complete before
-        # proceeding.
-        yield self.wait_for_previous_lookups(
-            [rq.server_name for rq in verify_requests],
-            server_to_deferred,
-        )
-
-        # Actually start fetching keys.
-        self._get_server_verify_keys(verify_requests)
-
-        # When we've finished fetching all the keys for a given server_name,
-        # resolve the deferred passed to `wait_for_previous_lookups` so that
-        # any lookups waiting will proceed.
-        #
-        # map from server name to a set of request ids
-        server_to_request_ids = {}
-
-        for verify_request in verify_requests:
-            server_name = verify_request.server_name
-            request_id = id(verify_request)
-            server_to_request_ids.setdefault(server_name, set()).add(request_id)
-
-        def remove_deferreds(res, verify_request):
-            server_name = verify_request.server_name
-            request_id = id(verify_request)
-            server_to_request_ids[server_name].discard(request_id)
-            if not server_to_request_ids[server_name]:
-                d = server_to_deferred.pop(server_name, None)
-                if d:
-                    d.callback(None)
-            return res
-
-        for verify_request in verify_requests:
-            verify_request.deferred.addBoth(
-                remove_deferreds, verify_request,
+        try:
+            # create a deferred for each server we're going to look up the keys
+            # for; we'll resolve them once we have completed our lookups.
+            # These will be passed into wait_for_previous_lookups to block
+            # any other lookups until we have finished.
+            # The deferreds are called with no logcontext.
+            server_to_deferred = {
+                rq.server_name: defer.Deferred()
+                for rq in verify_requests
+            }
+
+            # We want to wait for any previous lookups to complete before
+            # proceeding.
+            yield self.wait_for_previous_lookups(
+                [rq.server_name for rq in verify_requests],
+                server_to_deferred,
             )
 
+            # Actually start fetching keys.
+            self._get_server_verify_keys(verify_requests)
+
+            # When we've finished fetching all the keys for a given server_name,
+            # resolve the deferred passed to `wait_for_previous_lookups` so that
+            # any lookups waiting will proceed.
+            #
+            # map from server name to a set of request ids
+            server_to_request_ids = {}
+
+            for verify_request in verify_requests:
+                server_name = verify_request.server_name
+                request_id = id(verify_request)
+                server_to_request_ids.setdefault(server_name, set()).add(request_id)
+
+            def remove_deferreds(res, verify_request):
+                server_name = verify_request.server_name
+                request_id = id(verify_request)
+                server_to_request_ids[server_name].discard(request_id)
+                if not server_to_request_ids[server_name]:
+                    d = server_to_deferred.pop(server_name, None)
+                    if d:
+                        d.callback(None)
+                return res
+
+            for verify_request in verify_requests:
+                verify_request.deferred.addBoth(
+                    remove_deferreds, verify_request,
+                )
+        except Exception:
+            logger.exception("Error starting key lookups")
+
     @defer.inlineCallbacks
     def wait_for_previous_lookups(self, server_names, server_to_deferred):
         """Waits for any previous key lookups for the given servers to finish.
@@ -313,7 +317,7 @@ class Keyring(object):
                     if not verify_request.deferred.called:
                         verify_request.deferred.errback(err)
 
-        preserve_fn(do_iterations)().addErrback(on_err)
+        run_in_background(do_iterations).addErrback(on_err)
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
@@ -329,8 +333,9 @@ class Keyring(object):
         """
         res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.get_server_verify_keys)(
-                    server_name, key_ids
+                run_in_background(
+                    self.store.get_server_verify_keys,
+                    server_name, key_ids,
                 ).addCallback(lambda ks, server: (server, ks), server_name)
                 for server_name, key_ids in server_name_and_key_ids
             ],
@@ -352,13 +357,13 @@ class Keyring(object):
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
                 defer.returnValue({})
 
         results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(p_name, p_keys)
+                run_in_background(get_key, p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
@@ -384,7 +389,7 @@ class Keyring(object):
                 logger.info(
                     "Unable to get key %r for %r directly: %s %s",
                     key_ids, server_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
 
             if not keys:
@@ -398,7 +403,7 @@ class Keyring(object):
 
         results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(server_name, key_ids)
+                run_in_background(get_key, server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
@@ -481,7 +486,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=server_name,
                     from_server=perspective_name,
                     verify_keys=response_keys,
@@ -539,7 +545,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=key_server_name,
                     from_server=server_name,
                     verify_keys=verify_keys,
@@ -615,7 +622,8 @@ class Keyring(object):
 
         yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_keys_json)(
+                run_in_background(
+                    self.store.store_server_keys_json,
                     server_name=server_name,
                     key_id=key_id,
                     from_server=server_name,
@@ -716,7 +724,8 @@ class Keyring(object):
         # TODO(markjh): Store whether the keys have expired.
         return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_verify_key)(
+                run_in_background(
+                    self.store.store_server_verify_key,
                     server_name, server_name, key.time_added, key
                 )
                 for key_id, key in verify_keys.items()
@@ -734,7 +743,7 @@ def _handle_key_deferred(verify_request):
     except IOError as e:
         logger.warn(
             "Got IOError when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             502,
@@ -744,7 +753,7 @@ def _handle_key_deferred(verify_request):
     except Exception as e:
         logger.exception(
             "Got Exception when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             401,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..6163f7c466 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,8 @@ import itertools
 import logging
 import random
 
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
@@ -33,7 +35,7 @@ from synapse.federation.federation_base import (
 import synapse.metrics
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -394,7 +396,7 @@ class FederationClient(FederationBase):
             seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
             signed_events = seen_events.values()
         else:
-            seen_events = yield self.store.have_events(event_ids)
+            seen_events = yield self.store.have_seen_events(event_ids)
             signed_events = []
 
         failed_to_fetch = set()
@@ -413,11 +415,12 @@ class FederationClient(FederationBase):
 
         batch_size = 20
         missing_events = list(missing_events)
-        for i in xrange(0, len(missing_events), batch_size):
+        for i in range(0, len(missing_events), batch_size):
             batch = set(missing_events[i:i + batch_size])
 
             deferreds = [
-                preserve_fn(self.get_pdu)(
+                run_in_background(
+                    self.get_pdu,
                     destinations=random_server_list(),
                     event_id=e_id,
                 )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e4ce037acf..247ddc89d5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -30,9 +31,10 @@ import synapse.metrics
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
 
+from six import iteritems
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -212,16 +214,17 @@ class FederationServer(FederationBase):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        result = self._state_resp_cache.get((room_id, event_id))
-        if not result:
-            with (yield self._server_linearizer.queue((origin, room_id))):
-                d = self._state_resp_cache.set(
-                    (room_id, event_id),
-                    preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
-                )
-                resp = yield make_deferred_yieldable(d)
-        else:
-            resp = yield make_deferred_yieldable(result)
+        # we grab the linearizer to protect ourselves from servers which hammer
+        # us. In theory we might already have the response to this query
+        # in the cache so we could return it without waiting for the linearizer
+        # - but that's non-trivial to get right, and anyway somewhat defeats
+        # the point of the linearizer.
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            resp = yield self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id, event_id,
+            )
 
         defer.returnValue((200, resp))
 
@@ -425,9 +428,9 @@ class FederationServer(FederationBase):
             "Claimed one-time-keys: %s",
             ",".join((
                 "%s for %s:%s" % (key_id, user_id, device_id)
-                for user_id, user_keys in json_result.iteritems()
-                for device_id, device_keys in user_keys.iteritems()
-                for key_id, _ in device_keys.iteritems()
+                for user_id, user_keys in iteritems(json_result)
+                for device_id, device_keys in iteritems(user_keys)
+                for key_id, _ in iteritems(device_keys)
             )),
         )
 
@@ -494,13 +497,33 @@ class FederationServer(FederationBase):
     def _handle_received_pdu(self, origin, pdu):
         """ Process a PDU received in a federation /send/ transaction.
 
+        If the event is invalid, then this method throws a FederationError.
+        (The error will then be logged and sent back to the sender (which
+        probably won't do anything with it), and other events in the
+        transaction will be processed as normal).
+
+        It is likely that we'll then receive other events which refer to
+        this rejected_event in their prev_events, etc.  When that happens,
+        we'll attempt to fetch the rejected event again, which will presumably
+        fail, so those second-generation events will also get rejected.
+
+        Eventually, we get to the point where there are more than 10 events
+        between any new events and the original rejected event. Since we
+        only try to backfill 10 events deep on received pdu, we then accept the
+        new event, possibly introducing a discontinuity in the DAG, with new
+        forward extremities, so normal service is approximately returned,
+        until we try to backfill across the discontinuity.
+
         Args:
             origin (str): server which sent the pdu
             pdu (FrozenEvent): received pdu
 
         Returns (Deferred): completes with None
-        Raises: FederationError if the signatures / hash do not match
-    """
+
+        Raises: FederationError if the signatures / hash do not match, or
+            if the event was unacceptable for any other reason (eg, too large,
+            too many prev_events, couldn't find the prev_events)
+        """
         # check that it's actually being sent from a valid destination to
         # workaround bug #1753 in 0.18.5 and 0.18.6
         if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..0f0c687b37 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -40,6 +40,8 @@ from collections import namedtuple
 
 import logging
 
+from six import itervalues, iteritems
+
 logger = logging.getLogger(__name__)
 
 
@@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
 
             user_ids = set(
                 user_id
-                for uids in self.presence_changed.itervalues()
+                for uids in itervalues(self.presence_changed)
                 for user_id in uids
             )
 
@@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
         # stream position.
         keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
 
-        for ((destination, edu_key), pos) in keyed_edus.iteritems():
+        for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
         j = keys.bisect_right(to_token) + 1
         device_messages = {self.device_messages[k]: k for k in keys[i:j]}
 
-        for (destination, pos) in device_messages.iteritems():
+        for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
-    for destination, edu_map in buff.keyed_edus.iteritems():
+    for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=key,
             )
 
-    for destination, edu_list in buff.edus.iteritems():
+    for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in buff.failures.iteritems():
+    for destination, failure_list in iteritems(buff.failures):
         for failure in failure_list:
             transaction_queue.send_failure(destination, failure)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 963d938edd..ded2b1871a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -323,6 +323,8 @@ class TransactionQueue(object):
                     break
 
                 yield self._process_presence_inner(states_map.values())
+        except Exception:
+            logger.exception("Error sending presence states to servers")
         finally:
             self._processing_pending_presence = False
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index ff0656df3e..19d09f5422 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 
 import functools
@@ -152,11 +152,18 @@ class Authenticator(object):
         # alive
         retry_timings = yield self.store.get_destination_retry_timings(origin)
         if retry_timings and retry_timings["retry_last_ts"]:
-            logger.info("Marking origin %r as up", origin)
-            preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+            run_in_background(self._reset_retry_timings, origin)
 
         defer.returnValue(origin)
 
+    @defer.inlineCallbacks
+    def _reset_retry_timings(self, origin):
+        try:
+            logger.info("Marking origin %r as up", origin)
+            yield self.store.set_destination_retry_timings(origin, 0, 0)
+        except Exception:
+            logger.exception("Error resetting retry timings on %s", origin)
+
 
 class BaseFederationServlet(object):
     REQUIRE_AUTH = True
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 1fb709e6c3..6f11fa374b 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -42,7 +42,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 from synapse.types import get_domain_from_id
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 from signedjson.sign import sign_json
 
@@ -165,31 +165,35 @@ class GroupAttestionRenewer(object):
 
         @defer.inlineCallbacks
         def _renew_attestation(group_id, user_id):
-            if not self.is_mine_id(group_id):
-                destination = get_domain_from_id(group_id)
-            elif not self.is_mine_id(user_id):
-                destination = get_domain_from_id(user_id)
-            else:
-                logger.warn(
-                    "Incorrectly trying to do attestations for user: %r in %r",
-                    user_id, group_id,
+            try:
+                if not self.is_mine_id(group_id):
+                    destination = get_domain_from_id(group_id)
+                elif not self.is_mine_id(user_id):
+                    destination = get_domain_from_id(user_id)
+                else:
+                    logger.warn(
+                        "Incorrectly trying to do attestations for user: %r in %r",
+                        user_id, group_id,
+                    )
+                    yield self.store.remove_attestation_renewal(group_id, user_id)
+                    return
+
+                attestation = self.attestations.create_attestation(group_id, user_id)
+
+                yield self.transport_client.renew_group_attestation(
+                    destination, group_id, user_id,
+                    content={"attestation": attestation},
                 )
-                yield self.store.remove_attestation_renewal(group_id, user_id)
-                return
-
-            attestation = self.attestations.create_attestation(group_id, user_id)
 
-            yield self.transport_client.renew_group_attestation(
-                destination, group_id, user_id,
-                content={"attestation": attestation},
-            )
-
-            yield self.store.update_attestation_renewal(
-                group_id, user_id, attestation
-            )
+                yield self.store.update_attestation_renewal(
+                    group_id, user_id, attestation
+                )
+            except Exception:
+                logger.exception("Error renewing attestation of %r in %r",
+                                 user_id, group_id)
 
         for row in rows:
             group_id = row["group_id"]
             user_id = row["user_id"]
 
-            preserve_fn(_renew_attestation)(group_id, user_id)
+            run_in_background(_renew_attestation, group_id, user_id)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 0245197c02..0d11d890ff 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -198,7 +198,10 @@ class ApplicationServicesHandler(object):
         services = yield self._get_services_for_3pn(protocol)
 
         results = yield make_deferred_yieldable(defer.DeferredList([
-            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+            run_in_background(
+                self.appservice_api.query_3pe,
+                service, kind, protocol, fields,
+            )
             for service in services
         ], consumeErrors=True))
 
@@ -259,11 +262,15 @@ class ApplicationServicesHandler(object):
             event based on the service regex.
         """
         services = self.store.get_app_services()
-        interested_list = [
-            s for s in services if (
-                yield s.is_interested(event, self.store)
-            )
-        ]
+
+        # we can't use a list comprehension here. Since python 3, list
+        # comprehensions use a generator internally. This means you can't yield
+        # inside of a list comprehension anymore.
+        interested_list = []
+        for s in services:
+            if (yield s.is_interested(event, self.store)):
+                interested_list.append(s)
+
         defer.returnValue(interested_list)
 
     def _get_services_for_user(self, user_id):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 325c0c4a9f..25aec624af 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
     SynapseError, CodeMessageException, FederationDeniedError,
 )
 from synapse.types import get_domain_from_id, UserID
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -139,9 +139,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(do_remote_query)(destination)
+            run_in_background(do_remote_query, destination)
             for destination in remote_queries_not_in_cache
-        ]))
+        ], consumeErrors=True))
 
         defer.returnValue({
             "device_keys": results, "failures": failures,
@@ -242,9 +242,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(claim_client_keys)(destination)
+            run_in_background(claim_client_keys, destination)
             for destination in remote_queries
-        ]))
+        ], consumeErrors=True))
 
         logger.info(
             "Claimed one-time-keys: %s",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..f39233d846 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,16 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+
+import itertools
+import logging
+import sys
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
+import six
+from six.moves import http_client
+from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
@@ -43,10 +51,6 @@ from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.util.distributor import user_joined_room
 
-from twisted.internet import defer
-
-import itertools
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -115,6 +119,19 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # do some initial sanity-checking of the event. In particular, make
+        # sure it doesn't have hundreds of prev_events or auth_events, which
+        # could cause a huge state resolution or cascade of event fetches.
+        try:
+            self._sanity_check_event(pdu)
+        except SynapseError as err:
+            raise FederationError(
+                "ERROR",
+                err.code,
+                err.msg,
+                affected=pdu.event_id,
+            )
+
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if pdu.room_id in self.room_queues:
@@ -149,10 +166,6 @@ class FederationHandler(BaseHandler):
 
         auth_chain = []
 
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-
         fetch_state = False
 
         # Get missing pdus if necessary.
@@ -168,7 +181,7 @@ class FederationHandler(BaseHandler):
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
+            seen = yield self.store.have_seen_events(prevs)
 
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
@@ -196,8 +209,7 @@ class FederationHandler(BaseHandler):
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
-                        have_seen = yield self.store.have_events(prevs)
-                        seen = set(have_seen.iterkeys())
+                        seen = yield self.store.have_seen_events(prevs)
 
                         if not prevs - seen:
                             logger.info(
@@ -248,8 +260,7 @@ class FederationHandler(BaseHandler):
             min_depth (int): Minimum depth of events to return.
         """
         # We recalculate seen, since it may have changed.
-        have_seen = yield self.store.have_events(prevs)
-        seen = set(have_seen.keys())
+        seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
@@ -361,9 +372,7 @@ class FederationHandler(BaseHandler):
             if auth_chain:
                 event_ids |= {e.event_id for e in auth_chain}
 
-            seen_ids = set(
-                (yield self.store.have_events(event_ids)).keys()
-            )
+            seen_ids = yield self.store.have_seen_events(event_ids)
 
             if state and auth_chain is not None:
                 # If we have any state or auth_chain given to us by the replication
@@ -527,9 +536,16 @@ class FederationHandler(BaseHandler):
     def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
 
-        This will attempt to get more events from the remote. This may return
-        be successfull and still return no events if the other side has no new
-        events to offer.
+        This will attempt to get more events from the remote. If the other side
+        has no new events to offer, this will return an empty list.
+
+        As the events are received, we check their signatures, and also do some
+        sanity-checking on them. If any of the backfilled events are invalid,
+        this method throws a SynapseError.
+
+        TODO: make this more useful to distinguish failures of the remote
+        server from invalid events (there is probably no point in trying to
+        re-fetch invalid events from every other HS in the room.)
         """
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +557,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # ideally we'd sanity check the events here for excess prev_events etc,
+        # but it's hard to reject events at this point without completely
+        # breaking backfill in the same way that it is currently broken by
+        # events whose signature we cannot verify (#3121).
+        #
+        # So for now we accept the events anyway. #3124 tracks this.
+        #
+        # for ev in events:
+        #     self._sanity_check_event(ev)
+
         # Don't bother processing events we already have.
         seen_events = yield self.store.have_events_in_timeline(
             set(e.event_id for e in events)
@@ -613,7 +639,8 @@ class FederationHandler(BaseHandler):
 
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
-                        logcontext.preserve_fn(self.replication_layer.get_pdu)(
+                        logcontext.run_in_background(
+                            self.replication_layer.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -633,7 +660,7 @@ class FederationHandler(BaseHandler):
 
                 failed_to_fetch = missing_auth - set(auth_events)
 
-        seen_events = yield self.store.have_events(
+        seen_events = yield self.store.have_seen_events(
             set(auth_events.keys()) | set(state_events.keys())
         )
 
@@ -843,6 +870,38 @@ class FederationHandler(BaseHandler):
 
         defer.returnValue(False)
 
+    def _sanity_check_event(self, ev):
+        """
+        Do some early sanity checks of a received event
+
+        In particular, checks it doesn't have an excessive number of
+        prev_events or auth_events, which could cause a huge state resolution
+        or cascade of event fetches.
+
+        Args:
+            ev (synapse.events.EventBase): event to be checked
+
+        Returns: None
+
+        Raises:
+            SynapseError if the event does not pass muster
+        """
+        if len(ev.prev_events) > 20:
+            logger.warn("Rejecting event %s which has %i prev_events",
+                        ev.event_id, len(ev.prev_events))
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Too many prev_events",
+            )
+
+        if len(ev.auth_events) > 10:
+            logger.warn("Rejecting event %s which has %i auth_events",
+                        ev.event_id, len(ev.auth_events))
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Too many auth_events",
+            )
+
     @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -967,7 +1026,7 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
+            logcontext.run_in_background(self._handle_queued_pdus, room_queue)
 
         defer.returnValue(True)
 
@@ -1457,18 +1516,21 @@ class FederationHandler(BaseHandler):
                 backfilled=backfilled,
             )
         except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area
-            logcontext.preserve_fn(
-                self.store.remove_push_actions_from_staging
-            )(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            logcontext.run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
             # and don't need to wait for it.
-            logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
-                event_stream_id, max_stream_id
+            logcontext.run_in_background(
+                self.pusher_pool.on_new_notifications,
+                event_stream_id, max_stream_id,
             )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1482,7 +1544,8 @@ class FederationHandler(BaseHandler):
         """
         contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                logcontext.preserve_fn(self._prep_event)(
+                logcontext.run_in_background(
+                    self._prep_event,
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
@@ -1736,7 +1799,8 @@ class FederationHandler(BaseHandler):
             event_key = None
 
         if event_auth_events - current_state:
-            have_events = yield self.store.have_events(
+            # TODO: can we use store.have_seen_events here instead?
+            have_events = yield self.store.get_seen_events_with_rejections(
                 event_auth_events - current_state
             )
         else:
@@ -1759,12 +1823,12 @@ class FederationHandler(BaseHandler):
                     origin, event.room_id, event.event_id
                 )
 
-                seen_remotes = yield self.store.have_events(
+                seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
-                    if e.event_id in seen_remotes.keys():
+                    if e.event_id in seen_remotes:
                         continue
 
                     if e.event_id == event.event_id:
@@ -1791,7 +1855,7 @@ class FederationHandler(BaseHandler):
                     except AuthError:
                         pass
 
-                have_events = yield self.store.have_events(
+                have_events = yield self.store.get_seen_events_with_rejections(
                     [e_id for e_id, _ in event.auth_events]
                 )
                 seen_events = set(have_events.keys())
@@ -1810,7 +1874,8 @@ class FederationHandler(BaseHandler):
 
             different_events = yield logcontext.make_deferred_yieldable(
                 defer.gatherResults([
-                    logcontext.preserve_fn(self.store.get_event)(
+                    logcontext.run_in_background(
+                        self.store.get_event,
                         d,
                         allow_none=True,
                         allow_rejected=False,
@@ -1876,13 +1941,13 @@ class FederationHandler(BaseHandler):
                         local_auth_chain,
                     )
 
-                    seen_remotes = yield self.store.have_events(
+                    seen_remotes = yield self.store.have_seen_events(
                         [e.event_id for e in result["auth_chain"]]
                     )
 
                     # 3. Process any remote auth chain events we haven't seen.
                     for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes.keys():
+                        if ev.event_id in seen_remotes:
                             continue
 
                         if ev.event_id == event.event_id:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c5267b4b84..cd33a86599 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
                 (messages, token), current_state = yield make_deferred_yieldable(
                     defer.gatherResults(
                         [
-                            preserve_fn(self.store.get_recent_events_for_room)(
+                            run_in_background(
+                                self.store.get_recent_events_for_room,
                                 event.room_id,
                                 limit=limit,
                                 end_token=room_end_token,
@@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
-                preserve_fn(get_presence)(),
-                preserve_fn(get_receipts)(),
-                preserve_fn(self.store.get_recent_events_for_room)(
+                run_in_background(get_presence),
+                run_in_background(get_receipts),
+                run_in_background(
+                    self.store.get_recent_events_for_room,
                     room_id,
                     limit=limit,
                     end_token=now_token.room_key,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 54cd691f91..23502eda70 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,6 +13,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import simplejson
+import sys
+
+from canonicaljson import encode_canonical_json
+import six
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 
@@ -25,7 +31,7 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn, run_in_background
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.stringutils import random_string
@@ -34,12 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import simplejson
-
 logger = logging.getLogger(__name__)
 
 
@@ -433,7 +433,7 @@ class EventCreationHandler(object):
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -447,7 +447,13 @@ class EventCreationHandler(object):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
@@ -485,7 +491,7 @@ class EventCreationHandler(object):
         event, context = yield self.create_new_client_event(
             builder=builder,
             requester=requester,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         defer.returnValue((event, context))
@@ -588,39 +594,44 @@ class EventCreationHandler(object):
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
+
+        Args:
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
+
+        Returns:
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+        """
+
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
             )
+        else:
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
 
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+        else:
+            depth = 1
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -719,8 +730,14 @@ class EventCreationHandler(object):
         except:  # noqa: E722, as we reraise the exception this is fine.
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
-            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -840,22 +857,33 @@ class EventCreationHandler(object):
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
-        preserve_fn(self.pusher_pool.on_new_notifications)(
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
             event_stream_id, max_stream_id
         )
 
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+            try:
+                self.notifier.on_new_room_event(
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room event")
 
-        preserve_fn(_notify)()
+        run_in_background(_notify)
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
             # We don't want to block sending messages on any presence code. This
             # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(requester.user)
+            run_in_background(self._bump_active_time, requester.user)
+
+    @defer.inlineCallbacks
+    def _bump_active_time(self, user):
+        try:
+            presence = self.hs.get_presence_handler()
+            yield presence.bump_presence_active_time(user)
+        except Exception:
+            logger.exception("Error bumping presence active time")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..585f3e4da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -255,6 +255,14 @@ class PresenceHandler(object):
         logger.info("Finished _persist_unpersisted_changes")
 
     @defer.inlineCallbacks
+    def _update_states_and_catch_exception(self, new_states):
+        try:
+            res = yield self._update_states(new_states)
+            defer.returnValue(res)
+        except Exception:
+            logger.exception("Error updating presence")
+
+    @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
@@ -364,7 +372,7 @@ class PresenceHandler(object):
                     now=now,
                 )
 
-            preserve_fn(self._update_states)(changes)
+            run_in_background(self._update_states_and_catch_exception, changes)
         except Exception:
             logger.exception("Exception in _handle_timeouts loop")
 
@@ -422,20 +430,23 @@ class PresenceHandler(object):
 
         @defer.inlineCallbacks
         def _end():
-            if affect_presence:
+            try:
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
                 yield self._update_states([prev_state.copy_and_replace(
                     last_user_sync_ts=self.clock.time_msec(),
                 )])
+            except Exception:
+                logger.exception("Error updating presence after sync")
 
         @contextmanager
         def _user_syncing():
             try:
                 yield
             finally:
-                preserve_fn(_end)()
+                if affect_presence:
+                    run_in_background(_end)
 
         defer.returnValue(_user_syncing())
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 3f215c2b4e..2e0672161c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler):
         """Given a list of receipts, works out which remote servers should be
         poked and pokes them.
         """
-        # TODO: Some of this stuff should be coallesced.
-        for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
-            users = yield self.state.get_current_user_in_room(room_id)
-            remotedomains = set(get_domain_from_id(u) for u in users)
-            remotedomains = remotedomains.copy()
-            remotedomains.discard(self.server_name)
-
-            logger.debug("Sending receipt to: %r", remotedomains)
-
-            for domain in remotedomains:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.receipt",
-                    content={
-                        room_id: {
-                            receipt_type: {
-                                user_id: {
-                                    "event_ids": event_ids,
-                                    "data": data,
+        try:
+            # TODO: Some of this stuff should be coallesced.
+            for receipt in receipts:
+                room_id = receipt["room_id"]
+                receipt_type = receipt["receipt_type"]
+                user_id = receipt["user_id"]
+                event_ids = receipt["event_ids"]
+                data = receipt["data"]
+
+                users = yield self.state.get_current_user_in_room(room_id)
+                remotedomains = set(get_domain_from_id(u) for u in users)
+                remotedomains = remotedomains.copy()
+                remotedomains.discard(self.server_name)
+
+                logger.debug("Sending receipt to: %r", remotedomains)
+
+                for domain in remotedomains:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.receipt",
+                        content={
+                            room_id: {
+                                receipt_type: {
+                                    user_id: {
+                                        "event_ids": event_ids,
+                                        "data": data,
+                                    }
                                 }
-                            }
+                            },
                         },
-                    },
-                    key=(room_id, receipt_type, user_id),
-                )
+                        key=(room_id, receipt_type, user_id),
+                    )
+        except Exception:
+            logger.exception("Error pushing receipts to remote servers")
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 8028d793c2..5757bb7f8a 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -15,12 +15,13 @@
 
 from twisted.internet import defer
 
+from six.moves import range
+
 from ._base import BaseHandler
 
 from synapse.api.constants import (
     EventTypes, JoinRules,
 )
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
@@ -78,18 +79,11 @@ class RoomListHandler(BaseHandler):
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            logger.info("No cached result, calculating one.")
-            result = self.response_cache.set(
-                key,
-                preserve_fn(self._get_public_room_list)(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        else:
-            logger.info("Using cached deferred result.")
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
@@ -208,7 +202,7 @@ class RoomListHandler(BaseHandler):
             step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
 
         chunk = []
-        for i in xrange(0, len(rooms_to_scan), step):
+        for i in range(0, len(rooms_to_scan), step):
             batch = rooms_to_scan[i:i + step]
             logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
@@ -423,18 +417,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c45142d38d..714583f1d5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
     @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
-        prev_event_ids,
+        prev_events_and_hashes,
         txn_id=None,
         ratelimit=True,
         content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
             },
             token_id=requester.access_token_id,
             txn_id=txn_id,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
                     403, "Invites have been disabled on this server",
                 )
 
-        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+            room_id,
+        )
+        latest_event_ids = (
+            event_id for (event_id, _, _) in prev_events_and_hashes
+        )
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
             membership=effective_membership_state,
             txn_id=txn_id,
             ratelimit=ratelimit,
-            prev_event_ids=latest_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
             content=content,
         )
         defer.returnValue(res)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 06d17ab20c..b52e4c2aff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
 
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
         to tell if room needs to be part of the sync result.
         """
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             # nb the notification count does not, er, count: if there's nothing
             # else in the result, we don't need to send it.
         )
+    __bool__ = __nonzero__  # python3
 
 
 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
             or self.state
             or self.account_data
         )
+    __bool__ = __nonzero__  # python3
 
 
 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
     def __nonzero__(self):
         """Invited rooms should always be reported to the client"""
         return True
+    __bool__ = __nonzero__  # python3
 
 
 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
 
     def __nonzero__(self):
         return bool(self.join or self.invite or self.leave)
+    __bool__ = __nonzero__  # python3
 
 
 class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
 
     def __nonzero__(self):
         return bool(self.changed or self.left)
+    __bool__ = __nonzero__  # python3
 
 
 class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
             self.device_lists or
             self.groups
         )
+    __bool__ = __nonzero__  # python3
 
 
 class SyncHandler(object):
@@ -180,15 +187,11 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        result = self.response_cache.get(sync_config.request_key)
-        if not result:
-            result = self.response_cache.set(
-                sync_config.request_key,
-                preserve_fn(self._wait_for_sync_for_user)(
-                    sync_config, since_token, timeout, full_state
-                )
-            )
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            sync_config.request_key,
+            self._wait_for_sync_for_user,
+            sync_config, since_token, timeout, full_state,
+        )
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 77c0cf146f..5d9736e88f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID, get_domain_from_id
@@ -97,7 +97,8 @@ class TypingHandler(object):
             if self.hs.is_mine_id(member.user_id):
                 last_fed_poke = self._member_last_federation_poke.get(member, None)
                 if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
-                    preserve_fn(self._push_remote)(
+                    run_in_background(
+                        self._push_remote,
                         member=member,
                         typing=True
                     )
@@ -196,7 +197,7 @@ class TypingHandler(object):
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            preserve_fn(self._push_remote)(member, typing)
+            run_in_background(self._push_remote, member, typing)
 
         self._push_update_local(
             member=member,
@@ -205,28 +206,31 @@ class TypingHandler(object):
 
     @defer.inlineCallbacks
     def _push_remote(self, member, typing):
-        users = yield self.state.get_current_user_in_room(member.room_id)
-        self._member_last_federation_poke[member] = self.clock.time_msec()
+        try:
+            users = yield self.state.get_current_user_in_room(member.room_id)
+            self._member_last_federation_poke[member] = self.clock.time_msec()
 
-        now = self.clock.time_msec()
-        self.wheel_timer.insert(
-            now=now,
-            obj=member,
-            then=now + FEDERATION_PING_INTERVAL,
-        )
+            now = self.clock.time_msec()
+            self.wheel_timer.insert(
+                now=now,
+                obj=member,
+                then=now + FEDERATION_PING_INTERVAL,
+            )
 
-        for domain in set(get_domain_from_id(u) for u in users):
-            if domain != self.server_name:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.typing",
-                    content={
-                        "room_id": member.room_id,
-                        "user_id": member.user_id,
-                        "typing": typing,
-                    },
-                    key=member,
-                )
+            for domain in set(get_domain_from_id(u) for u in users):
+                if domain != self.server_name:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.typing",
+                        content={
+                            "room_id": member.room_id,
+                            "user_id": member.user_id,
+                            "typing": typing,
+                        },
+                        key=member,
+                    )
+        except Exception:
+            logger.exception("Error pushing typing notif to remotes")
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index bfebb0f644..0d47ccdb59 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,3 +13,24 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
+
+from synapse.api.errors import SynapseError
+
+
+class RequestTimedOutError(SynapseError):
+    """Exception representing timeout of an outbound request"""
+    def __init__(self):
+        super(RequestTimedOutError, self).__init__(504, "Timed out")
+
+
+def cancelled_to_request_timed_out_error(value):
+    """Turns CancelledErrors into RequestTimedOutErrors.
+
+    For use with async.add_timeout_to_deferred
+    """
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise RequestTimedOutError()
+    return value
diff --git a/synapse/http/client.py b/synapse/http/client.py
index f3e4973c2e..70a19d9b74 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,9 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
+from synapse.http import cancelled_to_request_timed_out_error
+from synapse.util.async import add_timeout_to_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
-from synapse.util import logcontext
 import synapse.metrics
 from synapse.http.endpoint import SpiderEndpoint
 
@@ -38,7 +40,7 @@ from twisted.web.http import PotentialDataLoss
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
-from StringIO import StringIO
+from six import StringIO
 
 import simplejson as json
 import logging
@@ -95,21 +97,17 @@ class SimpleHttpClient(object):
         # counters to it
         outgoing_requests_counter.inc(method)
 
-        def send_request():
+        logger.info("Sending request %s %s", method, uri)
+
+        try:
             request_deferred = self.agent.request(
                 method, uri, *args, **kwargs
             )
-
-            return self.clock.time_bound_deferred(
+            add_timeout_to_deferred(
                 request_deferred,
-                time_out=60,
+                60, cancelled_to_request_timed_out_error,
             )
-
-        logger.info("Sending request %s %s", method, uri)
-
-        try:
-            with logcontext.PreserveLoggingContext():
-                response = yield send_request()
+            response = yield make_deferred_yieldable(request_deferred)
 
             incoming_responses_counter.inc(method, response.code)
             logger.info(
@@ -509,7 +507,7 @@ class SpiderHttpClient(SimpleHttpClient):
                     reactor,
                     SpiderEndpointFactory(hs)
                 )
-            ), [('gzip', GzipDecoder)]
+            ), [(b'gzip', GzipDecoder)]
         )
         # We could look like Chrome:
         # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 00572c2897..db455e5909 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -286,7 +286,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
         if (len(answers) == 1
                 and answers[0].type == dns.SRV
                 and answers[0].payload
-                and answers[0].payload.target == dns.Name('.')):
+                and answers[0].payload.target == dns.Name(b'.')):
             raise ConnectError("Service %s unavailable" % service_name)
 
         for answer in answers:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 60a29081e8..4b2b85464d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,17 +13,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
+from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async import sleep
-from synapse.util import logcontext
 import synapse.metrics
+from synapse.util.async import sleep, add_timeout_to_deferred
+from synapse.util import logcontext
+from synapse.util.logcontext import make_deferred_yieldable
+import synapse.util.retryutils
 
 from canonicaljson import encode_canonical_json
 
@@ -38,8 +41,7 @@ import logging
 import random
 import sys
 import urllib
-import urlparse
-
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -184,21 +186,20 @@ class MatrixFederationHttpClient(object):
                         producer = body_callback(method, http_url_bytes, headers_dict)
 
                     try:
-                        def send_request():
-                            request_deferred = self.agent.request(
-                                method,
-                                url_bytes,
-                                Headers(headers_dict),
-                                producer
-                            )
-
-                            return self.clock.time_bound_deferred(
-                                request_deferred,
-                                time_out=timeout / 1000. if timeout else 60,
-                            )
-
-                        with logcontext.PreserveLoggingContext():
-                            response = yield send_request()
+                        request_deferred = self.agent.request(
+                            method,
+                            url_bytes,
+                            Headers(headers_dict),
+                            producer
+                        )
+                        add_timeout_to_deferred(
+                            request_deferred,
+                            timeout / 1000. if timeout else 60,
+                            cancelled_to_request_timed_out_error,
+                        )
+                        response = yield make_deferred_yieldable(
+                            request_deferred,
+                        )
 
                         log_result = "%d %s" % (response.code, response.phrase,)
                         break
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 8d632290de..55b9ad5251 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -546,6 +546,6 @@ def _request_user_agent_is_curl(request):
         b"User-Agent", default=[]
     )
     for user_agent in user_agents:
-        if "curl" in user_agent:
+        if b"curl" in user_agent:
             return True
     return False
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..8355c7d621 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,14 +14,17 @@
 # limitations under the License.
 
 from twisted.internet import defer
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
-from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.async import (
+    ObservableDeferred, add_timeout_to_deferred,
+    DeferredTimeoutError,
+)
+from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
@@ -144,6 +147,7 @@ class _NotifierUserStream(object):
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
     def __nonzero__(self):
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class Notifier(object):
@@ -250,9 +254,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        preserve_fn(self.appservice_handler.notify_interested_services)(
-            room_stream_id
-        )
+        run_in_background(self._notify_app_services, room_stream_id)
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
@@ -266,6 +268,13 @@ class Notifier(object):
             rooms=[event.room_id],
         )
 
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
+
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
@@ -330,11 +339,12 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
+                    add_timeout_to_deferred(
+                        listener.deferred,
+                        (end_time - now) / 1000.,
+                    )
                     with PreserveLoggingContext():
-                        yield self.clock.time_bound_deferred(
-                            listener.deferred,
-                            time_out=(end_time - now) / 1000.
-                        )
+                        yield listener.deferred
 
                     current_token = user_stream.current_token
 
@@ -345,7 +355,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimedOutError:
+                except DeferredTimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -550,13 +560,14 @@ class Notifier(object):
             if end_time <= now:
                 break
 
+            add_timeout_to_deferred(
+                listener.deferred.addTimeout,
+                (end_time - now) / 1000.,
+            )
             try:
                 with PreserveLoggingContext():
-                    yield self.clock.time_bound_deferred(
-                        listener.deferred,
-                        time_out=(end_time - now) / 1000.
-                    )
-            except DeferredTimedOutError:
+                    yield listener.deferred
+            except DeferredTimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 58df98a793..ba7286cb72 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -77,10 +77,13 @@ class EmailPusher(object):
     @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
-            self.throttle_params = yield self.store.get_throttle_params_by_room(
-                self.pusher_id
-            )
-            yield self._process()
+            try:
+                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                    self.pusher_id
+                )
+                yield self._process()
+            except Exception:
+                logger.exception("Error starting email pusher")
 
     def on_stop(self):
         if self.timed_call:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2cbac571b8..b077e1a446 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -18,8 +18,8 @@ import logging
 from twisted.internet import defer, reactor
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-import push_rule_evaluator
-import push_tools
+from . import push_rule_evaluator
+from . import push_tools
 import synapse
 from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
@@ -94,7 +94,10 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def on_started(self):
-        yield self._process()
+        try:
+            yield self._process()
+        except Exception:
+            logger.exception("Error starting http pusher")
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 71576330a9..5aa6667e91 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from httppusher import HttpPusher
+from .httppusher import HttpPusher
 
 import logging
 logger = logging.getLogger(__name__)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 134e89b371..750d11ca38 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -14,13 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
-from .pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.push.pusher import PusherFactory
 from synapse.util.async import run_on_reactor
-
-import logging
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -137,12 +137,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_notifications)(
-                                min_stream_id, max_stream_id
+                            run_in_background(
+                                p.on_new_notifications,
+                                min_stream_id, max_stream_id,
                             )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
@@ -164,10 +167,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
+                            run_in_background(
+                                p.on_new_receipts,
+                                min_stream_id, max_stream_id,
+                            )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
@@ -207,7 +215,7 @@ class PusherPool:
                 if appid_pushkey in byuser:
                     byuser[appid_pushkey].on_stop()
                 byuser[appid_pushkey] = p
-                preserve_fn(p.on_started)()
+                run_in_background(p.on_started)
 
         logger.info("Started pushers")
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 5cabf7dabe..711cbb6c50 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,5 +1,6 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,6 +19,18 @@ from distutils.version import LooseVersion
 
 logger = logging.getLogger(__name__)
 
+# this dict maps from python package name to a list of modules we expect it to
+# provide.
+#
+# the key is a "requirement specifier", as used as a parameter to `pip
+# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
+#
+# the value is a sequence of strings; each entry should be the name of the
+# python module, optionally followed by a version assertion which can be either
+# ">=<ver>" or "==<ver>".
+#
+# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
+# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
 REQUIREMENTS = {
     "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
     "frozendict>=0.4": ["frozendict"],
@@ -26,7 +39,11 @@ REQUIREMENTS = {
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
-    "Twisted>=16.0.0": ["twisted>=16.0.0"],
+
+    # we break under Twisted 18.4
+    # (https://github.com/matrix-org/synapse/issues/3135)
+    "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
+
     "pyopenssl>=0.14": ["OpenSSL>=0.14"],
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
@@ -39,6 +56,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "six": ["six"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c6a6551d24..a9baa2c1c3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
 
@@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet):
         self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
 
     def on_PUT(self, request, event_id):
-        result = self.response_cache.get(event_id)
-        if not result:
-            result = self.response_cache.set(
-                event_id,
-                self._handle_request(request)
-            )
-        else:
-            logger.warn("Returning cached response")
-        return make_deferred_yieldable(result)
-
-    @preserve_fn
+        return self.response_cache.wrap(
+            event_id,
+            self._handle_request,
+            request
+        )
+
     @defer.inlineCallbacks
     def _handle_request(self, request):
         with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0a9a290af4..d7d38464b2 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -53,12 +53,12 @@ from twisted.internet import defer
 from twisted.protocols.basic import LineOnlyReceiver
 from twisted.python.failure import Failure
 
-from commands import (
+from .commands import (
     COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
     ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
     NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
 )
-from streams import STREAMS_MAP
+from .streams import STREAMS_MAP
 
 from synapse.util.stringutils import random_string
 from synapse.metrics.metric import CounterMetric
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 786c3fe864..a41af4fd6c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -18,8 +18,8 @@
 from twisted.internet import defer, reactor
 from twisted.internet.protocol import Factory
 
-from streams import STREAMS_MAP, FederationStream
-from protocol import ServerReplicationStreamProtocol
+from .streams import STREAMS_MAP, FederationStream
+from .protocol import ServerReplicationStreamProtocol
 
 from synapse.util.metrics import Measure, measure_func
 
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 45844aa2d2..34df5be4e9 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -25,7 +25,7 @@ from .base import ClientV1RestServlet, client_path_patterns
 
 import simplejson as json
 import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 import logging
 from saml2 import BINDING_HTTP_POST
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 8a82097178..9b3022e0b0 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -30,6 +30,8 @@ from hashlib import sha1
 import hmac
 import logging
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -333,11 +335,11 @@ class RegisterRestServlet(ClientV1RestServlet):
     def _do_shared_secret(self, request, register_json, session):
         yield run_on_reactor()
 
-        if not isinstance(register_json.get("mac", None), basestring):
+        if not isinstance(register_json.get("mac", None), string_types):
             raise SynapseError(400, "Expected mac.")
-        if not isinstance(register_json.get("user", None), basestring):
+        if not isinstance(register_json.get("user", None), string_types):
             raise SynapseError(400, "Expected 'user' key.")
-        if not isinstance(register_json.get("password", None), basestring):
+        if not isinstance(register_json.get("password", None), string_types):
             raise SynapseError(400, "Expected 'password' key.")
 
         if not self.hs.config.registration_shared_secret:
@@ -358,14 +360,14 @@ class RegisterRestServlet(ClientV1RestServlet):
         got_mac = str(register_json["mac"])
 
         want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret,
+            key=self.hs.config.registration_shared_secret.encode(),
             digestmod=sha1,
         )
         want_mac.update(user)
-        want_mac.update("\x00")
+        want_mac.update(b"\x00")
         want_mac.update(password)
-        want_mac.update("\x00")
-        want_mac.update("admin" if admin else "notadmin")
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
         want_mac = want_mac.hexdigest()
 
         if compare_digest(want_mac, got_mac):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 2ad0e5943b..fcf9c9ab44 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -28,8 +28,9 @@ from synapse.http.servlet import (
     parse_json_object_from_request, parse_string, parse_integer
 )
 
+from six.moves.urllib import parse as urlparse
+
 import logging
-import urllib
 import simplejson as json
 
 logger = logging.getLogger(__name__)
@@ -433,7 +434,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         as_client_event = "raw" not in request.args
         filter_bytes = request.args.get("filter", None)
         if filter_bytes:
-            filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
+            filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8")
             event_filter = Filter(json.loads(filter_json))
         else:
             event_filter = None
@@ -718,8 +719,8 @@ class RoomTypingRestServlet(ClientV1RestServlet):
     def on_PUT(self, request, room_id, user_id):
         requester = yield self.auth.get_user_by_req(request)
 
-        room_id = urllib.unquote(room_id)
-        target_user = UserID.from_string(urllib.unquote(user_id))
+        room_id = urlparse.unquote(room_id)
+        target_user = UserID.from_string(urlparse.unquote(user_id))
 
         content = parse_json_object_from_request(request)
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index f317c919dc..5cab00aea9 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -35,6 +35,8 @@ from hashlib import sha1
 from synapse.util.async import run_on_reactor
 from synapse.util.ratelimitutils import FederationRateLimiter
 
+from six import string_types
+
 
 # We ought to be using hmac.compare_digest() but on older pythons it doesn't
 # exist. It's a _really minor_ security flaw to use plain string comparison
@@ -210,14 +212,14 @@ class RegisterRestServlet(RestServlet):
         # in sessions. Pull out the username/password provided to us.
         desired_password = None
         if 'password' in body:
-            if (not isinstance(body['password'], basestring) or
+            if (not isinstance(body['password'], string_types) or
                     len(body['password']) > 512):
                 raise SynapseError(400, "Invalid password")
             desired_password = body["password"]
 
         desired_username = None
         if 'username' in body:
-            if (not isinstance(body['username'], basestring) or
+            if (not isinstance(body['username'], string_types) or
                     len(body['username']) > 512):
                 raise SynapseError(400, "Invalid username")
             desired_username = body['username']
@@ -243,7 +245,7 @@ class RegisterRestServlet(RestServlet):
 
             access_token = get_access_token_from_request(request)
 
-            if isinstance(desired_username, basestring):
+            if isinstance(desired_username, string_types):
                 result = yield self._do_appservice_registration(
                     desired_username, access_token, body
                 )
@@ -464,7 +466,7 @@ class RegisterRestServlet(RestServlet):
         # includes the password and admin flag in the hashed text. Why are
         # these different?
         want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret,
+            key=self.hs.config.registration_shared_secret.encode(),
             msg=user,
             digestmod=sha1,
         ).hexdigest()
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index e7ac01da01..d9c4af9389 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -28,7 +28,7 @@ import os
 
 import logging
 import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index bb79599379..9800ce7581 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -47,7 +47,7 @@ import shutil
 
 import cgi
 import logging
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 0fc21540c6..9290d7946f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -35,7 +35,7 @@ from ._base import FileInfo
 from synapse.api.errors import (
     SynapseError, Codes,
 )
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
@@ -144,7 +144,8 @@ class PreviewUrlResource(Resource):
         observable = self._cache.get(url)
 
         if not observable:
-            download = preserve_fn(self._do_preview)(
+            download = run_in_background(
+                self._do_preview,
                 url, requester.user, ts,
             )
             observable = ObservableDeferred(
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index c188192f2b..0252afd9d3 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -18,7 +18,7 @@ from twisted.internet import defer, threads
 from .media_storage import FileResponder
 
 from synapse.config._base import Config
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 import logging
 import os
@@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider):
             return self.backend.store_file(path, file_info)
         else:
             # TODO: Handle errors.
-            preserve_fn(self.backend.store_file)(path, file_info)
+            def store():
+                try:
+                    return self.backend.store_file(path, file_info)
+                except Exception:
+                    logger.exception("Error storing file")
+            run_in_background(store)
             return defer.succeed(None)
 
     def fetch(self, path, file_info):
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index f6f498cdc5..a31e75cb46 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -81,15 +81,15 @@ class UploadResource(Resource):
         headers = request.requestHeaders
 
         if headers.hasHeader("Content-Type"):
-            media_type = headers.getRawHeaders("Content-Type")[0]
+            media_type = headers.getRawHeaders(b"Content-Type")[0]
         else:
             raise SynapseError(
                 msg="Upload request missing 'Content-Type'",
                 code=400,
             )
 
-        # if headers.hasHeader("Content-Disposition"):
-        #     disposition = headers.getRawHeaders("Content-Disposition")[0]
+        # if headers.hasHeader(b"Content-Disposition"):
+        #     disposition = headers.getRawHeaders(b"Content-Disposition")[0]
         # TODO(markjh): parse content-dispostion
 
         content_uri = yield self.media_repo.create_content(
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d300..8fbf7ffba7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import random
 
 from twisted.internet import defer
 
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
 import logging
-from Queue import PriorityQueue, Empty
+from six.moves.queue import PriorityQueue, Empty
+
+from six.moves import range
 
 
 logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             front_list = list(front)
             chunks = [
                 front_list[x:x + 100]
-                for x in xrange(0, len(front), 100)
+                for x in range(0, len(front), 100)
             ]
             for chunk in chunks:
                 txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             retcol="event_id",
         )
 
+    @defer.inlineCallbacks
+    def get_prev_events_for_room(self, room_id):
+        """
+        Gets a subset of the current forward extremities in the given room.
+
+        Limits the result to 10 extremities, so that we can avoid creating
+        events which refer to hundreds of prev_events.
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+        res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+        if len(res) > 10:
+            # Sort by reverse depth, so we point to the most recent.
+            res.sort(key=lambda a: -a[2])
+
+            # we use half of the limit for the actual most recent events, and
+            # the other half to randomly point to some of the older events, to
+            # make sure that we don't completely ignore the older events.
+            res = res[0:5] + random.sample(res[5:], 5)
+
+        defer.returnValue(res)
+
     def get_latest_event_ids_and_hashes_in_room(self, room_id):
+        """
+        Gets the current forward extremities in the given room
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+
         return self.runInteraction(
             "get_latest_event_ids_and_hashes_in_room",
             self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             room_id,
         )
 
-    @defer.inlineCallbacks
-    def get_max_depth_of_events(self, event_ids):
-        sql = (
-            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
-        ) % (",".join(["?"] * len(event_ids)),)
-
-        rows = yield self._execute(
-            "get_max_depth_of_events", None,
-            sql, *event_ids
-        )
-
-        if rows:
-            defer.returnValue(rows[0][0])
-        else:
-            defer.returnValue(1)
-
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e78f8d0114..c22762eb5c 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
+    @defer.inlineCallbacks
     def remove_push_actions_from_staging(self, event_id):
         """Called if we failed to persist the event to ensure that stale push
         actions don't build up in the DB
@@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             event_id (str)
         """
 
-        return self._simple_delete(
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event_id,
-            },
-            desc="remove_push_actions_from_staging",
-        )
+        try:
+            res = yield self._simple_delete(
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                desc="remove_push_actions_from_staging",
+            )
+            defer.returnValue(res)
+        except Exception:
+            # this method is called from an exception handler, so propagating
+            # another exception here really isn't helpful - there's nothing
+            # the caller can do about it. Just log the exception and move on.
+            logger.exception(
+                "Error removing push actions after event persistence failure",
+            )
 
     @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index da44b52fd6..5fe4a0e56c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@
 
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
+import itertools
 import logging
 
 import simplejson as json
@@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
 
         defer.returnValue(set(r["event_id"] for r in rows))
 
-    def have_events(self, event_ids):
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
+        Args:
+            event_ids (iterable[str]):
+
         Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
         """
         if not event_ids:
             return defer.succeed({})
@@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
 
             return res
 
-        return self.runInteraction(
-            "have_events", f,
-        )
+        return self.runInteraction("get_rejection_reasons", f)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a937b9bceb..ba834854e1 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
 )
 from synapse.util.metrics import Measure
 from synapse.api.errors import SynapseError
@@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         res = yield make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self._get_event_from_row)(
+                run_in_background(
+                    self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
                 )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 6b557ca0cf..a50717db2d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,6 +22,8 @@ from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from six.moves import range
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     @cached()
@@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore,
                 match = regex.search(user_id)
                 if match:
                     found.add(int(match.group(1)))
-            for i in xrange(len(found) + 1):
+            for i in range(len(found) + 1):
                 if i not in found:
                     return i
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
             # Convert the IDs to MXC URIs
             for media_id in local_mxcs:
-                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
             for hostname, media_id in remote_mxcs:
                 remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
 
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         while next_token:
             sql = """
                 SELECT stream_ordering, json FROM events
-                JOIN event_json USING (event_id)
+                JOIN event_json USING (room_id, event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
-                        if hostname == self.hostname:
+                        if hostname == self.hs.hostname:
                             local_media_mxcs.append(media_id)
                         else:
                             remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index c53e53c94f..85bd1a2006 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -14,6 +14,8 @@
 import logging
 from synapse.config.appservice import load_appservices
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
             sql = (
                 "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
-                " JOIN event_json USING (event_id)"
+                " JOIN event_json USING (room_id, event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..f0784ba137 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,12 +41,14 @@ from synapse.storage.events import EventsWorkerStore
 from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import abc
 import logging
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -196,13 +198,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
             res = yield make_deferred_yieldable(defer.gatherResults([
-                preserve_fn(self.get_room_events_stream_for_room)(
+                run_in_background(
+                    self.get_room_events_stream_for_room,
                     room_id, from_key, to_key, limit, order=order,
                 )
                 for room_id in rm_ids
-            ]))
+            ], consumeErrors=True))
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 13bff9f055..6671d3cfca 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -22,6 +22,8 @@ from twisted.internet import defer
 import simplejson as json
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         batch_size = 50
         results = []
-        for i in xrange(0, len(tag_ids), batch_size):
+        for i in range(0, len(tag_ids), batch_size):
             tags = yield self.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 756d8ffa32..814a7bf71b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.errors import SynapseError
 from synapse.util.logcontext import PreserveLoggingContext
 
 from twisted.internet import defer, reactor, task
@@ -24,11 +23,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class DeferredTimedOutError(SynapseError):
-    def __init__(self):
-        super(DeferredTimedOutError, self).__init__(504, "Timed out")
-
-
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
     failure.trap(defer.FirstError)
@@ -85,53 +79,3 @@ class Clock(object):
         except Exception:
             if not ignore_errs:
                 raise
-
-    def time_bound_deferred(self, given_deferred, time_out):
-        if given_deferred.called:
-            return given_deferred
-
-        ret_deferred = defer.Deferred()
-
-        def timed_out_fn():
-            e = DeferredTimedOutError()
-
-            try:
-                ret_deferred.errback(e)
-            except Exception:
-                pass
-
-            try:
-                given_deferred.cancel()
-            except Exception:
-                pass
-
-        timer = None
-
-        def cancel(res):
-            try:
-                self.cancel_call_later(timer)
-            except Exception:
-                pass
-            return res
-
-        ret_deferred.addBoth(cancel)
-
-        def success(res):
-            try:
-                ret_deferred.callback(res)
-            except Exception:
-                pass
-
-            return res
-
-        def err(res):
-            try:
-                ret_deferred.errback(res)
-            except Exception:
-                pass
-
-        given_deferred.addCallbacks(callback=success, errback=err)
-
-        timer = self.call_later(time_out, timed_out_fn)
-
-        return ret_deferred
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0729bb2863..9dd4e6b5bc 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -15,9 +15,11 @@
 
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
 
 from .logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, preserve_fn
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background
 )
 from synapse.util import logcontext, unwrapFirstError
 
@@ -25,6 +27,8 @@ from contextlib import contextmanager
 
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -156,13 +160,13 @@ def concurrently_execute(func, args, limit):
     def _concurrently_execute_inner():
         try:
             while True:
-                yield func(it.next())
+                yield func(next(it))
         except StopIteration:
             pass
 
     return logcontext.make_deferred_yieldable(defer.gatherResults([
-        preserve_fn(_concurrently_execute_inner)()
-        for _ in xrange(limit)
+        run_in_background(_concurrently_execute_inner)
+        for _ in range(limit)
     ], consumeErrors=True)).addErrback(unwrapFirstError)
 
 
@@ -392,3 +396,68 @@ class ReadWriteLock(object):
                     self.key_to_current_writer.pop(key)
 
         defer.returnValue(_ctx_manager())
+
+
+class DeferredTimeoutError(Exception):
+    """
+    This error is raised by default when a L{Deferred} times out.
+    """
+
+
+def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+    """
+    Add a timeout to a deferred by scheduling it to be cancelled after
+    timeout seconds.
+
+    This is essentially a backport of deferred.addTimeout, which was introduced
+    in twisted 16.5.
+
+    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
+    unless a cancelable function was passed to its initialization or unless
+    a different on_timeout_cancel callable is provided.
+
+    Args:
+        deferred (defer.Deferred): deferred to be timed out
+        timeout (Number): seconds to time out after
+
+        on_timeout_cancel (callable): A callable which is called immediately
+            after the deferred times out, and not if this deferred is
+            otherwise cancelled before the timeout.
+
+            It takes an arbitrary value, which is the value of the deferred at
+            that exact point in time (probably a CancelledError Failure), and
+            the timeout.
+
+            The default callable (if none is provided) will translate a
+            CancelledError Failure into a DeferredTimeoutError.
+    """
+    timed_out = [False]
+
+    def time_it_out():
+        timed_out[0] = True
+        deferred.cancel()
+
+    delayed_call = reactor.callLater(timeout, time_it_out)
+
+    def convert_cancelled(value):
+        if timed_out[0]:
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
+        return value
+
+    deferred.addBoth(convert_cancelled)
+
+    def cancel_timeout(result):
+        # stop the pending call to cancel the deferred if it's been fired
+        if delayed_call.active():
+            delayed_call.cancel()
+        return result
+
+    deferred.addBoth(cancel_timeout)
+
+
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise DeferredTimeoutError(timeout, "Deferred")
+    return value
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -31,6 +37,7 @@ class ResponseCache(object):
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
         self._metrics = cache_metrics.register_cache(
             "response_cache",
             size_callback=lambda: self.size(),
@@ -43,15 +50,21 @@ class ResponseCache(object):
     def get(self, key):
         """Look up the given key.
 
-        Returns a deferred which doesn't follow the synapse logcontext rules,
-        so you'll probably want to make_deferred_yieldable it.
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
 
         Args:
-            key (str):
+            key (hashable):
 
         Returns:
-            twisted.internet.defer.Deferred|None: None if there is no entry
-            for this key; otherwise a deferred result.
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
         """
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -68,19 +81,17 @@ class ResponseCache(object):
         you should wrap normal synapse deferreds with
         logcontext.run_in_background).
 
-        Returns a new Deferred which also doesn't follow the synapse logcontext
-        rules, so you will want to make_deferred_yieldable it
-
-        (TODO: before using this more widely, it might make sense to refactor
-        it and get() so that they do the necessary wrapping rather than having
-        to do it everywhere ResponseCache is used.)
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
 
         Args:
-            key (str):
-            deferred (twisted.internet.defer.Deferred):
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
 
         Returns:
-            twisted.internet.defer.Deferred
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
         """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
@@ -97,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3380970e4e 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -15,9 +15,9 @@
 
 from twisted.internet import threads, reactor
 
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
-import Queue
+from six.moves import queue
 
 
 class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self._bytes_queue = Queue.Queue()
+        self._bytes_queue = queue.Queue()
 
         # Deferred that is resolved when finished writing
         self._finished_deferred = None
@@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
 
         self._producer = producer
         self.streaming = streaming
-        self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
+        self._finished_deferred = run_in_background(
+            threads.deferToThread, self._writer
+        )
         if not streaming:
             self._producer.resumeProducing()
 
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index d747849553..e9f0f292ee 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -40,9 +40,12 @@ def create_resource_tree(desired_tree, root_resource):
     # extra resources to existing nodes. See self._resource_id for the key.
     resource_mappings = {}
     for full_path, res in desired_tree.items():
+        # twisted requires all resources to be bytes
+        full_path = full_path.encode("utf-8")
+
         logger.info("Attaching %s to path %s", res, full_path)
         last_resource = root_resource
-        for path_seg in full_path.split('/')[1:-1]:
+        for path_seg in full_path.split(b'/')[1:-1]:
             if path_seg not in last_resource.listNames():
                 # resource doesn't exist, so make a "dummy resource"
                 child_resource = NoResource()
@@ -57,7 +60,7 @@ def create_resource_tree(desired_tree, root_resource):
 
         # ===========================
         # now attach the actual desired resource
-        last_path_seg = full_path.split('/')[-1]
+        last_path_seg = full_path.split(b'/')[-1]
 
         # if there is already a resource here, thieve its children and
         # replace it
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec785b..bbadeec922 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object):
 
         def __nonzero__(self):
             return False
+        __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()
 
@@ -304,7 +305,12 @@ def run_in_background(f, *args, **kwargs):
     deferred returned by the funtion completes.
 
     Useful for wrapping functions that return a deferred which you don't yield
-    on.
+    on (for instance because you want to pass it to deferred.gatherResults()).
+
+    Note that if you completely discard the result, you should make sure that
+    `f` doesn't raise any deferred exceptions, otherwise a scary-looking
+    CRITICAL error about an unhandled error will be logged without much
+    indication about where it came from.
     """
     current = LoggingContext.current_context()
     res = f(*args, **kwargs)
@@ -340,7 +346,7 @@ def make_deferred_yieldable(deferred):
     returning a deferred. Then, when the deferred completes, restores the
     current logcontext before running callbacks/errbacks.
 
-    (This is more-or-less the opposite operation to preserve_fn.)
+    (This is more-or-less the opposite operation to run_in_background.)
     """
     if isinstance(deferred, defer.Deferred) and not deferred.called:
         prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
index cdbc4bffd7..59ab3c6968 100644
--- a/synapse/util/logformatter.py
+++ b/synapse/util/logformatter.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 
-import StringIO
+from six import StringIO
 import logging
 import traceback
 
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 1101881a2d..18424f6c36 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import LimitExceededError
 
 from synapse.util.async import sleep
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 
 import collections
 import contextlib
@@ -150,7 +150,7 @@ class _PerHostRatelimiter(object):
                 "Ratelimit [%s]: sleeping req",
                 id(request_id),
             )
-            ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
+            ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
 
             self.sleeping_requests.add(request_id)
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 47b0bb5eb3..4e93f69d3a 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
                 )
             except Exception:
                 logger.exception(
-                    "Failed to store set_destination_retry_timings",
+                    "Failed to store destination_retry_timings",
                 )
 
         # we deliberately do this in the background.
-        synapse.util.logcontext.preserve_fn(store_retry_timings)()
+        synapse.util.logcontext.run_in_background(store_retry_timings)
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 95a6168e16..b98b9dc6e4 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -15,6 +15,7 @@
 
 import random
 import string
+from six.moves import range
 
 _string_with_symbols = (
     string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
@@ -22,12 +23,12 @@ _string_with_symbols = (
 
 
 def random_string(length):
-    return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
+    return ''.join(random.choice(string.ascii_letters) for _ in range(length))
 
 
 def random_string_with_symbols(length):
     return ''.join(
-        random.choice(_string_with_symbols) for _ in xrange(length)
+        random.choice(_string_with_symbols) for _ in range(length)
     )
 
 
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index b70f9a6b0a..7a9e45aca9 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six.moves import range
+
 
 class _Entry(object):
     __slots__ = ["end_key", "queue"]
@@ -68,7 +70,7 @@ class WheelTimer(object):
         # Add empty entries between the end of the current list and when we want
         # to insert. This ensures there are no gaps.
         self.entries.extend(
-            _Entry(key) for key in xrange(last_key, then_key + 1)
+            _Entry(key) for key in range(last_key, then_key + 1)
         )
 
         self.entries[-1].queue.append(obj)
diff --git a/tests/config/test_load.py b/tests/config/test_load.py
index 161a87d7e3..772afd2cf9 100644
--- a/tests/config/test_load.py
+++ b/tests/config/test_load.py
@@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
 
     def setUp(self):
         self.dir = tempfile.mkdtemp()
-        print self.dir
+        print(self.dir)
         self.file = os.path.join(self.dir, "homeserver.yaml")
 
     def tearDown(self):
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index d4ec02ffc2..149e443022 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
                 res_deferreds_2 = kr.verify_json_objects_for_server(
                     [("server10", json1)],
                 )
-                yield async.sleep(01)
+                yield async.sleep(1)
                 self.http_client.post_json.assert_not_called()
                 res_deferreds_2[0].addBoth(self.check_context, None)
 
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 7e8966a1a8..d763400eaf 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -24,7 +24,7 @@ from synapse.api.constants import Membership
 from synapse.types import UserID
 
 import json
-import urllib
+from six.moves.urllib import parse as urlparse
 
 from ....utils import MockHttpResource, setup_test_homeserver
 from .utils import RestTestCase
@@ -766,7 +766,7 @@ class RoomMemberStateTestCase(RestTestCase):
     @defer.inlineCallbacks
     def test_rooms_members_self(self):
         path = "/rooms/%s/state/m.room.member/%s" % (
-            urllib.quote(self.room_id), self.user_id
+            urlparse.quote(self.room_id), self.user_id
         )
 
         # valid join message (NOOP since we made the room)
@@ -786,7 +786,7 @@ class RoomMemberStateTestCase(RestTestCase):
     def test_rooms_members_other(self):
         self.other_id = "@zzsid1:red"
         path = "/rooms/%s/state/m.room.member/%s" % (
-            urllib.quote(self.room_id), self.other_id
+            urlparse.quote(self.room_id), self.other_id
         )
 
         # valid invite message
@@ -802,7 +802,7 @@ class RoomMemberStateTestCase(RestTestCase):
     def test_rooms_members_other_custom_keys(self):
         self.other_id = "@zzsid1:red"
         path = "/rooms/%s/state/m.room.member/%s" % (
-            urllib.quote(self.room_id), self.other_id
+            urlparse.quote(self.room_id), self.other_id
         )
 
         # valid invite message with custom key
@@ -859,7 +859,7 @@ class RoomMessagesTestCase(RestTestCase):
     @defer.inlineCallbacks
     def test_invalid_puts(self):
         path = "/rooms/%s/send/m.room.message/mid1" % (
-            urllib.quote(self.room_id))
+            urlparse.quote(self.room_id))
         # missing keys or invalid json
         (code, response) = yield self.mock_resource.trigger(
             "PUT", path, '{}'
@@ -894,7 +894,7 @@ class RoomMessagesTestCase(RestTestCase):
     @defer.inlineCallbacks
     def test_rooms_messages_sent(self):
         path = "/rooms/%s/send/m.room.message/mid1" % (
-            urllib.quote(self.room_id))
+            urlparse.quote(self.room_id))
 
         content = '{"body":"test","msgtype":{"type":"a"}}'
         (code, response) = yield self.mock_resource.trigger("PUT", path, content)
@@ -911,7 +911,7 @@ class RoomMessagesTestCase(RestTestCase):
 
         # m.text message type
         path = "/rooms/%s/send/m.room.message/mid2" % (
-            urllib.quote(self.room_id))
+            urlparse.quote(self.room_id))
         content = '{"body":"test2","msgtype":"m.text"}'
         (code, response) = yield self.mock_resource.trigger("PUT", path, content)
         self.assertEquals(200, code, msg=str(response))
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index c2e39a7288..00825498b1 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
             ApplicationServiceStore(None, hs)
 
         e = cm.exception
-        self.assertIn(f1, e.message)
-        self.assertIn(f2, e.message)
-        self.assertIn("id", e.message)
+        self.assertIn(f1, str(e))
+        self.assertIn(f2, str(e))
+        self.assertIn("id", str(e))
 
     @defer.inlineCallbacks
     def test_duplicate_as_tokens(self):
@@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
             ApplicationServiceStore(None, hs)
 
         e = cm.exception
-        self.assertIn(f1, e.message)
-        self.assertIn(f2, e.message)
-        self.assertIn("as_token", e.message)
+        self.assertIn(f1, str(e))
+        self.assertIn(f2, str(e))
+        self.assertIn("as_token", str(e))
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
new file mode 100644
index 0000000000..30683e7888
--- /dev/null
+++ b/tests/storage/test_event_federation.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import tests.unittest
+import tests.utils
+
+
+class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
+    @defer.inlineCallbacks
+    def setUp(self):
+        hs = yield tests.utils.setup_test_homeserver()
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def test_get_prev_events_for_room(self):
+        room_id = '@ROOM:local'
+
+        # add a bunch of events and hashes to act as forward extremities
+        def insert_event(txn, i):
+            event_id = '$event_%i:local' % i
+
+            txn.execute((
+                "INSERT INTO events ("
+                "   room_id, event_id, type, depth, topological_ordering,"
+                "   content, processed, outlier) "
+                "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
+            ), (room_id, event_id, i, i, True, False))
+
+            txn.execute((
+                'INSERT INTO event_forward_extremities (room_id, event_id) '
+                'VALUES (?, ?)'
+            ), (room_id, event_id))
+
+            txn.execute((
+                'INSERT INTO event_reference_hashes '
+                '(event_id, algorithm, hash) '
+                "VALUES (?, 'sha256', ?)"
+            ), (event_id, 'ffff'))
+
+        for i in range(0, 11):
+            yield self.store.runInteraction("insert", insert_event, i)
+
+        # this should get the last five and five others
+        r = yield self.store.get_prev_events_for_room(room_id)
+        self.assertEqual(10, len(r))
+        for i in range(0, 5):
+            el = r[i]
+            depth = el[2]
+            self.assertEqual(10 - i, depth)
+
+        for i in range(5, 5):
+            el = r[i]
+            depth = el[2]
+            self.assertLessEqual(5, depth)
diff --git a/tests/test_distributor.py b/tests/test_distributor.py
index acebcf4a86..010aeaee7e 100644
--- a/tests/test_distributor.py
+++ b/tests/test_distributor.py
@@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
     def test_signal_catch(self):
         self.dist.declare("alarm")
 
-        observers = [Mock() for i in 1, 2]
+        observers = [Mock() for i in (1, 2)]
         for o in observers:
             self.dist.observe("alarm", o)
 
diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py
deleted file mode 100644
index 9672603579..0000000000
--- a/tests/util/test_clock.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2017 Vector Creations Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from synapse import util
-from twisted.internet import defer
-from tests import unittest
-
-
-class ClockTestCase(unittest.TestCase):
-    @defer.inlineCallbacks
-    def test_time_bound_deferred(self):
-        # just a deferred which never resolves
-        slow_deferred = defer.Deferred()
-
-        clock = util.Clock()
-        time_bound = clock.time_bound_deferred(slow_deferred, 0.001)
-
-        try:
-            yield time_bound
-            self.fail("Expected timedout error, but got nothing")
-        except util.DeferredTimedOutError:
-            pass
diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py
index 76e2234255..d6e1082779 100644
--- a/tests/util/test_file_consumer.py
+++ b/tests/util/test_file_consumer.py
@@ -20,7 +20,7 @@ from mock import NonCallableMock
 from synapse.util.file_consumer import BackgroundFileConsumer
 
 from tests import unittest
-from StringIO import StringIO
+from six import StringIO
 
 import threading
 
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index 793a88e462..4865eb4bc6 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -18,6 +18,7 @@ from tests import unittest
 from twisted.internet import defer
 
 from synapse.util.async import Linearizer
+from six.moves import range
 
 
 class LinearizerTestCase(unittest.TestCase):
@@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase):
                     logcontext.LoggingContext.current_context(), lc)
 
         func(0, sleep=True)
-        for i in xrange(1, 100):
+        for i in range(1, 100):
             func(i)
 
         return func(1000)
diff --git a/tests/utils.py b/tests/utils.py
index f15317d27b..c2beb5d9f7 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -15,8 +15,7 @@
 
 import hashlib
 from inspect import getcallargs
-import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
 
 from mock import Mock, patch
 from twisted.internet import defer, reactor
@@ -59,6 +58,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.email_enable_notifs = False
         config.block_non_admin_invites = False
         config.federation_domain_whitelist = None
+        config.federation_rc_reject_limit = 10
+        config.federation_rc_sleep_limit = 10
+        config.federation_rc_concurrent = 10
+        config.filter_timeline_limit = 5000
         config.user_directory_search_all_users = False
 
         # disable user directory updates, because they get done in the
@@ -234,7 +237,7 @@ class MockHttpResource(HttpServer):
             if matcher:
                 try:
                     args = [
-                        urllib.unquote(u).decode("UTF-8")
+                        urlparse.unquote(u).decode("UTF-8")
                         for u in matcher.groups()
                     ]
 
diff --git a/tox.ini b/tox.ini
index f408defc8f..ca8373662b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
 [tox]
-envlist = packaging, py27, pep8
+envlist = packaging, py27, py36, pep8
 
 [testenv]
 deps =
@@ -46,6 +46,14 @@ commands =
 # )
 usedevelop=true
 
+[testenv:py36]
+usedevelop=true
+commands =
+    /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
+    coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
+        "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/metrics} {env:TOXSUFFIX:}
+    {env:DUMP_COVERAGE_COMMAND:coverage report -m}
+
 [testenv:packaging]
 deps =
     check-manifest