summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py10
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/app/frontend_proxy.py2
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/appservice/api.py11
-rw-r--r--synapse/config/_base.py16
-rw-r--r--synapse/config/appservice.py8
-rw-r--r--synapse/crypto/keyring.py8
-rw-r--r--synapse/federation/federation_base.py21
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py57
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py73
-rw-r--r--synapse/federation/transport/server.py12
-rw-r--r--synapse/handlers/appservice.py37
-rw-r--r--synapse/handlers/federation.py109
-rw-r--r--synapse/handlers/message.py188
-rw-r--r--synapse/handlers/register.py36
-rw-r--r--synapse/handlers/room_list.py43
-rw-r--r--synapse/handlers/room_member.py21
-rw-r--r--synapse/handlers/sync.py25
-rw-r--r--synapse/http/endpoint.py103
-rw-r--r--synapse/http/server.py4
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/metrics/__init__.py43
-rw-r--r--synapse/metrics/metric.py32
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/python_dependencies.py20
-rw-r--r--synapse/replication/http/send_event.py20
-rw-r--r--synapse/rest/client/v1/logout.py5
-rw-r--r--synapse/rest/client/v1/register.py4
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/rest/client/v2_alpha/register.py32
-rw-r--r--synapse/rest/media/v1/media_storage.py4
-rw-r--r--synapse/storage/__init__.py4
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/engines/__init__.py5
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/events.py52
-rw-r--r--synapse/storage/events_worker.py20
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/types.py2
-rw-r--r--synapse/util/caches/response_cache.py106
-rw-r--r--synapse/util/file_consumer.py4
-rw-r--r--synapse/util/logcontext.py1
49 files changed, 824 insertions, 433 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 0c1c16b9a4..f31cb9a3cb 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.27.4"
+__version__ = "0.28.1"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index ac0a3655a5..f17fda6315 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -204,8 +204,8 @@ class Auth(object):
 
             ip_addr = self.hs.get_ip_from_request(request)
             user_agent = request.requestHeaders.getRawHeaders(
-                "User-Agent",
-                default=[""]
+                b"User-Agent",
+                default=[b""]
             )[0]
             if user and access_token and ip_addr:
                 self.store.insert_client_ip(
@@ -672,7 +672,7 @@ def has_access_token(request):
         bool: False if no access_token was given, True otherwise.
     """
     query_params = request.args.get("access_token")
-    auth_headers = request.requestHeaders.getRawHeaders("Authorization")
+    auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
     return bool(query_params) or bool(auth_headers)
 
 
@@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
         AuthError: If there isn't an access_token in the request.
     """
 
-    auth_headers = request.requestHeaders.getRawHeaders("Authorization")
-    query_params = request.args.get("access_token")
+    auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
+    query_params = request.args.get(b"access_token")
     if auth_headers:
         # Try the get the access_token from a "Authorization: Bearer"
         # header
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 489efb7f86..5baba43966 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -16,6 +16,9 @@
 
 """Contains constants from the specification."""
 
+# the "depth" field on events is limited to 2**63 - 1
+MAX_DEPTH = 2**63 - 1
+
 
 class Membership(object):
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index bee59e80dd..a9ff5576f3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -18,6 +18,7 @@
 import logging
 
 import simplejson as json
+from six import iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
         A dict representing the error response JSON.
     """
     err = {"error": msg, "errcode": code}
-    for key, value in kwargs.iteritems():
+    for key, value in iteritems(kwargs):
         err[key] = value
     return err
 
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index de889357c3..b349e3e3ce 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
             # They're actually trying to upload something, proxy to main synapse.
             # Pass through the auth headers, if any, in case the access token
             # is there.
-            auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+            auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
             headers = {
                 "Authorization": auth_headers,
             }
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 508b66613d..2fddcd935a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
 from twisted.web.resource import NoResource
 
+from six import iteritems
+
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
 
     def get_currently_syncing_users(self):
         return [
-            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
             if count > 0
         ]
 
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d5a7a5ce2f..5fdb579723 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -21,6 +21,8 @@ from twisted.internet import defer
 import logging
 import re
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -146,7 +148,7 @@ class ApplicationService(object):
                         )
 
                 regex = regex_obj.get("regex")
-                if isinstance(regex, basestring):
+                if isinstance(regex, string_types):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
                     raise ValueError(
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 40c433d7ae..00efff1464 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+                                                 timeout_ms=HOUR_IN_MS)
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
@@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        result = self.protocol_meta_cache.get(key)
-        if not result:
-            result = self.protocol_meta_cache.set(
-                key, preserve_fn(_get)()
-            )
-        return make_deferred_yieldable(result)
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index fa105bce72..32b439d20a 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -19,6 +19,8 @@ import os
 import yaml
 from textwrap import dedent
 
+from six import integer_types
+
 
 class ConfigError(Exception):
     pass
@@ -49,7 +51,7 @@ Missing mandatory `server_name` config option.
 class Config(object):
     @staticmethod
     def parse_size(value):
-        if isinstance(value, int) or isinstance(value, long):
+        if isinstance(value, integer_types):
             return value
         sizes = {"K": 1024, "M": 1024 * 1024}
         size = 1
@@ -61,7 +63,7 @@ class Config(object):
 
     @staticmethod
     def parse_duration(value):
-        if isinstance(value, int) or isinstance(value, long):
+        if isinstance(value, integer_types):
             return value
         second = 1000
         minute = 60 * second
@@ -288,22 +290,22 @@ class Config(object):
                     )
                     obj.invoke_all("generate_files", config)
                     config_file.write(config_bytes)
-                print (
+                print((
                     "A config file has been generated in %r for server name"
                     " %r with corresponding SSL keys and self-signed"
                     " certificates. Please review this file and customise it"
                     " to your needs."
-                ) % (config_path, server_name)
-                print (
+                ) % (config_path, server_name))
+                print(
                     "If this server name is incorrect, you will need to"
                     " regenerate the SSL certificates"
                 )
                 return
             else:
-                print (
+                print((
                     "Config file %r already exists. Generating any missing key"
                     " files."
-                ) % (config_path,)
+                ) % (config_path,))
                 generate_keys = True
 
         parser = argparse.ArgumentParser(
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index aba0aec6e8..9a2359b6fd 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -21,6 +21,8 @@ import urllib
 import yaml
 import logging
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename):
         "id", "as_token", "hs_token", "sender_localpart"
     ]
     for field in required_string_fields:
-        if not isinstance(as_info.get(field), basestring):
+        if not isinstance(as_info.get(field), string_types):
             raise KeyError("Required string field: '%s' (%s)" % (
                 field, config_filename,
             ))
 
     # 'url' must either be a string or explicitly null, not missing
     # to avoid accidentally turning off push for ASes.
-    if (not isinstance(as_info.get("url"), basestring) and
+    if (not isinstance(as_info.get("url"), string_types) and
             as_info.get("url", "") is not None):
         raise KeyError(
             "Required string field or explicit null: 'url' (%s)" % (config_filename,)
@@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename):
                         "Expected namespace entry in %s to be an object,"
                         " but got %s", ns, regex_obj
                     )
-                if not isinstance(regex_obj.get("regex"), basestring):
+                if not isinstance(regex_obj.get("regex"), string_types):
                     raise ValueError(
                         "Missing/bad type 'regex' key in %s", regex_obj
                     )
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 35f810b07b..fce83d445f 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -352,7 +352,7 @@ class Keyring(object):
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
                 defer.returnValue({})
 
@@ -384,7 +384,7 @@ class Keyring(object):
                 logger.info(
                     "Unable to get key %r for %r directly: %s %s",
                     key_ids, server_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
 
             if not keys:
@@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
     except IOError as e:
         logger.warn(
             "Got IOError when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             502,
@@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
     except Exception as e:
         logger.exception(
             "Got Exception when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e.message),
+            server_name, type(e).__name__, str(e),
         )
         raise SynapseError(
             401,
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 79eaa31031..4cc98a3fe8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -14,7 +14,10 @@
 # limitations under the License.
 import logging
 
-from synapse.api.errors import SynapseError
+import six
+
+from synapse.api.constants import MAX_DEPTH
+from synapse.api.errors import SynapseError, Codes
 from synapse.crypto.event_signing import check_event_content_hash
 from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
@@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
         FrozenEvent
 
     Raises:
-        SynapseError: if the pdu is missing required fields
+        SynapseError: if the pdu is missing required fields or is otherwise
+            not a valid matrix event
     """
     # we could probably enforce a bunch of other fields here (room_id, sender,
     # origin, etc etc)
-    assert_params_in_request(pdu_json, ('event_id', 'type'))
+    assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
+
+    depth = pdu_json['depth']
+    if not isinstance(depth, six.integer_types):
+        raise SynapseError(400, "Depth %r not an intger" % (depth, ),
+                           Codes.BAD_JSON)
+
+    if depth < 0:
+        raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
+    elif depth > MAX_DEPTH:
+        raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
+
     event = FrozenEvent(
         pdu_json
     )
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..8e2c0c4cd2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -394,7 +394,7 @@ class FederationClient(FederationBase):
             seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
             signed_events = seen_events.values()
         else:
-            seen_events = yield self.store.have_events(event_ids)
+            seen_events = yield self.store.have_seen_events(event_ids)
             signed_events = []
 
         failed_to_fetch = set()
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bea7fd0b71..247ddc89d5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -30,9 +31,10 @@ import synapse.metrics
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
 
+from six import iteritems
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -65,7 +67,7 @@ class FederationServer(FederationBase):
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
-        self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+        self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
 
     @defer.inlineCallbacks
     @log_function
@@ -212,16 +214,17 @@ class FederationServer(FederationBase):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        result = self._state_resp_cache.get((room_id, event_id))
-        if not result:
-            with (yield self._server_linearizer.queue((origin, room_id))):
-                d = self._state_resp_cache.set(
-                    (room_id, event_id),
-                    preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
-                )
-                resp = yield make_deferred_yieldable(d)
-        else:
-            resp = yield make_deferred_yieldable(result)
+        # we grab the linearizer to protect ourselves from servers which hammer
+        # us. In theory we might already have the response to this query
+        # in the cache so we could return it without waiting for the linearizer
+        # - but that's non-trivial to get right, and anyway somewhat defeats
+        # the point of the linearizer.
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            resp = yield self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id, event_id,
+            )
 
         defer.returnValue((200, resp))
 
@@ -425,9 +428,9 @@ class FederationServer(FederationBase):
             "Claimed one-time-keys: %s",
             ",".join((
                 "%s for %s:%s" % (key_id, user_id, device_id)
-                for user_id, user_keys in json_result.iteritems()
-                for device_id, device_keys in user_keys.iteritems()
-                for key_id, _ in device_keys.iteritems()
+                for user_id, user_keys in iteritems(json_result)
+                for device_id, device_keys in iteritems(user_keys)
+                for key_id, _ in iteritems(device_keys)
             )),
         )
 
@@ -494,13 +497,33 @@ class FederationServer(FederationBase):
     def _handle_received_pdu(self, origin, pdu):
         """ Process a PDU received in a federation /send/ transaction.
 
+        If the event is invalid, then this method throws a FederationError.
+        (The error will then be logged and sent back to the sender (which
+        probably won't do anything with it), and other events in the
+        transaction will be processed as normal).
+
+        It is likely that we'll then receive other events which refer to
+        this rejected_event in their prev_events, etc.  When that happens,
+        we'll attempt to fetch the rejected event again, which will presumably
+        fail, so those second-generation events will also get rejected.
+
+        Eventually, we get to the point where there are more than 10 events
+        between any new events and the original rejected event. Since we
+        only try to backfill 10 events deep on received pdu, we then accept the
+        new event, possibly introducing a discontinuity in the DAG, with new
+        forward extremities, so normal service is approximately returned,
+        until we try to backfill across the discontinuity.
+
         Args:
             origin (str): server which sent the pdu
             pdu (FrozenEvent): received pdu
 
         Returns (Deferred): completes with None
-        Raises: FederationError if the signatures / hash do not match
-    """
+
+        Raises: FederationError if the signatures / hash do not match, or
+            if the event was unacceptable for any other reason (eg, too large,
+            too many prev_events, couldn't find the prev_events)
+        """
         # check that it's actually being sent from a valid destination to
         # workaround bug #1753 in 0.18.5 and 0.18.6
         if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..0f0c687b37 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -40,6 +40,8 @@ from collections import namedtuple
 
 import logging
 
+from six import itervalues, iteritems
+
 logger = logging.getLogger(__name__)
 
 
@@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
 
             user_ids = set(
                 user_id
-                for uids in self.presence_changed.itervalues()
+                for uids in itervalues(self.presence_changed)
                 for user_id in uids
             )
 
@@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
         # stream position.
         keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
 
-        for ((destination, edu_key), pos) in keyed_edus.iteritems():
+        for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
         j = keys.bisect_right(to_token) + 1
         device_messages = {self.device_messages[k]: k for k in keys[i:j]}
 
-        for (destination, pos) in device_messages.iteritems():
+        for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
-    for destination, edu_map in buff.keyed_edus.iteritems():
+    for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=key,
             )
 
-    for destination, edu_list in buff.edus.iteritems():
+    for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in buff.failures.iteritems():
+    for destination, failure_list in iteritems(buff.failures):
         for failure in failure_list:
             transaction_queue.send_failure(destination, failure)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..963d938edd 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
-
-                    # Get the state from before the event.
-                    # We need to make sure that this is the state from before
-                    # the event and not from after it.
-                    # Otherwise if the last member on a server in a room is
-                    # banned then it won't receive the event because it won't
-                    # be in the room after the ban.
-                    destinations = yield self.state.get_current_hosts_in_room(
-                        event.room_id, latest_event_ids=[
-                            prev_id for prev_id, _ in event.prev_events
-                        ],
-                    )
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=[
+                                prev_id for prev_id, _ in event.prev_events
+                            ],
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
                     destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
@@ -207,12 +216,44 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
-                events_processed_counter.inc_by(len(events))
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
 
                 yield self.store.update_federation_out_pos(
                     "events", next_token
                 )
 
+                if events:
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "federation_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "federation_sender",
+                    )
+
+                events_processed_counter.inc_by(len(events))
+
+                synapse.metrics.event_processing_positions.set(
+                    next_token, "federation_sender",
+                )
+
         finally:
             self._is_processing = False
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 4c94d5a36c..ff0656df3e 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -94,12 +94,6 @@ class Authenticator(object):
             "signatures": {},
         }
 
-        if (
-            self.federation_domain_whitelist is not None and
-            self.server_name not in self.federation_domain_whitelist
-        ):
-            raise FederationDeniedError(self.server_name)
-
         if content is not None:
             json_request["content"] = content
 
@@ -138,6 +132,12 @@ class Authenticator(object):
                 json_request["origin"] = origin
                 json_request["signatures"].setdefault(origin, {})[key] = sig
 
+        if (
+            self.federation_domain_whitelist is not None and
+            origin not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(origin)
+
         if not json_request["signatures"]:
             raise NoAuthenticationError(
                 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..0245197c02 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+    make_deferred_yieldable, preserve_fn, run_in_background,
+)
 
 import logging
 
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
-                    events_processed_counter.inc_by(len(events))
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
 
                     yield self.store.set_appservice_last_pos(upper_bound)
+
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_positions.set(
+                        upper_bound, "appservice_sender",
+                    )
+
+                    events_processed_counter.inc_by(len(events))
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "appservice_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "appservice_sender",
+                    )
             finally:
                 self.is_processing = False
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..ae7e0d6da2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,14 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+
+import httplib
+import itertools
+import logging
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
+from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
@@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.util.distributor import user_joined_room
 
-from twisted.internet import defer
-
-import itertools
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # do some initial sanity-checking of the event. In particular, make
+        # sure it doesn't have hundreds of prev_events or auth_events, which
+        # could cause a huge state resolution or cascade of event fetches.
+        try:
+            self._sanity_check_event(pdu)
+        except SynapseError as err:
+            raise FederationError(
+                "ERROR",
+                err.code,
+                err.msg,
+                affected=pdu.event_id,
+            )
+
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if pdu.room_id in self.room_queues:
@@ -149,10 +164,6 @@ class FederationHandler(BaseHandler):
 
         auth_chain = []
 
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-
         fetch_state = False
 
         # Get missing pdus if necessary.
@@ -168,7 +179,7 @@ class FederationHandler(BaseHandler):
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
+            seen = yield self.store.have_seen_events(prevs)
 
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
@@ -196,8 +207,7 @@ class FederationHandler(BaseHandler):
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
-                        have_seen = yield self.store.have_events(prevs)
-                        seen = set(have_seen.iterkeys())
+                        seen = yield self.store.have_seen_events(prevs)
 
                         if not prevs - seen:
                             logger.info(
@@ -248,8 +258,7 @@ class FederationHandler(BaseHandler):
             min_depth (int): Minimum depth of events to return.
         """
         # We recalculate seen, since it may have changed.
-        have_seen = yield self.store.have_events(prevs)
-        seen = set(have_seen.keys())
+        seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
@@ -361,9 +370,7 @@ class FederationHandler(BaseHandler):
             if auth_chain:
                 event_ids |= {e.event_id for e in auth_chain}
 
-            seen_ids = set(
-                (yield self.store.have_events(event_ids)).keys()
-            )
+            seen_ids = yield self.store.have_seen_events(event_ids)
 
             if state and auth_chain is not None:
                 # If we have any state or auth_chain given to us by the replication
@@ -527,9 +534,16 @@ class FederationHandler(BaseHandler):
     def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
 
-        This will attempt to get more events from the remote. This may return
-        be successfull and still return no events if the other side has no new
-        events to offer.
+        This will attempt to get more events from the remote. If the other side
+        has no new events to offer, this will return an empty list.
+
+        As the events are received, we check their signatures, and also do some
+        sanity-checking on them. If any of the backfilled events are invalid,
+        this method throws a SynapseError.
+
+        TODO: make this more useful to distinguish failures of the remote
+        server from invalid events (there is probably no point in trying to
+        re-fetch invalid events from every other HS in the room.)
         """
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +555,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # ideally we'd sanity check the events here for excess prev_events etc,
+        # but it's hard to reject events at this point without completely
+        # breaking backfill in the same way that it is currently broken by
+        # events whose signature we cannot verify (#3121).
+        #
+        # So for now we accept the events anyway. #3124 tracks this.
+        #
+        # for ev in events:
+        #     self._sanity_check_event(ev)
+
         # Don't bother processing events we already have.
         seen_events = yield self.store.have_events_in_timeline(
             set(e.event_id for e in events)
@@ -633,7 +657,7 @@ class FederationHandler(BaseHandler):
 
                 failed_to_fetch = missing_auth - set(auth_events)
 
-        seen_events = yield self.store.have_events(
+        seen_events = yield self.store.have_seen_events(
             set(auth_events.keys()) | set(state_events.keys())
         )
 
@@ -843,6 +867,38 @@ class FederationHandler(BaseHandler):
 
         defer.returnValue(False)
 
+    def _sanity_check_event(self, ev):
+        """
+        Do some early sanity checks of a received event
+
+        In particular, checks it doesn't have an excessive number of
+        prev_events or auth_events, which could cause a huge state resolution
+        or cascade of event fetches.
+
+        Args:
+            ev (synapse.events.EventBase): event to be checked
+
+        Returns: None
+
+        Raises:
+            SynapseError if the event does not pass muster
+        """
+        if len(ev.prev_events) > 20:
+            logger.warn("Rejecting event %s which has %i prev_events",
+                        ev.event_id, len(ev.prev_events))
+            raise SynapseError(
+                httplib.BAD_REQUEST,
+                "Too many prev_events",
+            )
+
+        if len(ev.auth_events) > 10:
+            logger.warn("Rejecting event %s which has %i auth_events",
+                        ev.event_id, len(ev.auth_events))
+            raise SynapseError(
+                httplib.BAD_REQUEST,
+                "Too many auth_events",
+            )
+
     @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler):
             event_key = None
 
         if event_auth_events - current_state:
-            have_events = yield self.store.have_events(
+            # TODO: can we use store.have_seen_events here instead?
+            have_events = yield self.store.get_seen_events_with_rejections(
                 event_auth_events - current_state
             )
         else:
@@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler):
                     origin, event.room_id, event.event_id
                 )
 
-                seen_remotes = yield self.store.have_events(
+                seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
-                    if e.event_id in seen_remotes.keys():
+                    if e.event_id in seen_remotes:
                         continue
 
                     if e.event_id == event.event_id:
@@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler):
                     except AuthError:
                         pass
 
-                have_events = yield self.store.have_events(
+                have_events = yield self.store.get_seen_events_with_rejections(
                     [e_id for e_id, _ in event.auth_events]
                 )
                 seen_events = set(have_events.keys())
@@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler):
                         local_auth_chain,
                     )
 
-                    seen_remotes = yield self.store.have_events(
+                    seen_remotes = yield self.store.have_seen_events(
                         [e.event_id for e in result["auth_chain"]]
                     )
 
                     # 3. Process any remote auth chain events we haven't seen.
                     for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes.keys():
+                        if ev.event_id in seen_remotes:
                             continue
 
                         if ev.event_id == event.event_id:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6de6e13b7b..53beb2b9ab 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
@@ -37,7 +37,6 @@ from ._base import BaseHandler
 from canonicaljson import encode_canonical_json
 
 import logging
-import random
 import simplejson
 
 logger = logging.getLogger(__name__)
@@ -433,7 +432,7 @@ class EventCreationHandler(object):
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -447,47 +446,52 @@ class EventCreationHandler(object):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
 
-            event, context = yield self.create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_events_and_hashes=prev_events_and_hashes,
+        )
 
         defer.returnValue((event, context))
 
@@ -557,64 +561,80 @@ class EventCreationHandler(object):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
 
-        spam_error = self.spam_checker.check_event_for_spam(event)
-        if spam_error:
-            if not isinstance(spam_error, basestring):
-                spam_error = "Spam is not permitted here"
-            raise SynapseError(
-                403, spam_error, Codes.FORBIDDEN
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
 
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, basestring):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+            )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
 
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+        Args:
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
+
+        Returns:
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+        """
+
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
+            )
+        else:
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
+
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+            # we cap depth of generated events, to ensure that they are not
+            # rejected by other servers (and so that they can be persisted in
+            # the db)
+            depth = min(depth, MAX_DEPTH)
+        else:
+            depth = 1
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index dd03705279..f83c6b3cf8 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,7 @@ from synapse.api.errors import (
 )
 from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
-from synapse.types import UserID
+from synapse.types import UserID, create_requester, RoomID, RoomAlias
 from synapse.util.async import run_on_reactor, Linearizer
 from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
@@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler):
                     token = None
                     attempts += 1
 
+        # auto-join the user to any rooms we're supposed to dump them into
+        fake_requester = create_requester(user_id)
+        for r in self.hs.config.auto_join_rooms:
+            try:
+                yield self._join_user_to_room(fake_requester, r)
+            except Exception as e:
+                logger.error("Failed to join new user to %r: %r", r, e)
+
         # We used to generate default identicons here, but nowadays
         # we want clients to generate their own as part of their branding
         # rather than there being consistent matrix-wide ones, so we don't.
-
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler):
         )
 
         defer.returnValue((user_id, access_token))
+
+    @defer.inlineCallbacks
+    def _join_user_to_room(self, requester, room_identifier):
+        room_id = None
+        room_member_handler = self.hs.get_room_member_handler()
+        if RoomID.is_valid(room_identifier):
+            room_id = room_identifier
+        elif RoomAlias.is_valid(room_identifier):
+            room_alias = RoomAlias.from_string(room_identifier)
+            room_id, remote_room_hosts = (
+                yield room_member_handler.lookup_room_alias(room_alias)
+            )
+            room_id = room_id.to_string()
+        else:
+            raise SynapseError(400, "%s was not legal room ID or room alias" % (
+                room_identifier,
+            ))
+
+        yield room_member_handler.update_membership(
+            requester=requester,
+            target=requester.user,
+            room_id=room_id,
+            remote_room_hosts=remote_room_hosts,
+            action="join",
+        )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5d81f59b44..add3f9b009 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -20,7 +20,6 @@ from ._base import BaseHandler
 from synapse.api.constants import (
     EventTypes, JoinRules,
 )
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
@@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache(hs)
-        self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+        self.response_cache = ResponseCache(hs, "room_list")
+        self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+                                                   timeout_ms=30 * 1000)
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
@@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler):
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            logger.info("No cached result, calculating one.")
-            result = self.response_cache.set(
-                key,
-                preserve_fn(self._get_public_room_list)(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        else:
-            logger.info("Using cached deferred result.")
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
@@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 9977be8831..714583f1d5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
     @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
-        prev_event_ids,
+        prev_events_and_hashes,
         txn_id=None,
         ratelimit=True,
         content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
             },
             token_id=requester.access_token_id,
             txn_id=txn_id,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
                     403, "Invites have been disabled on this server",
                 )
 
-        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+            room_id,
+        )
+        latest_event_ids = (
+            event_id for (event_id, _, _) in prev_events_and_hashes
+        )
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
             membership=effective_membership_state,
             txn_id=txn_id,
             ratelimit=ratelimit,
-            prev_event_ids=latest_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
             content=content,
         )
         defer.returnValue(res)
@@ -852,6 +857,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
         """
+        # filter ourselves out of remote_room_hosts: do_invite_join ignores it
+        # and if it is the only entry we'd like to return a 404 rather than a
+        # 500.
+
+        remote_room_hosts = [
+            host for host in remote_room_hosts if host != self.hs.hostname
+        ]
+
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f713ce038..b52e4c2aff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
 
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
         to tell if room needs to be part of the sync result.
         """
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             # nb the notification count does not, er, count: if there's nothing
             # else in the result, we don't need to send it.
         )
+    __bool__ = __nonzero__  # python3
 
 
 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
             or self.state
             or self.account_data
         )
+    __bool__ = __nonzero__  # python3
 
 
 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
     def __nonzero__(self):
         """Invited rooms should always be reported to the client"""
         return True
+    __bool__ = __nonzero__  # python3
 
 
 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
 
     def __nonzero__(self):
         return bool(self.join or self.invite or self.leave)
+    __bool__ = __nonzero__  # python3
 
 
 class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
 
     def __nonzero__(self):
         return bool(self.changed or self.left)
+    __bool__ = __nonzero__  # python3
 
 
 class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
             self.device_lists or
             self.groups
         )
+    __bool__ = __nonzero__  # python3
 
 
 class SyncHandler(object):
@@ -169,7 +176,7 @@ class SyncHandler(object):
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache(hs)
+        self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -180,15 +187,11 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        result = self.response_cache.get(sync_config.request_key)
-        if not result:
-            result = self.response_cache.set(
-                sync_config.request_key,
-                preserve_fn(self._wait_for_sync_for_user)(
-                    sync_config, since_token, timeout, full_state
-                )
-            )
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            sync_config.request_key,
+            self._wait_for_sync_for_user,
+            sync_config, since_token, timeout, full_state,
+        )
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87639b9151..00572c2897 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import socket
-
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.internet import defer, reactor
 from twisted.internet.error import ConnectError
@@ -33,7 +31,7 @@ SERVER_CACHE = {}
 
 # our record of an individual server which can be tried to reach a destination.
 #
-# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# "host" is the hostname acquired from the SRV record. Except when there's
 # no SRV record, in which case it is the original hostname.
 _Server = collections.namedtuple(
     "_Server", "priority weight host port expires"
@@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
 
             payload = answer.payload
 
-            hosts = yield _get_hosts_for_srv_record(
-                dns_client, str(payload.target)
-            )
-
-            for (ip, ttl) in hosts:
-                host_ttl = min(answer.ttl, ttl)
-
-                servers.append(_Server(
-                    host=ip,
-                    port=int(payload.port),
-                    priority=int(payload.priority),
-                    weight=int(payload.weight),
-                    expires=int(clock.time()) + host_ttl,
-                ))
+            servers.append(_Server(
+                host=str(payload.target),
+                port=int(payload.port),
+                priority=int(payload.priority),
+                weight=int(payload.weight),
+                expires=int(clock.time()) + answer.ttl,
+            ))
 
         servers.sort()
         cache[service_name] = list(servers)
@@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
             raise e
 
     defer.returnValue(servers)
-
-
-@defer.inlineCallbacks
-def _get_hosts_for_srv_record(dns_client, host):
-    """Look up each of the hosts in a SRV record
-
-    Args:
-        dns_client (twisted.names.dns.IResolver):
-        host (basestring): host to look up
-
-    Returns:
-        Deferred[list[(str, int)]]: a list of (host, ttl) pairs
-
-    """
-    ip4_servers = []
-    ip6_servers = []
-
-    def cb(res):
-        # lookupAddress and lookupIP6Address return a three-tuple
-        # giving the answer, authority, and additional sections of the
-        # response.
-        #
-        # we only care about the answers.
-
-        return res[0]
-
-    def eb(res, record_type):
-        if res.check(DNSNameError):
-            return []
-        logger.warn("Error looking up %s for %s: %s", record_type, host, res)
-        return res
-
-    # no logcontexts here, so we can safely fire these off and gatherResults
-    d1 = dns_client.lookupAddress(host).addCallbacks(
-        cb, eb, errbackArgs=("A", ))
-    d2 = dns_client.lookupIPV6Address(host).addCallbacks(
-        cb, eb, errbackArgs=("AAAA", ))
-    results = yield defer.DeferredList(
-        [d1, d2], consumeErrors=True)
-
-    # if all of the lookups failed, raise an exception rather than blowing out
-    # the cache with an empty result.
-    if results and all(s == defer.FAILURE for (s, _) in results):
-        defer.returnValue(results[0][1])
-
-    for (success, result) in results:
-        if success == defer.FAILURE:
-            continue
-
-        for answer in result:
-            if not answer.payload:
-                continue
-
-            try:
-                if answer.type == dns.A:
-                    ip = answer.payload.dottedQuad()
-                    ip4_servers.append((ip, answer.ttl))
-                elif answer.type == dns.AAAA:
-                    ip = socket.inet_ntop(
-                        socket.AF_INET6, answer.payload.address,
-                    )
-                    ip6_servers.append((ip, answer.ttl))
-                else:
-                    # the most likely candidate here is a CNAME record.
-                    # rfc2782 says srvs may not point to aliases.
-                    logger.warn(
-                        "Ignoring unexpected DNS record type %s for %s",
-                        answer.type, host,
-                    )
-                    continue
-            except Exception as e:
-                logger.warn("Ignoring invalid DNS response for %s: %s",
-                            host, e)
-                continue
-
-    # keep the ipv4 results before the ipv6 results, mostly to match historical
-    # behaviour.
-    defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 64e083ebfc..8d632290de 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
                 register_paths, so will return (possibly via Deferred) either
                 None, or a tuple of (http code, response body).
         """
-        if request.method == "OPTIONS":
+        if request.method == b"OPTIONS":
             return _options_handler, {}
 
         # Loop through all the registered callbacks to check if the method
@@ -543,7 +543,7 @@ def finish_request(request):
 
 def _request_user_agent_is_curl(request):
     user_agents = request.requestHeaders.getRawHeaders(
-        "User-Agent", default=[]
+        b"User-Agent", default=[]
     )
     for user_agent in user_agents:
         if "curl" in user_agent:
diff --git a/synapse/http/site.py b/synapse/http/site.py
index e422c8dfae..c8b46e1af2 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -20,7 +20,7 @@ import logging
 import re
 import time
 
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
 
 
 class SynapseRequest(Request):
@@ -43,12 +43,12 @@ class SynapseRequest(Request):
 
     def get_redacted_uri(self):
         return ACCESS_TOKEN_RE.sub(
-            r'\1<redacted>\3',
+            br'\1<redacted>\3',
             self.uri
         )
 
     def get_user_agent(self):
-        return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
+        return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
 
     def started_processing(self):
         self.site.access_logger.info(
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 50d99d7a5c..e3b831db67 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -17,12 +17,13 @@ import logging
 import functools
 import time
 import gc
+import platform
 
 from twisted.internet import reactor
 
 from .metric import (
     CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
-    MemoryUsageMetric,
+    MemoryUsageMetric, GaugeMetric,
 )
 from .process_collector import register_process_collector
 
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
 logger = logging.getLogger(__name__)
 
 
+running_on_pypy = platform.python_implementation() == 'PyPy'
 all_metrics = []
 all_collectors = []
 
@@ -63,6 +65,13 @@ class Metrics(object):
         """
         return self._register(CounterMetric, *args, **kwargs)
 
+    def register_gauge(self, *args, **kwargs):
+        """
+        Returns:
+            GaugeMetric
+        """
+        return self._register(GaugeMetric, *args, **kwargs)
+
     def register_callback(self, *args, **kwargs):
         """
         Returns:
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
 tick_time = reactor_metrics.register_distribution("tick_time")
 pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
 
+synapse_metrics = get_metrics_for("synapse")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = synapse_metrics.register_gauge(
+    "event_processing_positions", labels=["name"],
+)
+
+# Used to track the current max events stream position
+event_persisted_position = synapse_metrics.register_gauge(
+    "event_persisted_position",
+)
+
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = synapse_metrics.register_gauge(
+    "event_processing_last_ts", labels=["name"],
+)
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = synapse_metrics.register_gauge(
+    "event_processing_lag", labels=["name"],
+)
+
 
 def runUntilCurrentTimer(func):
 
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
         tick_time.inc_by(end - start)
         pending_calls_metric.inc_by(num_pending)
 
+        if running_on_pypy:
+            return ret
+
         # Check if we need to do a manual GC (since its been disabled), and do
         # one if necessary.
         threshold = gc.get_threshold()
@@ -206,6 +244,7 @@ try:
 
     # We manually run the GC each reactor tick so that we can get some metrics
     # about time spent doing GC,
-    gc.disable()
+    if not running_on_pypy:
+        gc.disable()
 except AttributeError:
     pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index ff5aa8c0e1..89bd47c3f7 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
         # dict[list[str]]: value for each set of label values. the keys are the
         # label values, in the same order as the labels in self.labels.
         #
-        # (if the metric is a scalar, the (single) key is the empty list).
+        # (if the metric is a scalar, the (single) key is the empty tuple).
         self.counts = {}
 
         # Scalar metrics are never empty
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
         )
 
 
+class GaugeMetric(BaseMetric):
+    """A metric that can go up or down
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(GaugeMetric, self).__init__(*args, **kwargs)
+
+        # dict[list[str]]: value for each set of label values. the keys are the
+        # label values, in the same order as the labels in self.labels.
+        #
+        # (if the metric is a scalar, the (single) key is the empty tuple).
+        self.guages = {}
+
+    def set(self, v, *values):
+        if len(values) != self.dimension():
+            raise ValueError(
+                "Expected as many values to inc() as labels (%d)" % (self.dimension())
+            )
+
+        # TODO: should assert that the tag values are all strings
+
+        self.guages[values] = v
+
+    def render(self):
+        return flatten(
+            self._render_for_labels(k, self.guages[k])
+            for k in sorted(self.guages.keys())
+        )
+
+
 class CallbackMetric(BaseMetric):
     """A metric that returns the numeric value returned by a callback whenever
     it is rendered. Typically this is used to implement gauges that yield the
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..0e40a4aad6 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -144,6 +144,7 @@ class _NotifierUserStream(object):
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
     def __nonzero__(self):
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class Notifier(object):
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 5cabf7dabe..711cbb6c50 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,5 +1,6 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,6 +19,18 @@ from distutils.version import LooseVersion
 
 logger = logging.getLogger(__name__)
 
+# this dict maps from python package name to a list of modules we expect it to
+# provide.
+#
+# the key is a "requirement specifier", as used as a parameter to `pip
+# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
+#
+# the value is a sequence of strings; each entry should be the name of the
+# python module, optionally followed by a version assertion which can be either
+# ">=<ver>" or "==<ver>".
+#
+# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
+# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
 REQUIREMENTS = {
     "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
     "frozendict>=0.4": ["frozendict"],
@@ -26,7 +39,11 @@ REQUIREMENTS = {
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
-    "Twisted>=16.0.0": ["twisted>=16.0.0"],
+
+    # we break under Twisted 18.4
+    # (https://github.com/matrix-org/synapse/issues/3135)
+    "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
+
     "pyopenssl>=0.14": ["OpenSSL>=0.14"],
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
@@ -39,6 +56,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "six": ["six"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index bbe2f967b7..a9baa2c1c3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
 
@@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet):
         self.clock = hs.get_clock()
 
         # The responses are tiny, so we may as well cache them for a while
-        self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+        self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
 
     def on_PUT(self, request, event_id):
-        result = self.response_cache.get(event_id)
-        if not result:
-            result = self.response_cache.set(
-                event_id,
-                self._handle_request(request)
-            )
-        else:
-            logger.warn("Returning cached response")
-        return make_deferred_yieldable(result)
-
-    @preserve_fn
+        return self.response_cache.wrap(
+            event_id,
+            self._handle_request,
+            request
+        )
+
     @defer.inlineCallbacks
     def _handle_request(self, request):
         with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index ca49955935..e092158cb7 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet):
             requester = yield self.auth.get_user_by_req(request)
         except AuthError:
             # this implies the access token has already been deleted.
-            pass
+            defer.returnValue((401, {
+                "errcode": "M_UNKNOWN_TOKEN",
+                "error": "Access Token unknown or expired"
+            }))
         else:
             if requester.device_id is None:
                 # the acccess token wasn't associated with a device.
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 5c5fa8f7ab..8a82097178 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
         admin = register_json.get("admin", None)
 
         # Its important to check as we use null bytes as HMAC field separators
-        if "\x00" in user:
+        if b"\x00" in user:
             raise SynapseError(400, "Invalid user")
-        if "\x00" in password:
+        if b"\x00" in password:
             raise SynapseError(400, "Invalid password")
 
         # str() because otherwise hmac complains that 'unicode' does not
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index d06cbdc35e..2ad0e5943b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
                 content=content,
             )
         else:
-            event, context = yield self.event_creation_hander.create_event(
+            event = yield self.event_creation_hander.create_and_send_nonmember_event(
                 requester,
                 event_dict,
-                token_id=requester.access_token_id,
                 txn_id=txn_id,
             )
 
-            yield self.event_creation_hander.send_nonmember_event(
-                requester, event, context,
-            )
-
         ret = {}
         if event:
             ret = {"event_id": event.event_id}
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 0ba62bddc1..f317c919dc 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -20,7 +20,6 @@ import synapse
 import synapse.types
 from synapse.api.auth import get_access_token_from_request, has_access_token
 from synapse.api.constants import LoginType
-from synapse.types import RoomID, RoomAlias
 from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
 from synapse.http.servlet import (
     RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
@@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet):
                 generate_token=False,
             )
 
-            # auto-join the user to any rooms we're supposed to dump them into
-            fake_requester = synapse.types.create_requester(registered_user_id)
-            for r in self.hs.config.auto_join_rooms:
-                try:
-                    yield self._join_user_to_room(fake_requester, r)
-                except Exception as e:
-                    logger.error("Failed to join new user to %r: %r", r, e)
-
             # remember that we've now registered that user account, and with
             #  what user ID (since the user may not have specified)
             self.auth_handler.set_session_data(
@@ -446,29 +437,6 @@ class RegisterRestServlet(RestServlet):
         return 200, {}
 
     @defer.inlineCallbacks
-    def _join_user_to_room(self, requester, room_identifier):
-        room_id = None
-        if RoomID.is_valid(room_identifier):
-            room_id = room_identifier
-        elif RoomAlias.is_valid(room_identifier):
-            room_alias = RoomAlias.from_string(room_identifier)
-            room_id, remote_room_hosts = (
-                yield self.room_member_handler.lookup_room_alias(room_alias)
-            )
-            room_id = room_id.to_string()
-        else:
-            raise SynapseError(400, "%s was not legal room ID or room alias" % (
-                room_identifier,
-            ))
-
-        yield self.room_member_handler.update_membership(
-            requester=requester,
-            target=requester.user,
-            room_id=room_id,
-            action="join",
-        )
-
-    @defer.inlineCallbacks
     def _do_appservice_registration(self, username, as_token, body):
         user_id = yield self.registration_handler.appservice_register(
             username, as_token
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 83471b3173..7f263db239 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -16,6 +16,8 @@
 from twisted.internet import defer, threads
 from twisted.protocols.basic import FileSender
 
+import six
+
 from ._base import Responder
 
 from synapse.util.file_consumer import BackgroundFileConsumer
@@ -119,7 +121,7 @@ class MediaStorage(object):
                 os.remove(fname)
             except Exception:
                 pass
-            raise t, v, tb
+            six.reraise(t, v, tb)
 
         if not finished_called:
             raise Exception("Finished callback not called")
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eacd49d6a5..8cdfd50f90 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore,
     def count_r30_users(self):
         """
         Counts the number of 30 day retained users, defined as:-
-         * Users who have created their accounts more than 30 days
+         * Users who have created their accounts more than 30 days ago
          * Where last seen at most 30 days ago
-         * Where account creation and last_seen are > 30 days
+         * Where account creation and last_seen are > 30 days apart
 
          Returns counts globaly for a given user as well as breaking
          by platform
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2fbebd4907..2262776ab2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -376,7 +376,7 @@ class SQLBaseStore(object):
         Returns:
             A list of dicts where the key is the column header.
         """
-        col_headers = list(intern(column[0]) for column in cursor.description)
+        col_headers = list(intern(str(column[0])) for column in cursor.description)
         results = list(
             dict(zip(col_headers, row)) for row in cursor
         )
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 338b495611..8c868ece75 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
 from .sqlite3 import Sqlite3Engine
 
 import importlib
+import platform
 
 
 SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
+        # pypy requires psycopg2cffi rather than psycopg2
+        if (name == "psycopg2" and
+                platform.python_implementation() == "PyPy"):
+            name = "psycopg2cffi"
         module = importlib.import_module(name)
         return engine_class(module, database_config)
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d300..8fbf7ffba7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import random
 
 from twisted.internet import defer
 
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
 import logging
-from Queue import PriorityQueue, Empty
+from six.moves.queue import PriorityQueue, Empty
+
+from six.moves import range
 
 
 logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             front_list = list(front)
             chunks = [
                 front_list[x:x + 100]
-                for x in xrange(0, len(front), 100)
+                for x in range(0, len(front), 100)
             ]
             for chunk in chunks:
                 txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             retcol="event_id",
         )
 
+    @defer.inlineCallbacks
+    def get_prev_events_for_room(self, room_id):
+        """
+        Gets a subset of the current forward extremities in the given room.
+
+        Limits the result to 10 extremities, so that we can avoid creating
+        events which refer to hundreds of prev_events.
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+        res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+        if len(res) > 10:
+            # Sort by reverse depth, so we point to the most recent.
+            res.sort(key=lambda a: -a[2])
+
+            # we use half of the limit for the actual most recent events, and
+            # the other half to randomly point to some of the older events, to
+            # make sure that we don't completely ignore the older events.
+            res = res[0:5] + random.sample(res[5:], 5)
+
+        defer.returnValue(res)
+
     def get_latest_event_ids_and_hashes_in_room(self, room_id):
+        """
+        Gets the current forward extremities in the given room
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+
         return self.runInteraction(
             "get_latest_event_ids_and_hashes_in_room",
             self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             room_id,
         )
 
-    @defer.inlineCallbacks
-    def get_max_depth_of_events(self, event_ids):
-        sql = (
-            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
-        ) % (",".join(["?"] * len(event_ids)),)
-
-        rows = yield self._execute(
-            "get_max_depth_of_events", None,
-            sql, *event_ids
-        )
-
-        if rows:
-            defer.returnValue(rows[0][0])
-        else:
-            defer.returnValue(1)
-
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ece5e6c41f..5fe4a0e56c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@
 
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
+import itertools
 import logging
 
 import simplejson as json
@@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore):
                     new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc_by(len(chunk))
+                synapse.metrics.event_persisted_position.set(
+                    chunk[-1][0].internal_metadata.stream_ordering,
+                )
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"
@@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
 
         defer.returnValue(set(r["event_id"] for r in rows))
 
-    def have_events(self, event_ids):
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
         Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
         """
         if not event_ids:
             return defer.succeed({})
@@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
 
             return res
 
-        return self.runInteraction(
-            "have_events", f,
-        )
+        return self.runInteraction("get_rejection_reasons", f)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78ba..a937b9bceb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
 class EventsWorkerStore(SQLBaseStore):
+    def get_received_ts(self, event_id):
+        """Get received_ts (when it was persisted) for the event.
+
+        Raises an exception for unknown events.
+
+        Args:
+            event_id (str)
+
+        Returns:
+            Deferred[int|None]: Timestamp in milliseconds, or None for events
+            that were persisted before received_ts was implemented.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={
+                "event_id": event_id,
+            },
+            retcol="received_ts",
+            desc="get_received_ts",
+        )
 
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
             # Convert the IDs to MXC URIs
             for media_id in local_mxcs:
-                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
             for hostname, media_id in remote_mxcs:
                 remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
 
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         while next_token:
             sql = """
                 SELECT stream_ordering, json FROM events
-                JOIN event_json USING (event_id)
+                JOIN event_json USING (room_id, event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
-                        if hostname == self.hostname:
+                        if hostname == self.hs.hostname:
                             local_media_mxcs.append(media_id)
                         else:
                             remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
             sql = (
                 "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
-                " JOIN event_json USING (event_id)"
+                " JOIN event_json USING (room_id, event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
diff --git a/synapse/types.py b/synapse/types.py
index 7cb24cecb2..cc7c182a78 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -169,7 +169,7 @@ class DomainSpecificString(
         except Exception:
             return False
 
-    __str__ = to_string
+    __repr__ = to_string
 
 
 class UserID(DomainSpecificString):
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af539880..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,8 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
+from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -24,20 +31,68 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self, hs, timeout_ms=0):
+    def __init__(self, hs, name, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
+        self._metrics = cache_metrics.register_cache(
+            "response_cache",
+            size_callback=lambda: self.size(),
+            cache_name=name,
+        )
+
+    def size(self):
+        return len(self.pending_result_cache)
+
     def get(self, key):
+        """Look up the given key.
+
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
+
+        Args:
+            key (hashable):
+
+        Returns:
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
+        """
         result = self.pending_result_cache.get(key)
         if result is not None:
+            self._metrics.inc_hits()
             return result.observe()
         else:
+            self._metrics.inc_misses()
             return None
 
     def set(self, key, deferred):
+        """Set the entry for the given key to the given deferred.
+
+        *deferred* should run its callbacks in the sentinel logcontext (ie,
+        you should wrap normal synapse deferreds with
+        logcontext.run_in_background).
+
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        Args:
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
+
+        Returns:
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
+        """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
 
@@ -53,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3c8a165331 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
 
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
-import Queue
+from six.moves import queue
 
 
 class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self._bytes_queue = Queue.Queue()
+        self._bytes_queue = queue.Queue()
 
         # Deferred that is resolved when finished writing
         self._finished_deferred = None
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec785b..d59adc236e 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object):
 
         def __nonzero__(self):
             return False
+        __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()