summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py15
-rw-r--r--synapse/app/_base.py3
-rw-r--r--synapse/app/appservice.py3
-rw-r--r--synapse/app/client_reader.py3
-rw-r--r--synapse/app/event_creator.py3
-rw-r--r--synapse/app/federation_reader.py3
-rw-r--r--synapse/app/federation_sender.py3
-rw-r--r--synapse/app/frontend_proxy.py3
-rwxr-xr-xsynapse/app/homeserver.py3
-rw-r--r--synapse/app/media_repository.py3
-rw-r--r--synapse/app/pusher.py3
-rw-r--r--synapse/app/synchrotron.py3
-rw-r--r--synapse/app/user_dir.py3
-rw-r--r--synapse/config/logger.py81
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/config/tracer.py63
-rw-r--r--synapse/config/workers.py6
-rw-r--r--synapse/events/__init__.py11
-rw-r--r--synapse/events/utils.py16
-rw-r--r--synapse/federation/transport/server.py430
-rw-r--r--synapse/handlers/message.py33
-rw-r--r--synapse/logging/opentracing.py45
-rw-r--r--synapse/logging/scopecontextmanager.py4
-rw-r--r--synapse/metrics/__init__.py17
-rw-r--r--synapse/metrics/_exposition.py258
-rw-r--r--synapse/metrics/resource.py20
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/rest/client/v2_alpha/relations.py75
-rw-r--r--synapse/static/index.html4
-rw-r--r--synapse/storage/events_worker.py376
30 files changed, 877 insertions, 619 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index d9e943c39c..7ce6540bdd 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -606,21 +606,6 @@ class Auth(object):
 
         defer.returnValue(auth_ids)
 
-    def check_redaction(self, room_version, event, auth_events):
-        """Check whether the event sender is allowed to redact the target event.
-
-        Returns:
-            True if the the sender is allowed to redact the target event if the
-                target event was created by them.
-            False if the sender is allowed to redact the target event with no
-                further checks.
-
-        Raises:
-            AuthError if the event sender is definitely not allowed to redact
-                the target event.
-        """
-        return event_auth.check_redaction(room_version, event, auth_events)
-
     @defer.inlineCallbacks
     def check_can_change_room_list(self, room_id, user):
         """Check if the user is allowed to edit the room's entry in the
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 807f320b46..540dbd9236 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -149,8 +149,7 @@ def listen_metrics(bind_addresses, port):
     """
     Start Prometheus metrics server.
     """
-    from synapse.metrics import RegistryProxy
-    from prometheus_client import start_http_server
+    from synapse.metrics import RegistryProxy, start_http_server
 
     for host in bind_addresses:
         logger.info("Starting metrics listener on %s:%d", host, port)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index be44249ed6..e01f3e5f3b 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ff11beca82..29bddc4823 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index cacad25eac..042cfd04af 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 11e80dbae0..76a97f8f32 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 97da7bdcbf..fec49d5092 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,9 +28,8 @@ from synapse.config.logger import setup_logging
 from synapse.federation import send_queue
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 417a10bbd2..1f1f1df78e 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -30,8 +30,7 @@ from synapse.http.server import JsonResource
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 639b1429c0..0c075cb3f1 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -55,9 +55,8 @@ from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import RootRedirect
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.module_api import ModuleApi
 from synapse.python_dependencies import check_requirements
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f23b9b6eda..d70780e9d5 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -28,8 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4f929edf86..070de7d0b0 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index de4797fddc..315c030694 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -32,8 +32,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
 from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 1177ddd72e..03ef21bd01 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 40502a5798..d321d00b80 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import logging
 import logging.config
 import os
@@ -75,10 +76,8 @@ root:
 
 class LoggingConfig(Config):
     def read_config(self, config, **kwargs):
-        self.verbosity = config.get("verbose", 0)
-        self.no_redirect_stdio = config.get("no_redirect_stdio", False)
         self.log_config = self.abspath(config.get("log_config"))
-        self.log_file = self.abspath(config.get("log_file"))
+        self.no_redirect_stdio = config.get("no_redirect_stdio", False)
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         log_config = os.path.join(config_dir_path, server_name + ".log.config")
@@ -94,39 +93,13 @@ class LoggingConfig(Config):
         )
 
     def read_arguments(self, args):
-        if args.verbose is not None:
-            self.verbosity = args.verbose
         if args.no_redirect_stdio is not None:
             self.no_redirect_stdio = args.no_redirect_stdio
-        if args.log_config is not None:
-            self.log_config = args.log_config
-        if args.log_file is not None:
-            self.log_file = args.log_file
 
     @staticmethod
     def add_arguments(parser):
         logging_group = parser.add_argument_group("logging")
         logging_group.add_argument(
-            "-v",
-            "--verbose",
-            dest="verbose",
-            action="count",
-            help="The verbosity level. Specify multiple times to increase "
-            "verbosity. (Ignored if --log-config is specified.)",
-        )
-        logging_group.add_argument(
-            "-f",
-            "--log-file",
-            dest="log_file",
-            help="File to log to. (Ignored if --log-config is specified.)",
-        )
-        logging_group.add_argument(
-            "--log-config",
-            dest="log_config",
-            default=None,
-            help="Python logging config file",
-        )
-        logging_group.add_argument(
             "-n",
             "--no-redirect-stdio",
             action="store_true",
@@ -153,58 +126,29 @@ def setup_logging(config, use_worker_options=False):
         config (LoggingConfig | synapse.config.workers.WorkerConfig):
             configuration data
 
-        use_worker_options (bool): True to use 'worker_log_config' and
-            'worker_log_file' options instead of 'log_config' and 'log_file'.
+        use_worker_options (bool): True to use the 'worker_log_config' option
+            instead of 'log_config'.
 
         register_sighup (func | None): Function to call to register a
             sighup handler.
     """
     log_config = config.worker_log_config if use_worker_options else config.log_config
-    log_file = config.worker_log_file if use_worker_options else config.log_file
-
-    log_format = (
-        "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
-        " - %(message)s"
-    )
 
     if log_config is None:
-        # We don't have a logfile, so fall back to the 'verbosity' param from
-        # the config or cmdline. (Note that we generate a log config for new
-        # installs, so this will be an unusual case)
-        level = logging.INFO
-        level_for_storage = logging.INFO
-        if config.verbosity:
-            level = logging.DEBUG
-            if config.verbosity > 1:
-                level_for_storage = logging.DEBUG
+        log_format = (
+            "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
+            " - %(message)s"
+        )
 
         logger = logging.getLogger("")
-        logger.setLevel(level)
-
-        logging.getLogger("synapse.storage.SQL").setLevel(level_for_storage)
+        logger.setLevel(logging.INFO)
+        logging.getLogger("synapse.storage.SQL").setLevel(logging.INFO)
 
         formatter = logging.Formatter(log_format)
-        if log_file:
-            # TODO: Customisable file size / backup count
-            handler = logging.handlers.RotatingFileHandler(
-                log_file, maxBytes=(1000 * 1000 * 100), backupCount=3, encoding="utf8"
-            )
-
-            def sighup(signum, stack):
-                logger.info("Closing log file due to SIGHUP")
-                handler.doRollover()
-                logger.info("Opened new log file due to SIGHUP")
-
-        else:
-            handler = logging.StreamHandler()
-
-            def sighup(*args):
-                pass
 
+        handler = logging.StreamHandler()
         handler.setFormatter(formatter)
-
         handler.addFilter(LoggingContextFilter(request=""))
-
         logger.addHandler(handler)
     else:
 
@@ -218,8 +162,7 @@ def setup_logging(config, use_worker_options=False):
             logging.info("Reloaded log config from %s due to SIGHUP", log_config)
 
         load_log_config()
-
-    appbase.register_sighup(sighup)
+        appbase.register_sighup(sighup)
 
     # make sure that the first thing we log is a thing we can grep backwards
     # for
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 080d0630bd..00170f1393 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -136,7 +136,7 @@ class ServerConfig(Config):
 
         # Whether to enable experimental MSC1849 (aka relations) support
         self.experimental_msc1849_support_enabled = config.get(
-            "experimental_msc1849_support_enabled", False
+            "experimental_msc1849_support_enabled", True
         )
 
         # Options to control access by tracking MAU
diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py
index 63a637984a..a2ce9ab3f6 100644
--- a/synapse/config/tracer.py
+++ b/synapse/config/tracer.py
@@ -18,33 +18,52 @@ from ._base import Config, ConfigError
 
 class TracerConfig(Config):
     def read_config(self, config, **kwargs):
-        self.tracer_config = config.get("opentracing")
+        opentracing_config = config.get("opentracing")
+        if opentracing_config is None:
+            opentracing_config = {}
 
-        self.tracer_config = config.get("opentracing", {"tracer_enabled": False})
+        self.opentracer_enabled = opentracing_config.get("enabled", False)
+        if not self.opentracer_enabled:
+            return
 
-        if self.tracer_config.get("tracer_enabled", False):
-            # The tracer is enabled so sanitize the config
-            # If no whitelists are given
-            self.tracer_config.setdefault("homeserver_whitelist", [])
+        # The tracer is enabled so sanitize the config
 
-            if not isinstance(self.tracer_config.get("homeserver_whitelist"), list):
-                raise ConfigError("Tracer homesererver_whitelist config is malformed")
+        self.opentracer_whitelist = opentracing_config.get("homeserver_whitelist", [])
+        if not isinstance(self.opentracer_whitelist, list):
+            raise ConfigError("Tracer homeserver_whitelist config is malformed")
 
     def generate_config_section(cls, **kwargs):
         return """\
         ## Opentracing ##
-        # These settings enable opentracing which implements distributed tracing
-        # This allows you to observe the causal chain of events across servers
-        # including requests, key lookups etc. across any server running
-        # synapse or any other other services which supports opentracing.
-        # (specifically those implemented with jaeger)
-
-        #opentracing:
-        #  # Enable / disable tracer
-        #  tracer_enabled: false
-        #  # The list of homeservers we wish to expose our current traces to.
-        #  # The list is a list of regexes which are matched against the
-        #  # servername of the homeserver
-        #  homeserver_whitelist:
-        #    - ".*"
+
+        # These settings enable opentracing, which implements distributed tracing.
+        # This allows you to observe the causal chains of events across servers
+        # including requests, key lookups etc., across any server running
+        # synapse or any other other services which supports opentracing
+        # (specifically those implemented with Jaeger).
+        #
+        opentracing:
+            # tracing is disabled by default. Uncomment the following line to enable it.
+            #
+            #enabled: true
+
+            # The list of homeservers we wish to send and receive span contexts and span baggage.
+            #
+            # Though it's mostly safe to send and receive span contexts to and from
+            # untrusted users since span contexts are usually opaque ids it can lead to
+            # two problems, namely:
+            # - If the span context is marked as sampled by the sending homeserver the receiver will
+            # sample it. Therefore two homeservers with wildly disparaging sampling policies
+            # could incur higher sampling counts than intended.
+            # - Span baggage can be arbitrary data. For safety this has been disabled in synapse
+            # but that doesn't prevent another server sending you baggage which will be logged
+            # to opentracing logs.
+            #
+            # This a list of regexes which are matched against the server_name of the
+            # homeserver.
+            #
+            # By defult, it is empty, so no servers are matched.
+            #
+            #homeserver_whitelist:
+            #  - ".*"
         """
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 3b75471d85..246d72cd61 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -31,8 +31,6 @@ class WorkerConfig(Config):
         self.worker_listeners = config.get("worker_listeners", [])
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
-        self.worker_log_file = config.get("worker_log_file")
-        self.worker_log_config = config.get("worker_log_config")
 
         # The host used to connect to the main synapse
         self.worker_replication_host = config.get("worker_replication_host", None)
@@ -78,9 +76,5 @@ class WorkerConfig(Config):
 
         if args.daemonize is not None:
             self.worker_daemonize = args.daemonize
-        if args.log_config is not None:
-            self.worker_log_config = args.log_config
-        if args.log_file is not None:
-            self.worker_log_file = args.log_file
         if args.manhole is not None:
             self.worker_manhole = args.worker_manhole
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index d3de70e671..88ed6d764f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -104,6 +104,17 @@ class _EventInternalMetadata(object):
         """
         return getattr(self, "proactively_send", True)
 
+    def is_redacted(self):
+        """Whether the event has been redacted.
+
+        This is used for efficiently checking whether an event has been
+        marked as redacted without needing to make another database call.
+
+        Returns:
+            bool
+        """
+        return getattr(self, "redacted", False)
+
 
 def _event_dict_property(key):
     # We want to be able to use hasattr with the event dict properties.
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 987de5cab7..9487a886f5 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -52,10 +52,15 @@ def prune_event(event):
 
     from . import event_type_from_format_version
 
-    return event_type_from_format_version(event.format_version)(
+    pruned_event = event_type_from_format_version(event.format_version)(
         pruned_event_dict, event.internal_metadata.get_dict()
     )
 
+    # Mark the event as redacted
+    pruned_event.internal_metadata.redacted = True
+
+    return pruned_event
+
 
 def prune_event_dict(event_dict):
     """Redacts the event_dict in the same way as `prune_event`, except it
@@ -360,9 +365,12 @@ class EventClientSerializer(object):
         event_id = event.event_id
         serialized_event = serialize_event(event, time_now, **kwargs)
 
-        # If MSC1849 is enabled then we need to look if thre are any relations
-        # we need to bundle in with the event
-        if self.experimental_msc1849_support_enabled and bundle_aggregations:
+        # If MSC1849 is enabled then we need to look if there are any relations
+        # we need to bundle in with the event.
+        # Do not bundle relations if the event has been redacted
+        if not event.internal_metadata.is_redacted() and (
+            self.experimental_msc1849_support_enabled and bundle_aggregations
+        ):
             annotations = yield self.store.get_aggregation_groups_for_event(event_id)
             references = yield self.store.get_relations_for_event(
                 event_id, RelationTypes.REFERENCE, direction="f"
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c45d458d94..663264dec4 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,8 +19,6 @@ import functools
 import logging
 import re
 
-from twisted.internet import defer
-
 import synapse
 import synapse.logging.opentracing as opentracing
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
@@ -103,8 +102,7 @@ class Authenticator(object):
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     # A method just so we can pass 'self' as the authenticator to the Servlets
-    @defer.inlineCallbacks
-    def authenticate_request(self, request, content):
+    async def authenticate_request(self, request, content):
         now = self._clock.time_msec()
         json_request = {
             "method": request.method.decode("ascii"),
@@ -142,7 +140,7 @@ class Authenticator(object):
                 401, "Missing Authorization headers", Codes.UNAUTHORIZED
             )
 
-        yield self.keyring.verify_json_for_server(
+        await self.keyring.verify_json_for_server(
             origin, json_request, now, "Incoming request"
         )
 
@@ -151,17 +149,16 @@ class Authenticator(object):
 
         # If we get a valid signed request from the other side, its probably
         # alive
-        retry_timings = yield self.store.get_destination_retry_timings(origin)
+        retry_timings = await self.store.get_destination_retry_timings(origin)
         if retry_timings and retry_timings["retry_last_ts"]:
             run_in_background(self._reset_retry_timings, origin)
 
-        defer.returnValue(origin)
+        return origin
 
-    @defer.inlineCallbacks
-    def _reset_retry_timings(self, origin):
+    async def _reset_retry_timings(self, origin):
         try:
             logger.info("Marking origin %r as up", origin)
-            yield self.store.set_destination_retry_timings(origin, 0, 0)
+            await self.store.set_destination_retry_timings(origin, 0, 0)
         except Exception:
             logger.exception("Error resetting retry timings on %s", origin)
 
@@ -215,7 +212,8 @@ class BaseFederationServlet(object):
     match against the request path (excluding the /federation/v1 prefix).
 
     The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
-    the appropriate HTTP method. These methods have the signature:
+    the appropriate HTTP method. These methods must be *asynchronous* and have the
+    signature:
 
         on_<METHOD>(self, origin, content, query, **kwargs)
 
@@ -235,7 +233,7 @@ class BaseFederationServlet(object):
                 components as specified in the path match regexp.
 
         Returns:
-            Deferred[(int, object)|None]: either (response code, response object) to
+            Optional[Tuple[int, object]]: either (response code, response object) to
                  return a JSON response, or None if the request has already been handled.
 
         Raises:
@@ -258,10 +256,9 @@ class BaseFederationServlet(object):
         authenticator = self.authenticator
         ratelimiter = self.ratelimiter
 
-        @defer.inlineCallbacks
         @functools.wraps(func)
-        def new_func(request, *args, **kwargs):
-            """ A callback which can be passed to HttpServer.RegisterPaths
+        async def new_func(request, *args, **kwargs):
+            """A callback which can be passed to HttpServer.RegisterPaths
 
             Args:
                 request (twisted.web.http.Request):
@@ -270,8 +267,8 @@ class BaseFederationServlet(object):
                     components as specified in the path match regexp.
 
             Returns:
-                Deferred[(int, object)|None]: (response code, response object) as returned
-                    by the callback method. None if the request has already been handled.
+                Tuple[int, object]|None: (response code, response object) as returned by
+                    the callback method. None if the request has already been handled.
             """
             content = None
             if request.method in [b"PUT", b"POST"]:
@@ -279,7 +276,7 @@ class BaseFederationServlet(object):
                 content = parse_json_object_from_request(request)
 
             try:
-                origin = yield authenticator.authenticate_request(request, content)
+                origin = await authenticator.authenticate_request(request, content)
             except NoAuthenticationError:
                 origin = None
                 if self.REQUIRE_AUTH:
@@ -304,16 +301,16 @@ class BaseFederationServlet(object):
             ):
                 if origin:
                     with ratelimiter.ratelimit(origin) as d:
-                        yield d
-                        response = yield func(
+                        await d
+                        response = await func(
                             origin, content, request.args, *args, **kwargs
                         )
                 else:
-                    response = yield func(
+                    response = await func(
                         origin, content, request.args, *args, **kwargs
                     )
 
-            defer.returnValue(response)
+            return response
 
         # Extra logic that functools.wraps() doesn't finish
         new_func.__self__ = func.__self__
@@ -341,8 +338,7 @@ class FederationSendServlet(BaseFederationServlet):
         self.server_name = server_name
 
     # This is when someone is trying to send us a bunch of data.
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, transaction_id):
+    async def on_PUT(self, origin, content, query, transaction_id):
         """ Called on PUT /send/<transaction_id>/
 
         Args:
@@ -351,7 +347,7 @@ class FederationSendServlet(BaseFederationServlet):
                 request. This is *not* None.
 
         Returns:
-            Deferred: Results in a tuple of `(code, response)`, where
+            Tuple of `(code, response)`, where
             `response` is a python dict to be converted into JSON that is
             used as the response body.
         """
@@ -380,34 +376,33 @@ class FederationSendServlet(BaseFederationServlet):
 
         except Exception as e:
             logger.exception(e)
-            defer.returnValue((400, {"error": "Invalid transaction"}))
-            return
+            return 400, {"error": "Invalid transaction"}
 
         try:
-            code, response = yield self.handler.on_incoming_transaction(
+            code, response = await self.handler.on_incoming_transaction(
                 origin, transaction_data
             )
         except Exception:
             logger.exception("on_incoming_transaction failed")
             raise
 
-        defer.returnValue((code, response))
+        return code, response
 
 
 class FederationEventServlet(BaseFederationServlet):
     PATH = "/event/(?P<event_id>[^/]*)/?"
 
     # This is when someone asks for a data item for a given server data_id pair.
-    def on_GET(self, origin, content, query, event_id):
-        return self.handler.on_pdu_request(origin, event_id)
+    async def on_GET(self, origin, content, query, event_id):
+        return await self.handler.on_pdu_request(origin, event_id)
 
 
 class FederationStateServlet(BaseFederationServlet):
     PATH = "/state/(?P<context>[^/]*)/?"
 
     # This is when someone asks for all data for a given context.
-    def on_GET(self, origin, content, query, context):
-        return self.handler.on_context_state_request(
+    async def on_GET(self, origin, content, query, context):
+        return await self.handler.on_context_state_request(
             origin,
             context,
             parse_string_from_args(query, "event_id", None, required=True),
@@ -417,8 +412,8 @@ class FederationStateServlet(BaseFederationServlet):
 class FederationStateIdsServlet(BaseFederationServlet):
     PATH = "/state_ids/(?P<room_id>[^/]*)/?"
 
-    def on_GET(self, origin, content, query, room_id):
-        return self.handler.on_state_ids_request(
+    async def on_GET(self, origin, content, query, room_id):
+        return await self.handler.on_state_ids_request(
             origin,
             room_id,
             parse_string_from_args(query, "event_id", None, required=True),
@@ -428,22 +423,22 @@ class FederationStateIdsServlet(BaseFederationServlet):
 class FederationBackfillServlet(BaseFederationServlet):
     PATH = "/backfill/(?P<context>[^/]*)/?"
 
-    def on_GET(self, origin, content, query, context):
+    async def on_GET(self, origin, content, query, context):
         versions = [x.decode("ascii") for x in query[b"v"]]
         limit = parse_integer_from_args(query, "limit", None)
 
         if not limit:
-            return defer.succeed((400, {"error": "Did not include limit param"}))
+            return 400, {"error": "Did not include limit param"}
 
-        return self.handler.on_backfill_request(origin, context, versions, limit)
+        return await self.handler.on_backfill_request(origin, context, versions, limit)
 
 
 class FederationQueryServlet(BaseFederationServlet):
     PATH = "/query/(?P<query_type>[^/]*)"
 
     # This is when we receive a server-server Query
-    def on_GET(self, origin, content, query, query_type):
-        return self.handler.on_query_request(
+    async def on_GET(self, origin, content, query, query_type):
+        return await self.handler.on_query_request(
             query_type,
             {k.decode("utf8"): v[0].decode("utf-8") for k, v in query.items()},
         )
@@ -452,8 +447,7 @@ class FederationQueryServlet(BaseFederationServlet):
 class FederationMakeJoinServlet(BaseFederationServlet):
     PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, _content, query, context, user_id):
+    async def on_GET(self, origin, _content, query, context, user_id):
         """
         Args:
             origin (unicode): The authenticated server_name of the calling server
@@ -466,8 +460,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
                 components as specified in the path match regexp.
 
         Returns:
-            Deferred[(int, object)|None]: either (response code, response object) to
-                 return a JSON response, or None if the request has already been handled.
+            Tuple[int, object]: (response code, response object)
         """
         versions = query.get(b"ver")
         if versions is not None:
@@ -475,64 +468,60 @@ class FederationMakeJoinServlet(BaseFederationServlet):
         else:
             supported_versions = ["1"]
 
-        content = yield self.handler.on_make_join_request(
+        content = await self.handler.on_make_join_request(
             origin, context, user_id, supported_versions=supported_versions
         )
-        defer.returnValue((200, content))
+        return 200, content
 
 
 class FederationMakeLeaveServlet(BaseFederationServlet):
     PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, context, user_id):
-        content = yield self.handler.on_make_leave_request(origin, context, user_id)
-        defer.returnValue((200, content))
+    async def on_GET(self, origin, content, query, context, user_id):
+        content = await self.handler.on_make_leave_request(origin, context, user_id)
+        return 200, content
 
 
 class FederationSendLeaveServlet(BaseFederationServlet):
     PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, room_id, event_id):
-        content = yield self.handler.on_send_leave_request(origin, content, room_id)
-        defer.returnValue((200, content))
+    async def on_PUT(self, origin, content, query, room_id, event_id):
+        content = await self.handler.on_send_leave_request(origin, content, room_id)
+        return 200, content
 
 
 class FederationEventAuthServlet(BaseFederationServlet):
     PATH = "/event_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
-    def on_GET(self, origin, content, query, context, event_id):
-        return self.handler.on_event_auth(origin, context, event_id)
+    async def on_GET(self, origin, content, query, context, event_id):
+        return await self.handler.on_event_auth(origin, context, event_id)
 
 
 class FederationSendJoinServlet(BaseFederationServlet):
     PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, context, event_id):
+    async def on_PUT(self, origin, content, query, context, event_id):
         # TODO(paul): assert that context/event_id parsed from path actually
         #   match those given in content
-        content = yield self.handler.on_send_join_request(origin, content, context)
-        defer.returnValue((200, content))
+        content = await self.handler.on_send_join_request(origin, content, context)
+        return 200, content
 
 
 class FederationV1InviteServlet(BaseFederationServlet):
     PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, context, event_id):
+    async def on_PUT(self, origin, content, query, context, event_id):
         # We don't get a room version, so we have to assume its EITHER v1 or
         # v2. This is "fine" as the only difference between V1 and V2 is the
         # state resolution algorithm, and we don't use that for processing
         # invites
-        content = yield self.handler.on_invite_request(
+        content = await self.handler.on_invite_request(
             origin, content, room_version=RoomVersions.V1.identifier
         )
 
         # V1 federation API is defined to return a content of `[200, {...}]`
         # due to a historical bug.
-        defer.returnValue((200, (200, content)))
+        return 200, (200, content)
 
 
 class FederationV2InviteServlet(BaseFederationServlet):
@@ -540,8 +529,7 @@ class FederationV2InviteServlet(BaseFederationServlet):
 
     PREFIX = FEDERATION_V2_PREFIX
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, context, event_id):
+    async def on_PUT(self, origin, content, query, context, event_id):
         # TODO(paul): assert that context/event_id parsed from path actually
         #   match those given in content
 
@@ -554,69 +542,65 @@ class FederationV2InviteServlet(BaseFederationServlet):
 
         event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
 
-        content = yield self.handler.on_invite_request(
+        content = await self.handler.on_invite_request(
             origin, event, room_version=room_version
         )
-        defer.returnValue((200, content))
+        return 200, content
 
 
 class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
     PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, room_id):
-        content = yield self.handler.on_exchange_third_party_invite_request(
+    async def on_PUT(self, origin, content, query, room_id):
+        content = await self.handler.on_exchange_third_party_invite_request(
             origin, room_id, content
         )
-        defer.returnValue((200, content))
+        return 200, content
 
 
 class FederationClientKeysQueryServlet(BaseFederationServlet):
     PATH = "/user/keys/query"
 
-    def on_POST(self, origin, content, query):
-        return self.handler.on_query_client_keys(origin, content)
+    async def on_POST(self, origin, content, query):
+        return await self.handler.on_query_client_keys(origin, content)
 
 
 class FederationUserDevicesQueryServlet(BaseFederationServlet):
     PATH = "/user/devices/(?P<user_id>[^/]*)"
 
-    def on_GET(self, origin, content, query, user_id):
-        return self.handler.on_query_user_devices(origin, user_id)
+    async def on_GET(self, origin, content, query, user_id):
+        return await self.handler.on_query_user_devices(origin, user_id)
 
 
 class FederationClientKeysClaimServlet(BaseFederationServlet):
     PATH = "/user/keys/claim"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query):
-        response = yield self.handler.on_claim_client_keys(origin, content)
-        defer.returnValue((200, response))
+    async def on_POST(self, origin, content, query):
+        response = await self.handler.on_claim_client_keys(origin, content)
+        return 200, response
 
 
 class FederationQueryAuthServlet(BaseFederationServlet):
     PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, context, event_id):
-        new_content = yield self.handler.on_query_auth_request(
+    async def on_POST(self, origin, content, query, context, event_id):
+        new_content = await self.handler.on_query_auth_request(
             origin, content, context, event_id
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGetMissingEventsServlet(BaseFederationServlet):
     # TODO(paul): Why does this path alone end with "/?" optional?
     PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, room_id):
+    async def on_POST(self, origin, content, query, room_id):
         limit = int(content.get("limit", 10))
         earliest_events = content.get("earliest_events", [])
         latest_events = content.get("latest_events", [])
 
-        content = yield self.handler.on_get_missing_events(
+        content = await self.handler.on_get_missing_events(
             origin,
             room_id=room_id,
             earliest_events=earliest_events,
@@ -624,7 +608,7 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
             limit=limit,
         )
 
-        defer.returnValue((200, content))
+        return 200, content
 
 
 class On3pidBindServlet(BaseFederationServlet):
@@ -632,8 +616,7 @@ class On3pidBindServlet(BaseFederationServlet):
 
     REQUIRE_AUTH = False
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query):
+    async def on_POST(self, origin, content, query):
         if "invites" in content:
             last_exception = None
             for invite in content["invites"]:
@@ -645,7 +628,7 @@ class On3pidBindServlet(BaseFederationServlet):
                         )
                         logger.info(message)
                         raise SynapseError(400, message)
-                    yield self.handler.exchange_third_party_invite(
+                    await self.handler.exchange_third_party_invite(
                         invite["sender"],
                         invite["mxid"],
                         invite["room_id"],
@@ -655,7 +638,7 @@ class On3pidBindServlet(BaseFederationServlet):
                     last_exception = e
             if last_exception:
                 raise last_exception
-        defer.returnValue((200, {}))
+        return 200, {}
 
 
 class OpenIdUserInfo(BaseFederationServlet):
@@ -679,29 +662,26 @@ class OpenIdUserInfo(BaseFederationServlet):
 
     REQUIRE_AUTH = False
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query):
+    async def on_GET(self, origin, content, query):
         token = query.get(b"access_token", [None])[0]
         if token is None:
-            defer.returnValue(
-                (401, {"errcode": "M_MISSING_TOKEN", "error": "Access Token required"})
+            return (
+                401,
+                {"errcode": "M_MISSING_TOKEN", "error": "Access Token required"},
             )
-            return
 
-        user_id = yield self.handler.on_openid_userinfo(token.decode("ascii"))
+        user_id = await self.handler.on_openid_userinfo(token.decode("ascii"))
 
         if user_id is None:
-            defer.returnValue(
-                (
-                    401,
-                    {
-                        "errcode": "M_UNKNOWN_TOKEN",
-                        "error": "Access Token unknown or expired",
-                    },
-                )
+            return (
+                401,
+                {
+                    "errcode": "M_UNKNOWN_TOKEN",
+                    "error": "Access Token unknown or expired",
+                },
             )
 
-        defer.returnValue((200, {"sub": user_id}))
+        return 200, {"sub": user_id}
 
 
 class PublicRoomList(BaseFederationServlet):
@@ -743,8 +723,7 @@ class PublicRoomList(BaseFederationServlet):
         )
         self.allow_access = allow_access
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query):
+    async def on_GET(self, origin, content, query):
         if not self.allow_access:
             raise FederationDeniedError(origin)
 
@@ -764,10 +743,10 @@ class PublicRoomList(BaseFederationServlet):
         else:
             network_tuple = ThirdPartyInstanceID(None, None)
 
-        data = yield self.handler.get_local_public_room_list(
+        data = await self.handler.get_local_public_room_list(
             limit, since_token, network_tuple=network_tuple, from_federation=True
         )
-        defer.returnValue((200, data))
+        return 200, data
 
 
 class FederationVersionServlet(BaseFederationServlet):
@@ -775,12 +754,10 @@ class FederationVersionServlet(BaseFederationServlet):
 
     REQUIRE_AUTH = False
 
-    def on_GET(self, origin, content, query):
-        return defer.succeed(
-            (
-                200,
-                {"server": {"name": "Synapse", "version": get_version_string(synapse)}},
-            )
+    async def on_GET(self, origin, content, query):
+        return (
+            200,
+            {"server": {"name": "Synapse", "version": get_version_string(synapse)}},
         )
 
 
@@ -790,41 +767,38 @@ class FederationGroupsProfileServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/profile"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.get_group_profile(group_id, requester_user_id)
+        new_content = await self.handler.get_group_profile(group_id, requester_user_id)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id):
+    async def on_POST(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.update_group_profile(
+        new_content = await self.handler.update_group_profile(
             group_id, requester_user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsSummaryServlet(BaseFederationServlet):
     PATH = "/groups/(?P<group_id>[^/]*)/summary"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.get_group_summary(group_id, requester_user_id)
+        new_content = await self.handler.get_group_summary(group_id, requester_user_id)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsRoomsServlet(BaseFederationServlet):
@@ -833,15 +807,14 @@ class FederationGroupsRoomsServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/rooms"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.get_rooms_in_group(group_id, requester_user_id)
+        new_content = await self.handler.get_rooms_in_group(group_id, requester_user_id)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsAddRoomsServlet(BaseFederationServlet):
@@ -850,29 +823,27 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, room_id):
+    async def on_POST(self, origin, content, query, group_id, room_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.add_room_to_group(
+        new_content = await self.handler.add_room_to_group(
             group_id, requester_user_id, room_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
-    @defer.inlineCallbacks
-    def on_DELETE(self, origin, content, query, group_id, room_id):
+    async def on_DELETE(self, origin, content, query, group_id, room_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.remove_room_from_group(
+        new_content = await self.handler.remove_room_from_group(
             group_id, requester_user_id, room_id
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
@@ -884,17 +855,16 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
         "/config/(?P<config_key>[^/]*)"
     )
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, room_id, config_key):
+    async def on_POST(self, origin, content, query, group_id, room_id, config_key):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        result = yield self.groups_handler.update_room_in_group(
+        result = await self.groups_handler.update_room_in_group(
             group_id, requester_user_id, room_id, config_key, content
         )
 
-        defer.returnValue((200, result))
+        return 200, result
 
 
 class FederationGroupsUsersServlet(BaseFederationServlet):
@@ -903,15 +873,14 @@ class FederationGroupsUsersServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/users"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.get_users_in_group(group_id, requester_user_id)
+        new_content = await self.handler.get_users_in_group(group_id, requester_user_id)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
@@ -920,17 +889,16 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/invited_users"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.get_invited_users_in_group(
+        new_content = await self.handler.get_invited_users_in_group(
             group_id, requester_user_id
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsInviteServlet(BaseFederationServlet):
@@ -939,17 +907,16 @@ class FederationGroupsInviteServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.invite_to_group(
+        new_content = await self.handler.invite_to_group(
             group_id, user_id, requester_user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
@@ -958,14 +925,13 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         if get_domain_from_id(user_id) != origin:
             raise SynapseError(403, "user_id doesn't match origin")
 
-        new_content = yield self.handler.accept_invite(group_id, user_id, content)
+        new_content = await self.handler.accept_invite(group_id, user_id, content)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsJoinServlet(BaseFederationServlet):
@@ -974,14 +940,13 @@ class FederationGroupsJoinServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         if get_domain_from_id(user_id) != origin:
             raise SynapseError(403, "user_id doesn't match origin")
 
-        new_content = yield self.handler.join_group(group_id, user_id, content)
+        new_content = await self.handler.join_group(group_id, user_id, content)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsRemoveUserServlet(BaseFederationServlet):
@@ -990,17 +955,16 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.remove_user_from_group(
+        new_content = await self.handler.remove_user_from_group(
             group_id, user_id, requester_user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsLocalInviteServlet(BaseFederationServlet):
@@ -1009,14 +973,13 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet):
 
     PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         if get_domain_from_id(group_id) != origin:
             raise SynapseError(403, "group_id doesn't match origin")
 
-        new_content = yield self.handler.on_invite(group_id, user_id, content)
+        new_content = await self.handler.on_invite(group_id, user_id, content)
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
@@ -1025,16 +988,15 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
 
     PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         if get_domain_from_id(group_id) != origin:
             raise SynapseError(403, "user_id doesn't match origin")
 
-        new_content = yield self.handler.user_removed_from_group(
+        new_content = await self.handler.user_removed_from_group(
             group_id, user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
@@ -1043,15 +1005,14 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, user_id):
         # We don't need to check auth here as we check the attestation signatures
 
-        new_content = yield self.handler.on_renew_attestation(
+        new_content = await self.handler.on_renew_attestation(
             group_id, user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
@@ -1068,8 +1029,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
         "/rooms/(?P<room_id>[^/]*)"
     )
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, category_id, room_id):
+    async def on_POST(self, origin, content, query, group_id, category_id, room_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1077,7 +1037,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
         if category_id == "":
             raise SynapseError(400, "category_id cannot be empty string")
 
-        resp = yield self.handler.update_group_summary_room(
+        resp = await self.handler.update_group_summary_room(
             group_id,
             requester_user_id,
             room_id=room_id,
@@ -1085,10 +1045,9 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
             content=content,
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_DELETE(self, origin, content, query, group_id, category_id, room_id):
+    async def on_DELETE(self, origin, content, query, group_id, category_id, room_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1096,11 +1055,11 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
         if category_id == "":
             raise SynapseError(400, "category_id cannot be empty string")
 
-        resp = yield self.handler.delete_group_summary_room(
+        resp = await self.handler.delete_group_summary_room(
             group_id, requester_user_id, room_id=room_id, category_id=category_id
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsCategoriesServlet(BaseFederationServlet):
@@ -1109,15 +1068,14 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/categories/?"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        resp = yield self.handler.get_group_categories(group_id, requester_user_id)
+        resp = await self.handler.get_group_categories(group_id, requester_user_id)
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsCategoryServlet(BaseFederationServlet):
@@ -1126,20 +1084,18 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id, category_id):
+    async def on_GET(self, origin, content, query, group_id, category_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        resp = yield self.handler.get_group_category(
+        resp = await self.handler.get_group_category(
             group_id, requester_user_id, category_id
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, category_id):
+    async def on_POST(self, origin, content, query, group_id, category_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1147,14 +1103,13 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
         if category_id == "":
             raise SynapseError(400, "category_id cannot be empty string")
 
-        resp = yield self.handler.upsert_group_category(
+        resp = await self.handler.upsert_group_category(
             group_id, requester_user_id, category_id, content
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_DELETE(self, origin, content, query, group_id, category_id):
+    async def on_DELETE(self, origin, content, query, group_id, category_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1162,11 +1117,11 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
         if category_id == "":
             raise SynapseError(400, "category_id cannot be empty string")
 
-        resp = yield self.handler.delete_group_category(
+        resp = await self.handler.delete_group_category(
             group_id, requester_user_id, category_id
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsRolesServlet(BaseFederationServlet):
@@ -1175,15 +1130,14 @@ class FederationGroupsRolesServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/roles/?"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id):
+    async def on_GET(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        resp = yield self.handler.get_group_roles(group_id, requester_user_id)
+        resp = await self.handler.get_group_roles(group_id, requester_user_id)
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsRoleServlet(BaseFederationServlet):
@@ -1192,18 +1146,16 @@ class FederationGroupsRoleServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)"
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, group_id, role_id):
+    async def on_GET(self, origin, content, query, group_id, role_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        resp = yield self.handler.get_group_role(group_id, requester_user_id, role_id)
+        resp = await self.handler.get_group_role(group_id, requester_user_id, role_id)
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, role_id):
+    async def on_POST(self, origin, content, query, group_id, role_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1211,14 +1163,13 @@ class FederationGroupsRoleServlet(BaseFederationServlet):
         if role_id == "":
             raise SynapseError(400, "role_id cannot be empty string")
 
-        resp = yield self.handler.update_group_role(
+        resp = await self.handler.update_group_role(
             group_id, requester_user_id, role_id, content
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_DELETE(self, origin, content, query, group_id, role_id):
+    async def on_DELETE(self, origin, content, query, group_id, role_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1226,11 +1177,11 @@ class FederationGroupsRoleServlet(BaseFederationServlet):
         if role_id == "":
             raise SynapseError(400, "role_id cannot be empty string")
 
-        resp = yield self.handler.delete_group_role(
+        resp = await self.handler.delete_group_role(
             group_id, requester_user_id, role_id
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
@@ -1247,8 +1198,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
         "/users/(?P<user_id>[^/]*)"
     )
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query, group_id, role_id, user_id):
+    async def on_POST(self, origin, content, query, group_id, role_id, user_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1256,7 +1206,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
         if role_id == "":
             raise SynapseError(400, "role_id cannot be empty string")
 
-        resp = yield self.handler.update_group_summary_user(
+        resp = await self.handler.update_group_summary_user(
             group_id,
             requester_user_id,
             user_id=user_id,
@@ -1264,10 +1214,9 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
             content=content,
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
-    @defer.inlineCallbacks
-    def on_DELETE(self, origin, content, query, group_id, role_id, user_id):
+    async def on_DELETE(self, origin, content, query, group_id, role_id, user_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
@@ -1275,11 +1224,11 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
         if role_id == "":
             raise SynapseError(400, "role_id cannot be empty string")
 
-        resp = yield self.handler.delete_group_summary_user(
+        resp = await self.handler.delete_group_summary_user(
             group_id, requester_user_id, user_id=user_id, role_id=role_id
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
@@ -1288,13 +1237,12 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
 
     PATH = "/get_groups_publicised"
 
-    @defer.inlineCallbacks
-    def on_POST(self, origin, content, query):
-        resp = yield self.handler.bulk_get_publicised_groups(
+    async def on_POST(self, origin, content, query):
+        resp = await self.handler.bulk_get_publicised_groups(
             content["user_ids"], proxy=False
         )
 
-        defer.returnValue((200, resp))
+        return 200, resp
 
 
 class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
@@ -1303,17 +1251,16 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
 
     PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy"
 
-    @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, group_id):
+    async def on_PUT(self, origin, content, query, group_id):
         requester_user_id = parse_string_from_args(query, "requester_user_id")
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        new_content = yield self.handler.set_group_join_policy(
+        new_content = await self.handler.set_group_join_policy(
             group_id, requester_user_id, content
         )
 
-        defer.returnValue((200, new_content))
+        return 200, new_content
 
 
 class RoomComplexityServlet(BaseFederationServlet):
@@ -1325,18 +1272,17 @@ class RoomComplexityServlet(BaseFederationServlet):
     PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
     PREFIX = FEDERATION_UNSTABLE_PREFIX
 
-    @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, room_id):
+    async def on_GET(self, origin, content, query, room_id):
 
         store = self.handler.hs.get_datastore()
 
-        is_public = yield store.is_room_world_readable_or_publicly_joinable(room_id)
+        is_public = await store.is_room_world_readable_or_publicly_joinable(room_id)
 
         if not is_public:
             raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
 
-        complexity = yield store.get_room_complexity(room_id)
-        defer.returnValue((200, complexity))
+        complexity = await store.get_room_complexity(room_id)
+        return 200, complexity
 
 
 FEDERATION_SERVLET_CLASSES = (
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index eaeda7a5cb..6d7a987f13 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
+from synapse import event_auth
 from synapse.api.constants import EventTypes, Membership, RelationTypes
 from synapse.api.errors import (
     AuthError,
@@ -784,6 +785,20 @@ class EventCreationHandler(object):
                     event.signatures.update(returned_invite.signatures)
 
         if event.type == EventTypes.Redaction:
+            original_event = yield self.store.get_event(
+                event.redacts,
+                check_redacted=False,
+                get_prev_content=False,
+                allow_rejected=False,
+                allow_none=True,
+                check_room_id=event.room_id,
+            )
+
+            # we can make some additional checks now if we have the original event.
+            if original_event:
+                if original_event.type == EventTypes.Create:
+                    raise AuthError(403, "Redacting create events is not permitted")
+
             prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
@@ -791,18 +806,18 @@ class EventCreationHandler(object):
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
             room_version = yield self.store.get_room_version(event.room_id)
-            if self.auth.check_redaction(room_version, event, auth_events=auth_events):
-                original_event = yield self.store.get_event(
-                    event.redacts,
-                    check_redacted=False,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=False,
-                )
+
+            if event_auth.check_redaction(room_version, event, auth_events=auth_events):
+                # this user doesn't have 'redact' rights, so we need to do some more
+                # checks on the original event. Let's start by checking the original
+                # event exists.
+                if not original_event:
+                    raise NotFoundError("Could not find event %s" % (event.redacts,))
+
                 if event.user_id != original_event.user_id:
                     raise AuthError(403, "You don't have permission to redact events")
 
-                # We've already checked.
+                # all the checks are done.
                 event.internal_metadata.recheck_redaction = False
 
         if event.type == EventTypes.Create:
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index f0ceea2a64..56d900080b 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2019 The Matrix.org Foundation C.I.C.d
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,6 +24,15 @@
 # this move the methods have work very similarly to opentracing's and it should only
 # be a matter of few regexes to move over to opentracing's access patterns proper.
 
+import contextlib
+import logging
+import re
+from functools import wraps
+
+from twisted.internet import defer
+
+from synapse.config import ConfigError
+
 try:
     import opentracing
 except ImportError:
@@ -35,12 +44,6 @@ except ImportError:
     JaegerConfig = None
     LogContextScopeManager = None
 
-import contextlib
-import logging
-import re
-from functools import wraps
-
-from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -91,7 +94,8 @@ def only_if_tracing(func):
     return _only_if_tracing_inner
 
 
-# Block everything by default
+# A regex which matches the server_names to expose traces for.
+# None means 'block everything'.
 _homeserver_whitelist = None
 
 tags = _DumTagNames
@@ -101,31 +105,24 @@ def init_tracer(config):
     """Set the whitelists and initialise the JaegerClient tracer
 
     Args:
-        config (Config)
-        The config used by the homeserver. Here it's used to set the service
-        name to the homeserver's.
+        config (HomeserverConfig): The config used by the homeserver
     """
     global opentracing
-    if not config.tracer_config.get("tracer_enabled", False):
+    if not config.opentracer_enabled:
         # We don't have a tracer
         opentracing = None
         return
 
-    if not opentracing:
-        logger.error(
-            "The server has been configure to use opentracing but opentracing is not installed."
-        )
-        raise ModuleNotFoundError("opentracing")
-
-    if not JaegerConfig:
-        logger.error(
-            "The server has been configure to use opentracing but opentracing is not installed."
+    if not opentracing or not JaegerConfig:
+        raise ConfigError(
+            "The server has been configured to use opentracing but opentracing is not "
+            "installed."
         )
 
     # Include the worker name
     name = config.worker_name if config.worker_name else "master"
 
-    set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"])
+    set_homeserver_whitelist(config.opentracer_whitelist)
     jaeger_config = JaegerConfig(
         config={"sampler": {"type": "const", "param": 1}, "logging": True},
         service_name="{} {}".format(config.server_name, name),
@@ -232,7 +229,6 @@ def whitelisted_homeserver(destination):
     """Checks if a destination matches the whitelist
     Args:
         destination (String)"""
-    global _homeserver_whitelist
     if _homeserver_whitelist:
         return _homeserver_whitelist.match(destination)
     return False
@@ -344,8 +340,7 @@ def trace_servlet(servlet_name, func):
     @wraps(func)
     @defer.inlineCallbacks
     def _trace_servlet_inner(request, *args, **kwargs):
-        with start_active_span_from_context(
-            request.requestHeaders,
+        with start_active_span(
             "incoming-client-request",
             tags={
                 "request_id": request.get_request_id(),
diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py
index 91e14462f3..8c661302c9 100644
--- a/synapse/logging/scopecontextmanager.py
+++ b/synapse/logging/scopecontextmanager.py
@@ -34,9 +34,7 @@ class LogContextScopeManager(ScopeManager):
     """
 
     def __init__(self, config):
-        # Set the whitelists
-        logger.info(config.tracer_config)
-        self._homeserver_whitelist = config.tracer_config["homeserver_whitelist"]
+        pass
 
     @property
     def active(self):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index eaf0aaa86e..488280b4a6 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -29,8 +29,16 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF
 
 from twisted.internet import reactor
 
+from synapse.metrics._exposition import (
+    MetricsResource,
+    generate_latest,
+    start_http_server,
+)
+
 logger = logging.getLogger(__name__)
 
+METRICS_PREFIX = "/_synapse/metrics"
+
 running_on_pypy = platform.python_implementation() == "PyPy"
 all_metrics = []
 all_collectors = []
@@ -470,3 +478,12 @@ try:
         gc.disable()
 except AttributeError:
     pass
+
+__all__ = [
+    "MetricsResource",
+    "generate_latest",
+    "start_http_server",
+    "LaterGauge",
+    "InFlightGauge",
+    "BucketCollector",
+]
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
new file mode 100644
index 0000000000..1933ecd3e3
--- /dev/null
+++ b/synapse/metrics/_exposition.py
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015-2019 Prometheus Python Client Developers
+# Copyright 2019 Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This code is based off `prometheus_client/exposition.py` from version 0.7.1.
+
+Due to the renaming of metrics in prometheus_client 0.4.0, this customised
+vendoring of the code will emit both the old versions that Synapse dashboards
+expect, and the newer "best practice" version of the up-to-date official client.
+"""
+
+import math
+import threading
+from collections import namedtuple
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from socketserver import ThreadingMixIn
+from urllib.parse import parse_qs, urlparse
+
+from prometheus_client import REGISTRY
+
+from twisted.web.resource import Resource
+
+try:
+    from prometheus_client.samples import Sample
+except ImportError:
+    Sample = namedtuple("Sample", ["name", "labels", "value", "timestamp", "exemplar"])
+
+
+CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")
+
+
+INF = float("inf")
+MINUS_INF = float("-inf")
+
+
+def floatToGoString(d):
+    d = float(d)
+    if d == INF:
+        return "+Inf"
+    elif d == MINUS_INF:
+        return "-Inf"
+    elif math.isnan(d):
+        return "NaN"
+    else:
+        s = repr(d)
+        dot = s.find(".")
+        # Go switches to exponents sooner than Python.
+        # We only need to care about positive values for le/quantile.
+        if d > 0 and dot > 6:
+            mantissa = "{0}.{1}{2}".format(s[0], s[1:dot], s[dot + 1 :]).rstrip("0.")
+            return "{0}e+0{1}".format(mantissa, dot - 1)
+        return s
+
+
+def sample_line(line, name):
+    if line.labels:
+        labelstr = "{{{0}}}".format(
+            ",".join(
+                [
+                    '{0}="{1}"'.format(
+                        k,
+                        v.replace("\\", r"\\").replace("\n", r"\n").replace('"', r"\""),
+                    )
+                    for k, v in sorted(line.labels.items())
+                ]
+            )
+        )
+    else:
+        labelstr = ""
+    timestamp = ""
+    if line.timestamp is not None:
+        # Convert to milliseconds.
+        timestamp = " {0:d}".format(int(float(line.timestamp) * 1000))
+    return "{0}{1} {2}{3}\n".format(
+        name, labelstr, floatToGoString(line.value), timestamp
+    )
+
+
+def nameify_sample(sample):
+    """
+    If we get a prometheus_client<0.4.0 sample as a tuple, transform it into a
+    namedtuple which has the names we expect.
+    """
+    if not isinstance(sample, Sample):
+        sample = Sample(*sample, None, None)
+
+    return sample
+
+
+def generate_latest(registry, emit_help=False):
+    output = []
+
+    for metric in registry.collect():
+
+        if metric.name.startswith("__unused"):
+            continue
+
+        if not metric.samples:
+            # No samples, don't bother.
+            continue
+
+        mname = metric.name
+        mnewname = metric.name
+        mtype = metric.type
+
+        # OpenMetrics -> Prometheus
+        if mtype == "counter":
+            mnewname = mnewname + "_total"
+        elif mtype == "info":
+            mtype = "gauge"
+            mnewname = mnewname + "_info"
+        elif mtype == "stateset":
+            mtype = "gauge"
+        elif mtype == "gaugehistogram":
+            mtype = "histogram"
+        elif mtype == "unknown":
+            mtype = "untyped"
+
+        # Output in the old format for compatibility.
+        if emit_help:
+            output.append(
+                "# HELP {0} {1}\n".format(
+                    mname,
+                    metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
+                )
+            )
+        output.append("# TYPE {0} {1}\n".format(mname, mtype))
+        for sample in map(nameify_sample, metric.samples):
+            # Get rid of the OpenMetrics specific samples
+            for suffix in ["_created", "_gsum", "_gcount"]:
+                if sample.name.endswith(suffix):
+                    break
+            else:
+                newname = sample.name.replace(mnewname, mname)
+                if ":" in newname and newname.endswith("_total"):
+                    newname = newname[: -len("_total")]
+                output.append(sample_line(sample, newname))
+
+        # Get rid of the weird colon things while we're at it
+        if mtype == "counter":
+            mnewname = mnewname.replace(":total", "")
+        mnewname = mnewname.replace(":", "_")
+
+        if mname == mnewname:
+            continue
+
+        # Also output in the new format, if it's different.
+        if emit_help:
+            output.append(
+                "# HELP {0} {1}\n".format(
+                    mnewname,
+                    metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
+                )
+            )
+        output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
+        for sample in map(nameify_sample, metric.samples):
+            # Get rid of the OpenMetrics specific samples
+            for suffix in ["_created", "_gsum", "_gcount"]:
+                if sample.name.endswith(suffix):
+                    break
+            else:
+                output.append(
+                    sample_line(
+                        sample, sample.name.replace(":total", "").replace(":", "_")
+                    )
+                )
+
+    return "".join(output).encode("utf-8")
+
+
+class MetricsHandler(BaseHTTPRequestHandler):
+    """HTTP handler that gives metrics from ``REGISTRY``."""
+
+    registry = REGISTRY
+
+    def do_GET(self):
+        registry = self.registry
+        params = parse_qs(urlparse(self.path).query)
+
+        if "help" in params:
+            emit_help = True
+        else:
+            emit_help = False
+
+        try:
+            output = generate_latest(registry, emit_help=emit_help)
+        except Exception:
+            self.send_error(500, "error generating metric output")
+            raise
+        self.send_response(200)
+        self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+        self.end_headers()
+        self.wfile.write(output)
+
+    def log_message(self, format, *args):
+        """Log nothing."""
+
+    @classmethod
+    def factory(cls, registry):
+        """Returns a dynamic MetricsHandler class tied
+           to the passed registry.
+        """
+        # This implementation relies on MetricsHandler.registry
+        #  (defined above and defaulted to REGISTRY).
+
+        # As we have unicode_literals, we need to create a str()
+        #  object for type().
+        cls_name = str(cls.__name__)
+        MyMetricsHandler = type(cls_name, (cls, object), {"registry": registry})
+        return MyMetricsHandler
+
+
+class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
+    """Thread per request HTTP server."""
+
+    # Make worker threads "fire and forget". Beginning with Python 3.7 this
+    # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
+    # non-daemon threads in a list in order to join on them at server close.
+    # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
+    # same as Python 3.7's ``ThreadingHTTPServer``.
+    daemon_threads = True
+
+
+def start_http_server(port, addr="", registry=REGISTRY):
+    """Starts an HTTP server for prometheus metrics as a daemon thread"""
+    CustomMetricsHandler = MetricsHandler.factory(registry)
+    httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
+    t = threading.Thread(target=httpd.serve_forever)
+    t.daemon = True
+    t.start()
+
+
+class MetricsResource(Resource):
+    """
+    Twisted ``Resource`` that serves prometheus metrics.
+    """
+
+    isLeaf = True
+
+    def __init__(self, registry=REGISTRY):
+        self.registry = registry
+
+    def render_GET(self, request):
+        request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
+        return generate_latest(self.registry)
diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py
deleted file mode 100644
index 9789359077..0000000000
--- a/synapse/metrics/resource.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from prometheus_client.twisted import MetricsResource
-
-METRICS_PREFIX = "/_synapse/metrics"
-
-__all__ = ["MetricsResource", "METRICS_PREFIX"]
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index e7618057be..c6465c0386 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -65,9 +65,7 @@ REQUIREMENTS = [
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
     "six>=1.10",
-    # prometheus_client 0.4.0 changed the format of counter metrics
-    # (cf https://github.com/matrix-org/synapse/issues/4001)
-    "prometheus_client>=0.0.18,<0.4.0",
+    "prometheus_client>=0.0.18,<0.8.0",
     # we use attr.s(slots), which arrived in 16.0.0
     # Twisted 18.7.0 requires attrs>=17.4.0
     "attrs>=17.4.0",
diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py
index 7ce485b471..6e52f6d284 100644
--- a/synapse/rest/client/v2_alpha/relations.py
+++ b/synapse/rest/client/v2_alpha/relations.py
@@ -34,6 +34,7 @@ from synapse.http.servlet import (
 from synapse.rest.client.transactions import HttpTransactionCache
 from synapse.storage.relations import (
     AggregationPaginationToken,
+    PaginationChunk,
     RelationPaginationToken,
 )
 
@@ -153,23 +154,28 @@ class RelationPaginationServlet(RestServlet):
         from_token = parse_string(request, "from")
         to_token = parse_string(request, "to")
 
-        if from_token:
-            from_token = RelationPaginationToken.from_string(from_token)
-
-        if to_token:
-            to_token = RelationPaginationToken.from_string(to_token)
-
-        result = yield self.store.get_relations_for_event(
-            event_id=parent_id,
-            relation_type=relation_type,
-            event_type=event_type,
-            limit=limit,
-            from_token=from_token,
-            to_token=to_token,
-        )
+        if event.internal_metadata.is_redacted():
+            # If the event is redacted, return an empty list of relations
+            pagination_chunk = PaginationChunk(chunk=[])
+        else:
+            # Return the relations
+            if from_token:
+                from_token = RelationPaginationToken.from_string(from_token)
+
+            if to_token:
+                to_token = RelationPaginationToken.from_string(to_token)
+
+            pagination_chunk = yield self.store.get_relations_for_event(
+                event_id=parent_id,
+                relation_type=relation_type,
+                event_type=event_type,
+                limit=limit,
+                from_token=from_token,
+                to_token=to_token,
+            )
 
         events = yield self.store.get_events_as_list(
-            [c["event_id"] for c in result.chunk]
+            [c["event_id"] for c in pagination_chunk.chunk]
         )
 
         now = self.clock.time_msec()
@@ -186,7 +192,7 @@ class RelationPaginationServlet(RestServlet):
             events, now, bundle_aggregations=False
         )
 
-        return_value = result.to_dict()
+        return_value = pagination_chunk.to_dict()
         return_value["chunk"] = events
         return_value["original_event"] = original_event
 
@@ -234,7 +240,7 @@ class RelationAggregationPaginationServlet(RestServlet):
 
         # This checks that a) the event exists and b) the user is allowed to
         # view it.
-        yield self.event_handler.get_event(requester.user, room_id, parent_id)
+        event = yield self.event_handler.get_event(requester.user, room_id, parent_id)
 
         if relation_type not in (RelationTypes.ANNOTATION, None):
             raise SynapseError(400, "Relation type must be 'annotation'")
@@ -243,21 +249,26 @@ class RelationAggregationPaginationServlet(RestServlet):
         from_token = parse_string(request, "from")
         to_token = parse_string(request, "to")
 
-        if from_token:
-            from_token = AggregationPaginationToken.from_string(from_token)
-
-        if to_token:
-            to_token = AggregationPaginationToken.from_string(to_token)
-
-        res = yield self.store.get_aggregation_groups_for_event(
-            event_id=parent_id,
-            event_type=event_type,
-            limit=limit,
-            from_token=from_token,
-            to_token=to_token,
-        )
-
-        defer.returnValue((200, res.to_dict()))
+        if event.internal_metadata.is_redacted():
+            # If the event is redacted, return an empty list of relations
+            pagination_chunk = PaginationChunk(chunk=[])
+        else:
+            # Return the relations
+            if from_token:
+                from_token = AggregationPaginationToken.from_string(from_token)
+
+            if to_token:
+                to_token = AggregationPaginationToken.from_string(to_token)
+
+            pagination_chunk = yield self.store.get_aggregation_groups_for_event(
+                event_id=parent_id,
+                event_type=event_type,
+                limit=limit,
+                from_token=from_token,
+                to_token=to_token,
+            )
+
+        defer.returnValue((200, pagination_chunk.to_dict()))
 
 
 class RelationAggregationGroupPaginationServlet(RestServlet):
diff --git a/synapse/static/index.html b/synapse/static/index.html
index d3f1c7dce0..bf46df9097 100644
--- a/synapse/static/index.html
+++ b/synapse/static/index.html
@@ -48,13 +48,13 @@
     </div>
     <h1>It works! Synapse is running</h1>
     <p>Your Synapse server is listening on this port and is ready for messages.</p>
-    <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank">a Matrix client</a>.
+    <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank" rel="noopener noreferrer">a Matrix client</a>.
     </p>
     <p>Welcome to the Matrix universe :)</p>
     <hr>
     <p>
       <small>
-        <a href="https://matrix.org" target="_blank">
+        <a href="https://matrix.org" target="_blank" rel="noopener noreferrer">
           matrix.org
         </a>
       </small>
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 874d0a56bc..858fc755a1 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -37,6 +37,7 @@ from synapse.logging.context import (
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.metrics import Measure
 
 from ._base import SQLBaseStore
@@ -218,9 +219,108 @@ class EventsWorkerStore(SQLBaseStore):
         if not event_ids:
             defer.returnValue([])
 
-        event_id_list = event_ids
-        event_ids = set(event_ids)
+        # there may be duplicates so we cast the list to a set
+        event_entry_map = yield self._get_events_from_cache_or_db(
+            set(event_ids), allow_rejected=allow_rejected
+        )
+
+        events = []
+        for event_id in event_ids:
+            entry = event_entry_map.get(event_id, None)
+            if not entry:
+                continue
+
+            if not allow_rejected:
+                assert not entry.event.rejected_reason, (
+                    "rejected event returned from _get_events_from_cache_or_db despite "
+                    "allow_rejected=False"
+                )
+
+            # We may not have had the original event when we received a redaction, so
+            # we have to recheck auth now.
+
+            if not allow_rejected and entry.event.type == EventTypes.Redaction:
+                redacted_event_id = entry.event.redacts
+                event_map = yield self._get_events_from_cache_or_db([redacted_event_id])
+                original_event_entry = event_map.get(redacted_event_id)
+                if not original_event_entry:
+                    # we don't have the redacted event (or it was rejected).
+                    #
+                    # We assume that the redaction isn't authorized for now; if the
+                    # redacted event later turns up, the redaction will be re-checked,
+                    # and if it is found valid, the original will get redacted before it
+                    # is served to the client.
+                    logger.debug(
+                        "Withholding redaction event %s since we don't (yet) have the "
+                        "original %s",
+                        event_id,
+                        redacted_event_id,
+                    )
+                    continue
+
+                original_event = original_event_entry.event
+                if original_event.type == EventTypes.Create:
+                    # we never serve redactions of Creates to clients.
+                    logger.info(
+                        "Withholding redaction %s of create event %s",
+                        event_id,
+                        redacted_event_id,
+                    )
+                    continue
+
+                if entry.event.internal_metadata.need_to_check_redaction():
+                    original_domain = get_domain_from_id(original_event.sender)
+                    redaction_domain = get_domain_from_id(entry.event.sender)
+                    if original_domain != redaction_domain:
+                        # the senders don't match, so this is forbidden
+                        logger.info(
+                            "Withholding redaction %s whose sender domain %s doesn't "
+                            "match that of redacted event %s %s",
+                            event_id,
+                            redaction_domain,
+                            redacted_event_id,
+                            original_domain,
+                        )
+                        continue
+
+                    # Update the cache to save doing the checks again.
+                    entry.event.internal_metadata.recheck_redaction = False
+
+            if check_redacted and entry.redacted_event:
+                event = entry.redacted_event
+            else:
+                event = entry.event
+
+            events.append(event)
+
+            if get_prev_content:
+                if "replaces_state" in event.unsigned:
+                    prev = yield self.get_event(
+                        event.unsigned["replaces_state"],
+                        get_prev_content=False,
+                        allow_none=True,
+                    )
+                    if prev:
+                        event.unsigned = dict(event.unsigned)
+                        event.unsigned["prev_content"] = prev.content
+                        event.unsigned["prev_sender"] = prev.sender
+
+        defer.returnValue(events)
+
+    @defer.inlineCallbacks
+    def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
+        """Fetch a bunch of events from the cache or the database.
+
+        If events are pulled from the database, they will be cached for future lookups.
 
+        Args:
+            event_ids (Iterable[str]): The event_ids of the events to fetch
+            allow_rejected (bool): Whether to include rejected events
+
+        Returns:
+            Deferred[Dict[str, _EventCacheEntry]]:
+                map from event id to result
+        """
         event_entry_map = self._get_events_from_cache(
             event_ids, allow_rejected=allow_rejected
         )
@@ -243,81 +343,7 @@ class EventsWorkerStore(SQLBaseStore):
 
             event_entry_map.update(missing_events)
 
-        events = []
-        for event_id in event_id_list:
-            entry = event_entry_map.get(event_id, None)
-            if not entry:
-                continue
-
-            # Starting in room version v3, some redactions need to be rechecked if we
-            # didn't have the redacted event at the time, so we recheck on read
-            # instead.
-            if not allow_rejected and entry.event.type == EventTypes.Redaction:
-                if entry.event.internal_metadata.need_to_check_redaction():
-                    # XXX: we need to avoid calling get_event here.
-                    #
-                    # The problem is that we end up at this point when an event
-                    # which has been redacted is pulled out of the database by
-                    # _enqueue_events, because _enqueue_events needs to check
-                    # the redaction before it can cache the redacted event. So
-                    # obviously, calling get_event to get the redacted event out
-                    # of the database gives us an infinite loop.
-                    #
-                    # For now (quick hack to fix during 0.99 release cycle), we
-                    # just go and fetch the relevant row from the db, but it
-                    # would be nice to think about how we can cache this rather
-                    # than hit the db every time we access a redaction event.
-                    #
-                    # One thought on how to do this:
-                    #  1. split get_events_as_list up so that it is divided into
-                    #     (a) get the rawish event from the db/cache, (b) do the
-                    #     redaction/rejection filtering
-                    #  2. have _get_event_from_row just call the first half of
-                    #     that
-
-                    orig_sender = yield self._simple_select_one_onecol(
-                        table="events",
-                        keyvalues={"event_id": entry.event.redacts},
-                        retcol="sender",
-                        allow_none=True,
-                    )
-
-                    expected_domain = get_domain_from_id(entry.event.sender)
-                    if (
-                        orig_sender
-                        and get_domain_from_id(orig_sender) == expected_domain
-                    ):
-                        # This redaction event is allowed. Mark as not needing a
-                        # recheck.
-                        entry.event.internal_metadata.recheck_redaction = False
-                    else:
-                        # We don't have the event that is being redacted, so we
-                        # assume that the event isn't authorized for now. (If we
-                        # later receive the event, then we will always redact
-                        # it anyway, since we have this redaction)
-                        continue
-
-            if allow_rejected or not entry.event.rejected_reason:
-                if check_redacted and entry.redacted_event:
-                    event = entry.redacted_event
-                else:
-                    event = entry.event
-
-                events.append(event)
-
-                if get_prev_content:
-                    if "replaces_state" in event.unsigned:
-                        prev = yield self.get_event(
-                            event.unsigned["replaces_state"],
-                            get_prev_content=False,
-                            allow_none=True,
-                        )
-                        if prev:
-                            event.unsigned = dict(event.unsigned)
-                            event.unsigned["prev_content"] = prev.content
-                            event.unsigned["prev_sender"] = prev.sender
-
-        defer.returnValue(events)
+        return event_entry_map
 
     def _invalidate_get_event_cache(self, event_id):
         self._get_event_cache.invalidate((event_id,))
@@ -326,7 +352,7 @@ class EventsWorkerStore(SQLBaseStore):
         """Fetch events from the caches
 
         Args:
-            events (list(str)): list of event_ids to fetch
+            events (Iterable[str]): list of event_ids to fetch
             allow_rejected (bool): Whether to return events that were rejected
             update_metrics (bool): Whether to update the cache hit ratio metrics
 
@@ -384,19 +410,16 @@ class EventsWorkerStore(SQLBaseStore):
                 The fetch requests. Each entry consists of a list of event
                 ids to be fetched, and a deferred to be completed once the
                 events have been fetched.
-
         """
         with Measure(self._clock, "_fetch_event_list"):
             try:
                 event_id_lists = list(zip(*event_list))[0]
                 event_ids = [item for sublist in event_id_lists for item in sublist]
 
-                rows = self._new_transaction(
+                row_dict = self._new_transaction(
                     conn, "do_fetch", [], [], self._fetch_event_rows, event_ids
                 )
 
-                row_dict = {r["event_id"]: r for r in rows}
-
                 # We only want to resolve deferreds from the main thread
                 def fire(lst, res):
                     for ids, d in lst:
@@ -454,7 +477,7 @@ class EventsWorkerStore(SQLBaseStore):
         logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
 
         if not allow_rejected:
-            rows[:] = [r for r in rows if not r["rejects"]]
+            rows[:] = [r for r in rows if r["rejected_reason"] is None]
 
         res = yield make_deferred_yieldable(
             defer.gatherResults(
@@ -463,8 +486,8 @@ class EventsWorkerStore(SQLBaseStore):
                         self._get_event_from_row,
                         row["internal_metadata"],
                         row["json"],
-                        row["redacts"],
-                        rejected_reason=row["rejects"],
+                        row["redactions"],
+                        rejected_reason=row["rejected_reason"],
                         format_version=row["format_version"],
                     )
                     for row in rows
@@ -475,49 +498,98 @@ class EventsWorkerStore(SQLBaseStore):
 
         defer.returnValue({e.event.event_id: e for e in res if e})
 
-    def _fetch_event_rows(self, txn, events):
-        rows = []
-        N = 200
-        for i in range(1 + len(events) // N):
-            evs = events[i * N : (i + 1) * N]
-            if not evs:
-                break
+    def _fetch_event_rows(self, txn, event_ids):
+        """Fetch event rows from the database
+
+        Events which are not found are omitted from the result.
+
+        The returned per-event dicts contain the following keys:
+
+         * event_id (str)
+
+         * json (str): json-encoded event structure
+
+         * internal_metadata (str): json-encoded internal metadata dict
+
+         * format_version (int|None): The format of the event. Hopefully one
+           of EventFormatVersions. 'None' means the event predates
+           EventFormatVersions (so the event is format V1).
+
+         * rejected_reason (str|None): if the event was rejected, the reason
+           why.
 
+         * redactions (List[str]): a list of event-ids which (claim to) redact
+           this event.
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection):
+            event_ids (Iterable[str]): event IDs to fetch
+
+        Returns:
+            Dict[str, Dict]: a map from event id to event info.
+        """
+        event_dict = {}
+        for evs in batch_iter(event_ids, 200):
             sql = (
                 "SELECT "
-                " e.event_id as event_id, "
+                " e.event_id, "
                 " e.internal_metadata,"
                 " e.json,"
                 " e.format_version, "
-                " r.redacts as redacts,"
-                " rej.event_id as rejects "
+                " rej.reason "
                 " FROM event_json as e"
                 " LEFT JOIN rejections as rej USING (event_id)"
-                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
             ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
-            rows.extend(self.cursor_to_dict(txn))
 
-        return rows
+            for row in txn:
+                event_id = row[0]
+                event_dict[event_id] = {
+                    "event_id": event_id,
+                    "internal_metadata": row[1],
+                    "json": row[2],
+                    "format_version": row[3],
+                    "rejected_reason": row[4],
+                    "redactions": [],
+                }
+
+            # check for redactions
+            redactions_sql = (
+                "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)"
+            ) % (",".join(["?"] * len(evs)),)
+
+            txn.execute(redactions_sql, evs)
+
+            for (redacter, redacted) in txn:
+                d = event_dict.get(redacted)
+                if d:
+                    d["redactions"].append(redacter)
+
+        return event_dict
 
     @defer.inlineCallbacks
     def _get_event_from_row(
-        self, internal_metadata, js, redacted, format_version, rejected_reason=None
+        self, internal_metadata, js, redactions, format_version, rejected_reason=None
     ):
+        """Parse an event row which has been read from the database
+
+        Args:
+            internal_metadata (str): json-encoded internal_metadata column
+            js (str): json-encoded event body from event_json
+            redactions (list[str]): a list of the events which claim to have redacted
+                this event, from the redactions table
+            format_version: (str): the 'format_version' column
+            rejected_reason (str|None): the reason this event was rejected, if any
+
+        Returns:
+            _EventCacheEntry
+        """
         with Measure(self._clock, "_get_event_from_row"):
             d = json.loads(js)
             internal_metadata = json.loads(internal_metadata)
 
-            if rejected_reason:
-                rejected_reason = yield self._simple_select_one_onecol(
-                    table="rejections",
-                    keyvalues={"event_id": rejected_reason},
-                    retcol="reason",
-                    desc="_get_event_from_row_rejected_reason",
-                )
-
             if format_version is None:
                 # This means that we stored the event before we had the concept
                 # of a event format version, so it must be a V1 event.
@@ -529,41 +601,7 @@ class EventsWorkerStore(SQLBaseStore):
                 rejected_reason=rejected_reason,
             )
 
-            redacted_event = None
-            if redacted:
-                redacted_event = prune_event(original_ev)
-
-                redaction_id = yield self._simple_select_one_onecol(
-                    table="redactions",
-                    keyvalues={"redacts": redacted_event.event_id},
-                    retcol="event_id",
-                    desc="_get_event_from_row_redactions",
-                )
-
-                redacted_event.unsigned["redacted_by"] = redaction_id
-                # Get the redaction event.
-
-                because = yield self.get_event(
-                    redaction_id, check_redacted=False, allow_none=True
-                )
-
-                if because:
-                    # It's fine to do add the event directly, since get_pdu_json
-                    # will serialise this field correctly
-                    redacted_event.unsigned["redacted_because"] = because
-
-                    # Starting in room version v3, some redactions need to be
-                    # rechecked if we didn't have the redacted event at the
-                    # time, so we recheck on read instead.
-                    if because.internal_metadata.need_to_check_redaction():
-                        expected_domain = get_domain_from_id(original_ev.sender)
-                        if get_domain_from_id(because.sender) == expected_domain:
-                            # This redaction event is allowed. Mark as not needing a
-                            # recheck.
-                            because.internal_metadata.recheck_redaction = False
-                        else:
-                            # Senders don't match, so the event isn't actually redacted
-                            redacted_event = None
+            redacted_event = yield self._maybe_redact_event_row(original_ev, redactions)
 
             cache_entry = _EventCacheEntry(
                 event=original_ev, redacted_event=redacted_event
@@ -574,6 +612,60 @@ class EventsWorkerStore(SQLBaseStore):
         defer.returnValue(cache_entry)
 
     @defer.inlineCallbacks
+    def _maybe_redact_event_row(self, original_ev, redactions):
+        """Given an event object and a list of possible redacting event ids,
+        determine whether to honour any of those redactions and if so return a redacted
+        event.
+
+        Args:
+             original_ev (EventBase):
+             redactions (iterable[str]): list of event ids of potential redaction events
+
+        Returns:
+            Deferred[EventBase|None]: if the event should be redacted, a pruned
+                event object. Otherwise, None.
+        """
+        if original_ev.type == "m.room.create":
+            # we choose to ignore redactions of m.room.create events.
+            return None
+
+        redaction_map = yield self._get_events_from_cache_or_db(redactions)
+
+        for redaction_id in redactions:
+            redaction_entry = redaction_map.get(redaction_id)
+            if not redaction_entry:
+                # we don't have the redaction event, or the redaction event was not
+                # authorized.
+                continue
+
+            redaction_event = redaction_entry.event
+
+            # Starting in room version v3, some redactions need to be
+            # rechecked if we didn't have the redacted event at the
+            # time, so we recheck on read instead.
+            if redaction_event.internal_metadata.need_to_check_redaction():
+                expected_domain = get_domain_from_id(original_ev.sender)
+                if get_domain_from_id(redaction_event.sender) == expected_domain:
+                    # This redaction event is allowed. Mark as not needing a recheck.
+                    redaction_event.internal_metadata.recheck_redaction = False
+                else:
+                    # Senders don't match, so the event isn't actually redacted
+                    continue
+
+            # we found a good redaction event. Redact!
+            redacted_event = prune_event(original_ev)
+            redacted_event.unsigned["redacted_by"] = redaction_id
+
+            # It's fine to add the event directly, since get_pdu_json
+            # will serialise this field correctly
+            redacted_event.unsigned["redacted_because"] = redaction_event
+
+            return redacted_event
+
+        # no valid redaction found for this event
+        return None
+
+    @defer.inlineCallbacks
     def have_events_in_timeline(self, event_ids):
         """Given a list of event ids, check if we have already processed and
         stored them as non outliers.