summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/emailconfig.py2
-rw-r--r--synapse/crypto/keyring.py12
-rw-r--r--synapse/federation/federation_server.py16
-rw-r--r--synapse/federation/sender/transaction_manager.py170
-rw-r--r--synapse/federation/transport/server.py43
-rw-r--r--synapse/federation/units.py3
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/devicemessage.py27
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/e2e_room_keys.py28
-rw-r--r--synapse/handlers/pagination.py17
-rw-r--r--synapse/http/federation/matrix_federation_agent.py26
-rw-r--r--synapse/http/federation/well_known_resolver.py126
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/logging/opentracing.py170
-rw-r--r--synapse/replication/http/_base.py16
-rw-r--r--synapse/rest/admin/__init__.py4
-rw-r--r--synapse/rest/admin/purge_room_servlet.py57
-rw-r--r--synapse/rest/admin/users.py76
-rw-r--r--synapse/rest/client/v2_alpha/keys.py13
-rw-r--r--synapse/rest/client/v2_alpha/register.py57
-rw-r--r--synapse/rest/well_known.py2
-rw-r--r--synapse/storage/devices.py39
-rw-r--r--synapse/storage/e2e_room_keys.py14
-rw-r--r--synapse/storage/end_to_end_keys.py38
-rw-r--r--synapse/storage/events.py151
-rw-r--r--synapse/storage/registration.py23
-rw-r--r--synapse/storage/room.py35
-rw-r--r--synapse/storage/schema/delta/56/add_spans_to_device_lists.sql20
-rw-r--r--synapse/storage/schema/delta/56/drop_unused_event_tables.sql20
-rw-r--r--synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql17
31 files changed, 961 insertions, 325 deletions
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 36d01a10af..f83c05df44 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -115,7 +115,7 @@ class EmailConfig(Config):
                     missing.append("email." + k)
 
             if config.get("public_baseurl") is None:
-                missing.append("public_base_url")
+                missing.append("public_baseurl")
 
             if len(missing) > 0:
                 raise RuntimeError(
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 2d7434fb2f..7cfad192e8 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -18,7 +18,6 @@ import logging
 from collections import defaultdict
 
 import six
-from six import raise_from
 from six.moves import urllib
 
 import attr
@@ -650,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 },
             )
         except (NotRetryingDestination, RequestSendFailed) as e:
-            raise_from(KeyLookupError("Failed to connect to remote server"), e)
+            # these both have str() representations which we can't really improve upon
+            raise KeyLookupError(str(e))
         except HttpResponseException as e:
-            raise_from(KeyLookupError("Remote server returned an error"), e)
+            raise KeyLookupError("Remote server returned an error: %s" % (e,))
 
         keys = {}
         added_keys = []
@@ -814,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
                     timeout=10000,
                 )
             except (NotRetryingDestination, RequestSendFailed) as e:
-                raise_from(KeyLookupError("Failed to connect to remote server"), e)
+                # these both have str() representations which we can't really improve
+                # upon
+                raise KeyLookupError(str(e))
             except HttpResponseException as e:
-                raise_from(KeyLookupError("Remote server returned an error"), e)
+                raise KeyLookupError("Remote server returned an error: %s" % (e,))
 
             if response["server_name"] != server_name:
                 raise KeyLookupError(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d216c46dfe..05fd49f3c1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
 from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
 from synapse.logging.utils import log_function
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
     def on_query_user_devices(self, origin, user_id):
         return self.on_query_request("user_devices", user_id)
 
+    @trace
     @defer.inlineCallbacks
     @log_function
     def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
             for device_id, algorithm in device_keys.items():
                 query.append((user_id, device_id, algorithm))
 
+        log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
         results = yield self.store.claim_e2e_one_time_keys(query)
 
         json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
-        try:
-            yield handler(origin, content)
-        except SynapseError as e:
-            logger.info("Failed to handle edu %r: %r", edu_type, e)
-        except Exception:
-            logger.exception("Failed to handle edu %r", edu_type)
+        with start_active_span_from_edu(content, "handle_edu"):
+            try:
+                yield handler(origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception:
+                logger.exception("Failed to handle edu %r", edu_type)
 
     def on_query(self, query_type, args):
         handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
 # limitations under the License.
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import HttpResponseException
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+    extract_text_map,
+    set_tag,
+    start_active_span_follows_from,
+    tags,
+)
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
     @defer.inlineCallbacks
     def send_new_transaction(self, destination, pending_pdus, pending_edus):
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
+        # Make a transaction-sending opentracing span. This span follows on from
+        # all the edus in that transaction. This needs to be done since there is
+        # no active span here, so if the edus were not received by the remote the
+        # span would have no causality and it would be forgotten.
+        # The span_contexts is a generator so that it won't be evaluated if
+        # opentracing is disabled. (Yay speed!)
 
-        success = True
+        span_contexts = (
+            extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+        )
 
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
+        with start_active_span_follows_from("send_transaction", span_contexts):
 
-        txn_id = str(self._next_txn_id)
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[1])
+            pdus = [x[0] for x in pending_pdus]
+            edus = pending_edus
 
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
-            destination,
-            txn_id,
-            len(pdus),
-            len(edus),
-        )
+            success = True
 
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self._server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-        self._next_txn_id += 1
+            txn_id = str(self._next_txn_id)
 
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
-            destination,
-            txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                destination,
+                txn_id,
+                len(pdus),
+                len(edus),
+            )
 
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self._transport_layer.send_transaction(
-                transaction, json_data_cb
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self._server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
             )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
 
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
-                raise e
+            self._next_txn_id += 1
 
-        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                destination,
+                txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+            )
 
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
+            # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def json_data_cb():
+                data = transaction.get_dict()
+                now = int(self.clock.time_msec())
+                if "pdus" in data:
+                    for p in data["pdus"]:
+                        if "age_ts" in p:
+                            unsigned = p.setdefault("unsigned", {})
+                            unsigned["age"] = now - int(p["age_ts"])
+                            del p["age_ts"]
+                return data
+
+            try:
+                response = yield self._transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
+
+                if e.code in (401, 404, 429) or 500 <= e.code:
+                    logger.info(
+                        "TX [%s] {%s} got %d response", destination, txn_id, code
+                    )
+                    raise e
+
+            logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+            if code == 200:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "TX [%s] {%s} Remote returned error for %s: %s",
+                            destination,
+                            txn_id,
+                            e_id,
+                            r,
+                        )
+            else:
+                for p in pdus:
                     logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        "TX [%s] {%s} Failed to send event %s",
                         destination,
                         txn_id,
-                        e_id,
-                        r,
+                        p.event_id,
                     )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination,
-                    txn_id,
-                    p.event_id,
-                )
-            success = False
+                success = False
 
-        return success
+            set_tag(tags.ERROR, not success)
+            return success
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a17148fc3c..dc53b4b170 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -38,7 +38,12 @@ from synapse.http.servlet import (
     parse_string_from_args,
 )
 from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import start_active_span_from_context, tags
+from synapse.logging.opentracing import (
+    start_active_span,
+    start_active_span_from_request,
+    tags,
+    whitelisted_homeserver,
+)
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
                 logger.warn("authenticate_request failed: %s", e)
                 raise
 
-            # Start an opentracing span
-            with start_active_span_from_context(
-                request.requestHeaders,
-                "incoming-federation-request",
-                tags={
-                    "request_id": request.get_request_id(),
-                    tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
-                    tags.HTTP_METHOD: request.get_method(),
-                    tags.HTTP_URL: request.get_redacted_uri(),
-                    tags.PEER_HOST_IPV6: request.getClientIP(),
-                    "authenticated_entity": origin,
-                    "servlet_name": request.request_metrics.name,
-                },
-            ):
+            request_tags = {
+                "request_id": request.get_request_id(),
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+                tags.HTTP_METHOD: request.get_method(),
+                tags.HTTP_URL: request.get_redacted_uri(),
+                tags.PEER_HOST_IPV6: request.getClientIP(),
+                "authenticated_entity": origin,
+                "servlet_name": request.request_metrics.name,
+            }
+
+            # Only accept the span context if the origin is authenticated
+            # and whitelisted
+            if origin and whitelisted_homeserver(origin):
+                scope = start_active_span_from_request(
+                    request, "incoming-federation-request", tags=request_tags
+                )
+            else:
+                scope = start_active_span(
+                    "incoming-federation-request", tags=request_tags
+                )
+
+            with scope:
                 if origin:
                     with ratelimiter.ratelimit(origin) as d:
                         await d
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 14aad8f09d..aa84621206 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
 
     internal_keys = ["origin", "destination"]
 
+    def get_context(self):
+        return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+
 
 class Transaction(JsonEncodedObject):
     """ A transaction is a list of Pdus and Edus to be sent to a remote home
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f22f56ca4..d30a68b650 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,16 @@ class AdminHandler(BaseHandler):
 
         return ret
 
+    def set_user_server_admin(self, user, admin):
+        """
+        Set the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+            admin (bool): whether or not the user should be an admin of this server
+        """
+        return self.store.set_server_admin(user, admin)
+
     @defer.inlineCallbacks
     def export_user_data(self, user_id, writer):
         """Write all data we have on the user to the given writer.
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..c7d56779b8 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    set_tag,
+    start_active_span,
+    whitelisted_homeserver,
+)
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = get_active_span_text_map()
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with start_active_span("to_device_for_user"):
+                set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "org.matrix.opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
+                }
 
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace
         @defer.inlineCallbacks
         def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
             ),
         )
 
+        log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
     @defer.inlineCallbacks
+    @tag_args
     def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
                 user_id,
                 time_now,
             )
+            log_kv(
+                {
+                    "message": "Updating device_keys for user.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             # TODO: Sign the JSON with the server key
             changed = yield self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
             if changed:
                 # Only notify about device updates *if* the keys actually changed
                 yield self.device_handler.notify_device_update(user_id, [device_id])
-
+        else:
+            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
+            log_kv(
+                {
+                    "message": "Updating one_time_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            log_kv(
+                {"message": "Did not update one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
     @defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
                     (algorithm, key_id, encode_canonical_json(key).decode("ascii"))
                 )
 
+        log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.opentracing import log_kv, trace
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            log_kv(results)
             return results
 
+    @trace
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
             session_id(str): the session whose room_key we're setting
             room_key(dict): the room_key being set
         """
-
+        log_kv(
+            {
+                "message": "Trying to upload room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "user_id": user_id,
+            }
+        )
         # get the room_key for this particular row
         current_room_key = None
         try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
             )
         except StoreError as e:
             if e.code == 404:
-                pass
+                log_kv(
+                    {
+                        "message": "Room key not found.",
+                        "room_id": room_id,
+                        "user_id": user_id,
+                    }
+                )
             else:
                 raise
 
         if self._should_replace_room_key(current_room_key, room_key):
+            log_kv({"message": "Replacing room key."})
             yield self.store.set_e2e_room_key(
                 user_id, version, room_id, session_id, room_key
             )
+        else:
+            log_kv({"message": "Not replacing room_key."})
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
         """
         return self._purges_by_id.get(purge_id)
 
+    async def purge_room(self, room_id):
+        """Purge the given room from the database"""
+        with (await self.pagination_lock.write(room_id)):
+            # check we know about the room
+            await self.store.get_room_version(room_id)
+
+            # first check that we have no users in this room
+            joined = await defer.maybeDeferred(
+                self.store.is_host_joined, room_id, self._server_name
+            )
+
+            if joined:
+                raise SynapseError(400, "Users are still joined to this room")
+
+            await self.store.purge_room(room_id)
+
     @defer.inlineCallbacks
     def get_messages(
         self,
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 71a15f434d..64f62aaeec 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
 
-        _well_known_cache (TTLCache|None):
-            TTLCache impl for storing cached well-known lookups. None to use a default
-            implementation.
+        _well_known_resolver (WellKnownResolver|None):
+            WellKnownResolver to use to perform well-known lookups. None to use a
+            default implementation.
     """
 
     def __init__(
@@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
         reactor,
         tls_client_options_factory,
         _srv_resolver=None,
-        _well_known_cache=None,
+        _well_known_resolver=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -76,15 +76,17 @@ class MatrixFederationAgent(object):
         self._pool.maxPersistentPerHost = 5
         self._pool.cachedConnectionTimeout = 2 * 60
 
-        self._well_known_resolver = WellKnownResolver(
-            self._reactor,
-            agent=Agent(
+        if _well_known_resolver is None:
+            _well_known_resolver = WellKnownResolver(
                 self._reactor,
-                pool=self._pool,
-                contextFactory=tls_client_options_factory,
-            ),
-            well_known_cache=_well_known_cache,
-        )
+                agent=Agent(
+                    self._reactor,
+                    pool=self._pool,
+                    contextFactory=tls_client_options_factory,
+                ),
+            )
+
+        self._well_known_resolver = _well_known_resolver
 
     @defer.inlineCallbacks
     def request(self, method, uri, headers=None, bodyProducer=None):
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index bb250c6922..5e9b0befb0 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -32,12 +32,19 @@ from synapse.util.metrics import Measure
 # period to cache .well-known results for by default
 WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
 
-# jitter to add to the .well-known default cache ttl
-WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+# jitter factor to add to the .well-known default cache ttls
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1
 
 # period to cache failure to fetch .well-known for
 WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
 
+# period to cache failure to fetch .well-known if there has recently been a
+# valid well-known for that domain.
+WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
+
+# period to remember there was a valid well-known after valid record expires
+WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
+
 # cap for .well-known cache period
 WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
 
@@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
 # we'll start trying to refetch 1 minute before it expires.
 WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
 
+# Number of times we retry fetching a well-known for a domain we know recently
+# had a valid entry.
+WELL_KNOWN_RETRY_ATTEMPTS = 3
+
 
 logger = logging.getLogger(__name__)
 
 
 _well_known_cache = TTLCache("well-known")
+_had_valid_well_known_cache = TTLCache("had-valid-well-known")
 
 
 @attr.s(slots=True, frozen=True)
@@ -65,14 +77,20 @@ class WellKnownResolver(object):
     """Handles well-known lookups for matrix servers.
     """
 
-    def __init__(self, reactor, agent, well_known_cache=None):
+    def __init__(
+        self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+    ):
         self._reactor = reactor
         self._clock = Clock(reactor)
 
         if well_known_cache is None:
             well_known_cache = _well_known_cache
 
+        if had_well_known_cache is None:
+            had_well_known_cache = _had_valid_well_known_cache
+
         self._well_known_cache = well_known_cache
+        self._had_valid_well_known_cache = had_well_known_cache
         self._well_known_agent = RedirectAgent(agent)
 
     @defer.inlineCallbacks
@@ -100,7 +118,7 @@ class WellKnownResolver(object):
         # requests for the same server in parallel?
         try:
             with Measure(self._clock, "get_well_known"):
-                result, cache_period = yield self._do_get_well_known(server_name)
+                result, cache_period = yield self._fetch_well_known(server_name)
 
         except _FetchWellKnownFailure as e:
             if prev_result and e.temporary:
@@ -111,10 +129,18 @@ class WellKnownResolver(object):
 
             result = None
 
-            # add some randomness to the TTL to avoid a stampeding herd every hour
-            # after startup
-            cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+            if self._had_valid_well_known_cache.get(server_name, False):
+                # We have recently seen a valid well-known record for this
+                # server, so we cache the lack of well-known for a shorter time.
+                cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
+            else:
+                cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+
+            # add some randomness to the TTL to avoid a stampeding herd
+            cache_period *= random.uniform(
+                1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+                1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+            )
 
         if cache_period > 0:
             self._well_known_cache.set(server_name, result, cache_period)
@@ -122,7 +148,7 @@ class WellKnownResolver(object):
         return WellKnownLookupResult(delegated_server=result)
 
     @defer.inlineCallbacks
-    def _do_get_well_known(self, server_name):
+    def _fetch_well_known(self, server_name):
         """Actually fetch and parse a .well-known, without checking the cache
 
         Args:
@@ -134,24 +160,15 @@ class WellKnownResolver(object):
         Returns:
             Deferred[Tuple[bytes,int]]: The lookup result and cache period.
         """
-        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
-        uri_str = uri.decode("ascii")
-        logger.info("Fetching %s", uri_str)
+
+        had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
 
         # We do this in two steps to differentiate between possibly transient
         # errors (e.g. can't connect to host, 503 response) and more permenant
         # errors (such as getting a 404 response).
-        try:
-            response = yield make_deferred_yieldable(
-                self._well_known_agent.request(b"GET", uri)
-            )
-            body = yield make_deferred_yieldable(readBody(response))
-
-            if 500 <= response.code < 600:
-                raise Exception("Non-200 response %s" % (response.code,))
-        except Exception as e:
-            logger.info("Error fetching %s: %s", uri_str, e)
-            raise _FetchWellKnownFailure(temporary=True)
+        response, body = yield self._make_well_known_request(
+            server_name, retry=had_valid_well_known
+        )
 
         try:
             if response.code != 200:
@@ -161,8 +178,11 @@ class WellKnownResolver(object):
             logger.info("Response from .well-known: %s", parsed_body)
 
             result = parsed_body["m.server"].encode("ascii")
+        except defer.CancelledError:
+            # Bail if we've been cancelled
+            raise
         except Exception as e:
-            logger.info("Error fetching %s: %s", uri_str, e)
+            logger.info("Error parsing well-known for %s: %s", server_name, e)
             raise _FetchWellKnownFailure(temporary=False)
 
         cache_period = _cache_period_from_headers(
@@ -172,13 +192,69 @@ class WellKnownResolver(object):
             cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
             # add some randomness to the TTL to avoid a stampeding herd every 24 hours
             # after startup
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+            cache_period *= random.uniform(
+                1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+                1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+            )
         else:
             cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
             cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
 
+        # We got a success, mark as such in the cache
+        self._had_valid_well_known_cache.set(
+            server_name,
+            bool(result),
+            cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
+        )
+
         return (result, cache_period)
 
+    @defer.inlineCallbacks
+    def _make_well_known_request(self, server_name, retry):
+        """Make the well known request.
+
+        This will retry the request if requested and it fails (with unable
+        to connect or receives a 5xx error).
+
+        Args:
+            server_name (bytes)
+            retry (bool): Whether to retry the request if it fails.
+
+        Returns:
+            Deferred[tuple[IResponse, bytes]] Returns the response object and
+            body. Response may be a non-200 response.
+        """
+        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
+        uri_str = uri.decode("ascii")
+
+        i = 0
+        while True:
+            i += 1
+
+            logger.info("Fetching %s", uri_str)
+            try:
+                response = yield make_deferred_yieldable(
+                    self._well_known_agent.request(b"GET", uri)
+                )
+                body = yield make_deferred_yieldable(readBody(response))
+
+                if 500 <= response.code < 600:
+                    raise Exception("Non-200 response %s" % (response.code,))
+
+                return response, body
+            except defer.CancelledError:
+                # Bail if we've been cancelled
+                raise
+            except Exception as e:
+                if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
+                    logger.info("Error fetching %s: %s", uri_str, e)
+                    raise _FetchWellKnownFailure(temporary=True)
+
+                logger.info("Error fetching %s: %s. Retrying", uri_str, e)
+
+            # Sleep briefly in the hopes that they come back up
+            yield self._clock.sleep(0.5)
+
 
 def _cache_period_from_headers(headers, time_now=time.time):
     cache_controls = _parse_cache_control(headers)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd07bf7b8e..c186b31f59 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -300,7 +300,7 @@ class RestServlet(object):
                     http_server.register_paths(
                         method,
                         patterns,
-                        trace_servlet(servlet_classname, method_handler),
+                        trace_servlet(servlet_classname)(method_handler),
                         servlet_classname,
                     )
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 6b706e1892..dd296027a1 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -149,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
 ``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
 in a destination and compares it to the whitelist.
 
+Most injection methods take a 'destination' arg. The context will only be injected
+if the destination matches the whitelist or the destination is None.
+
 =======
 Gotchas
 =======
@@ -174,10 +177,48 @@ from twisted.internet import defer
 
 from synapse.config import ConfigError
 
+# Helper class
+
+
+class _DummyTagNames(object):
+    """wrapper of opentracings tags. We need to have them if we
+    want to reference them without opentracing around. Clearly they
+    should never actually show up in a trace. `set_tags` overwrites
+    these with the correct ones."""
+
+    INVALID_TAG = "invalid-tag"
+    COMPONENT = INVALID_TAG
+    DATABASE_INSTANCE = INVALID_TAG
+    DATABASE_STATEMENT = INVALID_TAG
+    DATABASE_TYPE = INVALID_TAG
+    DATABASE_USER = INVALID_TAG
+    ERROR = INVALID_TAG
+    HTTP_METHOD = INVALID_TAG
+    HTTP_STATUS_CODE = INVALID_TAG
+    HTTP_URL = INVALID_TAG
+    MESSAGE_BUS_DESTINATION = INVALID_TAG
+    PEER_ADDRESS = INVALID_TAG
+    PEER_HOSTNAME = INVALID_TAG
+    PEER_HOST_IPV4 = INVALID_TAG
+    PEER_HOST_IPV6 = INVALID_TAG
+    PEER_PORT = INVALID_TAG
+    PEER_SERVICE = INVALID_TAG
+    SAMPLING_PRIORITY = INVALID_TAG
+    SERVICE = INVALID_TAG
+    SPAN_KIND = INVALID_TAG
+    SPAN_KIND_CONSUMER = INVALID_TAG
+    SPAN_KIND_PRODUCER = INVALID_TAG
+    SPAN_KIND_RPC_CLIENT = INVALID_TAG
+    SPAN_KIND_RPC_SERVER = INVALID_TAG
+
+
 try:
     import opentracing
+
+    tags = opentracing.tags
 except ImportError:
     opentracing = None
+    tags = _DummyTagNames
 try:
     from jaeger_client import Config as JaegerConfig
     from synapse.logging.scopecontextmanager import LogContextScopeManager
@@ -252,10 +293,6 @@ def init_tracer(config):
         scope_manager=LogContextScopeManager(config),
     ).initialize_tracer()
 
-    # Set up tags to be opentracing's tags
-    global tags
-    tags = opentracing.tags
-
 
 # Whitelisting
 
@@ -334,8 +371,8 @@ def start_active_span_follows_from(operation_name, contexts):
         return scope
 
 
-def start_active_span_from_context(
-    headers,
+def start_active_span_from_request(
+    request,
     operation_name,
     references=None,
     tags=None,
@@ -344,9 +381,9 @@ def start_active_span_from_context(
     finish_on_close=True,
 ):
     """
-    Extracts a span context from Twisted Headers.
+    Extracts a span context from a Twisted Request.
     args:
-        headers (twisted.web.http_headers.Headers)
+        headers (twisted.web.http.Request)
 
         For the other args see opentracing.tracer
 
@@ -360,7 +397,9 @@ def start_active_span_from_context(
     if opentracing is None:
         return _noop_context_manager()
 
-    header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
+    header_dict = {
+        k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+    }
     context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
 
     return opentracing.tracer.start_active_span(
@@ -448,7 +487,7 @@ def set_operation_name(operation_name):
 
 
 @only_if_tracing
-def inject_active_span_twisted_headers(headers, destination):
+def inject_active_span_twisted_headers(headers, destination, check_destination=True):
     """
     Injects a span context into twisted headers in-place
 
@@ -467,7 +506,7 @@ def inject_active_span_twisted_headers(headers, destination):
         https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
     """
 
-    if not whitelisted_homeserver(destination):
+    if check_destination and not whitelisted_homeserver(destination):
         return
 
     span = opentracing.tracer.active_span
@@ -479,7 +518,7 @@ def inject_active_span_twisted_headers(headers, destination):
 
 
 @only_if_tracing
-def inject_active_span_byte_dict(headers, destination):
+def inject_active_span_byte_dict(headers, destination, check_destination=True):
     """
     Injects a span context into a dict where the headers are encoded as byte
     strings
@@ -511,7 +550,7 @@ def inject_active_span_byte_dict(headers, destination):
 
 
 @only_if_tracing
-def inject_active_span_text_map(carrier, destination=None):
+def inject_active_span_text_map(carrier, destination, check_destination=True):
     """
     Injects a span context into a dict
 
@@ -532,7 +571,7 @@ def inject_active_span_text_map(carrier, destination=None):
         https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
     """
 
-    if destination and not whitelisted_homeserver(destination):
+    if check_destination and not whitelisted_homeserver(destination):
         return
 
     opentracing.tracer.inject(
@@ -540,6 +579,29 @@ def inject_active_span_text_map(carrier, destination=None):
     )
 
 
+def get_active_span_text_map(destination=None):
+    """
+    Gets a span context as a dict. This can be used instead of manually
+    injecting a span into an empty carrier.
+
+    Args:
+        destination (str): the name of the remote server.
+
+    Returns:
+        dict: the active span's context if opentracing is enabled, otherwise empty.
+    """
+
+    if not opentracing or (destination and not whitelisted_homeserver(destination)):
+        return {}
+
+    carrier = {}
+    opentracing.tracer.inject(
+        opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+    )
+
+    return carrier
+
+
 def active_span_context_as_string():
     """
     Returns:
@@ -689,65 +751,43 @@ def tag_args(func):
     return _tag_args_inner
 
 
-def trace_servlet(servlet_name, func):
+def trace_servlet(servlet_name, extract_context=False):
     """Decorator which traces a serlet. It starts a span with some servlet specific
-    tags such as the servlet_name and request information"""
-    if not opentracing:
-        return func
+    tags such as the servlet_name and request information
 
-    @wraps(func)
-    @defer.inlineCallbacks
-    def _trace_servlet_inner(request, *args, **kwargs):
-        with start_active_span(
-            "incoming-client-request",
-            tags={
+    Args:
+        servlet_name (str): The name to be used for the span's operation_name
+        extract_context (bool): Whether to attempt to extract the opentracing
+            context from the request the servlet is handling.
+
+    """
+
+    def _trace_servlet_inner_1(func):
+        if not opentracing:
+            return func
+
+        @wraps(func)
+        @defer.inlineCallbacks
+        def _trace_servlet_inner(request, *args, **kwargs):
+            request_tags = {
                 "request_id": request.get_request_id(),
                 tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
                 tags.HTTP_METHOD: request.get_method(),
                 tags.HTTP_URL: request.get_redacted_uri(),
                 tags.PEER_HOST_IPV6: request.getClientIP(),
-                "servlet_name": servlet_name,
-            },
-        ):
-            result = yield defer.maybeDeferred(func, request, *args, **kwargs)
-            return result
-
-    return _trace_servlet_inner
-
-
-# Helper class
+            }
 
+            if extract_context:
+                scope = start_active_span_from_request(
+                    request, servlet_name, tags=request_tags
+                )
+            else:
+                scope = start_active_span(servlet_name, tags=request_tags)
 
-class _DummyTagNames(object):
-    """wrapper of opentracings tags. We need to have them if we
-    want to reference them without opentracing around. Clearly they
-    should never actually show up in a trace. `set_tags` overwrites
-    these with the correct ones."""
-
-    INVALID_TAG = "invalid-tag"
-    COMPONENT = INVALID_TAG
-    DATABASE_INSTANCE = INVALID_TAG
-    DATABASE_STATEMENT = INVALID_TAG
-    DATABASE_TYPE = INVALID_TAG
-    DATABASE_USER = INVALID_TAG
-    ERROR = INVALID_TAG
-    HTTP_METHOD = INVALID_TAG
-    HTTP_STATUS_CODE = INVALID_TAG
-    HTTP_URL = INVALID_TAG
-    MESSAGE_BUS_DESTINATION = INVALID_TAG
-    PEER_ADDRESS = INVALID_TAG
-    PEER_HOSTNAME = INVALID_TAG
-    PEER_HOST_IPV4 = INVALID_TAG
-    PEER_HOST_IPV6 = INVALID_TAG
-    PEER_PORT = INVALID_TAG
-    PEER_SERVICE = INVALID_TAG
-    SAMPLING_PRIORITY = INVALID_TAG
-    SERVICE = INVALID_TAG
-    SPAN_KIND = INVALID_TAG
-    SPAN_KIND_CONSUMER = INVALID_TAG
-    SPAN_KIND_PRODUCER = INVALID_TAG
-    SPAN_KIND_RPC_CLIENT = INVALID_TAG
-    SPAN_KIND_RPC_SERVER = INVALID_TAG
+            with scope:
+                result = yield defer.maybeDeferred(func, request, *args, **kwargs)
+                return result
 
+        return _trace_servlet_inner
 
-tags = _DummyTagNames
+    return _trace_servlet_inner_1
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e0594e581..c4be9273f6 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -22,6 +22,7 @@ from six.moves import urllib
 
 from twisted.internet import defer
 
+import synapse.logging.opentracing as opentracing
 from synapse.api.errors import (
     CodeMessageException,
     HttpResponseException,
@@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
                 # have a good idea that the request has either succeeded or failed on
                 # the master, and so whether we should clean up or not.
                 while True:
+                    headers = {}
+                    opentracing.inject_active_span_byte_dict(
+                        headers, None, check_destination=False
+                    )
                     try:
-                        result = yield request_func(uri, data)
+                        result = yield request_func(uri, data, headers=headers)
                         break
                     except CodeMessageException as e:
                         if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
         args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
         pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
 
-        http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
+        http_server.register_paths(
+            method,
+            [pattern],
+            opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
+                handler
+            ),
+            self.__class__.__name__,
+        )
 
     def _cached_handler(self, request, txn_id, **kwargs):
         """Called on new incoming requests when caching is enabled. Checks
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5720cab425..9ab1c2c9e0 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -42,7 +42,9 @@ from synapse.rest.admin._base import (
     historical_admin_path_patterns,
 )
 from synapse.rest.admin.media import register_servlets_for_media_repo
+from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
+from synapse.rest.admin.users import UserAdminServlet
 from synapse.types import UserID, create_requester
 from synapse.util.versionstring import get_version_string
 
@@ -738,8 +740,10 @@ def register_servlets(hs, http_server):
     Register all the admin servlets.
     """
     register_servlets_for_client_rest_resource(hs, http_server)
+    PurgeRoomServlet(hs).register(http_server)
     SendServerNoticeServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
+    UserAdminServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/admin/purge_room_servlet.py b/synapse/rest/admin/purge_room_servlet.py
new file mode 100644
index 0000000000..2922eb543e
--- /dev/null
+++ b/synapse/rest/admin/purge_room_servlet.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+
+
+class PurgeRoomServlet(RestServlet):
+    """Servlet which will remove all trace of a room from the database
+
+    POST /_synapse/admin/v1/purge_room
+    {
+        "room_id": "!room:id"
+    }
+
+    returns:
+
+    {}
+    """
+
+    PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.pagination_handler = hs.get_pagination_handler()
+
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
+
+        body = parse_json_object_from_request(request)
+        assert_params_in_dict(body, ("room_id",))
+
+        await self.pagination_handler.purge_room(body["room_id"])
+
+        return (200, {})
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
new file mode 100644
index 0000000000..b0fddb6898
--- /dev/null
+++ b/synapse/rest/admin/users.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+from synapse.types import UserID
+
+
+class UserAdminServlet(RestServlet):
+    """
+    Set whether or not a user is a server administrator.
+
+    Note that only local users can be server administrators, and that an
+    administrator may not demote themselves.
+
+    Only server administrators can use this API.
+
+    Example:
+        PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
+        {
+            "admin": true
+        }
+    """
+
+    PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
+        yield assert_requester_is_admin(self.auth, request)
+        requester = yield self.auth.get_user_by_req(request)
+        auth_user = requester.user
+
+        target_user = UserID.from_string(user_id)
+
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_dict(body, ["admin"])
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Only local users can be admins of this homeserver")
+
+        set_admin_to = bool(body["admin"])
+
+        if target_user == auth_user and not set_admin_to:
+            raise SynapseError(400, "You may not demote yourself.")
+
+        yield self.handlers.admin_handler.set_user_server_admin(
+            target_user, set_admin_to
+        )
+
+        return (200, {})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6008adec7c..b218a3f334 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -24,6 +24,7 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
+from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
 from synapse.types import StreamToken
 
 from ._base import client_patterns
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
+    @trace_using_operation_name("upload_keys")
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
             if requester.device_id is not None and device_id != requester.device_id:
+                set_tag("error", True)
+                log_kv(
+                    {
+                        "message": "Client uploading keys for a different device",
+                        "logged_in_id": requester.device_id,
+                        "key_being_uploaded": device_id,
+                    }
+                )
                 logger.warning(
                     "Client uploading keys for a different device "
                     "(logged in as %s, uploading for %s)",
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
 
         from_token_string = parse_string(request, "from")
+        set_tag("from", from_token_string)
 
         # We want to enforce they do pass us one, but we ignore it and return
         # changes after the "to" as well as before.
-        parse_string(request, "to")
+        set_tag("to", parse_string(request, "to"))
 
         from_token = StreamToken.from_string(from_token_string)
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 05ea1459e3..9510a1e2b0 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -16,7 +16,6 @@
 
 import hmac
 import logging
-from hashlib import sha1
 
 from six import string_types
 
@@ -239,14 +238,12 @@ class RegisterRestServlet(RestServlet):
 
         # we do basic sanity checks here because the auth layer will store these
         # in sessions. Pull out the username/password provided to us.
-        desired_password = None
         if "password" in body:
             if (
                 not isinstance(body["password"], string_types)
                 or len(body["password"]) > 512
             ):
                 raise SynapseError(400, "Invalid password")
-            desired_password = body["password"]
 
         desired_username = None
         if "username" in body:
@@ -261,8 +258,8 @@ class RegisterRestServlet(RestServlet):
         if self.auth.has_access_token(request):
             appservice = yield self.auth.get_appservice_by_req(request)
 
-        # fork off as soon as possible for ASes and shared secret auth which
-        # have completely different registration flows to normal users
+        # fork off as soon as possible for ASes which have completely
+        # different registration flows to normal users
 
         # == Application Service Registration ==
         if appservice:
@@ -285,8 +282,8 @@ class RegisterRestServlet(RestServlet):
             return (200, result)  # we throw for non 200 responses
             return
 
-        # for either shared secret or regular registration, downcase the
-        # provided username before attempting to register it. This should mean
+        # for regular registration, downcase the provided username before
+        # attempting to register it. This should mean
         # that people who try to register with upper-case in their usernames
         # don't get a nasty surprise. (Note that we treat username
         # case-insenstively in login, so they are free to carry on imagining
@@ -294,16 +291,6 @@ class RegisterRestServlet(RestServlet):
         if desired_username is not None:
             desired_username = desired_username.lower()
 
-        # == Shared Secret Registration == (e.g. create new user scripts)
-        if "mac" in body:
-            # FIXME: Should we really be determining if this is shared secret
-            # auth based purely on the 'mac' key?
-            result = yield self._do_shared_secret_registration(
-                desired_username, desired_password, body
-            )
-            return (200, result)  # we throw for non 200 responses
-            return
-
         # == Normal User Registration == (everyone else)
         if not self.hs.config.enable_registration:
             raise SynapseError(403, "Registration has been disabled")
@@ -513,42 +500,6 @@ class RegisterRestServlet(RestServlet):
         return (yield self._create_registration_details(user_id, body))
 
     @defer.inlineCallbacks
-    def _do_shared_secret_registration(self, username, password, body):
-        if not self.hs.config.registration_shared_secret:
-            raise SynapseError(400, "Shared secret registration is not enabled")
-        if not username:
-            raise SynapseError(
-                400, "username must be specified", errcode=Codes.BAD_JSON
-            )
-
-        # use the username from the original request rather than the
-        # downcased one in `username` for the mac calculation
-        user = body["username"].encode("utf-8")
-
-        # str() because otherwise hmac complains that 'unicode' does not
-        # have the buffer interface
-        got_mac = str(body["mac"])
-
-        # FIXME this is different to the /v1/register endpoint, which
-        # includes the password and admin flag in the hashed text. Why are
-        # these different?
-        want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret.encode(),
-            msg=user,
-            digestmod=sha1,
-        ).hexdigest()
-
-        if not compare_digest(want_mac, got_mac):
-            raise SynapseError(403, "HMAC incorrect")
-
-        user_id = yield self.registration_handler.register_user(
-            localpart=username, password=password
-        )
-
-        result = yield self._create_registration_details(user_id, body)
-        return result
-
-    @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
         """Complete registration of newly-registered user
 
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index 5e8fda4b65..20177b44e7 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -34,7 +34,7 @@ class WellKnownBuilder(object):
         self._config = hs.config
 
     def get_well_known(self):
-        # if we don't have a public_base_url, we can't help much here.
+        # if we don't have a public_baseurl, we can't help much here.
         if self._config.public_baseurl is None:
             return None
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8f72d92895..e11881161d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,11 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    trace,
+    whitelisted_homeserver,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import Cache, SQLBaseStore, db_to_json
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return {d["device_id"]: d for d in devices}
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_remote(self, destination, from_stream_id, limit):
         """Get stream of updates to send to remote servers
@@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
         # (user_id, device_id) entries into a map, with the value being
         # the max stream_id across each set of duplicate entries
         #
-        # maps (user_id, device_id) -> stream_id
+        # maps (user_id, device_id) -> (stream_id, opentracing_context)
         # as long as their stream_id does not match that of the last row
+        #
+        # opentracing_context contains the opentracing metadata for the request
+        # that created the poke
+        #
+        # The most recent request's opentracing_context is used as the
+        # context which created the Edu.
+
         query_map = {}
         for update in updates:
             if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
                 break
 
             key = (update[0], update[1])
-            query_map[key] = max(query_map.get(key, 0), update[2])
+
+            update_context = update[3]
+            update_stream_id = update[2]
+
+            previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+            if update_stream_id > previous_update_stream_id:
+                query_map[key] = (update_stream_id, update_context)
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
             List: List of device updates
         """
         sql = """
-            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+            SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             ORDER BY stream_id
             LIMIT ?
@@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
         Args:
             destination (str): The host the device updates are intended for
             from_stream_id (int): The minimum stream_id to filter updates by, exclusive
-            query_map (Dict[(str, str): int]): Dictionary mapping
-                user_id/device_id to update stream_id
+            query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+                user_id/device_id to update stream_id and the relevent json-encoded
+                opentracing context
 
         Returns:
             List[Dict]: List of objects representing an device update EDU
@@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
                 destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
-                stream_id = query_map[(user_id, device_id)]
+                stream_id, opentracing_context = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
                     "device_id": device_id,
                     "prev_id": [prev_id] if prev_id else [],
                     "stream_id": stream_id,
+                    "org.matrix.opentracing_context": opentracing_context,
                 }
 
                 prev_id = stream_id
@@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             ],
         )
 
+        context = get_active_span_text_map()
+
         self._simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
@@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
+                    "opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
                 }
                 for destination in hosts
                 for device_id in device_ids
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index b1901404af..be2fe2bab6 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,6 +18,7 @@ import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.opentracing import log_kv, trace
 
 from ._base import SQLBaseStore
 
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             },
             lock=False,
         )
+        log_kv(
+            {
+                "message": "Set room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "room_key": room_key,
+            }
+        )
 
+    @trace
     @defer.inlineCallbacks
     def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    @trace
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
         )
 
+    @trace
     def create_e2e_room_keys_version(self, user_id, info):
         """Atomically creates a new version of this user's e2e_room_keys store
         with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
         )
 
+    @trace
     def update_e2e_room_keys_version(self, user_id, version, info):
         """Update a given backup version
 
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             desc="update_e2e_room_keys_version",
         )
 
+    @trace
     def delete_e2e_room_keys_version(self, user_id, version=None):
         """Delete a given backup version of the user's room keys.
         Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..33e3a84933 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
 
 from twisted.internet import defer
 
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
+    @trace
     @defer.inlineCallbacks
     def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
         """
+        set_tag("query_list", query_list)
         if not query_list:
             return {}
 
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return results
 
+    @trace
     def _get_e2e_device_keys_txn(
         self, txn, query_list, include_all_devices=False, include_deleted_devices=False
     ):
+        set_tag("include_all_devices", include_all_devices)
+        set_tag("include_deleted_devices", include_deleted_devices)
+
         query_clauses = []
         query_params = []
 
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for user_id, device_id in deleted_devices:
                 result.setdefault(user_id, {})[device_id] = None
 
+        log_kv(result)
         return result
 
     @defer.inlineCallbacks
@@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             keyvalues={"user_id": user_id, "device_id": device_id},
             desc="add_e2e_one_time_keys_check",
         )
-
-        return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+        result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+        log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
+        return result
 
     @defer.inlineCallbacks
     def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         """
 
         def _add_e2e_one_time_keys(txn):
+            set_tag("user_id", user_id)
+            set_tag("device_id", device_id)
+            set_tag("new_keys", new_keys)
             # We are protected from race between lookup and insertion due to
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         """
 
         def _set_e2e_device_keys_txn(txn):
+            set_tag("user_id", user_id)
+            set_tag("device_id", device_id)
+            set_tag("time_now", time_now)
+            set_tag("device_keys", device_keys)
+
             old_key_json = self._simple_select_one_onecol_txn(
                 txn,
                 table="e2e_device_keys_json",
@@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             new_key_json = encode_canonical_json(device_keys).decode("utf-8")
 
             if old_key_json == new_key_json:
+                log_kv({"Message": "Device key already stored."})
                 return False
 
             self._simple_upsert_txn(
@@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 keyvalues={"user_id": user_id, "device_id": device_id},
                 values={"ts_added_ms": time_now, "key_json": new_key_json},
             )
-
+            log_kv({"message": "Device keys stored."})
             return True
 
         return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def claim_e2e_one_time_keys(self, query_list):
         """Take a list of one time keys out of the database"""
 
+        @trace
         def _claim_e2e_one_time_keys(txn):
             sql = (
                 "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 " AND key_id = ?"
             )
             for user_id, device_id, algorithm, key_id in delete:
+                log_kv(
+                    {
+                        "message": "Executing claim e2e_one_time_keys transaction on database."
+                    }
+                )
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
+                log_kv({"message": "finished executing and invalidating cache"})
                 self._invalidate_cache_and_stream(
                     txn, self.count_e2e_one_time_keys, (user_id, device_id)
                 )
@@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
 
     def delete_e2e_keys_by_device(self, user_id, device_id):
         def delete_e2e_keys_by_device_txn(txn):
+            log_kv(
+                {
+                    "message": "Deleting keys for device",
+                    "device_id": device_id,
+                    "user_id": user_id,
+                }
+            )
             self._simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ac876287fc..5a95c36a8b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1302,15 +1302,11 @@ class EventsStore(
             "event_reference_hashes",
             "event_search",
             "event_to_state_groups",
-            "guest_access",
-            "history_visibility",
             "local_invites",
-            "room_names",
             "state_events",
             "rejections",
             "redactions",
             "room_memberships",
-            "topics",
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1454,10 +1450,10 @@ class EventsStore(
 
         for event, _ in events_and_contexts:
             if event.type == EventTypes.Name:
-                # Insert into the room_names and event_search tables.
+                # Insert into the event_search table.
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
-                # Insert into the topics table and event_search table.
+                # Insert into the event_search table.
                 self._store_room_topic_txn(txn, event)
             elif event.type == EventTypes.Message:
                 # Insert into the event_search table.
@@ -1465,12 +1461,6 @@ class EventsStore(
             elif event.type == EventTypes.Redaction:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
-            elif event.type == EventTypes.RoomHistoryVisibility:
-                # Insert into the event_search table.
-                self._store_history_visibility_txn(txn, event)
-            elif event.type == EventTypes.GuestAccess:
-                # Insert into the event_search table.
-                self._store_guest_access_txn(txn, event)
 
             self._handle_event_relations(txn, event)
 
@@ -2191,6 +2181,143 @@ class EventsStore(
 
         return to_delete, to_dedelta
 
+    def purge_room(self, room_id):
+        """Deletes all record of a room
+
+        Args:
+            room_id (str):
+        """
+
+        return self.runInteraction("purge_room", self._purge_room_txn, room_id)
+
+    def _purge_room_txn(self, txn, room_id):
+        # first we have to delete the state groups states
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_groups_state WHERE state_group IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # ... and the state group edges
+        logger.info("[purge] removing %s from state_group_edges", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_group_edges WHERE state_group IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # ... and the state groups
+        logger.info("[purge] removing %s from state_groups", room_id)
+
+        txn.execute(
+            """
+            DELETE FROM state_groups WHERE id IN (
+              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+              WHERE events.room_id=?
+            )
+            """,
+            (room_id,),
+        )
+
+        # and then tables which lack an index on room_id but have one on event_id
+        for table in (
+            "event_auth",
+            "event_edges",
+            "event_push_actions_staging",
+            "event_reference_hashes",
+            "event_relations",
+            "event_to_state_groups",
+            "redactions",
+            "rejections",
+            "state_events",
+        ):
+            logger.info("[purge] removing %s from %s", room_id, table)
+
+            txn.execute(
+                """
+                DELETE FROM %s WHERE event_id IN (
+                  SELECT event_id FROM events WHERE room_id=?
+                )
+                """
+                % (table,),
+                (room_id,),
+            )
+
+        # and finally, the tables with an index on room_id (or no useful index)
+        for table in (
+            "current_state_events",
+            "event_backward_extremities",
+            "event_forward_extremities",
+            "event_json",
+            "event_push_actions",
+            "event_search",
+            "events",
+            "group_rooms",
+            "public_room_list_stream",
+            "receipts_graph",
+            "receipts_linearized",
+            "room_aliases",
+            "room_depth",
+            "room_memberships",
+            "room_state",
+            "room_stats",
+            "room_stats_earliest_token",
+            "rooms",
+            "stream_ordering_to_exterm",
+            "topics",
+            "users_in_public_rooms",
+            "users_who_share_private_rooms",
+            # no useful index, but let's clear them anyway
+            "appservice_room_list",
+            "e2e_room_keys",
+            "event_push_summary",
+            "pusher_throttle",
+            "group_summary_rooms",
+            "local_invites",
+            "room_account_data",
+            "room_tags",
+        ):
+            logger.info("[purge] removing %s from %s", room_id, table)
+            txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
+
+        # Other tables we do NOT need to clear out:
+        #
+        #  - blocked_rooms
+        #    This is important, to make sure that we don't accidentally rejoin a blocked
+        #    room after it was purged
+        #
+        #  - user_directory
+        #    This has a room_id column, but it is unused
+        #
+
+        # Other tables that we might want to consider clearing out include:
+        #
+        #  - event_reports
+        #       Given that these are intended for abuse management my initial
+        #       inclination is to leave them in place.
+        #
+        #  - current_state_delta_stream
+        #  - ex_outlier_stream
+        #  - room_tags_revisions
+        #       The problem with these is that they are largeish and there is no room_id
+        #       index on them. In any case we should be clearing out 'stream' tables
+        #       periodically anyway (#5888)
+
+        # TODO: we could probably usefully do a bunch of cache invalidation here
+
+        logger.info("[purge] done")
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 55e4e84d71..9027b917c1 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -272,6 +272,14 @@ class RegistrationWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def is_server_admin(self, user):
+        """Determines if a user is an admin of this homeserver.
+
+        Args:
+            user (UserID): user ID of the user to test
+
+        Returns (bool):
+            true iff the user is a server admin, false otherwise.
+        """
         res = yield self._simple_select_one_onecol(
             table="users",
             keyvalues={"name": user.to_string()},
@@ -282,6 +290,21 @@ class RegistrationWorkerStore(SQLBaseStore):
 
         return res if res else False
 
+    def set_server_admin(self, user, admin):
+        """Sets whether a user is an admin of this homeserver.
+
+        Args:
+            user (UserID): user ID of the user to test
+            admin (bool): true iff the user is to be a server admin,
+                false otherwise.
+        """
+        return self._simple_update_one(
+            table="users",
+            keyvalues={"name": user.to_string()},
+            updatevalues={"admin": 1 if admin else 0},
+            desc="set_server_admin",
+        )
+
     def _query_for_auth(self, txn, token):
         sql = (
             "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index bc606292b8..08e13f3a3b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
     def _store_room_topic_txn(self, txn, event):
         if hasattr(event, "content") and "topic" in event.content:
-            self._simple_insert_txn(
-                txn,
-                "topics",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "topic": event.content["topic"],
-                },
-            )
-
             self.store_event_search_txn(
                 txn, event, "content.topic", event.content["topic"]
             )
 
     def _store_room_name_txn(self, txn, event):
         if hasattr(event, "content") and "name" in event.content:
-            self._simple_insert_txn(
-                txn,
-                "room_names",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "name": event.content["name"],
-                },
-            )
-
             self.store_event_search_txn(
                 txn, event, "content.name", event.content["name"]
             )
@@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 txn, event, "content.body", event.content["body"]
             )
 
-    def _store_history_visibility_txn(self, txn, event):
-        self._store_content_index_txn(txn, event, "history_visibility")
-
-    def _store_guest_access_txn(self, txn, event):
-        self._store_content_index_txn(txn, event, "guest_access")
-
-    def _store_content_index_txn(self, txn, event, key):
-        if hasattr(event, "content") and key in event.content:
-            sql = (
-                "INSERT INTO %(key)s"
-                " (event_id, room_id, %(key)s)"
-                " VALUES (?, ?, ?)" % {"key": key}
-            )
-            txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
-
     def add_event_report(
         self, room_id, event_id, user_id, reason, content, received_ts
     ):
diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..41807eb1e7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opentracing context data for inclusion in the device_list_update EDUs, as a
+ * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
+ */
+ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;
diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
new file mode 100644
index 0000000000..9f09922c67
--- /dev/null
+++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- these tables are never used.
+DROP TABLE IF EXISTS room_names;
+DROP TABLE IF EXISTS topics;
+DROP TABLE IF EXISTS history_visibility;
+DROP TABLE IF EXISTS guest_access;
diff --git a/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
new file mode 100644
index 0000000000..149f8be8b6
--- /dev/null
+++ b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this was apparently forgotten when the table was created back in delta 53.
+CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);