summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/handlers/federation.py195
-rw-r--r--synapse/handlers/room_member.py5
-rw-r--r--synapse/http/client.py4
-rw-r--r--synapse/http/endpoint.py13
-rw-r--r--synapse/http/matrixfederationclient.py406
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/notifier.py21
-rw-r--r--synapse/storage/client_ips.py34
-rw-r--r--synapse/storage/transactions.py8
-rw-r--r--synapse/util/async_helpers.py88
-rw-r--r--synapse/util/retryutils.py2
12 files changed, 466 insertions, 320 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ac97e19649..3241ded188 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -457,6 +457,10 @@ def run(hs):
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
+        version = sys.version_info
+        stats["python_version"] = "{}.{}.{}".format(
+            version.major, version.minor, version.micro
+        )
         stats["total_users"] = yield hs.get_datastore().count_all_users()
 
         total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0c68e8a472..8d6bd7976d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
             self, origin, pdu, get_missing=True, sent_to_us_directly=False,
     ):
@@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
         # If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
         # we should check if we *are* in fact in the room. If we are then we
         # can magically rejoin the room.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
             )
             if was_in_room:
                 logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
+                    "[%s %s] Ignoring PDU from %s as we've left the room",
+                    room_id, event_id, origin,
                 )
                 defer.returnValue(None)
 
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if get_missing and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
             if sent_to_us_directly and prevs - seen:
                 # If they have sent it to us directly, and the server
                 # isn't telling us about the auth events that it's
                 # made a message referencing, we explode
+                logger.warn(
+                    "[%s %s] Failed to fetch %d prev events: rejecting",
+                    room_id, event_id, len(prevs - seen),
+                )
                 raise FederationError(
                     "ERROR",
                     403,
@@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
                 auth_chains = set()
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    ours = yield self.store.get_state_groups(room_id, list(seen))
                     state_groups.append(ours)
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
+                        )
                         state, got_auth_chain = (
                             yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
+                                origin, room_id, p,
                             )
                         )
                         auth_chains.update(got_auth_chain)
@@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
                             ev_ids, get_prev_content=False, check_redacted=False
                         )
 
-                    room_version = yield self.store.get_room_version(pdu.room_id)
+                    room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                        room_version, state_groups, {event_id: pdu}, fetch
                     )
 
                     state = (yield self.store.get_events(state_map.values())).values()
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting %d prev_events: %s",
+            room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -360,49 +406,87 @@ class FederationHandler(BaseHandler):
         # apparently.
         #
         # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out agressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the agressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timout to 60s and see what happens.
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
             min_depth=min_depth,
-            timeout=10000,
+            timeout=60000,
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
             try:
                 yield self.on_receive_pdu(
                     origin,
-                    e,
+                    ev,
                     get_missing=False
                 )
             except FederationError as e:
                 if e.code == 403:
-                    logger.warn("Event %s failed history check.")
+                    logger.warn(
+                        "[%s %s] Received prev_event %s failed history check.",
+                        room_id, event_id, ev.event_id,
+                    )
                 else:
                     raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -411,15 +495,16 @@ class FederationHandler(BaseHandler):
         # event.
         if state and auth_chain and not event.internal_metadata.is_outlier():
             is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
+                room_id,
                 self.server_name
             )
         else:
             is_in_room = True
+
         if not is_in_room:
             logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
+                "[%s %s] Got event for room we're not in",
+                room_id, event_id,
             )
 
             try:
@@ -431,7 +516,7 @@ class FederationHandler(BaseHandler):
                     "ERROR",
                     e.code,
                     e.msg,
-                    affected=event.event_id,
+                    affected=event_id,
                 )
 
         else:
@@ -480,12 +565,12 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -513,7 +598,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
         room_id = mapping["room_id"]
         servers = mapping["servers"]
 
+        # put the server which owns the alias at the front of the server list.
+        if room_alias.domain in servers:
+            servers.remove(room_alias.domain)
+        servers.insert(0, room_alias.domain)
+
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92ad..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
             request_deferred = treq.request(
                 method, uri, agent=self.agent, data=data, headers=headers
             )
-            add_timeout_to_deferred(
+            request_deferred = timeout_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
                 cancelled_to_request_timed_out_error,
             )
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c9369519..91025037a3 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
 
     Args:
         reactor: Twisted reactor.
-        destination (bytes): The name of the server to connect to.
+        destination (unicode): The name of the server to connect to.
         tls_client_options_factory
             (synapse.crypto.context_factory.ClientTLSOptionsFactory):
             Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
         transport_endpoint = HostnameEndpoint
         default_port = 8008
     else:
+        # the SNI string should be the same as the Host header, minus the port.
+        # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+        # the Host header and SNI should therefore be the server_name of the remote
+        # server.
+        tls_options = tls_client_options_factory.get_options(domain)
+
         def transport_endpoint(reactor, host, port, timeout):
             return wrapClientTLS(
-                tls_client_options_factory.get_options(host),
-                HostnameEndpoint(reactor, host, port, timeout=timeout))
+                tls_options,
+                HostnameEndpoint(reactor, host, port, timeout=timeout),
+            )
         default_port = 8448
 
     if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 083484a687..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,10 +17,12 @@ import cgi
 import logging
 import random
 import sys
+from io import BytesIO
 
 from six import PY3, string_types
 from six.moves import urllib
 
+import attr
 import treq
 from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
@@ -28,8 +30,9 @@ from signedjson.sign import sign_json
 
 from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -41,13 +44,11 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
 
 outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
                                     "", ["method"])
@@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object):
         )
 
 
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+    method = attr.ib()
+    """HTTP method
+    :type: str
+    """
+
+    path = attr.ib()
+    """HTTP path
+    :type: str
+    """
+
+    destination = attr.ib()
+    """The remote server to send the HTTP request to.
+    :type: str"""
+
+    json = attr.ib(default=None)
+    """JSON to send in the body.
+    :type: dict|None
+    """
+
+    json_callback = attr.ib(default=None)
+    """A callback to generate the JSON.
+    :type: func|None
+    """
+
+    query = attr.ib(default=None)
+    """Query arguments.
+    :type: dict|None
+    """
+
+    txn_id = attr.ib(default=None)
+    """Unique ID for this request (for logging)
+    :type: str|None
+    """
+
+    def __attrs_post_init__(self):
+        global _next_id
+        self.txn_id = "%s-O-%s" % (self.method, _next_id)
+        _next_id = (_next_id + 1) % (MAXINT - 1)
+
+    def get_json(self):
+        if self.json_callback:
+            return self.json_callback()
+        return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+    """
+    Reads the JSON body of a response, with a timeout
+
+    Args:
+        reactor (IReactor): twisted reactor, for the timeout
+        timeout_sec (float): number of seconds to wait for response to complete
+        request (MatrixFederationRequest): the request that triggered the response
+        response (IResponse): response to the request
+
+    Returns:
+        dict: parsed JSON response
+    """
+    try:
+        check_content_type_is_json(response.headers)
+
+        d = treq.json_content(response)
+        d = timeout_deferred(
+            d,
+            timeout=timeout_sec,
+            reactor=reactor,
+        )
+
+        body = yield make_deferred_yieldable(d)
+    except Exception as e:
+        logger.warn(
+            "{%s} [%s] Error reading response: %s",
+            request.txn_id,
+            request.destination,
+            e,
+        )
+        raise
+    logger.info(
+        "{%s} [%s] Completed: %d %s",
+        request.txn_id,
+        request.destination,
+        response.code,
+        response.phrase.decode('ascii', errors='replace'),
+    )
+    defer.returnValue(body)
+
+
 class MatrixFederationHttpClient(object):
     """HTTP client used to talk to other homeservers over the federation
     protocol. Send client certificates and signs requests.
@@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object):
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
         self.version_string = hs.version_string.encode('ascii')
-        self._next_id = 1
         self.default_timeout = 60
 
-    def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urllib.parse.urlunparse(
-            (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
-        )
+        def schedule(x):
+            reactor.callLater(_EPSILON, x)
+
+        self._cooperator = Cooperator(scheduler=schedule)
 
     @defer.inlineCallbacks
-    def _request(self, destination, method, path,
-                 json=None, json_callback=None,
-                 param_bytes=b"",
-                 query=None, retry_on_dns_fail=True,
-                 timeout=None, long_retries=False,
-                 ignore_backoff=False,
-                 backoff_on_404=False):
+    def _send_request(
+        self,
+        request,
+        retry_on_dns_fail=True,
+        timeout=None,
+        long_retries=False,
+        ignore_backoff=False,
+        backoff_on_404=False
+    ):
         """
-        Creates and sends a request to the given server.
+        Sends a request to the given server.
 
         Args:
-            destination (str): The remote server to send the HTTP request to.
-            method (str): HTTP method
-            path (str): The HTTP path
-            json (dict or None): JSON to send in the body.
-            json_callback (func or None): A callback to generate the JSON.
-            query (dict or None): Query arguments.
+            request (MatrixFederationRequest): details of request to be sent
+
+            timeout (int|None): number of milliseconds to wait for the response headers
+                (including connecting to the server). 60s by default.
+
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
+
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
@@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object):
 
         if (
             self.hs.config.federation_domain_whitelist is not None and
-            destination not in self.hs.config.federation_domain_whitelist
+            request.destination not in self.hs.config.federation_domain_whitelist
         ):
-            raise FederationDeniedError(destination)
+            raise FederationDeniedError(request.destination)
 
         limiter = yield synapse.util.retryutils.get_retry_limiter(
-            destination,
+            request.destination,
             self.clock,
             self._store,
             backoff_on_404=backoff_on_404,
             ignore_backoff=ignore_backoff,
         )
 
-        headers_dict = {}
-        path_bytes = path.encode("ascii")
-        if query:
-            query_bytes = encode_query_args(query)
+        method = request.method
+        destination = request.destination
+        path_bytes = request.path.encode("ascii")
+        if request.query:
+            query_bytes = encode_query_args(request.query)
         else:
             query_bytes = b""
 
         headers_dict = {
             "User-Agent": [self.version_string],
-            "Host": [destination],
+            "Host": [request.destination],
         }
 
         with limiter:
-            url = self._create_url(
-                destination.encode("ascii"), path_bytes, param_bytes, query_bytes
-            ).decode('ascii')
-
-            txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (MAXINT - 1)
-
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
@@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url = urllib.parse.urlunparse(
-                (b"", b"", path_bytes, param_bytes, query_bytes, b"")
-            ).decode('ascii')
+            url = urllib.parse.urlunparse((
+                b"matrix", destination.encode("ascii"),
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
+
+            http_url = urllib.parse.urlunparse((
+                b"", b"",
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
 
-            log_result = None
             while True:
                 try:
-                    if json_callback:
-                        json = json_callback()
-
+                    json = request.get_json()
                     if json:
                         data = encode_canonical_json(json)
                         headers_dict["Content-Type"] = ["application/json"]
@@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object):
                         data = None
                         self.sign_request(destination, method, http_url, headers_dict)
 
-                    outbound_logger.info(
+                    logger.info(
                         "{%s} [%s] Sending request: %s %s",
-                        txn_id, destination, method, url
+                        request.txn_id, destination, method, url
                     )
 
+                    if data:
+                        producer = FileBodyProducer(
+                            BytesIO(data),
+                            cooperator=self._cooperator
+                        )
+                    else:
+                        producer = None
+
                     request_deferred = treq.request(
                         method,
                         url,
                         headers=Headers(headers_dict),
-                        data=data,
+                        data=producer,
                         agent=self.agent,
                         reactor=self.hs.get_reactor(),
                         unbuffered=True
                     )
-                    request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
 
-                    # Sometimes the timeout above doesn't work, so lets hack yet
-                    # another layer of timeouts in in the vain hope that at some
-                    # point the world made sense and this really really really
-                    # should work.
-                    request_deferred = timeout_no_seriously(
+                    request_deferred = timeout_deferred(
                         request_deferred,
-                        timeout=_sec_timeout * 2,
+                        timeout=_sec_timeout,
                         reactor=self.hs.get_reactor(),
                     )
 
@@ -244,33 +339,19 @@ class MatrixFederationHttpClient(object):
                             request_deferred,
                         )
 
-                    log_result = "%d %s" % (
-                        response.code,
-                        response.phrase.decode('ascii', errors='replace'),
-                    )
                     break
                 except Exception as e:
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                        logger.warn(
-                            "DNS Lookup failed to %s with %s",
-                            destination,
-                            e
-                        )
-                        log_result = "DNS Lookup failed to %s with %s" % (
-                            destination, e
-                        )
-                        raise
-
                     logger.warn(
-                        "{%s} Sending request failed to %s: %s %s: %s",
-                        txn_id,
+                        "{%s} [%s] Request failed: %s %s: %s",
+                        request.txn_id,
                         destination,
                         method,
                         url,
                         _flatten_response_never_received(e),
                     )
 
-                    log_result = _flatten_response_never_received(e)
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                        raise
 
                     if retries_left and not timeout:
                         if long_retries:
@@ -283,33 +364,37 @@ class MatrixFederationHttpClient(object):
                             delay *= random.uniform(0.8, 1.4)
 
                         logger.debug(
-                            "{%s} Waiting %s before sending to %s...",
-                            txn_id,
+                            "{%s} [%s] Waiting %ss before re-sending...",
+                            request.txn_id,
+                            destination,
                             delay,
-                            destination
                         )
 
                         yield self.clock.sleep(delay)
                         retries_left -= 1
                     else:
                         raise
-                finally:
-                    outbound_logger.info(
-                        "{%s} [%s] Result: %s",
-                        txn_id,
-                        destination,
-                        log_result,
-                    )
+
+            logger.info(
+                "{%s} [%s] Got response headers: %d %s",
+                request.txn_id,
+                destination,
+                response.code,
+                response.phrase.decode('ascii', errors='replace'),
+            )
 
             if 200 <= response.code < 300:
                 pass
             else:
                 # :'(
                 # Update transactions table?
-                with logcontext.PreserveLoggingContext():
-                    d = treq.content(response)
-                    d.addTimeout(_sec_timeout, self.hs.get_reactor())
-                    body = yield make_deferred_yieldable(d)
+                d = treq.content(response)
+                d = timeout_deferred(
+                    d,
+                    timeout=_sec_timeout,
+                    reactor=self.hs.get_reactor(),
+                )
+                body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -403,29 +488,26 @@ class MatrixFederationHttpClient(object):
             is not on our federation whitelist
         """
 
-        if not json_data_callback:
-            json_data_callback = lambda: data
-
-        response = yield self._request(
-            destination,
-            "PUT",
-            path,
-            json_callback=json_data_callback,
+        request = MatrixFederationRequest(
+            method="PUT",
+            destination=destination,
+            path=path,
             query=args,
+            json_callback=json_data_callback,
+            json=data,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
             backoff_on_404=backoff_on_404,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -459,31 +541,30 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "POST",
-            path,
+
+        request = MatrixFederationRequest(
+            method="POST",
+            destination=destination,
+            path=path,
             query=args,
             json=data,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            if timeout:
-                _sec_timeout = timeout / 1000
-            else:
-                _sec_timeout = self.default_timeout
-
-            d.addTimeout(_sec_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
 
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), _sec_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -519,25 +600,23 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
-
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -568,25 +647,23 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "DELETE",
-            path,
+        request = MatrixFederationRequest(
+            method="DELETE",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            d = treq.json_content(response)
-            d.addTimeout(self.default_timeout, self.hs.get_reactor())
-            body = yield make_deferred_yieldable(d)
-
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -614,11 +691,15 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
             query=args,
+        )
+
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -626,14 +707,25 @@ class MatrixFederationHttpClient(object):
         headers = dict(response.headers.getAllRawHeaders())
 
         try:
-            with logcontext.PreserveLoggingContext():
-                d = _readBodyToFile(response, output_stream, max_size)
-                d.addTimeout(self.default_timeout, self.hs.get_reactor())
-                length = yield make_deferred_yieldable(d)
-        except Exception:
-            logger.exception("Failed to download body")
+            d = _readBodyToFile(response, output_stream, max_size)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            length = yield make_deferred_yieldable(d)
+        except Exception as e:
+            logger.warn(
+                "{%s} [%s] Error reading response: %s",
+                request.txn_id,
+                request.destination,
+                e,
+            )
             raise
-
+        logger.info(
+            "{%s} [%s] Completed: %d %s [%d bytes]",
+            request.txn_id,
+            request.destination,
+            response.code,
+            response.phrase.decode('ascii', errors='replace'),
+            length,
+        )
         defer.returnValue((length, headers))
 
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9579e8cd0d..50be2de3bb 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,9 +75,9 @@ class SynapseRequest(Request):
         return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
             self.__class__.__name__,
             id(self),
-            self.method,
+            self.method.decode('ascii', errors='replace'),
             self.get_redacted_uri(),
-            self.clientproto,
+            self.clientproto.decode('ascii', errors='replace'),
             self.site.site_tag,
         )
 
@@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest):
             C{b"-"}.
         """
         return self.requestHeaders.getRawHeaders(
-            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
 
 
 class SynapseRequestFactory(object):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..f1d92c1395 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import LaterGauge
 from synapse.types import StreamToken
-from synapse.util.async_helpers import (
-    DeferredTimeoutError,
-    ObservableDeferred,
-    add_timeout_to_deferred,
-)
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -337,7 +333,7 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    add_timeout_to_deferred(
+                    listener.deferred = timeout_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
                         self.hs.get_reactor(),
@@ -354,7 +350,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimeoutError:
+                except defer.TimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -559,15 +555,16 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            add_timeout_to_deferred(
-                listener.deferred.addTimeout,
-                (end_time - now) / 1000.,
-                self.hs.get_reactor(),
+            listener.deferred = timeout_deferred(
+                listener.deferred,
+                timeout=(end_time - now) / 1000.,
+                reactor=self.hs.get_reactor(),
             )
+
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
-            except DeferredTimeoutError:
+            except defer.TimeoutError:
                 break
             except defer.CancelledError:
                 break
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..9ad17b7c25 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
-            self._simple_upsert_txn(
-                txn,
-                table="user_ips",
-                keyvalues={
-                    "user_id": user_id,
-                    "access_token": access_token,
-                    "ip": ip,
-                    "user_agent": user_agent,
-                    "device_id": device_id,
-                },
-                values={
-                    "last_seen": last_seen,
-                },
-                lock=False,
-            )
+            try:
+                self._simple_upsert_txn(
+                    txn,
+                    table="user_ips",
+                    keyvalues={
+                        "user_id": user_id,
+                        "access_token": access_token,
+                        "ip": ip,
+                        "user_agent": user_agent,
+                        "device_id": device_id,
+                    },
+                    values={
+                        "last_seen": last_seen,
+                    },
+                    lock=False,
+                )
+            except Exception as e:
+                # Failed to upsert, log and continue
+                logger.error("Failed to insert client IP %r: %r", entry, e)
 
     @defer.inlineCallbacks
     def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd3322..baf0379a68 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
@@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
-    @cached(max_entries=10000)
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # XXX: we could chose to not bother persisting this if our cache thinks
-        # this is a NOOP
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
@@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
                                        retry_last_ts, retry_interval):
         self.database_engine.lock_table(txn, "destinations")
 
-        self._invalidate_cache_and_stream(
-            txn, self.get_destination_retry_timings, (destination,)
-        )
-
         # We need to be careful here as the data may have changed from under us
         # due to a worker setting the timings.
 
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c2946129..ec7b2c9672 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
         defer.returnValue(_ctx_manager())
 
 
-class DeferredTimeoutError(Exception):
-    """
-    This error is raised by default when a L{Deferred} times out.
-    """
-
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise defer.TimeoutError(timeout, "Deferred")
+    return value
 
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
-    """
-    Add a timeout to a deferred by scheduling it to be cancelled after
-    timeout seconds.
 
-    This is essentially a backport of deferred.addTimeout, which was introduced
-    in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+    """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+    that have a canceller that throws exceptions. This method creates a new
+    deferred that wraps and times out the given deferred, correctly handling
+    the case where the given deferred's canceller throws.
 
-    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
-    unless a cancelable function was passed to its initialization or unless
-    a different on_timeout_cancel callable is provided.
+    NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 
     Args:
-        deferred (defer.Deferred): deferred to be timed out
-        timeout (Number): seconds to time out after
-        reactor (twisted.internet.reactor): the Twisted reactor to use
-
+        deferred (Deferred)
+        timeout (float): Timeout in seconds
+        reactor (twisted.internet.reactor): The twisted reactor to use
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
             otherwise cancelled before the timeout.
@@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
             the timeout.
 
             The default callable (if none is provided) will translate a
-            CancelledError Failure into a DeferredTimeoutError.
-    """
-    timed_out = [False]
-
-    def time_it_out():
-        timed_out[0] = True
-        deferred.cancel()
-
-    delayed_call = reactor.callLater(timeout, time_it_out)
-
-    def convert_cancelled(value):
-        if timed_out[0]:
-            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
-            return to_call(value, timeout)
-        return value
-
-    deferred.addBoth(convert_cancelled)
+            CancelledError Failure into a defer.TimeoutError.
 
-    def cancel_timeout(result):
-        # stop the pending call to cancel the deferred if it's been fired
-        if delayed_call.active():
-            delayed_call.cancel()
-        return result
-
-    deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
-    if isinstance(value, failure.Failure):
-        value.trap(CancelledError)
-        raise DeferredTimeoutError(timeout, "Deferred")
-    return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
-    """The in build twisted deferred addTimeout (and the method above)
-    completely fail to time things out under some unknown circumstances.
-
-    Lets try a different way of timing things out and maybe that will make
-    things work?!
-
-    TODO: Kill this with fire.
+    Returns:
+        Deferred
     """
 
     new_d = defer.Deferred()
@@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
     def time_it_out():
         timed_out[0] = True
 
-        if not new_d.called:
-            new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+        try:
+            deferred.cancel()
+        except:   # noqa: E722, if we throw any exception it'll break time outs
+            logger.exception("Canceller failed during timeout")
 
-        deferred.cancel()
+        if not new_d.called:
+            new_d.errback(defer.TimeoutError(timeout, "Deferred"))
 
     delayed_call = reactor.callLater(timeout, time_it_out)
 
     def convert_cancelled(value):
         if timed_out[0]:
-            return _cancelled_to_timed_out_error(value, timeout)
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
         return value
 
     deferred.addBoth(convert_cancelled)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
-            logger.debug(
+            logger.info(
                 "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
                 self.destination, exc_type, exc_val, self.retry_interval
             )