summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--.travis.yml5
-rw-r--r--CONTRIBUTING.rst38
-rw-r--r--MANIFEST.in1
-rw-r--r--README.rst38
-rw-r--r--changelog.d/3699.misc2
-rw-r--r--changelog.d/3873.misc2
-rw-r--r--changelog.d/3894.feature1
-rw-r--r--changelog.d/3899.bugfix1
-rw-r--r--changelog.d/3903.misc1
-rw-r--r--changelog.d/3904.misc1
-rw-r--r--changelog.d/3906.misc1
-rw-r--r--changelog.d/3907.bugfix1
-rw-r--r--changelog.d/3908.bugfix1
-rw-r--r--changelog.d/3909.misc1
-rw-r--r--changelog.d/3910.bugfix1
-rw-r--r--changelog.d/3912.misc1
-rw-r--r--changelog.d/3914.bugfix1
-rw-r--r--docker/Dockerfile-pgtests12
-rwxr-xr-xdocker/run_pg_tests.sh20
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/handlers/federation.py195
-rw-r--r--synapse/handlers/room_member.py5
-rw-r--r--synapse/http/client.py4
-rw-r--r--synapse/http/endpoint.py13
-rw-r--r--synapse/http/matrixfederationclient.py406
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/notifier.py21
-rw-r--r--synapse/storage/client_ips.py34
-rw-r--r--synapse/storage/transactions.py8
-rw-r--r--synapse/util/async_helpers.py88
-rw-r--r--synapse/util/retryutils.py2
-rwxr-xr-xtest_postgresql.sh12
-rw-r--r--tests/http/test_fedclient.py43
-rw-r--r--tests/replication/slave/storage/_base.py35
-rw-r--r--tests/server.py89
-rw-r--r--tests/storage/test_client_ips.py202
-rw-r--r--tests/test_server.py74
-rw-r--r--tests/unittest.py10
-rw-r--r--tests/utils.py27
-rw-r--r--tox.ini12
41 files changed, 979 insertions, 442 deletions
diff --git a/.gitignore b/.gitignore
index 1718185384..3b2252ad8a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,9 +1,11 @@
 *.pyc
 .*.swp
 *~
+*.lock
 
 .DS_Store
 _trial_temp/
+_trial_temp*/
 logs/
 dbs/
 *.egg
diff --git a/.travis.yml b/.travis.yml
index b3ee66da8f..b6faca4b92 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,6 +32,11 @@ matrix:
     env: TOX_ENV=py36
 
   - python: 3.6
+    env: TOX_ENV=py36-postgres TRIAL_FLAGS="-j 4"
+    services:
+      - postgresql
+
+  - python: 3.6
     env: TOX_ENV=check_isort
 
   - python: 3.6
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index f9de78a460..6ef7d48dc7 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -30,12 +30,28 @@ use github's pull request workflow to review the contribution, and either ask
 you to make any refinements needed or merge it and make them ourselves. The
 changes will then land on master when we next do a release.
 
-We use `Jenkins <http://matrix.org/jenkins>`_ and
-`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
-integration. All pull requests to synapse get automatically tested by Travis;
-the Jenkins builds require an adminstrator to start them. If your change
-breaks the build, this will be shown in github, so please keep an eye on the
-pull request for feedback.
+We use `CircleCI <https://circleci.com/gh/matrix-org>`_ and `Travis CI 
+<https://travis-ci.org/matrix-org/synapse>`_ for continuous integration. All
+pull requests to synapse get automatically tested by Travis and CircleCI.
+If your change breaks the build, this will be shown in GitHub, so please
+keep an eye on the pull request for feedback.
+
+To run unit tests in a local development environment, you can use:
+
+- ``tox -e py27`` (requires tox to be installed by ``pip install tox``) for
+  SQLite-backed Synapse on Python 2.7.
+- ``tox -e py35`` for SQLite-backed Synapse on Python 3.5.
+- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6.
+- ``tox -e py27-postgres`` for PostgreSQL-backed Synapse on Python 2.7
+  (requires a running local PostgreSQL with access to create databases).
+- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 2.7
+  (requires Docker). Entirely self-contained, recommended if you don't want to
+  set up PostgreSQL yourself.
+
+Docker images are available for running the integration tests (SyTest) locally,
+see the `documentation in the SyTest repo
+<https://github.com/matrix-org/sytest/blob/develop/docker/README.md>`_ for more
+information.
 
 Code style
 ~~~~~~~~~~
@@ -77,7 +93,8 @@ AUTHORS.rst file for the project in question. Please feel free to include a
 change to AUTHORS.rst in your pull request to list yourself and a short
 description of the area(s) you've worked on. Also, we sometimes have swag to
 give away to contributors - if you feel that Matrix-branded apparel is missing
-from your life, please mail us your shipping address to matrix at matrix.org and we'll try to fix it :)
+from your life, please mail us your shipping address to matrix at matrix.org and
+we'll try to fix it :)
 
 Sign off
 ~~~~~~~~
@@ -144,4 +161,9 @@ flag to ``git commit``, which uses the name and email set in your
 Conclusion
 ~~~~~~~~~~
 
-That's it!  Matrix is a very open and collaborative project as you might expect given our obsession with open communication.  If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so.  So please get involved - and we hope you have as much fun hacking on Matrix as we do!
+That's it!  Matrix is a very open and collaborative project as you might expect
+given our obsession with open communication.  If we're going to successfully
+matrix together all the fragmented communication technologies out there we are
+reliant on contributions and collaboration from the community to do so.  So
+please get involved - and we hope you have as much fun hacking on Matrix as we
+do!
diff --git a/MANIFEST.in b/MANIFEST.in
index e0826ba544..47ae5a77b9 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -28,6 +28,7 @@ exclude jenkins*.sh
 exclude jenkins*
 exclude Dockerfile
 exclude .dockerignore
+exclude test_postgresql.sh
 recursive-exclude jenkins *.sh
 
 include pyproject.toml
diff --git a/README.rst b/README.rst
index 9c0f9c09c8..5547f617ba 100644
--- a/README.rst
+++ b/README.rst
@@ -157,7 +157,7 @@ if you prefer.
 
 In case of problems, please see the _`Troubleshooting` section below.
 
-There is an offical synapse image available at 
+There is an offical synapse image available at
 https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with
 the docker-compose file available at `contrib/docker <contrib/docker>`_. Further information on
 this including configuration options is available in the README on
@@ -459,37 +459,13 @@ https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix-
 
 Windows Install
 ---------------
-Synapse can be installed on Cygwin. It requires the following Cygwin packages:
-
-- gcc
-- git
-- libffi-devel
-- openssl (and openssl-devel, python-openssl)
-- python
-- python-setuptools
-
-The content repository requires additional packages and will be unable to process
-uploads without them:
-
-- libjpeg8
-- libjpeg8-devel
-- zlib
-
-If you choose to install Synapse without these packages, you will need to reinstall
-``pillow`` for changes to be applied, e.g. ``pip uninstall pillow`` ``pip install
-pillow --user``
-
-Troubleshooting:
-
-- You may need to upgrade ``setuptools`` to get this to work correctly:
-  ``pip install setuptools --upgrade``.
-- You may encounter errors indicating that ``ffi.h`` is missing, even with
-  ``libffi-devel`` installed. If you do, copy the ``.h`` files:
-  ``cp /usr/lib/libffi-3.0.13/include/*.h /usr/include``
-- You may need to install libsodium from source in order to install PyNacl. If
-  you do, you may need to create a symlink to ``libsodium.a`` so ``ld`` can find
-  it: ``ln -s /usr/local/lib/libsodium.a /usr/lib/libsodium.a``
 
+If you wish to run or develop Synapse on Windows, the Windows Subsystem For
+Linux provides a Linux environment on Windows 10 which is capable of using the
+Debian, Fedora, or source installation methods. More information about WSL can
+be found at https://docs.microsoft.com/en-us/windows/wsl/install-win10 for
+Windows 10 and https://docs.microsoft.com/en-us/windows/wsl/install-on-server
+for Windows Server.
 
 Troubleshooting
 ===============
diff --git a/changelog.d/3699.misc b/changelog.d/3699.misc
new file mode 100644
index 0000000000..437efbd98f
--- /dev/null
+++ b/changelog.d/3699.misc
@@ -0,0 +1,2 @@
+Unit tests can now be run under PostgreSQL in Docker using 
+``test_postgresql.sh``.
diff --git a/changelog.d/3873.misc b/changelog.d/3873.misc
new file mode 100644
index 0000000000..8104b5c085
--- /dev/null
+++ b/changelog.d/3873.misc
@@ -0,0 +1,2 @@
+Remove documentation regarding installation on Cygwin, the use of WSL is 
+recommended instead.
diff --git a/changelog.d/3894.feature b/changelog.d/3894.feature
new file mode 100644
index 0000000000..1ed0cccdb2
--- /dev/null
+++ b/changelog.d/3894.feature
@@ -0,0 +1 @@
+Report "python_version" in the phone home stats
diff --git a/changelog.d/3899.bugfix b/changelog.d/3899.bugfix
new file mode 100644
index 0000000000..5120e3a823
--- /dev/null
+++ b/changelog.d/3899.bugfix
@@ -0,0 +1 @@
+When we join a room, always try the server we used for the alias lookup first, to avoid unresponsive and out-of-date servers.
diff --git a/changelog.d/3903.misc b/changelog.d/3903.misc
new file mode 100644
index 0000000000..49b64bf333
--- /dev/null
+++ b/changelog.d/3903.misc
@@ -0,0 +1 @@
+Increase the timeout when filling missing events in federation requests
\ No newline at end of file
diff --git a/changelog.d/3904.misc b/changelog.d/3904.misc
new file mode 100644
index 0000000000..1e3c8e1706
--- /dev/null
+++ b/changelog.d/3904.misc
@@ -0,0 +1 @@
+Improve the logging when handling a federation transaction
\ No newline at end of file
diff --git a/changelog.d/3906.misc b/changelog.d/3906.misc
new file mode 100644
index 0000000000..11709186d3
--- /dev/null
+++ b/changelog.d/3906.misc
@@ -0,0 +1 @@
+Improve logging of outbound federation requests
\ No newline at end of file
diff --git a/changelog.d/3907.bugfix b/changelog.d/3907.bugfix
new file mode 100644
index 0000000000..45e010c052
--- /dev/null
+++ b/changelog.d/3907.bugfix
@@ -0,0 +1 @@
+Fix incorrect server-name indication for outgoing federation requests
\ No newline at end of file
diff --git a/changelog.d/3908.bugfix b/changelog.d/3908.bugfix
new file mode 100644
index 0000000000..518aee6c4d
--- /dev/null
+++ b/changelog.d/3908.bugfix
@@ -0,0 +1 @@
+Fix adding client IPs to the database failing on Python 3.
\ No newline at end of file
diff --git a/changelog.d/3909.misc b/changelog.d/3909.misc
new file mode 100644
index 0000000000..11709186d3
--- /dev/null
+++ b/changelog.d/3909.misc
@@ -0,0 +1 @@
+Improve logging of outbound federation requests
\ No newline at end of file
diff --git a/changelog.d/3910.bugfix b/changelog.d/3910.bugfix
new file mode 100644
index 0000000000..22ec2adc33
--- /dev/null
+++ b/changelog.d/3910.bugfix
@@ -0,0 +1 @@
+Fix bug where things occaisonally were not being timed out correctly.
diff --git a/changelog.d/3912.misc b/changelog.d/3912.misc
new file mode 100644
index 0000000000..87d73697ea
--- /dev/null
+++ b/changelog.d/3912.misc
@@ -0,0 +1 @@
+Add a regression test for logging failed HTTP requests on Python 3.
\ No newline at end of file
diff --git a/changelog.d/3914.bugfix b/changelog.d/3914.bugfix
new file mode 100644
index 0000000000..27e6bad590
--- /dev/null
+++ b/changelog.d/3914.bugfix
@@ -0,0 +1 @@
+Fix bug where outbound federation would stop talking to some servers when using workers
diff --git a/docker/Dockerfile-pgtests b/docker/Dockerfile-pgtests
new file mode 100644
index 0000000000..7da8eeb9eb
--- /dev/null
+++ b/docker/Dockerfile-pgtests
@@ -0,0 +1,12 @@
+# Use the Sytest image that comes with a lot of the build dependencies
+# pre-installed
+FROM matrixdotorg/sytest:latest
+
+# The Sytest image doesn't come with python, so install that
+RUN apt-get -qq install -y python python-dev python-pip
+
+# We need tox to run the tests in run_pg_tests.sh
+RUN pip install tox
+
+ADD run_pg_tests.sh /pg_tests.sh
+ENTRYPOINT /pg_tests.sh
diff --git a/docker/run_pg_tests.sh b/docker/run_pg_tests.sh
new file mode 100755
index 0000000000..e77424c41a
--- /dev/null
+++ b/docker/run_pg_tests.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+# This script runs the PostgreSQL tests inside a Docker container. It expects
+# the relevant source files to be mounted into /src (done automatically by the
+# caller script). It will set up the database, run it, and then use the tox
+# configuration to run the tests.
+
+set -e
+
+# Set PGUSER so Synapse's tests know what user to connect to the database with
+export PGUSER=postgres
+
+# Initialise & start the database
+su -c '/usr/lib/postgresql/9.6/bin/initdb -D /var/lib/postgresql/data -E "UTF-8" --lc-collate="en_US.UTF-8" --lc-ctype="en_US.UTF-8" --username=postgres' postgres
+su -c '/usr/lib/postgresql/9.6/bin/pg_ctl -w -D /var/lib/postgresql/data start' postgres
+
+# Run the tests
+cd /src
+export TRIAL_FLAGS="-j 4"
+tox --workdir=/tmp -e py27-postgres
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ac97e19649..3241ded188 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -457,6 +457,10 @@ def run(hs):
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
+        version = sys.version_info
+        stats["python_version"] = "{}.{}.{}".format(
+            version.major, version.minor, version.micro
+        )
         stats["total_users"] = yield hs.get_datastore().count_all_users()
 
         total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0c68e8a472..8d6bd7976d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
             self, origin, pdu, get_missing=True, sent_to_us_directly=False,
     ):
@@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
         # If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
         # we should check if we *are* in fact in the room. If we are then we
         # can magically rejoin the room.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
             )
             if was_in_room:
                 logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
+                    "[%s %s] Ignoring PDU from %s as we've left the room",
+                    room_id, event_id, origin,
                 )
                 defer.returnValue(None)
 
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if get_missing and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
             if sent_to_us_directly and prevs - seen:
                 # If they have sent it to us directly, and the server
                 # isn't telling us about the auth events that it's
                 # made a message referencing, we explode
+                logger.warn(
+                    "[%s %s] Failed to fetch %d prev events: rejecting",
+                    room_id, event_id, len(prevs - seen),
+                )
                 raise FederationError(
                     "ERROR",
                     403,
@@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
                 auth_chains = set()
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    ours = yield self.store.get_state_groups(room_id, list(seen))
                     state_groups.append(ours)
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
+                        )
                         state, got_auth_chain = (
                             yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
+                                origin, room_id, p,
                             )
                         )
                         auth_chains.update(got_auth_chain)
@@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
                             ev_ids, get_prev_content=False, check_redacted=False
                         )
 
-                    room_version = yield self.store.get_room_version(pdu.room_id)
+                    room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                        room_version, state_groups, {event_id: pdu}, fetch
                     )
 
                     state = (yield self.store.get_events(state_map.values())).values()
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting %d prev_events: %s",
+            room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -360,49 +406,87 @@ class FederationHandler(BaseHandler):
         # apparently.
         #
         # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out agressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the agressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timout to 60s and see what happens.
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
             min_depth=min_depth,
-            timeout=10000,
+            timeout=60000,
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
             try:
                 yield self.on_receive_pdu(
                     origin,
-                    e,
+                    ev,
                     get_missing=False
                 )
             except FederationError as e:
                 if e.code == 403:
-                    logger.warn("Event %s failed history check.")
+                    logger.warn(
+                        "[%s %s] Received prev_event %s failed history check.",
+                        room_id, event_id, ev.event_id,
+                    )
                 else:
                     raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -411,15 +495,16 @@ class FederationHandler(BaseHandler):
         # event.
         if state and auth_chain and not event.internal_metadata.is_outlier():
             is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
+                room_id,
                 self.server_name
             )
         else:
             is_in_room = True
+
         if not is_in_room:
             logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
+                "[%s %s] Got event for room we're not in",
+                room_id, event_id,
             )
 
             try:
@@ -431,7 +516,7 @@ class FederationHandler(BaseHandler):
                     "ERROR",
                     e.code,
                     e.msg,
-                    affected=event.event_id,
+                    affected=event_id,
                 )
 
         else:
@@ -480,12 +565,12 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -513,7 +598,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
         room_id = mapping["room_id"]
         servers = mapping["servers"]
 
+        # put the server which owns the alias at the front of the server list.
+        if room_alias.domain in servers:
+            servers.remove(room_alias.domain)
+        servers.insert(0, room_alias.domain)
+
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92ad..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
             request_deferred = treq.request(
                 method, uri, agent=self.agent, data=data, headers=headers
             )
-            add_timeout_to_deferred(
+            request_deferred = timeout_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
                 cancelled_to_request_timed_out_error,
             )
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c9369519..91025037a3 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
 
     Args:
         reactor: Twisted reactor.
-        destination (bytes): The name of the server to connect to.
+        destination (unicode): The name of the server to connect to.
         tls_client_options_factory
             (synapse.crypto.context_factory.ClientTLSOptionsFactory):
             Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
         transport_endpoint = HostnameEndpoint
         default_port = 8008
     else:
+        # the SNI string should be the same as the Host header, minus the port.
+        # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+        # the Host header and SNI should therefore be the server_name of the remote
+        # server.
+        tls_options = tls_client_options_factory.get_options(domain)
+
         def transport_endpoint(reactor, host, port, timeout):
             return wrapClientTLS(
-                tls_client_options_factory.get_options(host),
-                HostnameEndpoint(reactor, host, port, timeout=timeout))
+                tls_options,
+                HostnameEndpoint(reactor, host, port, timeout=timeout),
+            )
         default_port = 8448
 
     if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 083484a687..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,10 +17,12 @@ import cgi
 import logging
 import random
 import sys
+from io import BytesIO
 
 from six import PY3, string_types
 from six.moves import urllib
 
+import attr
 import treq
 from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
@@ -28,8 +30,9 @@ from signedjson.sign import sign_json
 
 from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -41,13 +44,11 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
 
 outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
                                     "", ["method"])
@@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object):
         )
 
 
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+    method = attr.ib()
+    """HTTP method
+    :type: str
+    """
+
+    path = attr.ib()
+    """HTTP path
+    :type: str
+    """
+
+    destination = attr.ib()
+    """The remote server to send the HTTP request to.
+    :type: str"""
+
+    json = attr.ib(default=None)
+    """JSON to send in the body.
+    :type: dict|None
+    """
+
+    json_callback = attr.ib(default=None)
+    """A callback to generate the JSON.
+    :type: func|None
+    """
+
+    query = attr.ib(default=None)
+    """Query arguments.
+    :type: dict|None
+    """
+
+    txn_id = attr.ib(default=None)
+    """Unique ID for this request (for logging)
+    :type: str|None
+    """
+
+    def __attrs_post_init__(self):
+        global _next_id
+        self.txn_id = "%s-O-%s" % (self.method, _next_id)
+        _next_id = (_next_id + 1) % (MAXINT - 1)
+
+    def get_json(self):
+        if self.json_callback:
+            return self.json_callback()
+        return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+    """
+    Reads the JSON body of a response, with a timeout
+
+    Args:
+        reactor (IReactor): twisted reactor, for the timeout
+        timeout_sec (float): number of seconds to wait for response to complete
+        request (MatrixFederationRequest): the request that triggered the response
+        response (IResponse): response to the request
+
+    Returns:
+        dict: parsed JSON response
+    """
+    try:
+        check_content_type_is_json(response.headers)
+
+        d = treq.json_content(response)
+        d = timeout_deferred(
+            d,
+            timeout=timeout_sec,
+            reactor=reactor,
+        )
+
+        body = yield make_deferred_yieldable(d)
+    except Exception as e:
+        logger.warn(
+            "{%s} [%s] Error reading response: %s",
+            request.txn_id,
+            request.destination,
+            e,
+        )
+        raise
+    logger.info(
+        "{%s} [%s] Completed: %d %s",
+        request.txn_id,
+        request.destination,
+        response.code,
+        response.phrase.decode('ascii', errors='replace'),
+    )
+    defer.returnValue(body)
+
+
 class MatrixFederationHttpClient(object):
     """HTTP client used to talk to other homeservers over the federation
     protocol. Send client certificates and signs requests.
@@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object):
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
         self.version_string = hs.version_string.encode('ascii')
-        self._next_id = 1
         self.default_timeout = 60
 
-    def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urllib.parse.urlunparse(
-            (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
-        )
+        def schedule(x):
+            reactor.callLater(_EPSILON, x)
+
+        self._cooperator = Cooperator(scheduler=schedule)
 
     @defer.inlineCallbacks
-    def _request(self, destination, method, path,
-                 json=None, json_callback=None,
-                 param_bytes=b"",
-                 query=None, retry_on_dns_fail=True,
-                 timeout=None, long_retries=False,
-                 ignore_backoff=False,
-                 backoff_on_404=False):
+    def _send_request(
+        self,
+        request,
+        retry_on_dns_fail=True,
+        timeout=None,
+        long_retries=False,
+        ignore_backoff=False,
+        backoff_on_404=False
+    ):
         """
-        Creates and sends a request to the given server.
+        Sends a request to the given server.
 
         Args:
-            destination (str): The remote server to send the HTTP request to.
-            method (str): HTTP method
-            path (str): The HTTP path
-            json (dict or None): JSON to send in the body.
-            json_callback (func or None): A callback to generate the JSON.
-            query (dict or None): Query arguments.
+            request (MatrixFederationRequest): details of request to be sent
+
+            timeout (int|None): number of milliseconds to wait for the response headers
+                (including connecting to the server). 60s by default.
+
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
+
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
@@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object):
 
         if (
             self.hs.config.federation_domain_whitelist is not None and
-            destination not in self.hs.config.federation_domain_whitelist
+            request.destination not in self.hs.config.federation_domain_whitelist
         ):
-            raise FederationDeniedError(destination)
+            raise FederationDeniedError(request.destination)
 
         limiter = yield synapse.util.retryutils.get_retry_limiter(
-            destination,
+            request.destination,
             self.clock,
             self._store,
             backoff_on_404=backoff_on_404,
             ignore_backoff=ignore_backoff,
         )
 
-        headers_dict = {}
-        path_bytes = path.encode("ascii")
-        if query:
-            query_bytes = encode_query_args(query)
+        method = request.method
+        destination = request.destination
+        path_bytes = request.path.encode("ascii")
+        if request.query:
+            query_bytes = encode_query_args(request.query)
         else:
             query_bytes = b""
 
         headers_dict = {
             "User-Agent": [self.version_string],
-            "Host": [destination],
+            "Host": [request.destination],
         }
 
         with limiter:
-            url = self._create_url(
-                destination.encode("ascii"), path_bytes, param_bytes, query_bytes
-            ).decode('ascii')
-
-            txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (MAXINT - 1)
-
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
@@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url = urllib.parse.urlunparse(
-                (b"", b"", path_bytes, param_bytes, query_bytes, b"")
-            ).decode('ascii')
+            url = urllib.parse.urlunparse((
+                b"matrix", destination.encode("ascii"),
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
+
+            http_url = urllib.parse.urlunparse((
+                b"", b"",
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
 
-            log_result = None
             while True:
                 try:
-                    if json_callback:
-                        json = json_callback()
-
+                    json = request.get_json()
                     if json:
                         data = encode_canonical_json(json)
                         headers_dict["Content-Type"] = ["application/json"]
@@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object):
                         data = None
                         self.sign_request(destination, method, http_url, headers_dict)
 
-                    outbound_logger.info(
+                    logger.info(
                         "{%s} [%s] Sending request: %s %s",
-                        txn_id, destination, method, url
+                        request.txn_id, destination, method, url
                     )
 
+                    if data:
+                        producer = FileBodyProducer(
+                            BytesIO(data),
+                            cooperator=self._cooperator
+                        )
+                    else:
+                        producer = None
+
                     request_deferred = treq.request(
                         method,
                         url,
                         headers=Headers(headers_dict),
-                        data=data,
+                        data=producer,
                         agent=self.agent,
                         reactor=self.hs.get_reactor(),
                         unbuffered=True
                     )
-                    request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
 
-                    # Sometimes the timeout above doesn't work, so lets hack yet
-                    # another layer of timeouts in in the vain hope that at some
-                    # point the world made sense and this really really really
-                    # should work.
-                    request_deferred = timeout_no_seriously(
+                    request_deferred = timeout_deferred(
                         request_deferred,
-                        timeout=_sec_timeout * 2,
+                        timeout=_sec_timeout,
                         reactor=self.hs.get_reactor(),
                     )
 
@@ -244,33 +339,19 @@ class MatrixFederationHttpClient(object):
                             request_deferred,
                         )
 
-                    log_result = "%d %s" % (
-                        response.code,
-                        response.phrase.decode('ascii', errors='replace'),
-                    )
                     break
                 except Exception as e:
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                        logger.warn(
-                            "DNS Lookup failed to %s with %s",
-                            destination,
-                            e
-                        )
-                        log_result = "DNS Lookup failed to %s with %s" % (
-                            destination, e
-                        )
-                        raise
-
                     logger.warn(
-                        "{%s} Sending request failed to %s: %s %s: %s",
-                        txn_id,
+                        "{%s} [%s] Request failed: %s %s: %s",
+                        request.txn_id,
                         destination,
                         method,
                         url,
                         _flatten_response_never_received(e),
                     )
 
-                    log_result = _flatten_response_never_received(e)
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                        raise
 
                     if retries_left and not timeout:
                         if long_retries:
@@ -283,33 +364,37 @@ class MatrixFederationHttpClient(object):
                             delay *= random.uniform(0.8, 1.4)
 
                         logger.debug(
-                            "{%s} Waiting %s before sending to %s...",
-                            txn_id,
+                            "{%s} [%s] Waiting %ss before re-sending...",
+                            request.txn_id,
+                            destination,
                             delay,
-                            destination
                         )
 
                         yield self.clock.sleep(delay)
                         retries_left -= 1
                     else:
                         raise
-                finally:
-                    outbound_logger.info(
-                        "{%s} [%s] Result: %s",
-                        txn_id,
-                        destination,
-                        log_result,
-                    )
+
+            logger.info(
+                "{%s} [%s] Got response headers: %d %s",
+                request.txn_id,
+                destination,
+                response.code,
+                response.phrase.decode('ascii', errors='replace'),
+            )
 
             if 200 <= response.code < 300:
                 pass
             else:
                 # :'(
                 # Update transactions table?
-                with logcontext.PreserveLoggingContext():
-                    d = treq.content(response)
-                    d.addTimeout(_sec_timeout, self.hs.get_reactor())
-                    body = yield make_deferred_yieldable(d)
+                d = treq.content(response)
+                d = timeout_deferred(
+                    d,
+                    timeout=_sec_timeout,
+                    reactor=self.hs.get_reactor(),
+                )
+                body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -403,29 +488,26 @@ class MatrixFederationHttpClient(object):
             is not on our federation whitelist
         """
 
-        if not json_data_callback:
-            json_data_callback = lambda: data
-
-        response = yield self._request(
-            destination,
-            "PUT",
-            path,
-            json_callback=json_data_callback,
+        request = MatrixFederationRequest(
+            method="PUT",
+            destination=destination,
+            path=path,
             query=args,
+            json_callback=json_data_callback,
+            json=data,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
             backoff_on_404=backoff_on_404,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -459,31 +541,30 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "POST",
-            path,
+
+        request = MatrixFederationRequest(
+            method="POST",
+            destination=destination,
+            path=path,
             query=args,
             json=data,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            if timeout:
-                _sec_timeout = timeout / 1000
-            else:
-                _sec_timeout = self.default_timeout
-
-            d.addTimeout(_sec_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
 
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), _sec_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -519,25 +600,23 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
-
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -568,25 +647,23 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "DELETE",
-            path,
+        request = MatrixFederationRequest(
+            method="DELETE",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
-
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -614,11 +691,15 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -626,14 +707,25 @@ class MatrixFederationHttpClient(object):
         headers = dict(response.headers.getAllRawHeaders())
 
         try:
-            with logcontext.PreserveLoggingContext():
-                d = _readBodyToFile(response, output_stream, max_size)
-                d.addTimeout(self.default_timeout, self.hs.get_reactor())
-                length = yield make_deferred_yieldable(d)
-        except Exception:
-            logger.exception("Failed to download body")
+            d = _readBodyToFile(response, output_stream, max_size)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            length = yield make_deferred_yieldable(d)
+        except Exception as e:
+            logger.warn(
+                "{%s} [%s] Error reading response: %s",
+                request.txn_id,
+                request.destination,
+                e,
+            )
             raise
-
+        logger.info(
+            "{%s} [%s] Completed: %d %s [%d bytes]",
+            request.txn_id,
+            request.destination,
+            response.code,
+            response.phrase.decode('ascii', errors='replace'),
+            length,
+        )
         defer.returnValue((length, headers))
 
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9579e8cd0d..50be2de3bb 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,9 +75,9 @@ class SynapseRequest(Request):
         return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
             self.__class__.__name__,
             id(self),
-            self.method,
+            self.method.decode('ascii', errors='replace'),
             self.get_redacted_uri(),
-            self.clientproto,
+            self.clientproto.decode('ascii', errors='replace'),
             self.site.site_tag,
         )
 
@@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest):
             C{b"-"}.
         """
         return self.requestHeaders.getRawHeaders(
-            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
 
 
 class SynapseRequestFactory(object):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..f1d92c1395 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import LaterGauge
 from synapse.types import StreamToken
-from synapse.util.async_helpers import (
-    DeferredTimeoutError,
-    ObservableDeferred,
-    add_timeout_to_deferred,
-)
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -337,7 +333,7 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    add_timeout_to_deferred(
+                    listener.deferred = timeout_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
                         self.hs.get_reactor(),
@@ -354,7 +350,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimeoutError:
+                except defer.TimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -559,15 +555,16 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            add_timeout_to_deferred(
-                listener.deferred.addTimeout,
-                (end_time - now) / 1000.,
-                self.hs.get_reactor(),
+            listener.deferred = timeout_deferred(
+                listener.deferred,
+                timeout=(end_time - now) / 1000.,
+                reactor=self.hs.get_reactor(),
             )
+
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
-            except DeferredTimeoutError:
+            except defer.TimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..9ad17b7c25 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
-            self._simple_upsert_txn(
-                txn,
-                table="user_ips",
-                keyvalues={
-                    "user_id": user_id,
-                    "access_token": access_token,
-                    "ip": ip,
-                    "user_agent": user_agent,
-                    "device_id": device_id,
-                },
-                values={
-                    "last_seen": last_seen,
-                },
-                lock=False,
-            )
+            try:
+                self._simple_upsert_txn(
+                    txn,
+                    table="user_ips",
+                    keyvalues={
+                        "user_id": user_id,
+                        "access_token": access_token,
+                        "ip": ip,
+                        "user_agent": user_agent,
+                        "device_id": device_id,
+                    },
+                    values={
+                        "last_seen": last_seen,
+                    },
+                    lock=False,
+                )
+            except Exception as e:
+                # Failed to upsert, log and continue
+                logger.error("Failed to insert client IP %r: %r", entry, e)
 
     @defer.inlineCallbacks
     def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd3322..baf0379a68 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
@@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
-    @cached(max_entries=10000)
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # XXX: we could chose to not bother persisting this if our cache thinks
-        # this is a NOOP
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
@@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
                                        retry_last_ts, retry_interval):
         self.database_engine.lock_table(txn, "destinations")
 
-        self._invalidate_cache_and_stream(
-            txn, self.get_destination_retry_timings, (destination,)
-        )
-
         # We need to be careful here as the data may have changed from under us
         # due to a worker setting the timings.
 
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c2946129..ec7b2c9672 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
         defer.returnValue(_ctx_manager())
 
 
-class DeferredTimeoutError(Exception):
-    """
-    This error is raised by default when a L{Deferred} times out.
-    """
-
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise defer.TimeoutError(timeout, "Deferred")
+    return value
 
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
-    """
-    Add a timeout to a deferred by scheduling it to be cancelled after
-    timeout seconds.
 
-    This is essentially a backport of deferred.addTimeout, which was introduced
-    in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+    """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+    that have a canceller that throws exceptions. This method creates a new
+    deferred that wraps and times out the given deferred, correctly handling
+    the case where the given deferred's canceller throws.
 
-    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
-    unless a cancelable function was passed to its initialization or unless
-    a different on_timeout_cancel callable is provided.
+    NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 
     Args:
-        deferred (defer.Deferred): deferred to be timed out
-        timeout (Number): seconds to time out after
-        reactor (twisted.internet.reactor): the Twisted reactor to use
-
+        deferred (Deferred)
+        timeout (float): Timeout in seconds
+        reactor (twisted.internet.reactor): The twisted reactor to use
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
             otherwise cancelled before the timeout.
@@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
             the timeout.
 
             The default callable (if none is provided) will translate a
-            CancelledError Failure into a DeferredTimeoutError.
-    """
-    timed_out = [False]
-
-    def time_it_out():
-        timed_out[0] = True
-        deferred.cancel()
-
-    delayed_call = reactor.callLater(timeout, time_it_out)
-
-    def convert_cancelled(value):
-        if timed_out[0]:
-            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
-            return to_call(value, timeout)
-        return value
-
-    deferred.addBoth(convert_cancelled)
+            CancelledError Failure into a defer.TimeoutError.
 
-    def cancel_timeout(result):
-        # stop the pending call to cancel the deferred if it's been fired
-        if delayed_call.active():
-            delayed_call.cancel()
-        return result
-
-    deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
-    if isinstance(value, failure.Failure):
-        value.trap(CancelledError)
-        raise DeferredTimeoutError(timeout, "Deferred")
-    return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
-    """The in build twisted deferred addTimeout (and the method above)
-    completely fail to time things out under some unknown circumstances.
-
-    Lets try a different way of timing things out and maybe that will make
-    things work?!
-
-    TODO: Kill this with fire.
+    Returns:
+        Deferred
     """
 
     new_d = defer.Deferred()
@@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
     def time_it_out():
         timed_out[0] = True
 
-        if not new_d.called:
-            new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+        try:
+            deferred.cancel()
+        except:   # noqa: E722, if we throw any exception it'll break time outs
+            logger.exception("Canceller failed during timeout")
 
-        deferred.cancel()
+        if not new_d.called:
+            new_d.errback(defer.TimeoutError(timeout, "Deferred"))
 
     delayed_call = reactor.callLater(timeout, time_it_out)
 
     def convert_cancelled(value):
         if timed_out[0]:
-            return _cancelled_to_timed_out_error(value, timeout)
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
         return value
 
     deferred.addBoth(convert_cancelled)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
-            logger.debug(
+            logger.info(
                 "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
                 self.destination, exc_type, exc_val, self.retry_interval
             )
diff --git a/test_postgresql.sh b/test_postgresql.sh
new file mode 100755
index 0000000000..1ffcaabd31
--- /dev/null
+++ b/test_postgresql.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+# This script builds the Docker image to run the PostgreSQL tests, and then runs
+# the tests.
+
+set -e
+
+# Build, and tag
+docker build docker/ -f docker/Dockerfile-pgtests -t synapsepgtests
+
+# Run, mounting the current directory into /src
+docker run --rm -it -v $(pwd)\:/src synapsepgtests
diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index 1c46c9cfeb..66c09f63b6 100644
--- a/tests/http/test_fedclient.py
+++ b/tests/http/test_fedclient.py
@@ -18,9 +18,14 @@ from mock import Mock
 from twisted.internet.defer import TimeoutError
 from twisted.internet.error import ConnectingCancelledError, DNSLookupError
 from twisted.web.client import ResponseNeverReceived
+from twisted.web.http import HTTPChannel
 
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.http.matrixfederationclient import (
+    MatrixFederationHttpClient,
+    MatrixFederationRequest,
+)
 
+from tests.server import FakeTransport
 from tests.unittest import HomeserverTestCase
 
 
@@ -40,7 +45,7 @@ class FederationClientTests(HomeserverTestCase):
         """
         If the DNS raising returns an error, it will bubble up.
         """
-        d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000)
+        d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
         self.pump()
 
         f = self.failureResultOf(d)
@@ -51,7 +56,7 @@ class FederationClientTests(HomeserverTestCase):
         If the HTTP request is not connected and is timed out, it'll give a
         ConnectingCancelledError.
         """
-        d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+        d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
 
         self.pump()
 
@@ -78,7 +83,7 @@ class FederationClientTests(HomeserverTestCase):
         If the HTTP request is connected, but gets no response before being
         timed out, it'll give a ResponseNeverReceived.
         """
-        d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+        d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
 
         self.pump()
 
@@ -108,7 +113,12 @@ class FederationClientTests(HomeserverTestCase):
         """
         Once the client gets the headers, _request returns successfully.
         """
-        d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+        request = MatrixFederationRequest(
+            method="GET",
+            destination="testserv:8008",
+            path="foo/bar",
+        )
+        d = self.cl._send_request(request, timeout=10000)
 
         self.pump()
 
@@ -155,3 +165,26 @@ class FederationClientTests(HomeserverTestCase):
         f = self.failureResultOf(d)
 
         self.assertIsInstance(f.value, TimeoutError)
+
+    def test_client_sends_body(self):
+        self.cl.post_json(
+            "testserv:8008", "foo/bar", timeout=10000,
+            data={"a": "b"}
+        )
+
+        self.pump()
+
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        client = clients[0][2].buildProtocol(None)
+        server = HTTPChannel()
+
+        client.makeConnection(FakeTransport(server, self.reactor))
+        server.makeConnection(FakeTransport(client, self.reactor))
+
+        self.pump(0.1)
+
+        self.assertEqual(len(server.requests), 1)
+        request = server.requests[0]
+        content = request.content.read()
+        self.assertEqual(content, b'{"a":"b"}')
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 089cecfbee..9e9fbbfe93 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -15,8 +15,6 @@
 
 from mock import Mock, NonCallableMock
 
-import attr
-
 from synapse.replication.tcp.client import (
     ReplicationClientFactory,
     ReplicationClientHandler,
@@ -24,6 +22,7 @@ from synapse.replication.tcp.client import (
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 
 from tests import unittest
+from tests.server import FakeTransport
 
 
 class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
@@ -56,36 +55,8 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
         server = server_factory.buildProtocol(None)
         client = client_factory.buildProtocol(None)
 
-        @attr.s
-        class FakeTransport(object):
-
-            other = attr.ib()
-            disconnecting = False
-            buffer = attr.ib(default=b'')
-
-            def registerProducer(self, producer, streaming):
-
-                self.producer = producer
-
-                def _produce():
-                    self.producer.resumeProducing()
-                    reactor.callLater(0.1, _produce)
-
-                reactor.callLater(0.0, _produce)
-
-            def write(self, byt):
-                self.buffer = self.buffer + byt
-
-                if getattr(self.other, "transport") is not None:
-                    self.other.dataReceived(self.buffer)
-                    self.buffer = b""
-
-            def writeSequence(self, seq):
-                for x in seq:
-                    self.write(x)
-
-        client.makeConnection(FakeTransport(server))
-        server.makeConnection(FakeTransport(client))
+        client.makeConnection(FakeTransport(server, reactor))
+        server.makeConnection(FakeTransport(client, reactor))
 
     def replicate(self):
         """Tell the master side of replication that something has happened, and then
diff --git a/tests/server.py b/tests/server.py
index 420ec4e088..7bee58dff1 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -98,7 +98,7 @@ class FakeSite:
         return FakeLogger()
 
 
-def make_request(method, path, content=b"", access_token=None):
+def make_request(method, path, content=b"", access_token=None, request=SynapseRequest):
     """
     Make a web request using the given method and path, feed it the
     content, and return the Request and the Channel underneath.
@@ -120,14 +120,16 @@ def make_request(method, path, content=b"", access_token=None):
     site = FakeSite()
     channel = FakeChannel()
 
-    req = SynapseRequest(site, channel)
+    req = request(site, channel)
     req.process = lambda: b""
     req.content = BytesIO(content)
 
     if access_token:
         req.requestHeaders.addRawHeader(b"Authorization", b"Bearer " + access_token)
 
-    req.requestHeaders.addRawHeader(b"X-Forwarded-For", b"127.0.0.1")
+    if content:
+        req.requestHeaders.addRawHeader(b"Content-Type", b"application/json")
+
     req.requestReceived(method, path, b"1.1")
 
     return req, channel
@@ -280,3 +282,84 @@ def get_clock():
     clock = ThreadedMemoryReactorClock()
     hs_clock = Clock(clock)
     return (clock, hs_clock)
+
+
+@attr.s
+class FakeTransport(object):
+    """
+    A twisted.internet.interfaces.ITransport implementation which sends all its data
+    straight into an IProtocol object: it exists to connect two IProtocols together.
+
+    To use it, instantiate it with the receiving IProtocol, and then pass it to the
+    sending IProtocol's makeConnection method:
+
+        server = HTTPChannel()
+        client.makeConnection(FakeTransport(server, self.reactor))
+
+    If you want bidirectional communication, you'll need two instances.
+    """
+
+    other = attr.ib()
+    """The Protocol object which will receive any data written to this transport.
+
+    :type: twisted.internet.interfaces.IProtocol
+    """
+
+    _reactor = attr.ib()
+    """Test reactor
+
+    :type: twisted.internet.interfaces.IReactorTime
+    """
+
+    disconnecting = False
+    buffer = attr.ib(default=b'')
+    producer = attr.ib(default=None)
+
+    def getPeer(self):
+        return None
+
+    def getHost(self):
+        return None
+
+    def loseConnection(self):
+        self.disconnecting = True
+
+    def abortConnection(self):
+        self.disconnecting = True
+
+    def pauseProducing(self):
+        self.producer.pauseProducing()
+
+    def unregisterProducer(self):
+        if not self.producer:
+            return
+
+        self.producer = None
+
+    def registerProducer(self, producer, streaming):
+        self.producer = producer
+        self.producerStreaming = streaming
+
+        def _produce():
+            d = self.producer.resumeProducing()
+            d.addCallback(lambda x: self._reactor.callLater(0.1, _produce))
+
+        if not streaming:
+            self._reactor.callLater(0.0, _produce)
+
+    def write(self, byt):
+        self.buffer = self.buffer + byt
+
+        def _write():
+            if getattr(self.other, "transport") is not None:
+                self.other.dataReceived(self.buffer)
+                self.buffer = b""
+                return
+
+            self._reactor.callLater(0.0, _write)
+
+        _write()
+
+    def writeSequence(self, seq):
+        for x in seq:
+            self.write(x)
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index c9b02a062b..2ffbb9f14f 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,35 +13,45 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import hashlib
+import hmac
+import json
+
 from mock import Mock
 
 from twisted.internet import defer
 
-import tests.unittest
-import tests.utils
+from synapse.http.site import XForwardedForRequest
+from synapse.rest.client.v1 import admin, login
+
+from tests import unittest
 
 
-class ClientIpStoreTestCase(tests.unittest.TestCase):
-    def __init__(self, *args, **kwargs):
-        super(ClientIpStoreTestCase, self).__init__(*args, **kwargs)
-        self.store = None  # type: synapse.storage.DataStore
-        self.clock = None  # type: tests.utils.MockClock
+class ClientIpStoreTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver()
+        return hs
 
-    @defer.inlineCallbacks
-    def setUp(self):
-        self.hs = yield tests.utils.setup_test_homeserver(self.addCleanup)
+    def prepare(self, hs, reactor, clock):
         self.store = self.hs.get_datastore()
-        self.clock = self.hs.get_clock()
 
-    @defer.inlineCallbacks
     def test_insert_new_client_ip(self):
-        self.clock.now = 12345678
+        self.reactor.advance(12345678)
+
         user_id = "@user:id"
-        yield self.store.insert_client_ip(
-            user_id, "access_token", "ip", "user_agent", "device_id"
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
         )
 
-        result = yield self.store.get_last_client_ip_by_device(user_id, "device_id")
+        # Trigger the storage loop
+        self.reactor.advance(10)
+
+        result = self.get_success(
+            self.store.get_last_client_ip_by_device(user_id, "device_id")
+        )
 
         r = result[(user_id, "device_id")]
         self.assertDictContainsSubset(
@@ -55,18 +66,18 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
             r,
         )
 
-    @defer.inlineCallbacks
     def test_disabled_monthly_active_user(self):
         self.hs.config.limit_usage_by_mau = False
         self.hs.config.max_mau_value = 50
         user_id = "@user:server"
-        yield self.store.insert_client_ip(
-            user_id, "access_token", "ip", "user_agent", "device_id"
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
         )
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertFalse(active)
 
-    @defer.inlineCallbacks
     def test_adding_monthly_active_user_when_full(self):
         self.hs.config.limit_usage_by_mau = True
         self.hs.config.max_mau_value = 50
@@ -76,38 +87,159 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         self.store.get_monthly_active_count = Mock(
             return_value=defer.succeed(lots_of_users)
         )
-        yield self.store.insert_client_ip(
-            user_id, "access_token", "ip", "user_agent", "device_id"
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
         )
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertFalse(active)
 
-    @defer.inlineCallbacks
     def test_adding_monthly_active_user_when_space(self):
         self.hs.config.limit_usage_by_mau = True
         self.hs.config.max_mau_value = 50
         user_id = "@user:server"
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertFalse(active)
 
-        yield self.store.insert_client_ip(
-            user_id, "access_token", "ip", "user_agent", "device_id"
+        # Trigger the saving loop
+        self.reactor.advance(10)
+
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
         )
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertTrue(active)
 
-    @defer.inlineCallbacks
     def test_updating_monthly_active_user_when_space(self):
         self.hs.config.limit_usage_by_mau = True
         self.hs.config.max_mau_value = 50
         user_id = "@user:server"
-        yield self.store.register(user_id=user_id, token="123", password_hash=None)
+        self.get_success(
+            self.store.register(user_id=user_id, token="123", password_hash=None)
+        )
 
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertFalse(active)
 
-        yield self.store.insert_client_ip(
-            user_id, "access_token", "ip", "user_agent", "device_id"
+        # Trigger the saving loop
+        self.reactor.advance(10)
+
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
         )
-        active = yield self.store.user_last_seen_monthly_active(user_id)
+        active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertTrue(active)
+
+
+class ClientIpAuthTestCase(unittest.HomeserverTestCase):
+
+    servlets = [admin.register_servlets, login.register_servlets]
+
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver()
+        return hs
+
+    def prepare(self, hs, reactor, clock):
+        self.hs.config.registration_shared_secret = u"shared"
+        self.store = self.hs.get_datastore()
+
+        # Create the user
+        request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
+        self.render(request)
+        nonce = channel.json_body["nonce"]
+
+        want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
+        want_mac.update(nonce.encode('ascii') + b"\x00bob\x00abc123\x00admin")
+        want_mac = want_mac.hexdigest()
+
+        body = json.dumps(
+            {
+                "nonce": nonce,
+                "username": "bob",
+                "password": "abc123",
+                "admin": True,
+                "mac": want_mac,
+            }
+        )
+        request, channel = self.make_request(
+            "POST", "/_matrix/client/r0/admin/register", body.encode('utf8')
+        )
+        self.render(request)
+
+        self.assertEqual(channel.code, 200)
+        self.user_id = channel.json_body["user_id"]
+
+    def test_request_with_xforwarded(self):
+        """
+        The IP in X-Forwarded-For is entered into the client IPs table.
+        """
+        self._runtest(
+            {b"X-Forwarded-For": b"127.9.0.1"},
+            "127.9.0.1",
+            {"request": XForwardedForRequest},
+        )
+
+    def test_request_from_getPeer(self):
+        """
+        The IP returned by getPeer is entered into the client IPs table, if
+        there's no X-Forwarded-For header.
+        """
+        self._runtest({}, "127.0.0.1", {})
+
+    def _runtest(self, headers, expected_ip, make_request_args):
+        device_id = "bleb"
+
+        body = json.dumps(
+            {
+                "type": "m.login.password",
+                "user": "bob",
+                "password": "abc123",
+                "device_id": device_id,
+            }
+        )
+        request, channel = self.make_request(
+            "POST", "/_matrix/client/r0/login", body.encode('utf8'), **make_request_args
+        )
+        self.render(request)
+        self.assertEqual(channel.code, 200)
+        access_token = channel.json_body["access_token"].encode('ascii')
+
+        # Advance to a known time
+        self.reactor.advance(123456 - self.reactor.seconds())
+
+        request, channel = self.make_request(
+            "GET",
+            "/_matrix/client/r0/admin/users/" + self.user_id,
+            body.encode('utf8'),
+            access_token=access_token,
+            **make_request_args
+        )
+        request.requestHeaders.addRawHeader(b"User-Agent", b"Mozzila pizza")
+
+        # Add the optional headers
+        for h, v in headers.items():
+            request.requestHeaders.addRawHeader(h, v)
+        self.render(request)
+
+        # Advance so the save loop occurs
+        self.reactor.advance(100)
+
+        result = self.get_success(
+            self.store.get_last_client_ip_by_device(self.user_id, device_id)
+        )
+        r = result[(self.user_id, device_id)]
+        self.assertDictContainsSubset(
+            {
+                "user_id": self.user_id,
+                "device_id": device_id,
+                "ip": expected_ip,
+                "user_agent": "Mozzila pizza",
+                "last_seen": 123456100,
+            },
+            r,
+        )
diff --git a/tests/test_server.py b/tests/test_server.py
index ef74544e93..4045fdadc3 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -1,14 +1,35 @@
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
 import re
 
+from six import StringIO
+
 from twisted.internet.defer import Deferred
-from twisted.test.proto_helpers import MemoryReactorClock
+from twisted.python.failure import Failure
+from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
 
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.server import JsonResource
+from synapse.http.site import SynapseSite, logger
 from synapse.util import Clock
 
 from tests import unittest
-from tests.server import make_request, render, setup_test_homeserver
+from tests.server import FakeTransport, make_request, render, setup_test_homeserver
 
 
 class JsonResourceTests(unittest.TestCase):
@@ -121,3 +142,52 @@ class JsonResourceTests(unittest.TestCase):
         self.assertEqual(channel.result["code"], b'400')
         self.assertEqual(channel.json_body["error"], "Unrecognized request")
         self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")
+
+
+class SiteTestCase(unittest.HomeserverTestCase):
+    def test_lose_connection(self):
+        """
+        We log the URI correctly redacted when we lose the connection.
+        """
+
+        class HangingResource(Resource):
+            """
+            A Resource that strategically hangs, as if it were processing an
+            answer.
+            """
+
+            def render(self, request):
+                return NOT_DONE_YET
+
+        # Set up a logging handler that we can inspect afterwards
+        output = StringIO()
+        handler = logging.StreamHandler(output)
+        logger.addHandler(handler)
+        old_level = logger.level
+        logger.setLevel(10)
+        self.addCleanup(logger.setLevel, old_level)
+        self.addCleanup(logger.removeHandler, handler)
+
+        # Make a resource and a Site, the resource will hang and allow us to
+        # time out the request while it's 'processing'
+        base_resource = Resource()
+        base_resource.putChild(b'', HangingResource())
+        site = SynapseSite("test", "site_tag", {}, base_resource, "1.0")
+
+        server = site.buildProtocol(None)
+        client = AccumulatingProtocol()
+        client.makeConnection(FakeTransport(server, self.reactor))
+        server.makeConnection(FakeTransport(client, self.reactor))
+
+        # Send a request with an access token that will get redacted
+        server.dataReceived(b"GET /?access_token=bar HTTP/1.0\r\n\r\n")
+        self.pump()
+
+        # Lose the connection
+        e = Failure(Exception("Failed123"))
+        server.connectionLost(e)
+        handler.flush()
+
+        # Our access token is redacted and the failure reason is logged.
+        self.assertIn("/?access_token=<redacted>", output.getvalue())
+        self.assertIn("Failed123", output.getvalue())
diff --git a/tests/unittest.py b/tests/unittest.py
index a3d39920db..ef905e6389 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -26,6 +26,7 @@ from twisted.internet.defer import Deferred
 from twisted.trial import unittest
 
 from synapse.http.server import JsonResource
+from synapse.http.site import SynapseRequest
 from synapse.server import HomeServer
 from synapse.types import UserID, create_requester
 from synapse.util.logcontext import LoggingContextFilter
@@ -219,7 +220,8 @@ class HomeserverTestCase(TestCase):
 
         Function to be overridden in subclasses.
         """
-        raise NotImplementedError()
+        hs = self.setup_test_homeserver()
+        return hs
 
     def prepare(self, reactor, clock, homeserver):
         """
@@ -236,7 +238,9 @@ class HomeserverTestCase(TestCase):
         Function to optionally be overridden in subclasses.
         """
 
-    def make_request(self, method, path, content=b""):
+    def make_request(
+        self, method, path, content=b"", access_token=None, request=SynapseRequest
+    ):
         """
         Create a SynapseRequest at the path using the method and containing the
         given content.
@@ -254,7 +258,7 @@ class HomeserverTestCase(TestCase):
         if isinstance(content, dict):
             content = json.dumps(content).encode('utf8')
 
-        return make_request(method, path, content)
+        return make_request(method, path, content, access_token, request)
 
     def render(self, request):
         """
diff --git a/tests/utils.py b/tests/utils.py
index 215226debf..aaed1149c3 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -16,7 +16,9 @@
 import atexit
 import hashlib
 import os
+import time
 import uuid
+import warnings
 from inspect import getcallargs
 
 from mock import Mock, patch
@@ -237,20 +239,41 @@ def setup_test_homeserver(
         else:
             # We need to do cleanup on PostgreSQL
             def cleanup():
+                import psycopg2
+
                 # Close all the db pools
                 hs.get_db_pool().close()
 
+                dropped = False
+
                 # Drop the test database
                 db_conn = db_engine.module.connect(
                     database=POSTGRES_BASE_DB, user=POSTGRES_USER
                 )
                 db_conn.autocommit = True
                 cur = db_conn.cursor()
-                cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
-                db_conn.commit()
+
+                # Try a few times to drop the DB. Some things may hold on to the
+                # database for a few more seconds due to flakiness, preventing
+                # us from dropping it when the test is over. If we can't drop
+                # it, warn and move on.
+                for x in range(5):
+                    try:
+                        cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
+                        db_conn.commit()
+                        dropped = True
+                    except psycopg2.OperationalError as e:
+                        warnings.warn(
+                            "Couldn't drop old db: " + str(e), category=UserWarning
+                        )
+                        time.sleep(0.5)
+
                 cur.close()
                 db_conn.close()
 
+                if not dropped:
+                    warnings.warn("Failed to drop old DB.", category=UserWarning)
+
             if not LEAVE_DB:
                 # Register the cleanup hook
                 cleanup_func(cleanup)
diff --git a/tox.ini b/tox.ini
index 80ac9324df..e4db563b4b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -4,7 +4,7 @@ envlist = packaging, py27, py36, pep8, check_isort
 [base]
 deps =
     coverage
-    Twisted>=15.1
+    Twisted>=17.1
     mock
     python-subunit
     junitxml
@@ -70,6 +70,16 @@ usedevelop=true
 [testenv:py36]
 usedevelop=true
 
+[testenv:py36-postgres]
+usedevelop=true
+deps =
+    {[base]deps}
+     psycopg2
+setenv =
+    {[base]setenv}
+    SYNAPSE_POSTGRES = 1
+
+
 [testenv:packaging]
 deps =
     check-manifest