summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/appservice/scheduler.py153
-rw-r--r--synapse/config/emailconfig.py2
-rw-r--r--synapse/config/key.py34
-rw-r--r--synapse/crypto/keyring.py23
-rw-r--r--synapse/federation/federation_server.py16
-rw-r--r--synapse/federation/sender/transaction_manager.py170
-rw-r--r--synapse/federation/transport/server.py43
-rw-r--r--synapse/federation/units.py3
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/devicemessage.py27
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/e2e_room_keys.py28
-rw-r--r--synapse/handlers/pagination.py17
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/sync.py22
-rw-r--r--synapse/http/federation/matrix_federation_agent.py26
-rw-r--r--synapse/http/federation/well_known_resolver.py126
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/logging/opentracing.py237
-rw-r--r--synapse/replication/http/_base.py16
-rw-r--r--synapse/rest/admin/__init__.py4
-rw-r--r--synapse/rest/admin/purge_room_servlet.py57
-rw-r--r--synapse/rest/admin/users.py76
-rw-r--r--synapse/rest/client/v2_alpha/keys.py13
-rw-r--r--synapse/rest/client/v2_alpha/register.py57
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py28
-rw-r--r--synapse/rest/well_known.py2
-rw-r--r--synapse/storage/devices.py39
-rw-r--r--synapse/storage/e2e_room_keys.py14
-rw-r--r--synapse/storage/end_to_end_keys.py38
-rw-r--r--synapse/storage/events.py151
-rw-r--r--synapse/storage/prepare_database.py24
-rw-r--r--synapse/storage/registration.py23
-rw-r--r--synapse/storage/room.py35
-rw-r--r--synapse/storage/schema/delta/56/add_spans_to_device_lists.sql20
-rw-r--r--synapse/storage/schema/delta/56/drop_unused_event_tables.sql20
-rw-r--r--synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql17
37 files changed, 1163 insertions, 464 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 42a350bff8..9998f822f1 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
         self.store = hs.get_datastore()
         self.as_api = hs.get_application_service_api()
 
-        def create_recoverer(service, callback):
-            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
-        self.txn_ctrl = _TransactionController(
-            self.clock, self.store, self.as_api, create_recoverer
-        )
+        self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
         logger.info("Starting appservice scheduler")
+
         # check for any DOWN ASes and start recoverers for them.
-        recoverers = yield _Recoverer.start(
-            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        services = yield self.store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
         )
-        self.txn_ctrl.add_recoverers(recoverers)
+
+        for service in services:
+            self.txn_ctrl.start_recoverer(service)
 
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
 
 
 class _ServiceQueuer(object):
-    """Queues events for the same application service together, sending
-    transactions as soon as possible. Once a transaction is sent successfully,
-    this schedules any other events in the queue to run.
+    """Queue of events waiting to be sent to appservices.
+
+    Groups events into transactions per-appservice, and sends them on to the
+    TransactionController. Makes sure that we only have one transaction in flight per
+    appservice at a given time.
     """
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+
+        # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-    def __init__(self, clock, store, as_api, recoverer_fn):
+    """Transaction manager.
+
+    Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+    if a transaction fails.
+
+    (Note we have only have one of these in the homeserver.)
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+    """
+
+    def __init__(self, clock, store, as_api):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.recoverer_fn = recoverer_fn
-        # keep track of how many recoverers there are
-        self.recoverers = []
+
+        # map from service id to recoverer instance
+        self.recoverers = {}
+
+        # for UTs
+        self.RECOVERER_CLASS = _Recoverer
 
     @defer.inlineCallbacks
     def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    run_in_background(self._start_recoverer, service)
+                    run_in_background(self._on_txn_fail, service)
         except Exception:
             logger.exception("Error creating appservice transaction")
-            run_in_background(self._start_recoverer, service)
+            run_in_background(self._on_txn_fail, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
-        self.recoverers.remove(recoverer)
         logger.info(
             "Successfully recovered application service AS ID %s", recoverer.service.id
         )
+        self.recoverers.pop(recoverer.service.id)
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
             recoverer.service, ApplicationServiceState.UP
         )
 
-    def add_recoverers(self, recoverers):
-        for r in recoverers:
-            self.recoverers.append(r)
-        if len(recoverers) > 0:
-            logger.info("New active recoverers: %s", len(self.recoverers))
-
     @defer.inlineCallbacks
-    def _start_recoverer(self, service):
+    def _on_txn_fail(self, service):
         try:
             yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
-            logger.info(
-                "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id,
-            )
-            recoverer = self.recoverer_fn(service, self.on_recovered)
-            self.add_recoverers([recoverer])
-            recoverer.recover()
+            self.start_recoverer(service)
         except Exception:
             logger.exception("Error starting AS recoverer")
 
+    def start_recoverer(self, service):
+        """Start a Recoverer for the given service
+
+        Args:
+            service (synapse.appservice.ApplicationService):
+        """
+        logger.info("Starting recoverer for AS ID %s", service.id)
+        assert service.id not in self.recoverers
+        recoverer = self.RECOVERER_CLASS(
+            self.clock, self.store, self.as_api, service, self.on_recovered
+        )
+        self.recoverers[service.id] = recoverer
+        recoverer.recover()
+        logger.info("Now %i active recoverers", len(self.recoverers))
+
     @defer.inlineCallbacks
     def _is_service_up(self, service):
         state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-    @staticmethod
-    @defer.inlineCallbacks
-    def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
-        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
-        for r in recoverers:
-            logger.info(
-                "Starting recoverer for AS ID %s which was marked as " "DOWN",
-                r.service.id,
-            )
-            r.recover()
-        return recoverers
+    """Manages retries and backoff for a DOWN appservice.
+
+    We have one of these for each appservice which is currently considered DOWN.
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+        service (synapse.appservice.ApplicationService): the service we are managing
+        callback (callable[_Recoverer]): called once the service recovers.
+    """
 
     def __init__(self, clock, store, as_api, service, callback):
         self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
                 "as-recoverer-%s" % (self.service.id,), self.retry
             )
 
-        self.clock.call_later((2 ** self.backoff_counter), _retry)
+        delay = 2 ** self.backoff_counter
+        logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
+        self.clock.call_later(delay, _retry)
 
     def _backoff(self):
         # cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
 
     @defer.inlineCallbacks
     def retry(self):
+        logger.info("Starting retries on %s", self.service.id)
         try:
-            txn = yield self.store.get_oldest_unsent_txn(self.service)
-            if txn:
+            while True:
+                txn = yield self.store.get_oldest_unsent_txn(self.service)
+                if not txn:
+                    # nothing left: we're done!
+                    self.callback(self)
+                    return
+
                 logger.info(
                     "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
                 )
                 sent = yield txn.send(self.as_api)
-                if sent:
-                    yield txn.complete(self.store)
-                    # reset the backoff counter and retry immediately
-                    self.backoff_counter = 1
-                    yield self.retry()
-                else:
-                    self._backoff()
-            else:
-                self._set_service_recovered()
-        except Exception as e:
-            logger.exception(e)
-            self._backoff()
-
-    def _set_service_recovered(self):
-        self.callback(self)
+                if not sent:
+                    break
+
+                yield txn.complete(self.store)
+
+                # reset the backoff counter and then process the next transaction
+                self.backoff_counter = 1
+
+        except Exception:
+            logger.exception("Unexpected error running retries")
+
+        # we didn't manage to send all of the transactions before we got an error of
+        # some flavour: reschedule the next retry.
+        self._backoff()
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 36d01a10af..f83c05df44 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -115,7 +115,7 @@ class EmailConfig(Config):
                     missing.append("email." + k)
 
             if config.get("public_baseurl") is None:
-                missing.append("public_base_url")
+                missing.append("public_baseurl")
 
             if len(missing) > 0:
                 raise RuntimeError(
diff --git a/synapse/config/key.py b/synapse/config/key.py
index fe8386985c..ba2199bceb 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -76,7 +76,7 @@ class KeyConfig(Config):
                     config_dir_path, config["server_name"] + ".signing.key"
                 )
 
-            self.signing_key = self.read_signing_key(signing_key_path)
+            self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
 
         self.old_signing_keys = self.read_old_signing_keys(
             config.get("old_signing_keys", {})
@@ -85,6 +85,14 @@ class KeyConfig(Config):
             config.get("key_refresh_interval", "1d")
         )
 
+        key_server_signing_keys_path = config.get("key_server_signing_keys_path")
+        if key_server_signing_keys_path:
+            self.key_server_signing_keys = self.read_signing_keys(
+                key_server_signing_keys_path, "key_server_signing_keys_path"
+            )
+        else:
+            self.key_server_signing_keys = list(self.signing_key)
+
         # if neither trusted_key_servers nor perspectives are given, use the default.
         if "perspectives" not in config and "trusted_key_servers" not in config:
             key_servers = [{"server_name": "matrix.org"}]
@@ -210,16 +218,34 @@ class KeyConfig(Config):
         #
         #trusted_key_servers:
         #  - server_name: "matrix.org"
+        #
+
+        # The signing keys to use when acting as a trusted key server. If not specified
+        # defaults to the server signing key.
+        #
+        # Can contain multiple keys, one per line.
+        #
+        #key_server_signing_keys_path: "key_server_signing_keys.key"
         """
             % locals()
         )
 
-    def read_signing_key(self, signing_key_path):
-        signing_keys = self.read_file(signing_key_path, "signing_key")
+    def read_signing_keys(self, signing_key_path, name):
+        """Read the signing keys in the given path.
+
+        Args:
+            signing_key_path (str)
+            name (str): Associated config key name
+
+        Returns:
+            list[SigningKey]
+        """
+
+        signing_keys = self.read_file(signing_key_path, name)
         try:
             return read_signing_keys(signing_keys.splitlines(True))
         except Exception as e:
-            raise ConfigError("Error reading signing_key: %s" % (str(e)))
+            raise ConfigError("Error reading %s: %s" % (name, str(e)))
 
     def read_old_signing_keys(self, old_signing_keys):
         keys = {}
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 6c3e885e72..7cfad192e8 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -18,7 +18,6 @@ import logging
 from collections import defaultdict
 
 import six
-from six import raise_from
 from six.moves import urllib
 
 import attr
@@ -30,7 +29,6 @@ from signedjson.key import (
 from signedjson.sign import (
     SignatureVerifyException,
     encode_canonical_json,
-    sign_json,
     signature_ids,
     verify_signed_json,
 )
@@ -540,13 +538,7 @@ class BaseV2KeyFetcher(object):
                     verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
                 )
 
-        # re-sign the json with our own key, so that it is ready if we are asked to
-        # give it out as a notary server
-        signed_key_json = sign_json(
-            response_json, self.config.server_name, self.config.signing_key[0]
-        )
-
-        signed_key_json_bytes = encode_canonical_json(signed_key_json)
+        key_json_bytes = encode_canonical_json(response_json)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -558,7 +550,7 @@ class BaseV2KeyFetcher(object):
                         from_server=from_server,
                         ts_now_ms=time_added_ms,
                         ts_expires_ms=ts_valid_until_ms,
-                        key_json_bytes=signed_key_json_bytes,
+                        key_json_bytes=key_json_bytes,
                     )
                     for key_id in verify_keys
                 ],
@@ -657,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 },
             )
         except (NotRetryingDestination, RequestSendFailed) as e:
-            raise_from(KeyLookupError("Failed to connect to remote server"), e)
+            # these both have str() representations which we can't really improve upon
+            raise KeyLookupError(str(e))
         except HttpResponseException as e:
-            raise_from(KeyLookupError("Remote server returned an error"), e)
+            raise KeyLookupError("Remote server returned an error: %s" % (e,))
 
         keys = {}
         added_keys = []
@@ -821,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
                     timeout=10000,
                 )
             except (NotRetryingDestination, RequestSendFailed) as e:
-                raise_from(KeyLookupError("Failed to connect to remote server"), e)
+                # these both have str() representations which we can't really improve
+                # upon
+                raise KeyLookupError(str(e))
             except HttpResponseException as e:
-                raise_from(KeyLookupError("Remote server returned an error"), e)
+                raise KeyLookupError("Remote server returned an error: %s" % (e,))
 
             if response["server_name"] != server_name:
                 raise KeyLookupError(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d216c46dfe..05fd49f3c1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
 from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
 from synapse.logging.utils import log_function
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
     def on_query_user_devices(self, origin, user_id):
         return self.on_query_request("user_devices", user_id)
 
+    @trace
     @defer.inlineCallbacks
     @log_function
     def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
             for device_id, algorithm in device_keys.items():
                 query.append((user_id, device_id, algorithm))
 
+        log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
         results = yield self.store.claim_e2e_one_time_keys(query)
 
         json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
-        try:
-            yield handler(origin, content)
-        except SynapseError as e:
-            logger.info("Failed to handle edu %r: %r", edu_type, e)
-        except Exception:
-            logger.exception("Failed to handle edu %r", edu_type)
+        with start_active_span_from_edu(content, "handle_edu"):
+            try:
+                yield handler(origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception:
+                logger.exception("Failed to handle edu %r", edu_type)
 
     def on_query(self, query_type, args):
         handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
 # limitations under the License.
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import HttpResponseException
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+    extract_text_map,
+    set_tag,
+    start_active_span_follows_from,
+    tags,
+)
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
     @defer.inlineCallbacks
     def send_new_transaction(self, destination, pending_pdus, pending_edus):
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
+        # Make a transaction-sending opentracing span. This span follows on from
+        # all the edus in that transaction. This needs to be done since there is
+        # no active span here, so if the edus were not received by the remote the
+        # span would have no causality and it would be forgotten.
+        # The span_contexts is a generator so that it won't be evaluated if
+        # opentracing is disabled. (Yay speed!)
 
-        success = True
+        span_contexts = (
+            extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+        )
 
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
+        with start_active_span_follows_from("send_transaction", span_contexts):
 
-        txn_id = str(self._next_txn_id)
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[1])
+            pdus = [x[0] for x in pending_pdus]
+            edus = pending_edus
 
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
-            destination,
-            txn_id,
-            len(pdus),
-            len(edus),
-        )
+            success = True
 
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self._server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-        self._next_txn_id += 1
+            txn_id = str(self._next_txn_id)
 
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
-            destination,
-            txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                destination,
+                txn_id,
+                len(pdus),
+                len(edus),
+            )
 
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self._transport_layer.send_transaction(
-                transaction, json_data_cb
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self._server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
             )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
 
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
-                raise e
+            self._next_txn_id += 1
 
-        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                destination,
+                txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+            )
 
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
+            # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def json_data_cb():
+                data = transaction.get_dict()
+                now = int(self.clock.time_msec())
+                if "pdus" in data:
+                    for p in data["pdus"]:
+                        if "age_ts" in p:
+                            unsigned = p.setdefault("unsigned", {})
+                            unsigned["age"] = now - int(p["age_ts"])
+                            del p["age_ts"]
+                return data
+
+            try:
+                response = yield self._transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
+
+                if e.code in (401, 404, 429) or 500 <= e.code:
+                    logger.info(
+                        "TX [%s] {%s} got %d response", destination, txn_id, code
+                    )
+                    raise e
+
+            logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+            if code == 200:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "TX [%s] {%s} Remote returned error for %s: %s",
+                            destination,
+                            txn_id,
+                            e_id,
+                            r,
+                        )
+            else:
+                for p in pdus:
                     logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        "TX [%s] {%s} Failed to send event %s",
                         destination,
                         txn_id,
-                        e_id,
-                        r,
+                        p.event_id,
                     )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination,
-                    txn_id,
-                    p.event_id,
-                )
-            success = False
+                success = False
 
-        return success
+            set_tag(tags.ERROR, not success)
+            return success
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a17148fc3c..dc53b4b170 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -38,7 +38,12 @@ from synapse.http.servlet import (
     parse_string_from_args,
 )
 from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import start_active_span_from_context, tags
+from synapse.logging.opentracing import (
+    start_active_span,
+    start_active_span_from_request,
+    tags,
+    whitelisted_homeserver,
+)
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
                 logger.warn("authenticate_request failed: %s", e)
                 raise
 
-            # Start an opentracing span
-            with start_active_span_from_context(
-                request.requestHeaders,
-                "incoming-federation-request",
-                tags={
-                    "request_id": request.get_request_id(),
-                    tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
-                    tags.HTTP_METHOD: request.get_method(),
-                    tags.HTTP_URL: request.get_redacted_uri(),
-                    tags.PEER_HOST_IPV6: request.getClientIP(),
-                    "authenticated_entity": origin,
-                    "servlet_name": request.request_metrics.name,
-                },
-            ):
+            request_tags = {
+                "request_id": request.get_request_id(),
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+                tags.HTTP_METHOD: request.get_method(),
+                tags.HTTP_URL: request.get_redacted_uri(),
+                tags.PEER_HOST_IPV6: request.getClientIP(),
+                "authenticated_entity": origin,
+                "servlet_name": request.request_metrics.name,
+            }
+
+            # Only accept the span context if the origin is authenticated
+            # and whitelisted
+            if origin and whitelisted_homeserver(origin):
+                scope = start_active_span_from_request(
+                    request, "incoming-federation-request", tags=request_tags
+                )
+            else:
+                scope = start_active_span(
+                    "incoming-federation-request", tags=request_tags
+                )
+
+            with scope:
                 if origin:
                     with ratelimiter.ratelimit(origin) as d:
                         await d
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 14aad8f09d..aa84621206 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
 
     internal_keys = ["origin", "destination"]
 
+    def get_context(self):
+        return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+
 
 class Transaction(JsonEncodedObject):
     """ A transaction is a list of Pdus and Edus to be sent to a remote home
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f22f56ca4..d30a68b650 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,16 @@ class AdminHandler(BaseHandler):
 
         return ret
 
+    def set_user_server_admin(self, user, admin):
+        """
+        Set the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+            admin (bool): whether or not the user should be an admin of this server
+        """
+        return self.store.set_server_admin(user, admin)
+
     @defer.inlineCallbacks
     def export_user_data(self, user_id, writer):
         """Write all data we have on the user to the given writer.
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..c7d56779b8 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    set_tag,
+    start_active_span,
+    whitelisted_homeserver,
+)
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = get_active_span_text_map()
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with start_active_span("to_device_for_user"):
+                set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "org.matrix.opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
+                }
 
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace
         @defer.inlineCallbacks
         def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
             ),
         )
 
+        log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
     @defer.inlineCallbacks
+    @tag_args
     def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
                 user_id,
                 time_now,
             )
+            log_kv(
+                {
+                    "message": "Updating device_keys for user.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             # TODO: Sign the JSON with the server key
             changed = yield self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
             if changed:
                 # Only notify about device updates *if* the keys actually changed
                 yield self.device_handler.notify_device_update(user_id, [device_id])
-
+        else:
+            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
+            log_kv(
+                {
+                    "message": "Updating one_time_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            log_kv(
+                {"message": "Did not update one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
     @defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
                     (algorithm, key_id, encode_canonical_json(key).decode("ascii"))
                 )
 
+        log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.opentracing import log_kv, trace
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            log_kv(results)
             return results
 
+    @trace
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
             session_id(str): the session whose room_key we're setting
             room_key(dict): the room_key being set
         """
-
+        log_kv(
+            {
+                "message": "Trying to upload room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "user_id": user_id,
+            }
+        )
         # get the room_key for this particular row
         current_room_key = None
         try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
             )
         except StoreError as e:
             if e.code == 404:
-                pass
+                log_kv(
+                    {
+                        "message": "Room key not found.",
+                        "room_id": room_id,
+                        "user_id": user_id,
+                    }
+                )
             else:
                 raise
 
         if self._should_replace_room_key(current_room_key, room_key):
+            log_kv({"message": "Replacing room key."})
             yield self.store.set_e2e_room_key(
                 user_id, version, room_id, session_id, room_key
             )
+        else:
+            log_kv({"message": "Not replacing room_key."})
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
         """
         return self._purges_by_id.get(purge_id)
 
+    async def purge_room(self, room_id):
+        """Purge the given room from the database"""
+        with (await self.pagination_lock.write(room_id)):
+            # check we know about the room
+            await self.store.get_room_version(room_id)
+
+            # first check that we have no users in this room
+            joined = await defer.maybeDeferred(
+                self.store.is_host_joined, room_id, self._server_name
+            )
+
+            if joined:
+                raise SynapseError(400, "Users are still joined to this room")
+
+            await self.store.purge_room(room_id)
+
     @defer.inlineCallbacks
     def get_messages(
         self,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 2cc237e6a5..8690f69d45 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -34,7 +34,7 @@ from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
-MAX_DISPLAYNAME_LEN = 100
+MAX_DISPLAYNAME_LEN = 256
 MAX_AVATAR_URL_LEN = 1000
 
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 98da2318a0..ef7f2ca980 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -786,9 +786,8 @@ class SyncHandler(object):
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
-                    # Its not clear how we get here, but empirically we do
-                    # (#5407). Logging has been added elsewhere to try and
-                    # figure out where this state comes from.
+                    # We can get here if the user has ignored the senders of all
+                    # the recent events.
                     state_at_timeline_start = yield self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
@@ -1771,20 +1770,9 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
-        if not batch and batch.limited:
-            # This resulted in #5407, which is weird, so lets log! We do it
-            # here as we have the maximum amount of information.
-            user_id = sync_result_builder.sync_config.user.to_string()
-            logger.info(
-                "Issue #5407: Found limited batch with no events. user %s, room %s,"
-                " sync_config %s, newly_joined %s, events %s, batch %s.",
-                user_id,
-                room_id,
-                sync_config,
-                newly_joined,
-                events,
-                batch,
-            )
+        # Note: `batch` can be both empty and limited here in the case where
+        # `_load_filtered_recents` can't find any events the user should see
+        # (e.g. due to having ignored the sender of the last 50 events).
 
         if newly_joined:
             # debug for https://github.com/matrix-org/synapse/issues/4422
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 71a15f434d..64f62aaeec 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
 
-        _well_known_cache (TTLCache|None):
-            TTLCache impl for storing cached well-known lookups. None to use a default
-            implementation.
+        _well_known_resolver (WellKnownResolver|None):
+            WellKnownResolver to use to perform well-known lookups. None to use a
+            default implementation.
     """
 
     def __init__(
@@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
         reactor,
         tls_client_options_factory,
         _srv_resolver=None,
-        _well_known_cache=None,
+        _well_known_resolver=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -76,15 +76,17 @@ class MatrixFederationAgent(object):
         self._pool.maxPersistentPerHost = 5
         self._pool.cachedConnectionTimeout = 2 * 60
 
-        self._well_known_resolver = WellKnownResolver(
-            self._reactor,
-            agent=Agent(
+        if _well_known_resolver is None:
+            _well_known_resolver = WellKnownResolver(
                 self._reactor,
-                pool=self._pool,
-                contextFactory=tls_client_options_factory,
-            ),
-            well_known_cache=_well_known_cache,
-        )
+                agent=Agent(
+                    self._reactor,
+                    pool=self._pool,
+                    contextFactory=tls_client_options_factory,
+                ),
+            )
+
+        self._well_known_resolver = _well_known_resolver
 
     @defer.inlineCallbacks
     def request(self, method, uri, headers=None, bodyProducer=None):
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index bb250c6922..5e9b0befb0 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -32,12 +32,19 @@ from synapse.util.metrics import Measure
 # period to cache .well-known results for by default
 WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
 
-# jitter to add to the .well-known default cache ttl
-WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+# jitter factor to add to the .well-known default cache ttls
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1
 
 # period to cache failure to fetch .well-known for
 WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
 
+# period to cache failure to fetch .well-known if there has recently been a
+# valid well-known for that domain.
+WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
+
+# period to remember there was a valid well-known after valid record expires
+WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
+
 # cap for .well-known cache period
 WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
 
@@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
 # we'll start trying to refetch 1 minute before it expires.
 WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
 
+# Number of times we retry fetching a well-known for a domain we know recently
+# had a valid entry.
+WELL_KNOWN_RETRY_ATTEMPTS = 3
+
 
 logger = logging.getLogger(__name__)
 
 
 _well_known_cache = TTLCache("well-known")
+_had_valid_well_known_cache = TTLCache("had-valid-well-known")
 
 
 @attr.s(slots=True, frozen=True)
@@ -65,14 +77,20 @@ class WellKnownResolver(object):
     """Handles well-known lookups for matrix servers.
     """
 
-    def __init__(self, reactor, agent, well_known_cache=None):
+    def __init__(
+        self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+    ):
         self._reactor = reactor
         self._clock = Clock(reactor)
 
         if well_known_cache is None:
             well_known_cache = _well_known_cache
 
+        if had_well_known_cache is None:
+            had_well_known_cache = _had_valid_well_known_cache
+
         self._well_known_cache = well_known_cache
+        self._had_valid_well_known_cache = had_well_known_cache
         self._well_known_agent = RedirectAgent(agent)
 
     @defer.inlineCallbacks
@@ -100,7 +118,7 @@ class WellKnownResolver(object):
         # requests for the same server in parallel?
         try:
             with Measure(self._clock, "get_well_known"):
-                result, cache_period = yield self._do_get_well_known(server_name)
+                result, cache_period = yield self._fetch_well_known(server_name)
 
         except _FetchWellKnownFailure as e:
             if prev_result and e.temporary:
@@ -111,10 +129,18 @@ class WellKnownResolver(object):
 
             result = None
 
-            # add some randomness to the TTL to avoid a stampeding herd every hour
-            # after startup
-            cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+            if self._had_valid_well_known_cache.get(server_name, False):
+                # We have recently seen a valid well-known record for this
+                # server, so we cache the lack of well-known for a shorter time.
+                cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
+            else:
+                cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+
+            # add some randomness to the TTL to avoid a stampeding herd
+            cache_period *= random.uniform(
+                1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+                1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+            )
 
         if cache_period > 0:
             self._well_known_cache.set(server_name, result, cache_period)
@@ -122,7 +148,7 @@ class WellKnownResolver(object):
         return WellKnownLookupResult(delegated_server=result)
 
     @defer.inlineCallbacks
-    def _do_get_well_known(self, server_name):
+    def _fetch_well_known(self, server_name):
         """Actually fetch and parse a .well-known, without checking the cache
 
         Args:
@@ -134,24 +160,15 @@ class WellKnownResolver(object):
         Returns:
             Deferred[Tuple[bytes,int]]: The lookup result and cache period.
         """
-        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
-        uri_str = uri.decode("ascii")
-        logger.info("Fetching %s", uri_str)
+
+        had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
 
         # We do this in two steps to differentiate between possibly transient
         # errors (e.g. can't connect to host, 503 response) and more permenant
         # errors (such as getting a 404 response).
-        try:
-            response = yield make_deferred_yieldable(
-                self._well_known_agent.request(b"GET", uri)
-            )
-            body = yield make_deferred_yieldable(readBody(response))
-
-            if 500 <= response.code < 600:
-                raise Exception("Non-200 response %s" % (response.code,))
-        except Exception as e:
-            logger.info("Error fetching %s: %s", uri_str, e)
-            raise _FetchWellKnownFailure(temporary=True)
+        response, body = yield self._make_well_known_request(
+            server_name, retry=had_valid_well_known
+        )
 
         try:
             if response.code != 200:
@@ -161,8 +178,11 @@ class WellKnownResolver(object):
             logger.info("Response from .well-known: %s", parsed_body)
 
             result = parsed_body["m.server"].encode("ascii")
+        except defer.CancelledError:
+            # Bail if we've been cancelled
+            raise
         except Exception as e:
-            logger.info("Error fetching %s: %s", uri_str, e)
+            logger.info("Error parsing well-known for %s: %s", server_name, e)
             raise _FetchWellKnownFailure(temporary=False)
 
         cache_period = _cache_period_from_headers(
@@ -172,13 +192,69 @@ class WellKnownResolver(object):
             cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
             # add some randomness to the TTL to avoid a stampeding herd every 24 hours
             # after startup
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+            cache_period *= random.uniform(
+                1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+                1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+            )
         else:
             cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
             cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
 
+        # We got a success, mark as such in the cache
+        self._had_valid_well_known_cache.set(
+            server_name,
+            bool(result),
+            cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
+        )
+
         return (result, cache_period)
 
+    @defer.inlineCallbacks
+    def _make_well_known_request(self, server_name, retry):
+        """Make the well known request.
+
+        This will retry the request if requested and it fails (with unable
+        to connect or receives a 5xx error).
+
+        Args:
+            server_name (bytes)
+            retry (bool): Whether to retry the request if it fails.
+
+        Returns:
+            Deferred[tuple[IResponse, bytes]] Returns the response object and
+            body. Response may be a non-200 response.
+        """
+        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
+        uri_str = uri.decode("ascii")
+
+        i = 0
+        while True:
+            i += 1
+
+            logger.info("Fetching %s", uri_str)
+            try:
+                response = yield make_deferred_yieldable(
+                    self._well_known_agent.request(b"GET", uri)
+                )
+                body = yield make_deferred_yieldable(readBody(response))
+
+                if 500 <= response.code < 600:
+                    raise Exception("Non-200 response %s" % (response.code,))
+
+                return response, body
+            except defer.CancelledError:
+                # Bail if we've been cancelled
+                raise
+            except Exception as e:
+                if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
+                    logger.info("Error fetching %s: %s", uri_str, e)
+                    raise _FetchWellKnownFailure(temporary=True)
+
+                logger.info("Error fetching %s: %s. Retrying", uri_str, e)
+
+            # Sleep briefly in the hopes that they come back up
+            yield self._clock.sleep(0.5)
+
 
 def _cache_period_from_headers(headers, time_now=time.time):
     cache_controls = _parse_cache_control(headers)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd07bf7b8e..c186b31f59 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -300,7 +300,7 @@ class RestServlet(object):
                     http_server.register_paths(
                         method,
                         patterns,
-                        trace_servlet(servlet_classname, method_handler),
+                        trace_servlet(servlet_classname)(method_handler),
                         servlet_classname,
                     )
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index d2c209c471..dd296027a1 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -43,6 +43,9 @@ OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as
 an optional dependency. This does however limit the number of modifiable spans
 at any point in the code to one. From here out references to `opentracing`
 in the code snippets refer to the Synapses module.
+Most methods provided in the module have a direct correlation to those provided
+by opentracing. Refer to docs there for a more in-depth documentation on some of
+the args and methods.
 
 Tracing
 -------
@@ -68,52 +71,62 @@ set a tag on the current active span.
 Tracing functions
 -----------------
 
-Functions can be easily traced using decorators. There is a decorator for
-'normal' function and for functions which are actually deferreds. The name of
+Functions can be easily traced using decorators. The name of
 the function becomes the operation name for the span.
 
 .. code-block:: python
 
-   from synapse.logging.opentracing import trace, trace_deferred
+   from synapse.logging.opentracing import trace
 
-   # Start a span using 'normal_function' as the operation name
+   # Start a span using 'interesting_function' as the operation name
    @trace
-   def normal_function(*args, **kwargs):
+   def interesting_function(*args, **kwargs):
        # Does all kinds of cool and expected things
        return something_usual_and_useful
 
-   # Start a span using 'deferred_function' as the operation name
-   @trace_deferred
-   @defer.inlineCallbacks
-   def deferred_function(*args, **kwargs):
-       # We start
-       yield we_wait
-       # we finish
-       return something_usual_and_useful
 
 Operation names can be explicitly set for functions by using
-``trace_using_operation_name`` and
-``trace_deferred_using_operation_name``
+``trace_using_operation_name``
 
 .. code-block:: python
 
-   from synapse.logging.opentracing import (
-       trace_using_operation_name,
-       trace_deferred_using_operation_name
-   )
+   from synapse.logging.opentracing import trace_using_operation_name
 
    @trace_using_operation_name("A *much* better operation name")
-   def normal_function(*args, **kwargs):
+   def interesting_badly_named_function(*args, **kwargs):
        # Does all kinds of cool and expected things
        return something_usual_and_useful
 
-   @trace_deferred_using_operation_name("Another exciting operation name!")
-   @defer.inlineCallbacks
-   def deferred_function(*args, **kwargs):
-       # We start
-       yield we_wait
-       # we finish
-       return something_usual_and_useful
+Setting Tags
+------------
+
+To set a tag on the active span do
+
+.. code-block:: python
+
+   from synapse.logging.opentracing import set_tag
+
+   set_tag(tag_name, tag_value)
+
+There's a convenient decorator to tag all the args of the method. It uses
+inspection in order to use the formal parameter names prefixed with 'ARG_' as
+tag names. It uses kwarg names as tag names without the prefix.
+
+.. code-block:: python
+
+   from synapse.logging.opentracing import tag_args
+
+   @tag_args
+   def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
+       pass
+
+   set_fates("the story", "the end", "the act")
+   # This will have the following tags
+   #  - ARG_clotho: "the story"
+   #  - ARG_lachesis: "the end"
+   #  - ARG_atropos: "the act"
+   #  - father: "Zues"
+   #  - mother: "Themis"
 
 Contexts and carriers
 ---------------------
@@ -136,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
 ``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
 in a destination and compares it to the whitelist.
 
+Most injection methods take a 'destination' arg. The context will only be injected
+if the destination matches the whitelist or the destination is None.
+
 =======
 Gotchas
 =======
@@ -161,10 +177,48 @@ from twisted.internet import defer
 
 from synapse.config import ConfigError
 
+# Helper class
+
+
+class _DummyTagNames(object):
+    """wrapper of opentracings tags. We need to have them if we
+    want to reference them without opentracing around. Clearly they
+    should never actually show up in a trace. `set_tags` overwrites
+    these with the correct ones."""
+
+    INVALID_TAG = "invalid-tag"
+    COMPONENT = INVALID_TAG
+    DATABASE_INSTANCE = INVALID_TAG
+    DATABASE_STATEMENT = INVALID_TAG
+    DATABASE_TYPE = INVALID_TAG
+    DATABASE_USER = INVALID_TAG
+    ERROR = INVALID_TAG
+    HTTP_METHOD = INVALID_TAG
+    HTTP_STATUS_CODE = INVALID_TAG
+    HTTP_URL = INVALID_TAG
+    MESSAGE_BUS_DESTINATION = INVALID_TAG
+    PEER_ADDRESS = INVALID_TAG
+    PEER_HOSTNAME = INVALID_TAG
+    PEER_HOST_IPV4 = INVALID_TAG
+    PEER_HOST_IPV6 = INVALID_TAG
+    PEER_PORT = INVALID_TAG
+    PEER_SERVICE = INVALID_TAG
+    SAMPLING_PRIORITY = INVALID_TAG
+    SERVICE = INVALID_TAG
+    SPAN_KIND = INVALID_TAG
+    SPAN_KIND_CONSUMER = INVALID_TAG
+    SPAN_KIND_PRODUCER = INVALID_TAG
+    SPAN_KIND_RPC_CLIENT = INVALID_TAG
+    SPAN_KIND_RPC_SERVER = INVALID_TAG
+
+
 try:
     import opentracing
+
+    tags = opentracing.tags
 except ImportError:
     opentracing = None
+    tags = _DummyTagNames
 try:
     from jaeger_client import Config as JaegerConfig
     from synapse.logging.scopecontextmanager import LogContextScopeManager
@@ -239,10 +293,6 @@ def init_tracer(config):
         scope_manager=LogContextScopeManager(config),
     ).initialize_tracer()
 
-    # Set up tags to be opentracing's tags
-    global tags
-    tags = opentracing.tags
-
 
 # Whitelisting
 
@@ -321,8 +371,8 @@ def start_active_span_follows_from(operation_name, contexts):
         return scope
 
 
-def start_active_span_from_context(
-    headers,
+def start_active_span_from_request(
+    request,
     operation_name,
     references=None,
     tags=None,
@@ -331,9 +381,9 @@ def start_active_span_from_context(
     finish_on_close=True,
 ):
     """
-    Extracts a span context from Twisted Headers.
+    Extracts a span context from a Twisted Request.
     args:
-        headers (twisted.web.http_headers.Headers)
+        headers (twisted.web.http.Request)
 
         For the other args see opentracing.tracer
 
@@ -347,7 +397,9 @@ def start_active_span_from_context(
     if opentracing is None:
         return _noop_context_manager()
 
-    header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
+    header_dict = {
+        k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+    }
     context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
 
     return opentracing.tracer.start_active_span(
@@ -435,7 +487,7 @@ def set_operation_name(operation_name):
 
 
 @only_if_tracing
-def inject_active_span_twisted_headers(headers, destination):
+def inject_active_span_twisted_headers(headers, destination, check_destination=True):
     """
     Injects a span context into twisted headers in-place
 
@@ -454,7 +506,7 @@ def inject_active_span_twisted_headers(headers, destination):
         https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
     """
 
-    if not whitelisted_homeserver(destination):
+    if check_destination and not whitelisted_homeserver(destination):
         return
 
     span = opentracing.tracer.active_span
@@ -466,7 +518,7 @@ def inject_active_span_twisted_headers(headers, destination):
 
 
 @only_if_tracing
-def inject_active_span_byte_dict(headers, destination):
+def inject_active_span_byte_dict(headers, destination, check_destination=True):
     """
     Injects a span context into a dict where the headers are encoded as byte
     strings
@@ -498,7 +550,7 @@ def inject_active_span_byte_dict(headers, destination):
 
 
 @only_if_tracing
-def inject_active_span_text_map(carrier, destination=None):
+def inject_active_span_text_map(carrier, destination, check_destination=True):
     """
     Injects a span context into a dict
 
@@ -519,7 +571,7 @@ def inject_active_span_text_map(carrier, destination=None):
         https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
     """
 
-    if destination and not whitelisted_homeserver(destination):
+    if check_destination and not whitelisted_homeserver(destination):
         return
 
     opentracing.tracer.inject(
@@ -527,6 +579,29 @@ def inject_active_span_text_map(carrier, destination=None):
     )
 
 
+def get_active_span_text_map(destination=None):
+    """
+    Gets a span context as a dict. This can be used instead of manually
+    injecting a span into an empty carrier.
+
+    Args:
+        destination (str): the name of the remote server.
+
+    Returns:
+        dict: the active span's context if opentracing is enabled, otherwise empty.
+    """
+
+    if not opentracing or (destination and not whitelisted_homeserver(destination)):
+        return {}
+
+    carrier = {}
+    opentracing.tracer.inject(
+        opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+    )
+
+    return carrier
+
+
 def active_span_context_as_string():
     """
     Returns:
@@ -676,65 +751,43 @@ def tag_args(func):
     return _tag_args_inner
 
 
-def trace_servlet(servlet_name, func):
+def trace_servlet(servlet_name, extract_context=False):
     """Decorator which traces a serlet. It starts a span with some servlet specific
-    tags such as the servlet_name and request information"""
-    if not opentracing:
-        return func
+    tags such as the servlet_name and request information
 
-    @wraps(func)
-    @defer.inlineCallbacks
-    def _trace_servlet_inner(request, *args, **kwargs):
-        with start_active_span(
-            "incoming-client-request",
-            tags={
+    Args:
+        servlet_name (str): The name to be used for the span's operation_name
+        extract_context (bool): Whether to attempt to extract the opentracing
+            context from the request the servlet is handling.
+
+    """
+
+    def _trace_servlet_inner_1(func):
+        if not opentracing:
+            return func
+
+        @wraps(func)
+        @defer.inlineCallbacks
+        def _trace_servlet_inner(request, *args, **kwargs):
+            request_tags = {
                 "request_id": request.get_request_id(),
                 tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
                 tags.HTTP_METHOD: request.get_method(),
                 tags.HTTP_URL: request.get_redacted_uri(),
                 tags.PEER_HOST_IPV6: request.getClientIP(),
-                "servlet_name": servlet_name,
-            },
-        ):
-            result = yield defer.maybeDeferred(func, request, *args, **kwargs)
-            return result
+            }
 
-    return _trace_servlet_inner
-
-
-# Helper class
-
-
-class _DummyTagNames(object):
-    """wrapper of opentracings tags. We need to have them if we
-    want to reference them without opentracing around. Clearly they
-    should never actually show up in a trace. `set_tags` overwrites
-    these with the correct ones."""
+            if extract_context:
+                scope = start_active_span_from_request(
+                    request, servlet_name, tags=request_tags
+                )
+            else:
+                scope = start_active_span(servlet_name, tags=request_tags)
 
-    INVALID_TAG = "invalid-tag"
-    COMPONENT = INVALID_TAG
-    DATABASE_INSTANCE = INVALID_TAG
-    DATABASE_STATEMENT = INVALID_TAG
-    DATABASE_TYPE = INVALID_TAG
-    DATABASE_USER = INVALID_TAG
-    ERROR = INVALID_TAG
-    HTTP_METHOD = INVALID_TAG
-    HTTP_STATUS_CODE = INVALID_TAG
-    HTTP_URL = INVALID_TAG
-    MESSAGE_BUS_DESTINATION = INVALID_TAG
-    PEER_ADDRESS = INVALID_TAG
-    PEER_HOSTNAME = INVALID_TAG
-    PEER_HOST_IPV4 = INVALID_TAG
-    PEER_HOST_IPV6 = INVALID_TAG
-    PEER_PORT = INVALID_TAG
-    PEER_SERVICE = INVALID_TAG
-    SAMPLING_PRIORITY = INVALID_TAG
-    SERVICE = INVALID_TAG
-    SPAN_KIND = INVALID_TAG
-    SPAN_KIND_CONSUMER = INVALID_TAG
-    SPAN_KIND_PRODUCER = INVALID_TAG
-    SPAN_KIND_RPC_CLIENT = INVALID_TAG
-    SPAN_KIND_RPC_SERVER = INVALID_TAG
+            with scope:
+                result = yield defer.maybeDeferred(func, request, *args, **kwargs)
+                return result
 
+        return _trace_servlet_inner
 
-tags = _DummyTagNames
+    return _trace_servlet_inner_1
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e0594e581..c4be9273f6 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -22,6 +22,7 @@ from six.moves import urllib
 
 from twisted.internet import defer
 
+import synapse.logging.opentracing as opentracing
 from synapse.api.errors import (
     CodeMessageException,
     HttpResponseException,
@@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
                 # have a good idea that the request has either succeeded or failed on
                 # the master, and so whether we should clean up or not.
                 while True:
+                    headers = {}
+                    opentracing.inject_active_span_byte_dict(
+                        headers, None, check_destination=False
+                    )
                     try:
-                        result = yield request_func(uri, data)
+                        result = yield request_func(uri, data, headers=headers)
                         break
                     except CodeMessageException as e:
                         if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
         args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
         pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
 
-        http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
+        http_server.register_paths(
+            method,
+            [pattern],
+            opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
+                handler
+            ),
+            self.__class__.__name__,
+        )
 
     def _cached_handler(self, request, txn_id, **kwargs):
         """Called on new incoming requests when caching is enabled. Checks
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5720cab425..9ab1c2c9e0 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -42,7 +42,9 @@ from synapse.rest.admin._base import (
     historical_admin_path_patterns,
 )
 from synapse.rest.admin.media import register_servlets_for_media_repo
+from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
+from synapse.rest.admin.users import UserAdminServlet
 from synapse.types import UserID, create_requester
 from synapse.util.versionstring import get_version_string
 
@@ -738,8 +740,10 @@ def register_servlets(hs, http_server):
     Register all the admin servlets.
     """
     register_servlets_for_client_rest_resource(hs, http_server)
+    PurgeRoomServlet(hs).register(http_server)
     SendServerNoticeServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
+    UserAdminServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/admin/purge_room_servlet.py b/synapse/rest/admin/purge_room_servlet.py
new file mode 100644
index 0000000000..2922eb543e
--- /dev/null
+++ b/synapse/rest/admin/purge_room_servlet.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+
+
+class PurgeRoomServlet(RestServlet):
+    """Servlet which will remove all trace of a room from the database
+
+    POST /_synapse/admin/v1/purge_room
+    {
+        "room_id": "!room:id"
+    }
+
+    returns:
+
+    {}
+    """
+
+    PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.pagination_handler = hs.get_pagination_handler()
+
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
+
+        body = parse_json_object_from_request(request)
+        assert_params_in_dict(body, ("room_id",))
+
+        await self.pagination_handler.purge_room(body["room_id"])
+
+        return (200, {})
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
new file mode 100644
index 0000000000..b0fddb6898
--- /dev/null
+++ b/synapse/rest/admin/users.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+from synapse.types import UserID
+
+
+class UserAdminServlet(RestServlet):
+    """
+    Set whether or not a user is a server administrator.
+
+    Note that only local users can be server administrators, and that an
+    administrator may not demote themselves.
+
+    Only server administrators can use this API.
+
+    Example:
+        PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
+        {
+            "admin": true
+        }
+    """
+
+    PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
+        yield assert_requester_is_admin(self.auth, request)
+        requester = yield self.auth.get_user_by_req(request)
+        auth_user = requester.user
+
+        target_user = UserID.from_string(user_id)
+
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_dict(body, ["admin"])
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Only local users can be admins of this homeserver")
+
+        set_admin_to = bool(body["admin"])
+
+        if target_user == auth_user and not set_admin_to:
+            raise SynapseError(400, "You may not demote yourself.")
+
+        yield self.handlers.admin_handler.set_user_server_admin(
+            target_user, set_admin_to
+        )
+
+        return (200, {})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6008adec7c..b218a3f334 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -24,6 +24,7 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
+from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
 from synapse.types import StreamToken
 
 from ._base import client_patterns
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
+    @trace_using_operation_name("upload_keys")
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
             if requester.device_id is not None and device_id != requester.device_id:
+                set_tag("error", True)
+                log_kv(
+                    {
+                        "message": "Client uploading keys for a different device",
+                        "logged_in_id": requester.device_id,
+                        "key_being_uploaded": device_id,
+                    }
+                )
                 logger.warning(
                     "Client uploading keys for a different device "
                     "(logged in as %s, uploading for %s)",
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
 
         from_token_string = parse_string(request, "from")
+        set_tag("from", from_token_string)
 
         # We want to enforce they do pass us one, but we ignore it and return
         # changes after the "to" as well as before.
-        parse_string(request, "to")
+        set_tag("to", parse_string(request, "to"))
 
         from_token = StreamToken.from_string(from_token_string)
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 05ea1459e3..9510a1e2b0 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -16,7 +16,6 @@
 
 import hmac
 import logging
-from hashlib import sha1
 
 from six import string_types
 
@@ -239,14 +238,12 @@ class RegisterRestServlet(RestServlet):
 
         # we do basic sanity checks here because the auth layer will store these
         # in sessions. Pull out the username/password provided to us.
-        desired_password = None
         if "password" in body:
             if (
                 not isinstance(body["password"], string_types)
                 or len(body["password"]) > 512
             ):
                 raise SynapseError(400, "Invalid password")
-            desired_password = body["password"]
 
         desired_username = None
         if "username" in body:
@@ -261,8 +258,8 @@ class RegisterRestServlet(RestServlet):
         if self.auth.has_access_token(request):
             appservice = yield self.auth.get_appservice_by_req(request)
 
-        # fork off as soon as possible for ASes and shared secret auth which
-        # have completely different registration flows to normal users
+        # fork off as soon as possible for ASes which have completely
+        # different registration flows to normal users
 
         # == Application Service Registration ==
         if appservice:
@@ -285,8 +282,8 @@ class RegisterRestServlet(RestServlet):
             return (200, result)  # we throw for non 200 responses
             return
 
-        # for either shared secret or regular registration, downcase the
-        # provided username before attempting to register it. This should mean
+        # for regular registration, downcase the provided username before
+        # attempting to register it. This should mean
         # that people who try to register with upper-case in their usernames
         # don't get a nasty surprise. (Note that we treat username
         # case-insenstively in login, so they are free to carry on imagining
@@ -294,16 +291,6 @@ class RegisterRestServlet(RestServlet):
         if desired_username is not None:
             desired_username = desired_username.lower()
 
-        # == Shared Secret Registration == (e.g. create new user scripts)
-        if "mac" in body:
-            # FIXME: Should we really be determining if this is shared secret
-            # auth based purely on the 'mac' key?
-            result = yield self._do_shared_secret_registration(
-                desired_username, desired_password, body
-            )
-            return (200, result)  # we throw for non 200 responses
-            return
-
         # == Normal User Registration == (everyone else)
         if not self.hs.config.enable_registration:
             raise SynapseError(403, "Registration has been disabled")
@@ -513,42 +500,6 @@ class RegisterRestServlet(RestServlet):
         return (yield self._create_registration_details(user_id, body))
 
     @defer.inlineCallbacks
-    def _do_shared_secret_registration(self, username, password, body):
-        if not self.hs.config.registration_shared_secret:
-            raise SynapseError(400, "Shared secret registration is not enabled")
-        if not username:
-            raise SynapseError(
-                400, "username must be specified", errcode=Codes.BAD_JSON
-            )
-
-        # use the username from the original request rather than the
-        # downcased one in `username` for the mac calculation
-        user = body["username"].encode("utf-8")
-
-        # str() because otherwise hmac complains that 'unicode' does not
-        # have the buffer interface
-        got_mac = str(body["mac"])
-
-        # FIXME this is different to the /v1/register endpoint, which
-        # includes the password and admin flag in the hashed text. Why are
-        # these different?
-        want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret.encode(),
-            msg=user,
-            digestmod=sha1,
-        ).hexdigest()
-
-        if not compare_digest(want_mac, got_mac):
-            raise SynapseError(403, "HMAC incorrect")
-
-        user_id = yield self.registration_handler.register_user(
-            localpart=username, password=password
-        )
-
-        result = yield self._create_registration_details(user_id, body)
-        return result
-
-    @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
         """Complete registration of newly-registered user
 
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index 031a316693..55580bc59e 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -13,7 +13,9 @@
 # limitations under the License.
 
 import logging
-from io import BytesIO
+
+from canonicaljson import encode_canonical_json, json
+from signedjson.sign import sign_json
 
 from twisted.internet import defer
 
@@ -95,6 +97,7 @@ class RemoteKey(DirectServeResource):
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
+        self.config = hs.config
 
     @wrap_json_request_handler
     async def _async_render_GET(self, request):
@@ -214,15 +217,14 @@ class RemoteKey(DirectServeResource):
             yield self.fetcher.get_keys(cache_misses)
             yield self.query_keys(request, query, query_remote_on_cache_miss=False)
         else:
-            result_io = BytesIO()
-            result_io.write(b'{"server_keys":')
-            sep = b"["
-            for json_bytes in json_results:
-                result_io.write(sep)
-                result_io.write(json_bytes)
-                sep = b","
-            if sep == b"[":
-                result_io.write(sep)
-            result_io.write(b"]}")
-
-            respond_with_json_bytes(request, 200, result_io.getvalue())
+            signed_keys = []
+            for key_json in json_results:
+                key_json = json.loads(key_json)
+                for signing_key in self.config.key_server_signing_keys:
+                    key_json = sign_json(key_json, self.config.server_name, signing_key)
+
+                signed_keys.append(key_json)
+
+            results = {"server_keys": signed_keys}
+
+            respond_with_json_bytes(request, 200, encode_canonical_json(results))
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index 5e8fda4b65..20177b44e7 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -34,7 +34,7 @@ class WellKnownBuilder(object):
         self._config = hs.config
 
     def get_well_known(self):
-        # if we don't have a public_base_url, we can't help much here.
+        # if we don't have a public_baseurl, we can't help much here.
         if self._config.public_baseurl is None:
             return None
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8f72d92895..e11881161d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,11 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    trace,
+    whitelisted_homeserver,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import Cache, SQLBaseStore, db_to_json
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return {d["device_id"]: d for d in devices}
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_remote(self, destination, from_stream_id, limit):
         """Get stream of updates to send to remote servers
@@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
         # (user_id, device_id) entries into a map, with the value being
         # the max stream_id across each set of duplicate entries
         #
-        # maps (user_id, device_id) -> stream_id
+        # maps (user_id, device_id) -> (stream_id, opentracing_context)
         # as long as their stream_id does not match that of the last row
+        #
+        # opentracing_context contains the opentracing metadata for the request
+        # that created the poke
+        #
+        # The most recent request's opentracing_context is used as the
+        # context which created the Edu.
+
         query_map = {}
         for update in updates:
             if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
                 break
 
             key = (update[0], update[1])
-            query_map[key] = max(query_map.get(key, 0), update[2])
+
+            update_context = update[3]
+            update_stream_id = update[2]
+
+            previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+            if update_stream_id > previous_update_stream_id:
+                query_map[key] = (update_stream_id, update_context)
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
             List: List of device updates
         """
         sql = """
-            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+            SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             ORDER BY stream_id
             LIMIT ?
@@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
         Args:
             destination (str): The host the device updates are intended for
             from_stream_id (int): The minimum stream_id to filter updates by, exclusive
-            query_map (Dict[(str, str): int]): Dictionary mapping
-                user_id/device_id to update stream_id
+            query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+                user_id/device_id to update stream_id and the relevent json-encoded
+                opentracing context
 
         Returns:
             List[Dict]: List of objects representing an device update EDU
@@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
                 destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
-                stream_id = query_map[(user_id, device_id)]
+                stream_id, opentracing_context = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
                     "device_id": device_id,
                     "prev_id": [prev_id] if prev_id else [],
                     "stream_id": stream_id,
+                    "org.matrix.opentracing_context": opentracing_context,
                 }
 
                 prev_id = stream_id
@@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             ],
         )
 
+        context = get_active_span_text_map()
+
         self._simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
@@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
+                    "opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
                 }
                 for destination in hosts
                 for device_id in device_ids
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index b1901404af..be2fe2bab6 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,6 +18,7 @@ import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.opentracing import log_kv, trace
 
 from ._base import SQLBaseStore
 
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             },
             lock=False,
         )
+        log_kv(
+            {
+                "message": "Set room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "room_key": room_key,
+            }
+        )
 
+    @trace
     @defer.inlineCallbacks
     def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    @trace
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
         )
 
+    @trace
     def create_e2e_room_keys_version(self, user_id, info):
         """Atomically creates a new version of this user's e2e_room_keys store
         with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
         )
 
+    @trace
     def update_e2e_room_keys_version(self, user_id, version, info):
         """Update a given backup version
 
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             desc="update_e2e_room_keys_version",
         )
 
+    @trace
     def delete_e2e_room_keys_version(self, user_id, version=None):
         """Delete a given backup version of the user's room keys.
         Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..33e3a84933 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
 
 from twisted.internet import defer
 
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
+    @trace
     @defer.inlineCallbacks
     def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
         """
+        set_tag("query_list", query_list)
         if not query_list:
             return {}
 
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return results
 
+    @trace
     def _get_e2e_device_keys_txn(
         self, txn, query_list, include_all_devices=False, include_deleted_devices=False
     ):
+        set_tag("include_all_devices", include_all_devices)
+        set_tag("include_deleted_devices", include_deleted_devices)
+
         query_clauses = []
         query_params = []
 
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for user_id, device_id in deleted_devices:
                 result.setdefault(user_id, {})[device_id] = None
 
+        log_kv(result)
         return result
 
     @defer.inlineCallbacks
@@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             keyvalues={"user_id": user_id, "device_id": device_id},
             desc="add_e2e_one_time_keys_check",
         )
-
-        return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+        result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+        log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
+        return result
 
     @defer.inlineCallbacks
     def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         """
 
         def _add_e2e_one_time_keys(txn):
+            set_tag("user_id", user_id)
+            set_tag("device_id", device_id)
+            set_tag("new_keys", new_keys)
             # We are protected from race between lookup and insertion due to
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         """
 
         def _set_e2e_device_keys_txn(txn):
+            set_tag("user_id", user_id)
+            set_tag("device_id", device_id)
+            set_tag("time_now", time_now)
+            set_tag("device_keys", device_keys)
+
             old_key_json = self._simple_select_one_onecol_txn(
                 txn,
                 table="e2e_device_keys_json",
@@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             new_key_json = encode_canonical_json(device_keys).decode("utf-8")
 
             if old_key_json == new_key_json:
+                log_kv({"Message": "Device key already stored."})
                 return False
 
             self._simple_upsert_txn(
@@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 keyvalues={"user_id": user_id, "device_id": device_id},
                 values={"ts_added_ms": time_now, "key_json": new_key_json},
             )
-
+            log_kv({"message": "Device keys stored."})
             return True
 
         return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def claim_e2e_one_time_keys(self, query_list):
         """Take a list of one time keys out of the database"""
 
+        @trace
         def _claim_e2e_one_time_keys(txn):
             sql = (
                 "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 " AND key_id = ?"
             )
             for user_id, device_id, algorithm, key_id in delete:
+                log_kv(
+                    {
+                        "message": "Executing claim e2e_one_time_keys transaction on database."
+                    }
+                )
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
+                log_kv({"message": "finished executing and invalidating cache"})
                 self._invalidate_cache_and_stream(
                     txn, self.count_e2e_one_time_keys, (user_id, device_id)
                 )
@@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
 
     def delete_e2e_keys_by_device(self, user_id, device_id):
         def delete_e2e_keys_by_device_txn(txn):
+            log_kv(
+                {
+                    "message": "Deleting keys for device",
+                    "device_id": device_id,
+                    "user_id": user_id,
+                }
+            )
             self._simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ac876287fc..5a95c36a8b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1302,15 +1302,11 @@ class EventsStore(
             "event_reference_hashes",
             "event_search",
             "event_to_state_groups",
-            "guest_access",
-            "history_visibility",
             "local_invites",
-            "room_names",
             "state_events",
             "rejections",
             "redactions",
             "room_memberships",
-            "topics",
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1454,10 +1450,10 @@ class EventsStore(
 
         for event, _ in events_and_contexts:
             if event.type == EventTypes.Name:
-                # Insert into the room_names and event_search tables.
+                # Insert into the event_search table.
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
-                # Insert into the topics table and event_search table.
+                # Insert into the event_search table.
                 self._store_room_topic_txn(txn, event)
             elif event.type == EventTypes.Message:
                 # Insert into the event_search table.
@@ -1465,12 +1461,6 @@ class EventsStore(
             elif event.type == EventTypes.Redaction:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
-            elif event.type == EventTypes.RoomHistoryVisibility:
-                # Insert into the event_search table.
-                self._store_history_visibility_txn(txn, event)
-            elif event.type == EventTypes.GuestAccess:
-                # Insert into the event_search table.
-                self._store_guest_access_txn(txn, event)
 
             self._handle_event_relations(txn, event)
 
@@ -2191,6 +2181,143 @@ class EventsStore(
 
         return to_delete, to_dedelta
 
+    def purge_room(self, room_id):
+        """Deletes all record of a room
+
+        Args:
+            room_id (str):
+        """
+
+        return self.runInteraction("purge_room", self._purge_room_txn, room_id)
+
+    def _purge_room_txn(self, txn, room_id):
+        # first we have to delete the state groups states
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_groups_state WHERE state_group IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # ... and the state group edges
+        logger.info("[purge] removing %s from state_group_edges", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_group_edges WHERE state_group IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # ... and the state groups
+        logger.info("[purge] removing %s from state_groups", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_groups WHERE id IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # and then tables which lack an index on room_id but have one on event_id
+        for table in (
+            "event_auth",
+            "event_edges",
+            "event_push_actions_staging",
+            "event_reference_hashes",
+            "event_relations",
+            "event_to_state_groups",
+            "redactions",
+            "rejections",
+            "state_events",
+        ):
+            logger.info("[purge] removing %s from %s", room_id, table)
+
+            txn.execute(
+                """
+                DELETE FROM %s WHERE event_id IN (
+                  SELECT event_id FROM events WHERE room_id=?
+                )
+                """
+                % (table,),
+                (room_id,),
+            )
+
+        # and finally, the tables with an index on room_id (or no useful index)
+        for table in (
+            "current_state_events",
+            "event_backward_extremities",
+            "event_forward_extremities",
+            "event_json",
+            "event_push_actions",
+            "event_search",
+            "events",
+            "group_rooms",
+            "public_room_list_stream",
+            "receipts_graph",
+            "receipts_linearized",
+            "room_aliases",
+            "room_depth",
+            "room_memberships",
+            "room_state",
+            "room_stats",
+            "room_stats_earliest_token",
+            "rooms",
+            "stream_ordering_to_exterm",
+            "topics",
+            "users_in_public_rooms",
+            "users_who_share_private_rooms",
+            # no useful index, but let's clear them anyway
+            "appservice_room_list",
+            "e2e_room_keys",
+            "event_push_summary",
+            "pusher_throttle",
+            "group_summary_rooms",
+            "local_invites",
+            "room_account_data",
+            "room_tags",
+        ):
+            logger.info("[purge] removing %s from %s", room_id, table)
+            txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
+
+        # Other tables we do NOT need to clear out:
+        #
+        #  - blocked_rooms
+        #    This is important, to make sure that we don't accidentally rejoin a blocked
+        #    room after it was purged
+        #
+        #  - user_directory
+        #    This has a room_id column, but it is unused
+        #
+
+        # Other tables that we might want to consider clearing out include:
+        #
+        #  - event_reports
+        #       Given that these are intended for abuse management my initial
+        #       inclination is to leave them in place.
+        #
+        #  - current_state_delta_stream
+        #  - ex_outlier_stream
+        #  - room_tags_revisions
+        #       The problem with these is that they are largeish and there is no room_id
+        #       index on them. In any case we should be clearing out 'stream' tables
+        #       periodically anyway (#5888)
+
+        # TODO: we could probably usefully do a bunch of cache invalidation here
+
+        logger.info("[purge] done")
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d20eacda59..e96eed8a6d 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -238,6 +238,13 @@ def _upgrade_existing_database(
 
     logger.debug("applied_delta_files: %s", applied_delta_files)
 
+    if isinstance(database_engine, PostgresEngine):
+        specific_engine_extension = ".postgres"
+    else:
+        specific_engine_extension = ".sqlite"
+
+    specific_engine_extensions = (".sqlite", ".postgres")
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.info("Upgrading schema to v%d", v)
 
@@ -274,15 +281,22 @@ def _upgrade_existing_database(
                 # Sometimes .pyc files turn up anyway even though we've
                 # disabled their generation; e.g. from distribution package
                 # installers. Silently skip it
-                pass
+                continue
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
                 logger.info("Applying schema %s", relative_path)
                 executescript(cur, absolute_path)
+            elif ext == specific_engine_extension and root_name.endswith(".sql"):
+                # A .sql file specific to our engine; just read and execute it
+                logger.info("Applying engine-specific schema %s", relative_path)
+                executescript(cur, absolute_path)
+            elif ext in specific_engine_extensions and root_name.endswith(".sql"):
+                # A .sql file for a different engine; skip it.
+                continue
             else:
                 # Not a valid delta file.
-                logger.warn(
-                    "Found directory entry that did not end in .py or" " .sql: %s",
+                logger.warning(
+                    "Found directory entry that did not end in .py or .sql: %s",
                     relative_path,
                 )
                 continue
@@ -290,7 +304,7 @@ def _upgrade_existing_database(
             # Mark as done.
             cur.execute(
                 database_engine.convert_param_style(
-                    "INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)"
+                    "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
                 ),
                 (v, relative_path),
             )
@@ -298,7 +312,7 @@ def _upgrade_existing_database(
             cur.execute("DELETE FROM schema_version")
             cur.execute(
                 database_engine.convert_param_style(
-                    "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
+                    "INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
                 ),
                 (v, True),
             )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 55e4e84d71..9027b917c1 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -272,6 +272,14 @@ class RegistrationWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def is_server_admin(self, user):
+        """Determines if a user is an admin of this homeserver.
+
+        Args:
+            user (UserID): user ID of the user to test
+
+        Returns (bool):
+            true iff the user is a server admin, false otherwise.
+        """
         res = yield self._simple_select_one_onecol(
             table="users",
             keyvalues={"name": user.to_string()},
@@ -282,6 +290,21 @@ class RegistrationWorkerStore(SQLBaseStore):
 
         return res if res else False
 
+    def set_server_admin(self, user, admin):
+        """Sets whether a user is an admin of this homeserver.
+
+        Args:
+            user (UserID): user ID of the user to test
+            admin (bool): true iff the user is to be a server admin,
+                false otherwise.
+        """
+        return self._simple_update_one(
+            table="users",
+            keyvalues={"name": user.to_string()},
+            updatevalues={"admin": 1 if admin else 0},
+            desc="set_server_admin",
+        )
+
     def _query_for_auth(self, txn, token):
         sql = (
             "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index bc606292b8..08e13f3a3b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
     def _store_room_topic_txn(self, txn, event):
         if hasattr(event, "content") and "topic" in event.content:
-            self._simple_insert_txn(
-                txn,
-                "topics",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "topic": event.content["topic"],
-                },
-            )
-
             self.store_event_search_txn(
                 txn, event, "content.topic", event.content["topic"]
             )
 
     def _store_room_name_txn(self, txn, event):
         if hasattr(event, "content") and "name" in event.content:
-            self._simple_insert_txn(
-                txn,
-                "room_names",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "name": event.content["name"],
-                },
-            )
-
             self.store_event_search_txn(
                 txn, event, "content.name", event.content["name"]
             )
@@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 txn, event, "content.body", event.content["body"]
             )
 
-    def _store_history_visibility_txn(self, txn, event):
-        self._store_content_index_txn(txn, event, "history_visibility")
-
-    def _store_guest_access_txn(self, txn, event):
-        self._store_content_index_txn(txn, event, "guest_access")
-
-    def _store_content_index_txn(self, txn, event, key):
-        if hasattr(event, "content") and key in event.content:
-            sql = (
-                "INSERT INTO %(key)s"
-                " (event_id, room_id, %(key)s)"
-                " VALUES (?, ?, ?)" % {"key": key}
-            )
-            txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
-
     def add_event_report(
         self, room_id, event_id, user_id, reason, content, received_ts
     ):
diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..41807eb1e7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opentracing context data for inclusion in the device_list_update EDUs, as a
+ * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
+ */
+ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;
diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
new file mode 100644
index 0000000000..9f09922c67
--- /dev/null
+++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- these tables are never used.
+DROP TABLE IF EXISTS room_names;
+DROP TABLE IF EXISTS topics;
+DROP TABLE IF EXISTS history_visibility;
+DROP TABLE IF EXISTS guest_access;
diff --git a/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
new file mode 100644
index 0000000000..149f8be8b6
--- /dev/null
+++ b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this was apparently forgotten when the table was created back in delta 53.
+CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);