summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.dockerignore1
-rw-r--r--.gitignore1
-rw-r--r--.travis.yml4
-rw-r--r--CHANGES.md14
-rw-r--r--changelog.d/3740.misc1
-rw-r--r--changelog.d/3771.misc1
-rw-r--r--changelog.d/3789.misc1
-rw-r--r--changelog.d/3790.feature1
-rw-r--r--changelog.d/3795.misc1
-rw-r--r--changelog.d/3803.misc1
-rw-r--r--changelog.d/3804.bugfix1
-rw-r--r--setup.cfg5
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/filtering.py1
-rw-r--r--synapse/appservice/api.py13
-rw-r--r--synapse/federation/federation_base.py126
-rw-r--r--synapse/federation/federation_server.py30
-rw-r--r--synapse/federation/persistence.py8
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/e2e_keys.py5
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/search.py14
-rw-r--r--synapse/handlers/sync.py22
-rw-r--r--synapse/http/client.py82
-rw-r--r--synapse/http/matrixfederationclient.py204
-rw-r--r--synapse/http/site.py4
-rw-r--r--synapse/python_dependencies.py5
-rw-r--r--synapse/rest/client/v2_alpha/account.py16
-rw-r--r--synapse/rest/client/v2_alpha/register.py12
-rw-r--r--synapse/rest/client/v2_alpha/sync.py51
-rw-r--r--tests/handlers/test_device.py143
-rw-r--r--tests/handlers/test_typing.py19
-rw-r--r--tests/replication/slave/storage/_base.py95
-rw-r--r--tests/replication/slave/storage/test_account_data.py20
-rw-r--r--tests/replication/slave/storage/test_events.py110
-rw-r--r--tests/replication/slave/storage/test_receipts.py15
-rw-r--r--tests/server.py1
-rw-r--r--tests/storage/test_appservice.py131
-rw-r--r--tests/storage/test_directory.py3
-rw-r--r--tests/storage/test_event_federation.py6
-rw-r--r--tests/storage/test_monthly_active_users.py138
-rw-r--r--tests/storage/test_presence.py7
-rw-r--r--tests/storage/test_profile.py2
-rw-r--r--tests/storage/test_user_directory.py2
-rw-r--r--tests/test_visibility.py2
-rw-r--r--tests/unittest.py7
-rw-r--r--tests/utils.py22
49 files changed, 745 insertions, 623 deletions
diff --git a/.dockerignore b/.dockerignore
index 6cdb8532d3..0180602e56 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -3,6 +3,5 @@ Dockerfile
 .gitignore
 demo/etc
 tox.ini
-synctl
 .git/*
 .tox/*
diff --git a/.gitignore b/.gitignore
index 9f42a7568f..1718185384 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,6 +44,7 @@ media_store/
 build/
 venv/
 venv*/
+*venv/
 
 localhost-800*/
 static/client/register/register_config.js
diff --git a/.travis.yml b/.travis.yml
index 318701c9f8..11c76db2e5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,10 +35,6 @@ matrix:
   - python: 3.6
     env: TOX_ENV=check-newsfragment
 
-  allow_failures:
-  - python: 2.7
-    env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
-
 install:
   - pip install tox
 
diff --git a/CHANGES.md b/CHANGES.md
index a35f5aebc7..59feb6b1a7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,17 @@
+Synapse 0.33.3.1 (2018-09-06)
+=============================
+
+SECURITY FIXES
+--------------
+
+- Fix an issue where event signatures were not always correctly validated ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
+- Fix an issue where server_acls could be circumvented for incoming events ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
+
+Internal Changes
+----------------
+
+- Unignore synctl in .dockerignore to fix docker builds ([\#3802](https://github.com/matrix-org/synapse/issues/3802))
+
 Synapse 0.33.3 (2018-08-22)
 ===========================
 
diff --git a/changelog.d/3740.misc b/changelog.d/3740.misc
new file mode 100644
index 0000000000..4dcb7fb5de
--- /dev/null
+++ b/changelog.d/3740.misc
@@ -0,0 +1 @@
+The test suite now passes on PostgreSQL.
diff --git a/changelog.d/3771.misc b/changelog.d/3771.misc
new file mode 100644
index 0000000000..47aa34bc04
--- /dev/null
+++ b/changelog.d/3771.misc
@@ -0,0 +1 @@
+http/ is now ported to Python 3.
diff --git a/changelog.d/3789.misc b/changelog.d/3789.misc
new file mode 100644
index 0000000000..d2d5d91091
--- /dev/null
+++ b/changelog.d/3789.misc
@@ -0,0 +1 @@
+Improve human readable error messages for threepid registration/account update
diff --git a/changelog.d/3790.feature b/changelog.d/3790.feature
new file mode 100644
index 0000000000..2c4ac62fb5
--- /dev/null
+++ b/changelog.d/3790.feature
@@ -0,0 +1 @@
+Implement `event_format` filter param in `/sync`
diff --git a/changelog.d/3795.misc b/changelog.d/3795.misc
new file mode 100644
index 0000000000..9f64ee5e2b
--- /dev/null
+++ b/changelog.d/3795.misc
@@ -0,0 +1 @@
+Make /sync slightly faster by avoiding needless copies
diff --git a/changelog.d/3803.misc b/changelog.d/3803.misc
new file mode 100644
index 0000000000..2b60653c29
--- /dev/null
+++ b/changelog.d/3803.misc
@@ -0,0 +1 @@
+handlers/ is now ported to Python 3.
diff --git a/changelog.d/3804.bugfix b/changelog.d/3804.bugfix
new file mode 100644
index 0000000000..a0cef20e3f
--- /dev/null
+++ b/changelog.d/3804.bugfix
@@ -0,0 +1 @@
+Bump dependency on pyopenssl 16.x, to avoid incompatibility with recent Twisted.
diff --git a/setup.cfg b/setup.cfg
index c2620be6c5..52feaa9cc7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,13 +17,14 @@ ignore =
 [pep8]
 max-line-length = 90
 #  W503 requires that binary operators be at the end, not start, of lines. Erik
-#  doesn't like it.  E203 is contrary to PEP8.
-ignore = W503,E203
+#  doesn't like it.  E203 is contrary to PEP8.  E731 is silly.
+ignore = W503,E203,E731
 
 [flake8]
 # note that flake8 inherits the "ignore" settings from "pep8" (because it uses
 # pep8 to do those checks), but not the "max-line-length" setting
 max-line-length = 90
+ignore=W503,E203,E731
 
 [isort]
 line_length = 89
diff --git a/synapse/__init__.py b/synapse/__init__.py
index e62901b761..e395a0a9ee 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.33.3"
+__version__ = "0.33.3.1"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 186831e118..a31a9a17e0 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -251,6 +251,7 @@ class FilterCollection(object):
             "include_leave", False
         )
         self.event_fields = filter_json.get("event_fields", [])
+        self.event_format = filter_json.get("event_format", "client")
 
     def __repr__(self):
         return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 6980e5890e..9ccc5a80fc 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,7 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import urllib
+
+from six.moves import urllib
 
 from prometheus_client import Counter
 
@@ -98,7 +99,7 @@ class ApplicationServiceApi(SimpleHttpClient):
     def query_user(self, service, user_id):
         if service.url is None:
             defer.returnValue(False)
-        uri = service.url + ("/users/%s" % urllib.quote(user_id))
+        uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
         response = None
         try:
             response = yield self.get_json(uri, {
@@ -119,7 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
     def query_alias(self, service, alias):
         if service.url is None:
             defer.returnValue(False)
-        uri = service.url + ("/rooms/%s" % urllib.quote(alias))
+        uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
         response = None
         try:
             response = yield self.get_json(uri, {
@@ -153,7 +154,7 @@ class ApplicationServiceApi(SimpleHttpClient):
             service.url,
             APP_SERVICE_PREFIX,
             kind,
-            urllib.quote(protocol)
+            urllib.parse.quote(protocol)
         )
         try:
             response = yield self.get_json(uri, fields)
@@ -188,7 +189,7 @@ class ApplicationServiceApi(SimpleHttpClient):
             uri = "%s%s/thirdparty/protocol/%s" % (
                 service.url,
                 APP_SERVICE_PREFIX,
-                urllib.quote(protocol)
+                urllib.parse.quote(protocol)
             )
             try:
                 info = yield self.get_json(uri, {})
@@ -228,7 +229,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         txn_id = str(txn_id)
 
         uri = service.url + ("/transactions/%s" %
-                             urllib.quote(txn_id))
+                             urllib.parse.quote(txn_id))
         try:
             yield self.put_json(
                 uri=uri,
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index c11798093d..5be8e66fb8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -13,17 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from collections import namedtuple
 
 import six
 
 from twisted.internet import defer
+from twisted.internet.defer import DeferredList
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
 from synapse.crypto.event_signing import check_event_content_hash
 from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 from synapse.http.servlet import assert_params_in_dict
+from synapse.types import get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
 
 logger = logging.getLogger(__name__)
@@ -133,34 +136,25 @@ class FederationBase(object):
               * throws a SynapseError if the signature check failed.
             The deferreds run their callbacks in the sentinel logcontext.
         """
-
-        redacted_pdus = [
-            prune_event(pdu)
-            for pdu in pdus
-        ]
-
-        deferreds = self.keyring.verify_json_objects_for_server([
-            (p.origin, p.get_pdu_json())
-            for p in redacted_pdus
-        ])
+        deferreds = _check_sigs_on_pdus(self.keyring, pdus)
 
         ctx = logcontext.LoggingContext.current_context()
 
-        def callback(_, pdu, redacted):
+        def callback(_, pdu):
             with logcontext.PreserveLoggingContext(ctx):
                 if not check_event_content_hash(pdu):
                     logger.warn(
                         "Event content has been tampered, redacting %s: %s",
                         pdu.event_id, pdu.get_pdu_json()
                     )
-                    return redacted
+                    return prune_event(pdu)
 
                 if self.spam_checker.check_event_for_spam(pdu):
                     logger.warn(
                         "Event contains spam, redacting %s: %s",
                         pdu.event_id, pdu.get_pdu_json()
                     )
-                    return redacted
+                    return prune_event(pdu)
 
                 return pdu
 
@@ -173,16 +167,116 @@ class FederationBase(object):
                 )
             return failure
 
-        for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
+        for deferred, pdu in zip(deferreds, pdus):
             deferred.addCallbacks(
                 callback, errback,
-                callbackArgs=[pdu, redacted],
+                callbackArgs=[pdu],
                 errbackArgs=[pdu],
             )
 
         return deferreds
 
 
+class PduToCheckSig(namedtuple("PduToCheckSig", [
+    "pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds",
+])):
+    pass
+
+
+def _check_sigs_on_pdus(keyring, pdus):
+    """Check that the given events are correctly signed
+
+    Args:
+        keyring (synapse.crypto.Keyring): keyring object to do the checks
+        pdus (Collection[EventBase]): the events to be checked
+
+    Returns:
+        List[Deferred]: a Deferred for each event in pdus, which will either succeed if
+           the signatures are valid, or fail (with a SynapseError) if not.
+    """
+
+    # (currently this is written assuming the v1 room structure; we'll probably want a
+    # separate function for checking v2 rooms)
+
+    # we want to check that the event is signed by:
+    #
+    # (a) the server which created the event_id
+    #
+    # (b) the sender's server.
+    #
+    #     - except in the case of invites created from a 3pid invite, which are exempt
+    #     from this check, because the sender has to match that of the original 3pid
+    #     invite, but the event may come from a different HS, for reasons that I don't
+    #     entirely grok (why do the senders have to match? and if they do, why doesn't the
+    #     joining server ask the inviting server to do the switcheroo with
+    #     exchange_third_party_invite?).
+    #
+    #     That's pretty awful, since redacting such an invite will render it invalid
+    #     (because it will then look like a regular invite without a valid signature),
+    #     and signatures are *supposed* to be valid whether or not an event has been
+    #     redacted. But this isn't the worst of the ways that 3pid invites are broken.
+    #
+    # let's start by getting the domain for each pdu, and flattening the event back
+    # to JSON.
+    pdus_to_check = [
+        PduToCheckSig(
+            pdu=p,
+            redacted_pdu_json=prune_event(p).get_pdu_json(),
+            event_id_domain=get_domain_from_id(p.event_id),
+            sender_domain=get_domain_from_id(p.sender),
+            deferreds=[],
+        )
+        for p in pdus
+    ]
+
+    # first make sure that the event is signed by the event_id's domain
+    deferreds = keyring.verify_json_objects_for_server([
+        (p.event_id_domain, p.redacted_pdu_json)
+        for p in pdus_to_check
+    ])
+
+    for p, d in zip(pdus_to_check, deferreds):
+        p.deferreds.append(d)
+
+    # now let's look for events where the sender's domain is different to the
+    # event id's domain (normally only the case for joins/leaves), and add additional
+    # checks.
+    pdus_to_check_sender = [
+        p for p in pdus_to_check
+        if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu)
+    ]
+
+    more_deferreds = keyring.verify_json_objects_for_server([
+        (p.sender_domain, p.redacted_pdu_json)
+        for p in pdus_to_check_sender
+    ])
+
+    for p, d in zip(pdus_to_check_sender, more_deferreds):
+        p.deferreds.append(d)
+
+    # replace lists of deferreds with single Deferreds
+    return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
+
+
+def _flatten_deferred_list(deferreds):
+    """Given a list of one or more deferreds, either return the single deferred, or
+    combine into a DeferredList.
+    """
+    if len(deferreds) > 1:
+        return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True)
+    else:
+        assert len(deferreds) == 1
+        return deferreds[0]
+
+
+def _is_invite_via_3pid(event):
+    return (
+        event.type == EventTypes.Member
+        and event.membership == Membership.INVITE
+        and "third_party_invite" in event.content
+    )
+
+
 def event_from_pdu_json(pdu_json, outlier=False):
     """Construct a FrozenEvent from an event json received over federation
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3e0cd294a1..dbee404ea7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -99,7 +99,7 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     @log_function
-    def on_incoming_transaction(self, transaction_data):
+    def on_incoming_transaction(self, origin, transaction_data):
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
         request_time = self._clock.time_msec()
@@ -108,34 +108,33 @@ class FederationServer(FederationBase):
 
         if not transaction.transaction_id:
             raise Exception("Transaction missing transaction_id")
-        if not transaction.origin:
-            raise Exception("Transaction missing origin")
 
         logger.debug("[%s] Got transaction", transaction.transaction_id)
 
         # use a linearizer to ensure that we don't process the same transaction
         # multiple times in parallel.
         with (yield self._transaction_linearizer.queue(
-                (transaction.origin, transaction.transaction_id),
+                (origin, transaction.transaction_id),
         )):
             result = yield self._handle_incoming_transaction(
-                transaction, request_time,
+                origin, transaction, request_time,
             )
 
         defer.returnValue(result)
 
     @defer.inlineCallbacks
-    def _handle_incoming_transaction(self, transaction, request_time):
+    def _handle_incoming_transaction(self, origin, transaction, request_time):
         """ Process an incoming transaction and return the HTTP response
 
         Args:
+            origin (unicode): the server making the request
             transaction (Transaction): incoming transaction
             request_time (int): timestamp that the HTTP request arrived at
 
         Returns:
             Deferred[(int, object)]: http response code and body
         """
-        response = yield self.transaction_actions.have_responded(transaction)
+        response = yield self.transaction_actions.have_responded(origin, transaction)
 
         if response:
             logger.debug(
@@ -149,7 +148,7 @@ class FederationServer(FederationBase):
 
         received_pdus_counter.inc(len(transaction.pdus))
 
-        origin_host, _ = parse_server_name(transaction.origin)
+        origin_host, _ = parse_server_name(origin)
 
         pdus_by_room = {}
 
@@ -190,7 +189,7 @@ class FederationServer(FederationBase):
                 event_id = pdu.event_id
                 try:
                     yield self._handle_received_pdu(
-                        transaction.origin, pdu
+                        origin, pdu
                     )
                     pdu_results[event_id] = {}
                 except FederationError as e:
@@ -212,7 +211,7 @@ class FederationServer(FederationBase):
         if hasattr(transaction, "edus"):
             for edu in (Edu(**x) for x in transaction.edus):
                 yield self.received_edu(
-                    transaction.origin,
+                    origin,
                     edu.edu_type,
                     edu.content
                 )
@@ -224,6 +223,7 @@ class FederationServer(FederationBase):
         logger.debug("Returning: %s", str(response))
 
         yield self.transaction_actions.set_response(
+            origin,
             transaction,
             200, response
         )
@@ -838,9 +838,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
             )
 
         return self._send_edu(
-                edu_type=edu_type,
-                origin=origin,
-                content=content,
+            edu_type=edu_type,
+            origin=origin,
+            content=content,
         )
 
     def on_query(self, query_type, args):
@@ -851,6 +851,6 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
             return handler(args)
 
         return self._get_query_client(
-                query_type=query_type,
-                args=args,
+            query_type=query_type,
+            args=args,
         )
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 9146215c21..74ffd13b4f 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -36,7 +36,7 @@ class TransactionActions(object):
         self.store = datastore
 
     @log_function
-    def have_responded(self, transaction):
+    def have_responded(self, origin, transaction):
         """ Have we already responded to a transaction with the same id and
         origin?
 
@@ -50,11 +50,11 @@ class TransactionActions(object):
                                "transaction_id")
 
         return self.store.get_received_txn_response(
-            transaction.transaction_id, transaction.origin
+            transaction.transaction_id, origin
         )
 
     @log_function
-    def set_response(self, transaction, code, response):
+    def set_response(self, origin, transaction, code, response):
         """ Persist how we responded to a transaction.
 
         Returns:
@@ -66,7 +66,7 @@ class TransactionActions(object):
 
         return self.store.set_received_txn_response(
             transaction.transaction_id,
-            transaction.origin,
+            origin,
             code,
             response,
         )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7a993fd1cf..3972922ff9 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -353,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet):
 
         try:
             code, response = yield self.handler.on_incoming_transaction(
-                transaction_data
+                origin, transaction_data,
             )
         except Exception:
             logger.exception("on_incoming_transaction failed")
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4a81bd2ba9..2a5eab124f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -895,22 +895,24 @@ class AuthHandler(BaseHandler):
 
         Args:
             password (unicode): Password to hash.
-            stored_hash (unicode): Expected hash value.
+            stored_hash (bytes): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
-
         def _do_validate_hash():
             # Normalise the Unicode in the password
             pw = unicodedata.normalize("NFKC", password)
 
             return bcrypt.checkpw(
                 pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
-                stored_hash.encode('utf8')
+                stored_hash
             )
 
         if stored_hash:
+            if not isinstance(stored_hash, bytes):
+                stored_hash = stored_hash.encode('ascii')
+
             return make_deferred_yieldable(
                 threads.deferToThreadPool(
                     self.hs.get_reactor(),
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..578e9250fb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
                         (algorithm, key_id, ex_json, key)
                     )
             else:
-                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+                new_keys.append((
+                    algorithm, key_id, encode_canonical_json(key).decode('ascii')))
 
         yield self.store.add_e2e_one_time_keys(
             user_id, device_id, time_now, new_keys
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
     # give a string for e.message, which json then fails to serialize.
     return {
-        "status": 503, "message": str(e.message),
+        "status": 503, "message": str(e),
     }
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3fa7a98445..0c68e8a472 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -594,7 +594,7 @@ class FederationHandler(BaseHandler):
 
         required_auth = set(
             a_id
-            for event in events + state_events.values() + auth_events.values()
+            for event in events + list(state_events.values()) + list(auth_events.values())
             for a_id, _ in event.auth_events
         )
         auth_events.update({
@@ -802,7 +802,7 @@ class FederationHandler(BaseHandler):
                     )
                     continue
                 except NotRetryingDestination as e:
-                    logger.info(e.message)
+                    logger.info(str(e))
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -1358,7 +1358,7 @@ class FederationHandler(BaseHandler):
         )
 
         if state_groups:
-            _, state = state_groups.items().pop()
+            _, state = list(state_groups.items()).pop()
             results = state
 
             if event.is_state():
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 37e41afd61..38e1737ec9 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
         # Filter out rooms that we don't want to return
         rooms_to_scan = [
             r for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
         ]
 
         total_room_count = len(rooms_to_scan)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..0c1d52fd11 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,7 +54,7 @@ class SearchHandler(BaseHandler):
         batch_token = None
         if batch:
             try:
-                b = decode_base64(batch)
+                b = decode_base64(batch).decode('ascii')
                 batch_group, batch_group_key, batch_token = b.split("\n")
 
                 assert batch_group is not None
@@ -258,18 +258,18 @@ class SearchHandler(BaseHandler):
                 # it returns more from the same group (if applicable) rather
                 # than reverting to searching all results again.
                 if batch_group and batch_group_key:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         batch_group, batch_group_key, pagination_token
-                    ))
+                    )).encode('ascii'))
                 else:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         "all", "", pagination_token
-                    ))
+                    )).encode('ascii'))
 
                 for room_id, group in room_groups.items():
-                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                    group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
                         "room_id", room_id, pagination_token
-                    ))
+                    )).encode('ascii'))
 
             allowed_events.extend(room_events)
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ef20c2296c..9f133ded3f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -545,7 +545,7 @@ class SyncHandler(object):
 
         member_ids = {
             state_key: event_id
-            for (t, state_key), event_id in state_ids.iteritems()
+            for (t, state_key), event_id in iteritems(state_ids)
             if t == EventTypes.Member
         }
         name_id = state_ids.get((EventTypes.Name, ''))
@@ -774,7 +774,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in state_ids.iteritems()
+                        for t, event_id in iteritems(state_ids)
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1729,17 +1729,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            timeline_contains.items(),
-            previous.items(),
-            timeline_start.items(),
-            current.items(),
+            iteritems(timeline_contains),
+            iteritems(previous),
+            iteritems(timeline_start),
+            iteritems(current),
         )
     }
 
-    c_ids = set(e for e in current.values())
-    ts_ids = set(e for e in timeline_start.values())
-    p_ids = set(e for e in previous.values())
-    tc_ids = set(e for e in timeline_contains.values())
+    c_ids = set(e for e in itervalues(current))
+    ts_ids = set(e for e in itervalues(timeline_start))
+    p_ids = set(e for e in itervalues(previous))
+    tc_ids = set(e for e in itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1753,7 +1753,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in timeline_start.iteritems()
+            e for t, e in iteritems(timeline_start)
             if t[0] == EventTypes.Member
         )
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ab4fbf59b2..4ba54fed05 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,24 +13,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import logging
-import urllib
 
-from six import StringIO
+from six import text_type
+from six.moves import urllib
 
+import treq
 from canonicaljson import encode_canonical_json, json
 from prometheus_client import Counter
 
 from OpenSSL import SSL
 from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, reactor, ssl, task
+from twisted.internet import defer, protocol, reactor, ssl
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import (
     Agent,
     BrowserLikeRedirectAgent,
     ContentDecoderAgent,
-    FileBodyProducer as TwistedFileBodyProducer,
     GzipDecoder,
     HTTPConnectionPool,
     PartialDownloadError,
@@ -83,18 +84,20 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
 
+        self.user_agent = self.user_agent.encode('ascii')
+
     @defer.inlineCallbacks
-    def request(self, method, uri, *args, **kwargs):
+    def request(self, method, uri, data=b'', headers=None):
         # A small wrapper around self.agent.request() so we can easily attach
         # counters to it
         outgoing_requests_counter.labels(method).inc()
 
         # log request but strip `access_token` (AS requests for example include this)
-        logger.info("Sending request %s %s", method, redact_uri(uri))
+        logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
 
         try:
-            request_deferred = self.agent.request(
-                method, uri, *args, **kwargs
+            request_deferred = treq.request(
+                method, uri, agent=self.agent, data=data, headers=headers
             )
             add_timeout_to_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
@@ -105,14 +108,14 @@ class SimpleHttpClient(object):
             incoming_responses_counter.labels(method, response.code).inc()
             logger.info(
                 "Received response to  %s %s: %s",
-                method, redact_uri(uri), response.code
+                method, redact_uri(uri.encode('ascii')), response.code
             )
             defer.returnValue(response)
         except Exception as e:
             incoming_responses_counter.labels(method, "ERR").inc()
             logger.info(
                 "Error sending request to  %s %s: %s %s",
-                method, redact_uri(uri), type(e).__name__, e.message
+                method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
             )
             raise
 
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
         # TODO: Do we ever want to log message contents?
         logger.debug("post_urlencoded_get_json args: %s", args)
 
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(
+            encode_urlencode_args(args), True).encode("utf8")
 
         actual_headers = {
             b"Content-Type": [b"application/x-www-form-urlencoded"],
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes))
+            data=query_bytes
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
-
         if 200 <= response.code < 300:
-            defer.returnValue(json.loads(body))
+            body = yield make_deferred_yieldable(treq.json_content(response))
+            defer.returnValue(body)
         else:
             raise HttpResponseException(response.code, response.phrase, body)
 
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
             ValueError: if the response was not JSON
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         json_str = encode_canonical_json(json_body)
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "PUT",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
             HttpResponseException on a non-2xx HTTP response.
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         actual_headers = {
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
         )
 
@@ -339,7 +342,7 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            url.encode("ascii"),
+            url,
             headers=Headers(actual_headers),
         )
 
@@ -434,12 +437,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
 
     @defer.inlineCallbacks
     def post_urlencoded_get_raw(self, url, args={}):
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
 
         response = yield self.request(
             "POST",
-            url.encode("ascii"),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes)),
+            url,
+            data=query_bytes,
             headers=Headers({
                 b"Content-Type": [b"application/x-www-form-urlencoded"],
                 b"User-Agent": [self.user_agent],
@@ -510,7 +513,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, unicode):
+    if isinstance(arg, text_type):
         return arg.encode('utf-8')
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
@@ -542,26 +545,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
 
     def creatorForNetloc(self, hostname, port):
         return self
-
-
-class FileBodyProducer(TwistedFileBodyProducer):
-    """Workaround for https://twistedmatrix.com/trac/ticket/8473
-
-    We override the pauseProducing and resumeProducing methods in twisted's
-    FileBodyProducer so that they do not raise exceptions if the task has
-    already completed.
-    """
-
-    def pauseProducing(self):
-        try:
-            super(FileBodyProducer, self).pauseProducing()
-        except task.TaskDone:
-            # task has already completed
-            pass
-
-    def resumeProducing(self):
-        try:
-            super(FileBodyProducer, self).resumeProducing()
-        except task.NotPaused:
-            # task was not paused (probably because it had already completed)
-            pass
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..6a1fc8ca55 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,19 @@ import cgi
 import logging
 import random
 import sys
-import urllib
 
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
 
-from canonicaljson import encode_canonical_json, json
+import treq
+from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
 from twisted.internet import defer, protocol, reactor
 from twisted.internet.error import DNSLookupError
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -58,13 +58,18 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
 
+if PY3:
+    MAXINT = sys.maxsize
+else:
+    MAXINT = sys.maxint
+
 
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
         self.tls_client_options_factory = hs.tls_client_options_factory
 
     def endpointForURI(self, uri):
-        destination = uri.netloc
+        destination = uri.netloc.decode('ascii')
 
         return matrix_federation_endpoint(
             reactor, destination, timeout=10,
@@ -93,26 +98,32 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string
+        self.version_string = hs.version_string.encode('ascii')
         self._next_id = 1
 
     def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urlparse.urlunparse(
-            ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+        return urllib.parse.urlunparse(
+            (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
         )
 
     @defer.inlineCallbacks
     def _request(self, destination, method, path,
-                 body_callback, headers_dict={}, param_bytes=b"",
-                 query_bytes=b"", retry_on_dns_fail=True,
+                 json=None, json_callback=None,
+                 param_bytes=b"",
+                 query=None, retry_on_dns_fail=True,
                  timeout=None, long_retries=False,
                  ignore_backoff=False,
                  backoff_on_404=False):
-        """ Creates and sends a request to the given server
+        """
+        Creates and sends a request to the given server.
+
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            json (dict or None): JSON to send in the body.
+            json_callback (func or None): A callback to generate the JSON.
+            query (dict or None): Query arguments.
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
             backoff_on_404 (bool): Back off if we get a 404
@@ -146,22 +157,29 @@ class MatrixFederationHttpClient(object):
             ignore_backoff=ignore_backoff,
         )
 
-        destination = destination.encode("ascii")
+        headers_dict = {}
         path_bytes = path.encode("ascii")
-        with limiter:
-            headers_dict[b"User-Agent"] = [self.version_string]
-            headers_dict[b"Host"] = [destination]
+        if query:
+            query_bytes = encode_query_args(query)
+        else:
+            query_bytes = b""
 
-            url_bytes = self._create_url(
-                destination, path_bytes, param_bytes, query_bytes
-            )
+        headers_dict = {
+            "User-Agent": [self.version_string],
+            "Host": [destination],
+        }
+
+        with limiter:
+            url = self._create_url(
+                destination.encode("ascii"), path_bytes, param_bytes, query_bytes
+            ).decode('ascii')
 
             txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+            self._next_id = (self._next_id + 1) % (MAXINT - 1)
 
             outbound_logger.info(
                 "{%s} [%s] Sending request: %s %s",
-                txn_id, destination, method, url_bytes
+                txn_id, destination, method, url
             )
 
             # XXX: Would be much nicer to retry only at the transaction-layer
@@ -171,23 +189,33 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url_bytes = urlparse.urlunparse(
-                ("", "", path_bytes, param_bytes, query_bytes, "")
-            )
+            http_url = urllib.parse.urlunparse(
+                (b"", b"", path_bytes, param_bytes, query_bytes, b"")
+            ).decode('ascii')
 
             log_result = None
             try:
                 while True:
-                    producer = None
-                    if body_callback:
-                        producer = body_callback(method, http_url_bytes, headers_dict)
-
                     try:
-                        request_deferred = self.agent.request(
+                        if json_callback:
+                            json = json_callback()
+
+                        if json:
+                            data = encode_canonical_json(json)
+                            headers_dict["Content-Type"] = ["application/json"]
+                            self.sign_request(
+                                destination, method, http_url, headers_dict, json
+                            )
+                        else:
+                            data = None
+                            self.sign_request(destination, method, http_url, headers_dict)
+
+                        request_deferred = treq.request(
                             method,
-                            url_bytes,
-                            Headers(headers_dict),
-                            producer
+                            url,
+                            headers=Headers(headers_dict),
+                            data=data,
+                            agent=self.agent,
                         )
                         add_timeout_to_deferred(
                             request_deferred,
@@ -218,7 +246,7 @@ class MatrixFederationHttpClient(object):
                             txn_id,
                             destination,
                             method,
-                            url_bytes,
+                            url,
                             _flatten_response_never_received(e),
                         )
 
@@ -252,7 +280,7 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 with logcontext.PreserveLoggingContext():
-                    body = yield readBody(response)
+                    body = yield treq.content(response)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -297,11 +325,11 @@ class MatrixFederationHttpClient(object):
         auth_headers = []
 
         for key, sig in request["signatures"][self.server_name].items():
-            auth_headers.append(bytes(
+            auth_headers.append((
                 "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
                     self.server_name, key, sig,
-                )
-            ))
+                )).encode('ascii')
+            )
 
         headers_dict[b"Authorization"] = auth_headers
 
@@ -347,24 +375,14 @@ class MatrixFederationHttpClient(object):
         """
 
         if not json_data_callback:
-            def json_data_callback():
-                return data
-
-        def body_callback(method, url_bytes, headers_dict):
-            json_data = json_data_callback()
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, json_data
-            )
-            producer = _JsonProducer(json_data)
-            return producer
+            json_data_callback = lambda: data
 
         response = yield self._request(
             destination,
             "PUT",
             path,
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
-            query_bytes=encode_query_args(args),
+            json_callback=json_data_callback,
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -376,8 +394,8 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-        defer.returnValue(json.loads(body))
+            body = yield treq.json_content(response)
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
@@ -410,20 +428,12 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, data
-            )
-            return _JsonProducer(data)
-
         response = yield self._request(
             destination,
             "POST",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
+            json=data,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -434,9 +444,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,16 +481,11 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -491,9 +496,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def delete_json(self, destination, path, long_retries=False,
@@ -523,13 +528,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
         response = yield self._request(
             destination,
             "DELETE",
             path,
-            query_bytes=encode_query_args(args),
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -540,9 +543,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +572,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        encoded_args = {}
-        for k, vs in args.items():
-            if isinstance(vs, string_types):
-                vs = [vs]
-            encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
-        query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=query_bytes,
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -639,30 +627,6 @@ def _readBodyToFile(response, stream, max_size):
     return d
 
 
-class _JsonProducer(object):
-    """ Used by the twisted http client to create the HTTP body from json
-    """
-    def __init__(self, jsn):
-        self.reset(jsn)
-
-    def reset(self, jsn):
-        self.body = encode_canonical_json(jsn)
-        self.length = len(self.body)
-
-    def startProducing(self, consumer):
-        consumer.write(self.body)
-        return defer.succeed(None)
-
-    def pauseProducing(self):
-        pass
-
-    def stopProducing(self):
-        pass
-
-    def resumeProducing(self):
-        pass
-
-
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
         reasons = ", ".join(
@@ -693,7 +657,7 @@ def check_content_type_is_json(headers):
             "No Content-Type header"
         )
 
-    c_type = c_type[0]  # only the first header
+    c_type = c_type[0].decode('ascii')  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
         raise RuntimeError(
@@ -711,6 +675,6 @@ def encode_query_args(args):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
-    query_bytes = urllib.urlencode(encoded_args, True)
+    query_bytes = urllib.parse.urlencode(encoded_args, True)
 
-    return query_bytes
+    return query_bytes.encode('utf8')
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 88ed3714f9..f0828c6542 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -204,14 +204,14 @@ class SynapseRequest(Request):
         self.start_time = time.time()
         self.request_metrics = RequestMetrics()
         self.request_metrics.start(
-            self.start_time, name=servlet_name, method=self.method,
+            self.start_time, name=servlet_name, method=self.method.decode('ascii'),
         )
 
         self.site.access_logger.info(
             "%s - %s - Received request: %s %s",
             self.getClientIP(),
             self.site.site_tag,
-            self.method,
+            self.method.decode('ascii'),
             self.get_redacted_uri()
         )
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 942d7c721f..0d8de600cf 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,9 +40,10 @@ REQUIREMENTS = {
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
     "Twisted>=17.1.0": ["twisted>=17.1.0"],
+    "treq>=15.1": ["treq>=15.1"],
 
-    # We use crypto.get_elliptic_curve which is only supported in >=0.15
-    "pyopenssl>=0.15": ["OpenSSL>=0.15"],
+    # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
+    "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
 
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 372648cafd..37b32dd37b 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -53,7 +53,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -89,7 +91,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Account phone numbers are not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -241,7 +245,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -276,7 +282,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Account phone numbers are not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 2fb4d43ccb..192f52e462 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -75,7 +75,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized to register on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -115,7 +117,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Phone numbers are not authorized to register on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -373,7 +377,9 @@ class RegisterRestServlet(RestServlet):
 
                     if not check_3pid_allowed(self.hs, medium, address):
                         raise SynapseError(
-                            403, "Third party identifier is not allowed",
+                            403,
+                            "Third party identifiers (email/phone numbers)" +
+                            " are not authorized on this server",
                             Codes.THREEPID_DENIED,
                         )
 
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 1275baa1ba..263d8eb73e 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -25,6 +25,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
 from synapse.events.utils import (
     format_event_for_client_v2_without_room_id,
+    format_event_raw,
     serialize_event,
 )
 from synapse.handlers.presence import format_user_presence_state
@@ -175,17 +176,28 @@ class SyncRestServlet(RestServlet):
 
     @staticmethod
     def encode_response(time_now, sync_result, access_token_id, filter):
+        if filter.event_format == 'client':
+            event_formatter = format_event_for_client_v2_without_room_id
+        elif filter.event_format == 'federation':
+            event_formatter = format_event_raw
+        else:
+            raise Exception("Unknown event format %s" % (filter.event_format, ))
+
         joined = SyncRestServlet.encode_joined(
-            sync_result.joined, time_now, access_token_id, filter.event_fields
+            sync_result.joined, time_now, access_token_id,
+            filter.event_fields,
+            event_formatter,
         )
 
         invited = SyncRestServlet.encode_invited(
             sync_result.invited, time_now, access_token_id,
+            event_formatter,
         )
 
         archived = SyncRestServlet.encode_archived(
             sync_result.archived, time_now, access_token_id,
             filter.event_fields,
+            event_formatter,
         )
 
         return {
@@ -228,7 +240,7 @@ class SyncRestServlet(RestServlet):
         }
 
     @staticmethod
-    def encode_joined(rooms, time_now, token_id, event_fields):
+    def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
         """
         Encode the joined rooms in a sync result
 
@@ -240,7 +252,9 @@ class SyncRestServlet(RestServlet):
             token_id(int): ID of the user's auth token - used for namespacing
                 of transaction IDs
             event_fields(list<str>): List of event fields to include. If empty,
-            all fields will be returned.
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, dict[str, object]]: the joined rooms list, in our
                 response format
@@ -248,13 +262,14 @@ class SyncRestServlet(RestServlet):
         joined = {}
         for room in rooms:
             joined[room.room_id] = SyncRestServlet.encode_room(
-                room, time_now, token_id, only_fields=event_fields
+                room, time_now, token_id, joined=True, only_fields=event_fields,
+                event_formatter=event_formatter,
             )
 
         return joined
 
     @staticmethod
-    def encode_invited(rooms, time_now, token_id):
+    def encode_invited(rooms, time_now, token_id, event_formatter):
         """
         Encode the invited rooms in a sync result
 
@@ -264,7 +279,9 @@ class SyncRestServlet(RestServlet):
             time_now(int): current time - used as a baseline for age
                 calculations
             token_id(int): ID of the user's auth token - used for namespacing
-            of transaction IDs
+                of transaction IDs
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
 
         Returns:
             dict[str, dict[str, object]]: the invited rooms list, in our
@@ -274,7 +291,7 @@ class SyncRestServlet(RestServlet):
         for room in rooms:
             invite = serialize_event(
                 room.invite, time_now, token_id=token_id,
-                event_format=format_event_for_client_v2_without_room_id,
+                event_format=event_formatter,
                 is_invite=True,
             )
             unsigned = dict(invite.get("unsigned", {}))
@@ -288,7 +305,7 @@ class SyncRestServlet(RestServlet):
         return invited
 
     @staticmethod
-    def encode_archived(rooms, time_now, token_id, event_fields):
+    def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
         """
         Encode the archived rooms in a sync result
 
@@ -300,7 +317,9 @@ class SyncRestServlet(RestServlet):
             token_id(int): ID of the user's auth token - used for namespacing
                 of transaction IDs
             event_fields(list<str>): List of event fields to include. If empty,
-            all fields will be returned.
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, dict[str, object]]: The invited rooms list, in our
                 response format
@@ -308,13 +327,18 @@ class SyncRestServlet(RestServlet):
         joined = {}
         for room in rooms:
             joined[room.room_id] = SyncRestServlet.encode_room(
-                room, time_now, token_id, joined=False, only_fields=event_fields
+                room, time_now, token_id, joined=False,
+                only_fields=event_fields,
+                event_formatter=event_formatter,
             )
 
         return joined
 
     @staticmethod
-    def encode_room(room, time_now, token_id, joined=True, only_fields=None):
+    def encode_room(
+            room, time_now, token_id, joined,
+            only_fields, event_formatter,
+    ):
         """
         Args:
             room (JoinedSyncResult|ArchivedSyncResult): sync result for a
@@ -326,14 +350,15 @@ class SyncRestServlet(RestServlet):
             joined (bool): True if the user is joined to this room - will mean
                 we handle ephemeral events
             only_fields(list<str>): Optional. The list of event fields to include.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, object]: the room, encoded in our response format
         """
         def serialize(event):
-            # TODO(mjark): Respect formatting requirements in the filter.
             return serialize_event(
                 event, time_now, token_id=token_id,
-                event_format=format_event_for_client_v2_without_room_id,
+                event_format=event_formatter,
                 only_event_fields=only_fields,
             )
 
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 56e7acd37c..a3aa0a1cf2 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,79 +14,79 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 import synapse.api.errors
 import synapse.handlers.device
 import synapse.storage
 
-from tests import unittest, utils
+from tests import unittest
 
 user1 = "@boris:aaa"
 user2 = "@theresa:bbb"
 
 
-class DeviceTestCase(unittest.TestCase):
-    def __init__(self, *args, **kwargs):
-        super(DeviceTestCase, self).__init__(*args, **kwargs)
-        self.store = None  # type: synapse.storage.DataStore
-        self.handler = None  # type: synapse.handlers.device.DeviceHandler
-        self.clock = None  # type: utils.MockClock
-
-    @defer.inlineCallbacks
-    def setUp(self):
-        hs = yield utils.setup_test_homeserver(self.addCleanup)
+class DeviceTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver("server", http_client=None)
         self.handler = hs.get_device_handler()
         self.store = hs.get_datastore()
-        self.clock = hs.get_clock()
+        return hs
+
+    def prepare(self, reactor, clock, hs):
+        # These tests assume that it starts 1000 seconds in.
+        self.reactor.advance(1000)
 
-    @defer.inlineCallbacks
     def test_device_is_created_if_doesnt_exist(self):
-        res = yield self.handler.check_device_registered(
-            user_id="@boris:foo",
-            device_id="fco",
-            initial_device_display_name="display name",
+        res = self.get_success(
+            self.handler.check_device_registered(
+                user_id="@boris:foo",
+                device_id="fco",
+                initial_device_display_name="display name",
+            )
         )
         self.assertEqual(res, "fco")
 
-        dev = yield self.handler.store.get_device("@boris:foo", "fco")
+        dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
         self.assertEqual(dev["display_name"], "display name")
 
-    @defer.inlineCallbacks
     def test_device_is_preserved_if_exists(self):
-        res1 = yield self.handler.check_device_registered(
-            user_id="@boris:foo",
-            device_id="fco",
-            initial_device_display_name="display name",
+        res1 = self.get_success(
+            self.handler.check_device_registered(
+                user_id="@boris:foo",
+                device_id="fco",
+                initial_device_display_name="display name",
+            )
         )
         self.assertEqual(res1, "fco")
 
-        res2 = yield self.handler.check_device_registered(
-            user_id="@boris:foo",
-            device_id="fco",
-            initial_device_display_name="new display name",
+        res2 = self.get_success(
+            self.handler.check_device_registered(
+                user_id="@boris:foo",
+                device_id="fco",
+                initial_device_display_name="new display name",
+            )
         )
         self.assertEqual(res2, "fco")
 
-        dev = yield self.handler.store.get_device("@boris:foo", "fco")
+        dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
         self.assertEqual(dev["display_name"], "display name")
 
-    @defer.inlineCallbacks
     def test_device_id_is_made_up_if_unspecified(self):
-        device_id = yield self.handler.check_device_registered(
-            user_id="@theresa:foo",
-            device_id=None,
-            initial_device_display_name="display",
+        device_id = self.get_success(
+            self.handler.check_device_registered(
+                user_id="@theresa:foo",
+                device_id=None,
+                initial_device_display_name="display",
+            )
         )
 
-        dev = yield self.handler.store.get_device("@theresa:foo", device_id)
+        dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id))
         self.assertEqual(dev["display_name"], "display")
 
-    @defer.inlineCallbacks
     def test_get_devices_by_user(self):
-        yield self._record_users()
+        self._record_users()
+
+        res = self.get_success(self.handler.get_devices_by_user(user1))
 
-        res = yield self.handler.get_devices_by_user(user1)
         self.assertEqual(3, len(res))
         device_map = {d["device_id"]: d for d in res}
         self.assertDictContainsSubset(
@@ -119,11 +120,10 @@ class DeviceTestCase(unittest.TestCase):
             device_map["abc"],
         )
 
-    @defer.inlineCallbacks
     def test_get_device(self):
-        yield self._record_users()
+        self._record_users()
 
-        res = yield self.handler.get_device(user1, "abc")
+        res = self.get_success(self.handler.get_device(user1, "abc"))
         self.assertDictContainsSubset(
             {
                 "user_id": user1,
@@ -135,59 +135,66 @@ class DeviceTestCase(unittest.TestCase):
             res,
         )
 
-    @defer.inlineCallbacks
     def test_delete_device(self):
-        yield self._record_users()
+        self._record_users()
 
         # delete the device
-        yield self.handler.delete_device(user1, "abc")
+        self.get_success(self.handler.delete_device(user1, "abc"))
 
         # check the device was deleted
-        with self.assertRaises(synapse.api.errors.NotFoundError):
-            yield self.handler.get_device(user1, "abc")
+        res = self.handler.get_device(user1, "abc")
+        self.pump()
+        self.assertIsInstance(
+            self.failureResultOf(res).value, synapse.api.errors.NotFoundError
+        )
 
         # we'd like to check the access token was invalidated, but that's a
         # bit of a PITA.
 
-    @defer.inlineCallbacks
     def test_update_device(self):
-        yield self._record_users()
+        self._record_users()
 
         update = {"display_name": "new display"}
-        yield self.handler.update_device(user1, "abc", update)
+        self.get_success(self.handler.update_device(user1, "abc", update))
 
-        res = yield self.handler.get_device(user1, "abc")
+        res = self.get_success(self.handler.get_device(user1, "abc"))
         self.assertEqual(res["display_name"], "new display")
 
-    @defer.inlineCallbacks
     def test_update_unknown_device(self):
         update = {"display_name": "new_display"}
-        with self.assertRaises(synapse.api.errors.NotFoundError):
-            yield self.handler.update_device("user_id", "unknown_device_id", update)
+        res = self.handler.update_device("user_id", "unknown_device_id", update)
+        self.pump()
+        self.assertIsInstance(
+            self.failureResultOf(res).value, synapse.api.errors.NotFoundError
+        )
 
-    @defer.inlineCallbacks
     def _record_users(self):
         # check this works for both devices which have a recorded client_ip,
         # and those which don't.
-        yield self._record_user(user1, "xyz", "display 0")
-        yield self._record_user(user1, "fco", "display 1", "token1", "ip1")
-        yield self._record_user(user1, "abc", "display 2", "token2", "ip2")
-        yield self._record_user(user1, "abc", "display 2", "token3", "ip3")
+        self._record_user(user1, "xyz", "display 0")
+        self._record_user(user1, "fco", "display 1", "token1", "ip1")
+        self._record_user(user1, "abc", "display 2", "token2", "ip2")
+        self._record_user(user1, "abc", "display 2", "token3", "ip3")
+
+        self._record_user(user2, "def", "dispkay", "token4", "ip4")
 
-        yield self._record_user(user2, "def", "dispkay", "token4", "ip4")
+        self.reactor.advance(10000)
 
-    @defer.inlineCallbacks
     def _record_user(
         self, user_id, device_id, display_name, access_token=None, ip=None
     ):
-        device_id = yield self.handler.check_device_registered(
-            user_id=user_id,
-            device_id=device_id,
-            initial_device_display_name=display_name,
+        device_id = self.get_success(
+            self.handler.check_device_registered(
+                user_id=user_id,
+                device_id=device_id,
+                initial_device_display_name=display_name,
+            )
         )
 
         if ip is not None:
-            yield self.store.insert_client_ip(
-                user_id, access_token, ip, "user_agent", device_id
+            self.get_success(
+                self.store.insert_client_ip(
+                    user_id, access_token, ip, "user_agent", device_id
+                )
             )
-            self.clock.advance_time(1000)
+            self.reactor.advance(1000)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index ad58073a14..c2d951b45f 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -33,7 +33,7 @@ from ..utils import (
 )
 
 
-def _expect_edu(destination, edu_type, content, origin="test"):
+def _expect_edu_transaction(edu_type, content, origin="test"):
     return {
         "origin": origin,
         "origin_server_ts": 1000000,
@@ -42,8 +42,8 @@ def _expect_edu(destination, edu_type, content, origin="test"):
     }
 
 
-def _make_edu_json(origin, edu_type, content):
-    return json.dumps(_expect_edu("test", edu_type, content, origin=origin)).encode(
+def _make_edu_transaction_json(edu_type, content):
+    return json.dumps(_expect_edu_transaction(edu_type, content)).encode(
         'utf8'
     )
 
@@ -190,8 +190,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
             call(
                 "farm",
                 path="/_matrix/federation/v1/send/1000000/",
-                data=_expect_edu(
-                    "farm",
+                data=_expect_edu_transaction(
                     "m.typing",
                     content={
                         "room_id": self.room_id,
@@ -221,11 +220,10 @@ class TypingNotificationsTestCase(unittest.TestCase):
 
         self.assertEquals(self.event_source.get_current_key(), 0)
 
-        yield self.mock_federation_resource.trigger(
+        (code, response) = yield self.mock_federation_resource.trigger(
             "PUT",
             "/_matrix/federation/v1/send/1000000/",
-            _make_edu_json(
-                "farm",
+            _make_edu_transaction_json(
                 "m.typing",
                 content={
                     "room_id": self.room_id,
@@ -233,7 +231,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
                     "typing": True,
                 },
             ),
-            federation_auth=True,
+            federation_auth_origin=b'farm',
         )
 
         self.on_new_event.assert_has_calls(
@@ -264,8 +262,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
             call(
                 "farm",
                 path="/_matrix/federation/v1/send/1000000/",
-                data=_expect_edu(
-                    "farm",
+                data=_expect_edu_transaction(
                     "m.typing",
                     content={
                         "room_id": self.room_id,
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 65df116efc..089cecfbee 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -1,4 +1,5 @@
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -11,89 +12,91 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import tempfile
 
 from mock import Mock, NonCallableMock
 
-from twisted.internet import defer, reactor
-from twisted.internet.defer import Deferred
+import attr
 
 from synapse.replication.tcp.client import (
     ReplicationClientFactory,
     ReplicationClientHandler,
 )
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
 
 from tests import unittest
-from tests.utils import setup_test_homeserver
 
 
-class TestReplicationClientHandler(ReplicationClientHandler):
-    """Overrides on_rdata so that we can wait for it to happen"""
+class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
 
-    def __init__(self, store):
-        super(TestReplicationClientHandler, self).__init__(store)
-        self._rdata_awaiters = []
-
-    def await_replication(self):
-        d = Deferred()
-        self._rdata_awaiters.append(d)
-        return make_deferred_yieldable(d)
-
-    def on_rdata(self, stream_name, token, rows):
-        awaiters = self._rdata_awaiters
-        self._rdata_awaiters = []
-        super(TestReplicationClientHandler, self).on_rdata(stream_name, token, rows)
-        with PreserveLoggingContext():
-            for a in awaiters:
-                a.callback(None)
-
-
-class BaseSlavedStoreTestCase(unittest.TestCase):
-    @defer.inlineCallbacks
-    def setUp(self):
-        self.hs = yield setup_test_homeserver(
-            self.addCleanup,
+        hs = self.setup_test_homeserver(
             "blue",
-            http_client=None,
             federation_client=Mock(),
             ratelimiter=NonCallableMock(spec_set=["send_message"]),
         )
-        self.hs.get_ratelimiter().send_message.return_value = (True, 0)
+
+        hs.get_ratelimiter().send_message.return_value = (True, 0)
+
+        return hs
+
+    def prepare(self, reactor, clock, hs):
 
         self.master_store = self.hs.get_datastore()
         self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs)
         self.event_id = 0
 
         server_factory = ReplicationStreamProtocolFactory(self.hs)
-        # XXX: mktemp is unsafe and should never be used. but we're just a test.
-        path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket")
-        listener = reactor.listenUNIX(path, server_factory)
-        self.addCleanup(listener.stopListening)
         self.streamer = server_factory.streamer
 
-        self.replication_handler = TestReplicationClientHandler(self.slaved_store)
+        self.replication_handler = ReplicationClientHandler(self.slaved_store)
         client_factory = ReplicationClientFactory(
             self.hs, "client_name", self.replication_handler
         )
-        client_connector = reactor.connectUNIX(path, client_factory)
-        self.addCleanup(client_factory.stopTrying)
-        self.addCleanup(client_connector.disconnect)
+
+        server = server_factory.buildProtocol(None)
+        client = client_factory.buildProtocol(None)
+
+        @attr.s
+        class FakeTransport(object):
+
+            other = attr.ib()
+            disconnecting = False
+            buffer = attr.ib(default=b'')
+
+            def registerProducer(self, producer, streaming):
+
+                self.producer = producer
+
+                def _produce():
+                    self.producer.resumeProducing()
+                    reactor.callLater(0.1, _produce)
+
+                reactor.callLater(0.0, _produce)
+
+            def write(self, byt):
+                self.buffer = self.buffer + byt
+
+                if getattr(self.other, "transport") is not None:
+                    self.other.dataReceived(self.buffer)
+                    self.buffer = b""
+
+            def writeSequence(self, seq):
+                for x in seq:
+                    self.write(x)
+
+        client.makeConnection(FakeTransport(server))
+        server.makeConnection(FakeTransport(client))
 
     def replicate(self):
         """Tell the master side of replication that something has happened, and then
         wait for the replication to occur.
         """
-        # xxx: should we be more specific in what we wait for?
-        d = self.replication_handler.await_replication()
         self.streamer.on_notifier_poke()
-        return d
+        self.pump(0.1)
 
-    @defer.inlineCallbacks
     def check(self, method, args, expected_result=None):
-        master_result = yield getattr(self.master_store, method)(*args)
-        slaved_result = yield getattr(self.slaved_store, method)(*args)
+        master_result = self.get_success(getattr(self.master_store, method)(*args))
+        slaved_result = self.get_success(getattr(self.slaved_store, method)(*args))
         if expected_result is not None:
             self.assertEqual(master_result, expected_result)
             self.assertEqual(slaved_result, expected_result)
diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py
index 87cc2b2fba..43e3248703 100644
--- a/tests/replication/slave/storage/test_account_data.py
+++ b/tests/replication/slave/storage/test_account_data.py
@@ -12,9 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from twisted.internet import defer
-
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 
 from ._base import BaseSlavedStoreTestCase
@@ -27,16 +24,19 @@ class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase):
 
     STORE_TYPE = SlavedAccountDataStore
 
-    @defer.inlineCallbacks
     def test_user_account_data(self):
-        yield self.master_store.add_account_data_for_user(USER_ID, TYPE, {"a": 1})
-        yield self.replicate()
-        yield self.check(
+        self.get_success(
+            self.master_store.add_account_data_for_user(USER_ID, TYPE, {"a": 1})
+        )
+        self.replicate()
+        self.check(
             "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 1}
         )
 
-        yield self.master_store.add_account_data_for_user(USER_ID, TYPE, {"a": 2})
-        yield self.replicate()
-        yield self.check(
+        self.get_success(
+            self.master_store.add_account_data_for_user(USER_ID, TYPE, {"a": 2})
+        )
+        self.replicate()
+        self.check(
             "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 2}
         )
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 2ba80ccdcf..db44d33c68 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 from synapse.events import FrozenEvent, _EventInternalMetadata
 from synapse.events.snapshot import EventContext
 from synapse.replication.slave.storage.events import SlavedEventStore
@@ -55,70 +53,66 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
     def tearDown(self):
         [unpatch() for unpatch in self.unpatches]
 
-    @defer.inlineCallbacks
     def test_get_latest_event_ids_in_room(self):
-        create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.replicate()
-        yield self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
+        create = self.persist(type="m.room.create", key="", creator=USER_ID)
+        self.replicate()
+        self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
 
-        join = yield self.persist(
+        join = self.persist(
             type="m.room.member",
             key=USER_ID,
             membership="join",
             prev_events=[(create.event_id, {})],
         )
-        yield self.replicate()
-        yield self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
+        self.replicate()
+        self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
 
-    @defer.inlineCallbacks
     def test_redactions(self):
-        yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.persist(type="m.room.member", key=USER_ID, membership="join")
+        self.persist(type="m.room.create", key="", creator=USER_ID)
+        self.persist(type="m.room.member", key=USER_ID, membership="join")
 
-        msg = yield self.persist(type="m.room.message", msgtype="m.text", body="Hello")
-        yield self.replicate()
-        yield self.check("get_event", [msg.event_id], msg)
+        msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
+        self.replicate()
+        self.check("get_event", [msg.event_id], msg)
 
-        redaction = yield self.persist(type="m.room.redaction", redacts=msg.event_id)
-        yield self.replicate()
+        redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
+        self.replicate()
 
         msg_dict = msg.get_dict()
         msg_dict["content"] = {}
         msg_dict["unsigned"]["redacted_by"] = redaction.event_id
         msg_dict["unsigned"]["redacted_because"] = redaction
         redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
-        yield self.check("get_event", [msg.event_id], redacted)
+        self.check("get_event", [msg.event_id], redacted)
 
-    @defer.inlineCallbacks
     def test_backfilled_redactions(self):
-        yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.persist(type="m.room.member", key=USER_ID, membership="join")
+        self.persist(type="m.room.create", key="", creator=USER_ID)
+        self.persist(type="m.room.member", key=USER_ID, membership="join")
 
-        msg = yield self.persist(type="m.room.message", msgtype="m.text", body="Hello")
-        yield self.replicate()
-        yield self.check("get_event", [msg.event_id], msg)
+        msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
+        self.replicate()
+        self.check("get_event", [msg.event_id], msg)
 
-        redaction = yield self.persist(
+        redaction = self.persist(
             type="m.room.redaction", redacts=msg.event_id, backfill=True
         )
-        yield self.replicate()
+        self.replicate()
 
         msg_dict = msg.get_dict()
         msg_dict["content"] = {}
         msg_dict["unsigned"]["redacted_by"] = redaction.event_id
         msg_dict["unsigned"]["redacted_because"] = redaction
         redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
-        yield self.check("get_event", [msg.event_id], redacted)
+        self.check("get_event", [msg.event_id], redacted)
 
-    @defer.inlineCallbacks
     def test_invites(self):
-        yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.check("get_invited_rooms_for_user", [USER_ID_2], [])
-        event = yield self.persist(
-            type="m.room.member", key=USER_ID_2, membership="invite"
-        )
-        yield self.replicate()
-        yield self.check(
+        self.persist(type="m.room.create", key="", creator=USER_ID)
+        self.check("get_invited_rooms_for_user", [USER_ID_2], [])
+        event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
+
+        self.replicate()
+
+        self.check(
             "get_invited_rooms_for_user",
             [USER_ID_2],
             [
@@ -132,37 +126,34 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             ],
         )
 
-    @defer.inlineCallbacks
     def test_push_actions_for_user(self):
-        yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.persist(type="m.room.join", key=USER_ID, membership="join")
-        yield self.persist(
+        self.persist(type="m.room.create", key="", creator=USER_ID)
+        self.persist(type="m.room.join", key=USER_ID, membership="join")
+        self.persist(
             type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
         )
-        event1 = yield self.persist(
-            type="m.room.message", msgtype="m.text", body="hello"
-        )
-        yield self.replicate()
-        yield self.check(
+        event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
+        self.replicate()
+        self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
             {"highlight_count": 0, "notify_count": 0},
         )
 
-        yield self.persist(
+        self.persist(
             type="m.room.message",
             msgtype="m.text",
             body="world",
             push_actions=[(USER_ID_2, ["notify"])],
         )
-        yield self.replicate()
-        yield self.check(
+        self.replicate()
+        self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
             {"highlight_count": 0, "notify_count": 1},
         )
 
-        yield self.persist(
+        self.persist(
             type="m.room.message",
             msgtype="m.text",
             body="world",
@@ -170,8 +161,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
                 (USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
             ],
         )
-        yield self.replicate()
-        yield self.check(
+        self.replicate()
+        self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
             {"highlight_count": 1, "notify_count": 2},
@@ -179,7 +170,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
 
     event_id = 0
 
-    @defer.inlineCallbacks
     def persist(
         self,
         sender=USER_ID,
@@ -206,8 +196,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             depth = self.event_id
 
         if not prev_events:
-            latest_event_ids = yield self.master_store.get_latest_event_ids_in_room(
-                room_id
+            latest_event_ids = self.get_success(
+                self.master_store.get_latest_event_ids_in_room(room_id)
             )
             prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
 
@@ -240,19 +230,23 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             )
         else:
             state_handler = self.hs.get_state_handler()
-            context = yield state_handler.compute_event_context(event)
+            context = self.get_success(state_handler.compute_event_context(event))
 
-        yield self.master_store.add_push_actions_to_staging(
+        self.master_store.add_push_actions_to_staging(
             event.event_id, {user_id: actions for user_id, actions in push_actions}
         )
 
         ordering = None
         if backfill:
-            yield self.master_store.persist_events([(event, context)], backfilled=True)
+            self.get_success(
+                self.master_store.persist_events([(event, context)], backfilled=True)
+            )
         else:
-            ordering, _ = yield self.master_store.persist_event(event, context)
+            ordering, _ = self.get_success(
+                self.master_store.persist_event(event, context)
+            )
 
         if ordering:
             event.internal_metadata.stream_ordering = ordering
 
-        defer.returnValue(event)
+        return event
diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py
index ae1adeded1..f47d94f690 100644
--- a/tests/replication/slave/storage/test_receipts.py
+++ b/tests/replication/slave/storage/test_receipts.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 
 from ._base import BaseSlavedStoreTestCase
@@ -27,13 +25,10 @@ class SlavedReceiptTestCase(BaseSlavedStoreTestCase):
 
     STORE_TYPE = SlavedReceiptsStore
 
-    @defer.inlineCallbacks
     def test_receipt(self):
-        yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {})
-        yield self.master_store.insert_receipt(
-            ROOM_ID, "m.read", USER_ID, [EVENT_ID], {}
-        )
-        yield self.replicate()
-        yield self.check(
-            "get_receipts_for_user", [USER_ID, "m.read"], {ROOM_ID: EVENT_ID}
+        self.check("get_receipts_for_user", [USER_ID, "m.read"], {})
+        self.get_success(
+            self.master_store.insert_receipt(ROOM_ID, "m.read", USER_ID, [EVENT_ID], {})
         )
+        self.replicate()
+        self.check("get_receipts_for_user", [USER_ID, "m.read"], {ROOM_ID: EVENT_ID})
diff --git a/tests/server.py b/tests/server.py
index 7dbdb7f8ea..615bba1b59 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -232,6 +232,7 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs):
 
     clock.threadpool = ThreadPool()
     pool.threadpool = ThreadPool()
+    pool.running = True
     return d
 
 
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index c893990454..3f0083831b 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -37,18 +37,14 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def setUp(self):
         self.as_yaml_files = []
-        config = Mock(
-            app_service_config_files=self.as_yaml_files,
-            event_cache_size=1,
-            password_providers=[],
-        )
         hs = yield setup_test_homeserver(
-            self.addCleanup,
-            config=config,
-            federation_sender=Mock(),
-            federation_client=Mock(),
+            self.addCleanup, federation_sender=Mock(), federation_client=Mock()
         )
 
+        hs.config.app_service_config_files = self.as_yaml_files
+        hs.config.event_cache_size = 1
+        hs.config.password_providers = []
+
         self.as_token = "token1"
         self.as_url = "some_url"
         self.as_id = "as1"
@@ -58,7 +54,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
         self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob")
         self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob")
         # must be done after inserts
-        self.store = ApplicationServiceStore(None, hs)
+        self.store = ApplicationServiceStore(hs.get_db_conn(), hs)
 
     def tearDown(self):
         # TODO: suboptimal that we need to create files for tests!
@@ -105,18 +101,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
     def setUp(self):
         self.as_yaml_files = []
 
-        config = Mock(
-            app_service_config_files=self.as_yaml_files,
-            event_cache_size=1,
-            password_providers=[],
-        )
         hs = yield setup_test_homeserver(
-            self.addCleanup,
-            config=config,
-            federation_sender=Mock(),
-            federation_client=Mock(),
+            self.addCleanup, federation_sender=Mock(), federation_client=Mock()
         )
+
+        hs.config.app_service_config_files = self.as_yaml_files
+        hs.config.event_cache_size = 1
+        hs.config.password_providers = []
+
         self.db_pool = hs.get_db_pool()
+        self.engine = hs.database_engine
 
         self.as_list = [
             {"token": "token1", "url": "https://matrix-as.org", "id": "id_1"},
@@ -129,7 +123,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
 
         self.as_yaml_files = []
 
-        self.store = TestTransactionStore(None, hs)
+        self.store = TestTransactionStore(hs.get_db_conn(), hs)
 
     def _add_service(self, url, as_token, id):
         as_yaml = dict(
@@ -146,29 +140,35 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
             self.as_yaml_files.append(as_token)
 
     def _set_state(self, id, state, txn=None):
-        return self.db_pool.runQuery(
-            "INSERT INTO application_services_state(as_id, state, last_txn) "
-            "VALUES(?,?,?)",
+        return self.db_pool.runOperation(
+            self.engine.convert_param_style(
+                "INSERT INTO application_services_state(as_id, state, last_txn) "
+                "VALUES(?,?,?)"
+            ),
             (id, state, txn),
         )
 
     def _insert_txn(self, as_id, txn_id, events):
-        return self.db_pool.runQuery(
-            "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
-            "VALUES(?,?,?)",
+        return self.db_pool.runOperation(
+            self.engine.convert_param_style(
+                "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
+                "VALUES(?,?,?)"
+            ),
             (as_id, txn_id, json.dumps([e.event_id for e in events])),
         )
 
     def _set_last_txn(self, as_id, txn_id):
-        return self.db_pool.runQuery(
-            "INSERT INTO application_services_state(as_id, last_txn, state) "
-            "VALUES(?,?,?)",
+        return self.db_pool.runOperation(
+            self.engine.convert_param_style(
+                "INSERT INTO application_services_state(as_id, last_txn, state) "
+                "VALUES(?,?,?)"
+            ),
             (as_id, txn_id, ApplicationServiceState.UP),
         )
 
     @defer.inlineCallbacks
     def test_get_appservice_state_none(self):
-        service = Mock(id=999)
+        service = Mock(id="999")
         state = yield self.store.get_appservice_state(service)
         self.assertEquals(None, state)
 
@@ -200,7 +200,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         service = Mock(id=self.as_list[1]["id"])
         yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
         rows = yield self.db_pool.runQuery(
-            "SELECT as_id FROM application_services_state WHERE state=?",
+            self.engine.convert_param_style(
+                "SELECT as_id FROM application_services_state WHERE state=?"
+            ),
             (ApplicationServiceState.DOWN,),
         )
         self.assertEquals(service.id, rows[0][0])
@@ -212,7 +214,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
         yield self.store.set_appservice_state(service, ApplicationServiceState.UP)
         rows = yield self.db_pool.runQuery(
-            "SELECT as_id FROM application_services_state WHERE state=?",
+            self.engine.convert_param_style(
+                "SELECT as_id FROM application_services_state WHERE state=?"
+            ),
             (ApplicationServiceState.UP,),
         )
         self.assertEquals(service.id, rows[0][0])
@@ -279,14 +283,19 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
 
         res = yield self.db_pool.runQuery(
-            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            self.engine.convert_param_style(
+                "SELECT last_txn FROM application_services_state WHERE as_id=?"
+            ),
             (service.id,),
         )
         self.assertEquals(1, len(res))
         self.assertEquals(txn_id, res[0][0])
 
         res = yield self.db_pool.runQuery(
-            "SELECT * FROM application_services_txns WHERE txn_id=?", (txn_id,)
+            self.engine.convert_param_style(
+                "SELECT * FROM application_services_txns WHERE txn_id=?"
+            ),
+            (txn_id,),
         )
         self.assertEquals(0, len(res))
 
@@ -300,7 +309,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
 
         res = yield self.db_pool.runQuery(
-            "SELECT last_txn, state FROM application_services_state WHERE " "as_id=?",
+            self.engine.convert_param_style(
+                "SELECT last_txn, state FROM application_services_state WHERE as_id=?"
+            ),
             (service.id,),
         )
         self.assertEquals(1, len(res))
@@ -308,7 +319,10 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         self.assertEquals(ApplicationServiceState.UP, res[0][1])
 
         res = yield self.db_pool.runQuery(
-            "SELECT * FROM application_services_txns WHERE txn_id=?", (txn_id,)
+            self.engine.convert_param_style(
+                "SELECT * FROM application_services_txns WHERE txn_id=?"
+            ),
+            (txn_id,),
         )
         self.assertEquals(0, len(res))
 
@@ -394,37 +408,31 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
         f1 = self._write_config(suffix="1")
         f2 = self._write_config(suffix="2")
 
-        config = Mock(
-            app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[]
-        )
         hs = yield setup_test_homeserver(
-            self.addCleanup,
-            config=config,
-            datastore=Mock(),
-            federation_sender=Mock(),
-            federation_client=Mock(),
+            self.addCleanup, federation_sender=Mock(), federation_client=Mock()
         )
 
-        ApplicationServiceStore(None, hs)
+        hs.config.app_service_config_files = [f1, f2]
+        hs.config.event_cache_size = 1
+        hs.config.password_providers = []
+
+        ApplicationServiceStore(hs.get_db_conn(), hs)
 
     @defer.inlineCallbacks
     def test_duplicate_ids(self):
         f1 = self._write_config(id="id", suffix="1")
         f2 = self._write_config(id="id", suffix="2")
 
-        config = Mock(
-            app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[]
-        )
         hs = yield setup_test_homeserver(
-            self.addCleanup,
-            config=config,
-            datastore=Mock(),
-            federation_sender=Mock(),
-            federation_client=Mock(),
+            self.addCleanup, federation_sender=Mock(), federation_client=Mock()
         )
 
+        hs.config.app_service_config_files = [f1, f2]
+        hs.config.event_cache_size = 1
+        hs.config.password_providers = []
+
         with self.assertRaises(ConfigError) as cm:
-            ApplicationServiceStore(None, hs)
+            ApplicationServiceStore(hs.get_db_conn(), hs)
 
         e = cm.exception
         self.assertIn(f1, str(e))
@@ -436,19 +444,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
         f1 = self._write_config(as_token="as_token", suffix="1")
         f2 = self._write_config(as_token="as_token", suffix="2")
 
-        config = Mock(
-            app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[]
-        )
         hs = yield setup_test_homeserver(
-            self.addCleanup,
-            config=config,
-            datastore=Mock(),
-            federation_sender=Mock(),
-            federation_client=Mock(),
+            self.addCleanup, federation_sender=Mock(), federation_client=Mock()
         )
 
+        hs.config.app_service_config_files = [f1, f2]
+        hs.config.event_cache_size = 1
+        hs.config.password_providers = []
+
         with self.assertRaises(ConfigError) as cm:
-            ApplicationServiceStore(None, hs)
+            ApplicationServiceStore(hs.get_db_conn(), hs)
 
         e = cm.exception
         self.assertIn(f1, str(e))
diff --git a/tests/storage/test_directory.py b/tests/storage/test_directory.py
index b4510c1c8d..4e128e1047 100644
--- a/tests/storage/test_directory.py
+++ b/tests/storage/test_directory.py
@@ -16,7 +16,6 @@
 
 from twisted.internet import defer
 
-from synapse.storage.directory import DirectoryStore
 from synapse.types import RoomAlias, RoomID
 
 from tests import unittest
@@ -28,7 +27,7 @@ class DirectoryStoreTestCase(unittest.TestCase):
     def setUp(self):
         hs = yield setup_test_homeserver(self.addCleanup)
 
-        self.store = DirectoryStore(None, hs)
+        self.store = hs.get_datastore()
 
         self.room = RoomID.from_string("!abcde:test")
         self.alias = RoomAlias.from_string("#my-room:test")
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 2fdf34fdf6..0d4e74d637 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -37,10 +37,10 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
                 (
                     "INSERT INTO events ("
                     "   room_id, event_id, type, depth, topological_ordering,"
-                    "   content, processed, outlier) "
-                    "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
+                    "   content, processed, outlier, stream_ordering) "
+                    "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?, ?)"
                 ),
-                (room_id, event_id, i, i, True, False),
+                (room_id, event_id, i, i, True, False, i),
             )
 
             txn.execute(
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index f2ed866ae7..2036287288 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -13,25 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-import tests.unittest
-import tests.utils
-from tests.utils import setup_test_homeserver
+from tests.unittest import HomeserverTestCase
 
 FORTY_DAYS = 40 * 24 * 60 * 60
 
 
-class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
-    def __init__(self, *args, **kwargs):
-        super(MonthlyActiveUsersTestCase, self).__init__(*args, **kwargs)
+class MonthlyActiveUsersTestCase(HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+
+        hs = self.setup_test_homeserver()
+        self.store = hs.get_datastore()
+
+        # Advance the clock a bit
+        reactor.advance(FORTY_DAYS)
 
-    @defer.inlineCallbacks
-    def setUp(self):
-        self.hs = yield setup_test_homeserver(self.addCleanup)
-        self.store = self.hs.get_datastore()
+        return hs
 
-    @defer.inlineCallbacks
     def test_initialise_reserved_users(self):
         self.hs.config.max_mau_value = 5
         user1 = "@user1:server"
@@ -44,88 +41,101 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
         ]
         user_num = len(threepids)
 
-        yield self.store.register(user_id=user1, token="123", password_hash=None)
-
-        yield self.store.register(user_id=user2, token="456", password_hash=None)
+        self.store.register(user_id=user1, token="123", password_hash=None)
+        self.store.register(user_id=user2, token="456", password_hash=None)
+        self.pump()
 
         now = int(self.hs.get_clock().time_msec())
-        yield self.store.user_add_threepid(user1, "email", user1_email, now, now)
-        yield self.store.user_add_threepid(user2, "email", user2_email, now, now)
-        yield self.store.initialise_reserved_users(threepids)
+        self.store.user_add_threepid(user1, "email", user1_email, now, now)
+        self.store.user_add_threepid(user2, "email", user2_email, now, now)
+        self.store.initialise_reserved_users(threepids)
+        self.pump()
 
-        active_count = yield self.store.get_monthly_active_count()
+        active_count = self.store.get_monthly_active_count()
 
         # Test total counts
-        self.assertEquals(active_count, user_num)
+        self.assertEquals(self.get_success(active_count), user_num)
 
         # Test user is marked as active
-
-        timestamp = yield self.store.user_last_seen_monthly_active(user1)
-        self.assertTrue(timestamp)
-        timestamp = yield self.store.user_last_seen_monthly_active(user2)
-        self.assertTrue(timestamp)
+        timestamp = self.store.user_last_seen_monthly_active(user1)
+        self.assertTrue(self.get_success(timestamp))
+        timestamp = self.store.user_last_seen_monthly_active(user2)
+        self.assertTrue(self.get_success(timestamp))
 
         # Test that users are never removed from the db.
         self.hs.config.max_mau_value = 0
 
-        self.hs.get_clock().advance_time(FORTY_DAYS)
+        self.reactor.advance(FORTY_DAYS)
 
-        yield self.store.reap_monthly_active_users()
+        self.store.reap_monthly_active_users()
+        self.pump()
 
-        active_count = yield self.store.get_monthly_active_count()
-        self.assertEquals(active_count, user_num)
+        active_count = self.store.get_monthly_active_count()
+        self.assertEquals(self.get_success(active_count), user_num)
 
         # Test that regalar users are removed from the db
         ru_count = 2
-        yield self.store.upsert_monthly_active_user("@ru1:server")
-        yield self.store.upsert_monthly_active_user("@ru2:server")
-        active_count = yield self.store.get_monthly_active_count()
+        self.store.upsert_monthly_active_user("@ru1:server")
+        self.store.upsert_monthly_active_user("@ru2:server")
+        self.pump()
 
-        self.assertEqual(active_count, user_num + ru_count)
+        active_count = self.store.get_monthly_active_count()
+        self.assertEqual(self.get_success(active_count), user_num + ru_count)
         self.hs.config.max_mau_value = user_num
-        yield self.store.reap_monthly_active_users()
+        self.store.reap_monthly_active_users()
+        self.pump()
 
-        active_count = yield self.store.get_monthly_active_count()
-        self.assertEquals(active_count, user_num)
+        active_count = self.store.get_monthly_active_count()
+        self.assertEquals(self.get_success(active_count), user_num)
 
-    @defer.inlineCallbacks
     def test_can_insert_and_count_mau(self):
-        count = yield self.store.get_monthly_active_count()
-        self.assertEqual(0, count)
+        count = self.store.get_monthly_active_count()
+        self.assertEqual(0, self.get_success(count))
 
-        yield self.store.upsert_monthly_active_user("@user:server")
-        count = yield self.store.get_monthly_active_count()
+        self.store.upsert_monthly_active_user("@user:server")
+        self.pump()
 
-        self.assertEqual(1, count)
+        count = self.store.get_monthly_active_count()
+        self.assertEqual(1, self.get_success(count))
 
-    @defer.inlineCallbacks
     def test_user_last_seen_monthly_active(self):
         user_id1 = "@user1:server"
         user_id2 = "@user2:server"
         user_id3 = "@user3:server"
 
-        result = yield self.store.user_last_seen_monthly_active(user_id1)
-        self.assertFalse(result == 0)
-        yield self.store.upsert_monthly_active_user(user_id1)
-        yield self.store.upsert_monthly_active_user(user_id2)
-        result = yield self.store.user_last_seen_monthly_active(user_id1)
-        self.assertTrue(result > 0)
-        result = yield self.store.user_last_seen_monthly_active(user_id3)
-        self.assertFalse(result == 0)
+        result = self.store.user_last_seen_monthly_active(user_id1)
+        self.assertFalse(self.get_success(result) == 0)
+
+        self.store.upsert_monthly_active_user(user_id1)
+        self.store.upsert_monthly_active_user(user_id2)
+        self.pump()
+
+        result = self.store.user_last_seen_monthly_active(user_id1)
+        self.assertGreater(self.get_success(result), 0)
+
+        result = self.store.user_last_seen_monthly_active(user_id3)
+        self.assertNotEqual(self.get_success(result), 0)
 
-    @defer.inlineCallbacks
     def test_reap_monthly_active_users(self):
         self.hs.config.max_mau_value = 5
         initial_users = 10
         for i in range(initial_users):
-            yield self.store.upsert_monthly_active_user("@user%d:server" % i)
-        count = yield self.store.get_monthly_active_count()
-        self.assertTrue(count, initial_users)
-        yield self.store.reap_monthly_active_users()
-        count = yield self.store.get_monthly_active_count()
-        self.assertEquals(count, initial_users - self.hs.config.max_mau_value)
-
-        self.hs.get_clock().advance_time(FORTY_DAYS)
-        yield self.store.reap_monthly_active_users()
-        count = yield self.store.get_monthly_active_count()
-        self.assertEquals(count, 0)
+            self.store.upsert_monthly_active_user("@user%d:server" % i)
+        self.pump()
+
+        count = self.store.get_monthly_active_count()
+        self.assertTrue(self.get_success(count), initial_users)
+
+        self.store.reap_monthly_active_users()
+        self.pump()
+        count = self.store.get_monthly_active_count()
+        self.assertEquals(
+            self.get_success(count), initial_users - self.hs.config.max_mau_value
+        )
+
+        self.reactor.advance(FORTY_DAYS)
+        self.store.reap_monthly_active_users()
+        self.pump()
+
+        count = self.store.get_monthly_active_count()
+        self.assertEquals(self.get_success(count), 0)
diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py
index b5b58ff660..c7a63f39b9 100644
--- a/tests/storage/test_presence.py
+++ b/tests/storage/test_presence.py
@@ -16,19 +16,18 @@
 
 from twisted.internet import defer
 
-from synapse.storage.presence import PresenceStore
 from synapse.types import UserID
 
 from tests import unittest
-from tests.utils import MockClock, setup_test_homeserver
+from tests.utils import setup_test_homeserver
 
 
 class PresenceStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def setUp(self):
-        hs = yield setup_test_homeserver(self.addCleanup, clock=MockClock())
+        hs = yield setup_test_homeserver(self.addCleanup)
 
-        self.store = PresenceStore(None, hs)
+        self.store = hs.get_datastore()
 
         self.u_apple = UserID.from_string("@apple:test")
         self.u_banana = UserID.from_string("@banana:test")
diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py
index a1f6618bf9..45824bd3b2 100644
--- a/tests/storage/test_profile.py
+++ b/tests/storage/test_profile.py
@@ -28,7 +28,7 @@ class ProfileStoreTestCase(unittest.TestCase):
     def setUp(self):
         hs = yield setup_test_homeserver(self.addCleanup)
 
-        self.store = ProfileStore(None, hs)
+        self.store = ProfileStore(hs.get_db_conn(), hs)
 
         self.u_frank = UserID.from_string("@frank:test")
 
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
index b46e0ea7e2..0dde1ab2fe 100644
--- a/tests/storage/test_user_directory.py
+++ b/tests/storage/test_user_directory.py
@@ -30,7 +30,7 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def setUp(self):
         self.hs = yield setup_test_homeserver(self.addCleanup)
-        self.store = UserDirectoryStore(None, self.hs)
+        self.store = UserDirectoryStore(self.hs.get_db_conn(), self.hs)
 
         # alice and bob are both in !room_id. bobby is not but shares
         # a homeserver with alice.
diff --git a/tests/test_visibility.py b/tests/test_visibility.py
index 8d8ce0cab9..2eea3b098b 100644
--- a/tests/test_visibility.py
+++ b/tests/test_visibility.py
@@ -96,7 +96,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
         events_to_filter.append(evt)
 
         # the erasey user gets erased
-        self.hs.get_datastore().mark_user_erased("@erased:local_hs")
+        yield self.hs.get_datastore().mark_user_erased("@erased:local_hs")
 
         # ... and the filtering happens.
         filtered = yield filter_events_for_server(
diff --git a/tests/unittest.py b/tests/unittest.py
index 8b513bb32b..a3d39920db 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -22,6 +22,7 @@ from canonicaljson import json
 
 import twisted
 import twisted.logger
+from twisted.internet.defer import Deferred
 from twisted.trial import unittest
 
 from synapse.http.server import JsonResource
@@ -281,12 +282,14 @@ class HomeserverTestCase(TestCase):
         kwargs.update(self._hs_args)
         return setup_test_homeserver(self.addCleanup, *args, **kwargs)
 
-    def pump(self):
+    def pump(self, by=0.0):
         """
         Pump the reactor enough that Deferreds will fire.
         """
-        self.reactor.pump([0.0] * 100)
+        self.reactor.pump([by] * 100)
 
     def get_success(self, d):
+        if not isinstance(d, Deferred):
+            return d
         self.pump()
         return self.successResultOf(d)
diff --git a/tests/utils.py b/tests/utils.py
index 63e30dc6c0..b85017d279 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -30,8 +30,8 @@ from synapse.config.server import ServerConfig
 from synapse.federation.transport import server
 from synapse.http.server import HttpServer
 from synapse.server import HomeServer
-from synapse.storage import DataStore, PostgresEngine
-from synapse.storage.engines import create_engine
+from synapse.storage import DataStore
+from synapse.storage.engines import PostgresEngine, create_engine
 from synapse.storage.prepare_database import (
     _get_or_create_schema_state,
     _setup_new_database,
@@ -42,6 +42,7 @@ from synapse.util.ratelimitutils import FederationRateLimiter
 
 # set this to True to run the tests against postgres instead of sqlite.
 USE_POSTGRES_FOR_TESTS = os.environ.get("SYNAPSE_POSTGRES", False)
+LEAVE_DB = os.environ.get("SYNAPSE_LEAVE_DB", False)
 POSTGRES_USER = os.environ.get("SYNAPSE_POSTGRES_USER", "postgres")
 POSTGRES_BASE_DB = "_synapse_unit_tests_base_%s" % (os.getpid(),)
 
@@ -244,8 +245,9 @@ def setup_test_homeserver(
                 cur.close()
                 db_conn.close()
 
-            # Register the cleanup hook
-            cleanup_func(cleanup)
+            if not LEAVE_DB:
+                # Register the cleanup hook
+                cleanup_func(cleanup)
 
         hs.setup()
     else:
@@ -319,7 +321,10 @@ class MockHttpResource(HttpServer):
 
     @patch('twisted.web.http.Request')
     @defer.inlineCallbacks
-    def trigger(self, http_method, path, content, mock_request, federation_auth=False):
+    def trigger(
+        self, http_method, path, content, mock_request,
+        federation_auth_origin=None,
+    ):
         """ Fire an HTTP event.
 
         Args:
@@ -328,6 +333,7 @@ class MockHttpResource(HttpServer):
             content : The HTTP body
             mock_request : Mocked request to pass to the event so it can get
                            content.
+            federation_auth_origin (bytes|None): domain to authenticate as, for federation
         Returns:
             A tuple of (code, response)
         Raises:
@@ -348,8 +354,10 @@ class MockHttpResource(HttpServer):
         mock_request.getClientIP.return_value = "-"
 
         headers = {}
-        if federation_auth:
-            headers[b"Authorization"] = [b"X-Matrix origin=test,key=,sig="]
+        if federation_auth_origin is not None:
+            headers[b"Authorization"] = [
+                b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin, )
+            ]
         mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
 
         # return the right path if the event requires it