summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2019-08-13 15:54:01 +0100
committerBrendan Abolivier <babolivier@matrix.org>2019-08-13 15:54:01 +0100
commita7efbc541601927904196bcf220a99e666a9a4a4 (patch)
treeff065ad5d403adfb72eb00160696205823299b54
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentMerge pull request #5809 from matrix-org/erikj/handle_pusher_stop (diff)
downloadsynapse-a7efbc541601927904196bcf220a99e666a9a4a4.tar.xz
Merge branch 'release-v1.3.0' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r--.gitignore1
-rw-r--r--changelog.d/5754.feature1
-rw-r--r--changelog.d/5809.misc1
-rw-r--r--changelog.d/5836.misc1
-rw-r--r--docs/sample_config.yaml7
-rw-r--r--docs/workers.rst7
-rw-r--r--synapse/app/media_repository.py9
-rw-r--r--synapse/config/repository.py20
-rw-r--r--synapse/http/federation/matrix_federation_agent.py162
-rw-r--r--synapse/http/federation/well_known_resolver.py187
-rw-r--r--synapse/push/emailpusher.py18
-rw-r--r--synapse/push/httppusher.py27
-rw-r--r--synapse/rest/admin/__init__.py102
-rw-r--r--synapse/rest/admin/_base.py25
-rw-r--r--synapse/rest/admin/media.py101
-rw-r--r--synapse/rest/media/v1/media_repository.py6
-rw-r--r--synapse/storage/pusher.py30
-rw-r--r--tests/http/federation/test_matrix_federation_agent.py43
18 files changed, 465 insertions, 283 deletions
diff --git a/.gitignore b/.gitignore
index a84c41b0c9..f6168a8819 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,7 @@ _trial_temp*/
 /*.log
 /*.log.config
 /*.pid
+/.python-version
 /*.signing.key
 /env/
 /homeserver*.yaml
diff --git a/changelog.d/5754.feature b/changelog.d/5754.feature
new file mode 100644
index 0000000000..c1a09a4dce
--- /dev/null
+++ b/changelog.d/5754.feature
@@ -0,0 +1 @@
+Synapse will no longer serve any media repo admin endpoints when `enable_media_repo` is set to False in the configuration. If a media repo worker is used, the admin APIs relating to the media repo will be served from it instead.
\ No newline at end of file
diff --git a/changelog.d/5809.misc b/changelog.d/5809.misc
new file mode 100644
index 0000000000..82a812480e
--- /dev/null
+++ b/changelog.d/5809.misc
@@ -0,0 +1 @@
+Handle pusher being deleted during processing rather than logging an exception.
diff --git a/changelog.d/5836.misc b/changelog.d/5836.misc
new file mode 100644
index 0000000000..18f2488201
--- /dev/null
+++ b/changelog.d/5836.misc
@@ -0,0 +1 @@
+Add a lower bound to well-known lookup cache time to avoid repeated lookups.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 1b206fe6bf..0c6be30e51 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -565,6 +565,13 @@ log_config: "CONFDIR/SERVERNAME.log.config"
 
 
 
+## Media Store ##
+
+# Enable the media store service in the Synapse master. Uncomment the
+# following if you are using a separate media store worker.
+#
+#enable_media_repo: false
+
 # Directory where uploaded images and attachments are stored.
 #
 media_store_path: "DATADIR/media_store"
diff --git a/docs/workers.rst b/docs/workers.rst
index 7b2d2db533..e11e117418 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -206,6 +206,13 @@ Handles the media repository. It can handle all endpoints starting with::
 
     /_matrix/media/
 
+And the following regular expressions matching media-specific administration
+APIs::
+
+    ^/_synapse/admin/v1/purge_media_cache$
+    ^/_synapse/admin/v1/room/.*/media$
+    ^/_synapse/admin/v1/quarantine_media/.*$
+
 You should also set ``enable_media_repo: False`` in the shared configuration
 file to stop the main synapse running background jobs related to managing the
 media repository.
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index ea26f29acb..3a168577c7 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -26,6 +26,7 @@ from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
@@ -35,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -71,6 +73,12 @@ class MediaRepositoryServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "media":
                     media_repo = self.get_media_repository_resource()
+
+                    # We need to serve the admin servlets for media on the
+                    # worker.
+                    admin_resource = JsonResource(self, canonical_json=False)
+                    register_servlets_for_media_repo(self, admin_resource)
+
                     resources.update(
                         {
                             MEDIA_PREFIX: media_repo,
@@ -78,6 +86,7 @@ class MediaRepositoryServer(HomeServer):
                             CONTENT_REPO_PREFIX: ContentRepoResource(
                                 self, self.config.uploads_path
                             ),
+                            "/_synapse/admin": admin_resource,
                         }
                     )
 
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 80a628d9b0..db39697e45 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import os
 from collections import namedtuple
 
@@ -87,6 +88,18 @@ def parse_thumbnail_requirements(thumbnail_sizes):
 
 class ContentRepositoryConfig(Config):
     def read_config(self, config, **kwargs):
+
+        # Only enable the media repo if either the media repo is enabled or the
+        # current worker app is the media repo.
+        if (
+            self.enable_media_repo is False
+            and config.worker_app != "synapse.app.media_repository"
+        ):
+            self.can_load_media_repo = False
+            return
+        else:
+            self.can_load_media_repo = True
+
         self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
         self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
         self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
@@ -202,6 +215,13 @@ class ContentRepositoryConfig(Config):
 
         return (
             r"""
+        ## Media Store ##
+
+        # Enable the media store service in the Synapse master. Uncomment the
+        # following if you are using a separate media store worker.
+        #
+        #enable_media_repo: false
+
         # Directory where uploaded images and attachments are stored.
         #
         media_store_path: "%(media_store)s"
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index a0d5139839..71a15f434d 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -12,10 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import json
+
 import logging
-import random
-import time
 
 import attr
 from netaddr import IPAddress
@@ -24,31 +22,16 @@ from zope.interface import implementer
 from twisted.internet import defer
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.internet.interfaces import IStreamClientEndpoint
-from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
-from twisted.web.http import stringToDatetime
+from twisted.web.client import URI, Agent, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IAgent
 
 from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.http.federation.well_known_resolver import WellKnownResolver
 from synapse.logging.context import make_deferred_yieldable
 from synapse.util import Clock
-from synapse.util.caches.ttlcache import TTLCache
-from synapse.util.metrics import Measure
-
-# period to cache .well-known results for by default
-WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
-
-# jitter to add to the .well-known default cache ttl
-WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
-
-# period to cache failure to fetch .well-known for
-WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
-
-# cap for .well-known cache period
-WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
 
 logger = logging.getLogger(__name__)
-well_known_cache = TTLCache("well-known")
 
 
 @implementer(IAgent)
@@ -78,7 +61,7 @@ class MatrixFederationAgent(object):
         reactor,
         tls_client_options_factory,
         _srv_resolver=None,
-        _well_known_cache=well_known_cache,
+        _well_known_cache=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -93,20 +76,15 @@ class MatrixFederationAgent(object):
         self._pool.maxPersistentPerHost = 5
         self._pool.cachedConnectionTimeout = 2 * 60
 
-        _well_known_agent = RedirectAgent(
-            Agent(
+        self._well_known_resolver = WellKnownResolver(
+            self._reactor,
+            agent=Agent(
                 self._reactor,
                 pool=self._pool,
                 contextFactory=tls_client_options_factory,
-            )
+            ),
+            well_known_cache=_well_known_cache,
         )
-        self._well_known_agent = _well_known_agent
-
-        # our cache of .well-known lookup results, mapping from server name
-        # to delegated name. The values can be:
-        #   `bytes`:     a valid server-name
-        #   `None`:      there is no (valid) .well-known here
-        self._well_known_cache = _well_known_cache
 
     @defer.inlineCallbacks
     def request(self, method, uri, headers=None, bodyProducer=None):
@@ -217,7 +195,10 @@ class MatrixFederationAgent(object):
 
         if lookup_well_known:
             # try a .well-known lookup
-            well_known_server = yield self._get_well_known(parsed_uri.host)
+            well_known_result = yield self._well_known_resolver.get_well_known(
+                parsed_uri.host
+            )
+            well_known_server = well_known_result.delegated_server
 
             if well_known_server:
                 # if we found a .well-known, start again, but don't do another
@@ -280,85 +261,6 @@ class MatrixFederationAgent(object):
             target_port=port,
         )
 
-    @defer.inlineCallbacks
-    def _get_well_known(self, server_name):
-        """Attempt to fetch and parse a .well-known file for the given server
-
-        Args:
-            server_name (bytes): name of the server, from the requested url
-
-        Returns:
-            Deferred[bytes|None]: either the new server name, from the .well-known, or
-                None if there was no .well-known file.
-        """
-        try:
-            result = self._well_known_cache[server_name]
-        except KeyError:
-            # TODO: should we linearise so that we don't end up doing two .well-known
-            # requests for the same server in parallel?
-            with Measure(self._clock, "get_well_known"):
-                result, cache_period = yield self._do_get_well_known(server_name)
-
-            if cache_period > 0:
-                self._well_known_cache.set(server_name, result, cache_period)
-
-        return result
-
-    @defer.inlineCallbacks
-    def _do_get_well_known(self, server_name):
-        """Actually fetch and parse a .well-known, without checking the cache
-
-        Args:
-            server_name (bytes): name of the server, from the requested url
-
-        Returns:
-            Deferred[Tuple[bytes|None|object],int]:
-                result, cache period, where result is one of:
-                 - the new server name from the .well-known (as a `bytes`)
-                 - None if there was no .well-known file.
-                 - INVALID_WELL_KNOWN if the .well-known was invalid
-        """
-        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
-        uri_str = uri.decode("ascii")
-        logger.info("Fetching %s", uri_str)
-        try:
-            response = yield make_deferred_yieldable(
-                self._well_known_agent.request(b"GET", uri)
-            )
-            body = yield make_deferred_yieldable(readBody(response))
-            if response.code != 200:
-                raise Exception("Non-200 response %s" % (response.code,))
-
-            parsed_body = json.loads(body.decode("utf-8"))
-            logger.info("Response from .well-known: %s", parsed_body)
-            if not isinstance(parsed_body, dict):
-                raise Exception("not a dict")
-            if "m.server" not in parsed_body:
-                raise Exception("Missing key 'm.server'")
-        except Exception as e:
-            logger.info("Error fetching %s: %s", uri_str, e)
-
-            # add some randomness to the TTL to avoid a stampeding herd every hour
-            # after startup
-            cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
-            return (None, cache_period)
-
-        result = parsed_body["m.server"].encode("ascii")
-
-        cache_period = _cache_period_from_headers(
-            response.headers, time_now=self._reactor.seconds
-        )
-        if cache_period is None:
-            cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
-            # add some randomness to the TTL to avoid a stampeding herd every 24 hours
-            # after startup
-            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
-        else:
-            cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
-
-        return (result, cache_period)
-
 
 @implementer(IStreamClientEndpoint)
 class LoggingHostnameEndpoint(object):
@@ -374,44 +276,6 @@ class LoggingHostnameEndpoint(object):
         return self.ep.connect(protocol_factory)
 
 
-def _cache_period_from_headers(headers, time_now=time.time):
-    cache_controls = _parse_cache_control(headers)
-
-    if b"no-store" in cache_controls:
-        return 0
-
-    if b"max-age" in cache_controls:
-        try:
-            max_age = int(cache_controls[b"max-age"])
-            return max_age
-        except ValueError:
-            pass
-
-    expires = headers.getRawHeaders(b"expires")
-    if expires is not None:
-        try:
-            expires_date = stringToDatetime(expires[-1])
-            return expires_date - time_now()
-        except ValueError:
-            # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
-            # especially the value "0", as representing a time in the past (i.e.,
-            # "already expired").
-            return 0
-
-    return None
-
-
-def _parse_cache_control(headers):
-    cache_controls = {}
-    for hdr in headers.getRawHeaders(b"cache-control", []):
-        for directive in hdr.split(b","):
-            splits = [x.strip() for x in directive.split(b"=", 1)]
-            k = splits[0].lower()
-            v = splits[1] if len(splits) > 1 else None
-            cache_controls[k] = v
-    return cache_controls
-
-
 @attr.s
 class _RoutingResult(object):
     """The result returned by `_route_matrix_uri`.
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
new file mode 100644
index 0000000000..d2866ff67d
--- /dev/null
+++ b/synapse/http/federation/well_known_resolver.py
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import random
+import time
+
+import attr
+
+from twisted.internet import defer
+from twisted.web.client import RedirectAgent, readBody
+from twisted.web.http import stringToDatetime
+
+from synapse.logging.context import make_deferred_yieldable
+from synapse.util import Clock
+from synapse.util.caches.ttlcache import TTLCache
+from synapse.util.metrics import Measure
+
+# period to cache .well-known results for by default
+WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
+
+# jitter to add to the .well-known default cache ttl
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+
+# period to cache failure to fetch .well-known for
+WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
+
+# cap for .well-known cache period
+WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
+
+# lower bound for .well-known cache period
+WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
+
+logger = logging.getLogger(__name__)
+
+
+_well_known_cache = TTLCache("well-known")
+
+
+@attr.s(slots=True, frozen=True)
+class WellKnownLookupResult(object):
+    delegated_server = attr.ib()
+
+
+class WellKnownResolver(object):
+    """Handles well-known lookups for matrix servers.
+    """
+
+    def __init__(self, reactor, agent, well_known_cache=None):
+        self._reactor = reactor
+        self._clock = Clock(reactor)
+
+        if well_known_cache is None:
+            well_known_cache = _well_known_cache
+
+        self._well_known_cache = well_known_cache
+        self._well_known_agent = RedirectAgent(agent)
+
+    @defer.inlineCallbacks
+    def get_well_known(self, server_name):
+        """Attempt to fetch and parse a .well-known file for the given server
+
+        Args:
+            server_name (bytes): name of the server, from the requested url
+
+        Returns:
+            Deferred[WellKnownLookupResult]: The result of the lookup
+        """
+        try:
+            result = self._well_known_cache[server_name]
+        except KeyError:
+            # TODO: should we linearise so that we don't end up doing two .well-known
+            # requests for the same server in parallel?
+            with Measure(self._clock, "get_well_known"):
+                result, cache_period = yield self._do_get_well_known(server_name)
+
+            if cache_period > 0:
+                self._well_known_cache.set(server_name, result, cache_period)
+
+        return WellKnownLookupResult(delegated_server=result)
+
+    @defer.inlineCallbacks
+    def _do_get_well_known(self, server_name):
+        """Actually fetch and parse a .well-known, without checking the cache
+
+        Args:
+            server_name (bytes): name of the server, from the requested url
+
+        Returns:
+            Deferred[Tuple[bytes|None|object],int]:
+                result, cache period, where result is one of:
+                 - the new server name from the .well-known (as a `bytes`)
+                 - None if there was no .well-known file.
+                 - INVALID_WELL_KNOWN if the .well-known was invalid
+        """
+        uri = b"https://%s/.well-known/matrix/server" % (server_name,)
+        uri_str = uri.decode("ascii")
+        logger.info("Fetching %s", uri_str)
+        try:
+            response = yield make_deferred_yieldable(
+                self._well_known_agent.request(b"GET", uri)
+            )
+            body = yield make_deferred_yieldable(readBody(response))
+            if response.code != 200:
+                raise Exception("Non-200 response %s" % (response.code,))
+
+            parsed_body = json.loads(body.decode("utf-8"))
+            logger.info("Response from .well-known: %s", parsed_body)
+            if not isinstance(parsed_body, dict):
+                raise Exception("not a dict")
+            if "m.server" not in parsed_body:
+                raise Exception("Missing key 'm.server'")
+        except Exception as e:
+            logger.info("Error fetching %s: %s", uri_str, e)
+
+            # add some randomness to the TTL to avoid a stampeding herd every hour
+            # after startup
+            cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+            return (None, cache_period)
+
+        result = parsed_body["m.server"].encode("ascii")
+
+        cache_period = _cache_period_from_headers(
+            response.headers, time_now=self._reactor.seconds
+        )
+        if cache_period is None:
+            cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
+            # add some randomness to the TTL to avoid a stampeding herd every 24 hours
+            # after startup
+            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+        else:
+            cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
+            cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
+
+        return (result, cache_period)
+
+
+def _cache_period_from_headers(headers, time_now=time.time):
+    cache_controls = _parse_cache_control(headers)
+
+    if b"no-store" in cache_controls:
+        return 0
+
+    if b"max-age" in cache_controls:
+        try:
+            max_age = int(cache_controls[b"max-age"])
+            return max_age
+        except ValueError:
+            pass
+
+    expires = headers.getRawHeaders(b"expires")
+    if expires is not None:
+        try:
+            expires_date = stringToDatetime(expires[-1])
+            return expires_date - time_now()
+        except ValueError:
+            # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
+            # especially the value "0", as representing a time in the past (i.e.,
+            # "already expired").
+            return 0
+
+    return None
+
+
+def _parse_cache_control(headers):
+    cache_controls = {}
+    for hdr in headers.getRawHeaders(b"cache-control", []):
+        for directive in hdr.split(b","):
+            splits = [x.strip() for x in directive.split(b"=", 1)]
+            k = splits[0].lower()
+            v = splits[1] if len(splits) > 1 else None
+            cache_controls[k] = v
+    return cache_controls
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 424ffa8b68..42e5b0c0a5 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -234,13 +234,19 @@ class EmailPusher(object):
             return
 
         self.last_stream_ordering = last_stream_ordering
-        yield self.store.update_pusher_last_stream_ordering_and_success(
-            self.app_id,
-            self.email,
-            self.user_id,
-            last_stream_ordering,
-            self.clock.time_msec(),
+        pusher_still_exists = (
+            yield self.store.update_pusher_last_stream_ordering_and_success(
+                self.app_id,
+                self.email,
+                self.user_id,
+                last_stream_ordering,
+                self.clock.time_msec(),
+            )
         )
+        if not pusher_still_exists:
+            # The pusher has been deleted while we were processing, so
+            # lets just stop and return.
+            self.on_stop()
 
     def seconds_until(self, ts_msec):
         secs = (ts_msec - self.clock.time_msec()) / 1000
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bfe3b36d52..bf65cfa21a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -203,13 +203,21 @@ class HttpPusher(object):
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                yield self.store.update_pusher_last_stream_ordering_and_success(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.last_stream_ordering,
-                    self.clock.time_msec(),
+                pusher_still_exists = (
+                    yield self.store.update_pusher_last_stream_ordering_and_success(
+                        self.app_id,
+                        self.pushkey,
+                        self.user_id,
+                        self.last_stream_ordering,
+                        self.clock.time_msec(),
+                    )
                 )
+                if not pusher_still_exists:
+                    # The pusher has been deleted while we were processing, so
+                    # lets just stop and return.
+                    self.on_stop()
+                    return
+
                 if self.failing_since:
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
@@ -238,12 +246,17 @@ class HttpPusher(object):
                     )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                     self.last_stream_ordering = push_action["stream_ordering"]
-                    yield self.store.update_pusher_last_stream_ordering(
+                    pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
                         self.last_stream_ordering,
                     )
+                    if not pusher_still_exists:
+                        # The pusher has been deleted while we were processing, so
+                        # lets just stop and return.
+                        self.on_stop()
+                        return
 
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 0a7d9b81b2..5720cab425 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import Membership, UserTypes
-from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.api.errors import Codes, NotFoundError, SynapseError
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
     RestServlet,
@@ -36,7 +36,12 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
-from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
+from synapse.rest.admin._base import (
+    assert_requester_is_admin,
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+from synapse.rest.admin.media import register_servlets_for_media_repo
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.types import UserID, create_requester
 from synapse.util.versionstring import get_version_string
@@ -44,28 +49,6 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger(__name__)
 
 
-def historical_admin_path_patterns(path_regex):
-    """Returns the list of patterns for an admin endpoint, including historical ones
-
-    This is a backwards-compatibility hack. Previously, the Admin API was exposed at
-    various paths under /_matrix/client. This function returns a list of patterns
-    matching those paths (as well as the new one), so that existing scripts which rely
-    on the endpoints being available there are not broken.
-
-    Note that this should only be used for existing endpoints: new ones should just
-    register for the /_synapse/admin path.
-    """
-    return list(
-        re.compile(prefix + path_regex)
-        for prefix in (
-            "^/_synapse/admin/v1",
-            "^/_matrix/client/api/v1/admin",
-            "^/_matrix/client/unstable/admin",
-            "^/_matrix/client/r0/admin",
-        )
-    )
-
-
 class UsersRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
 
@@ -255,25 +238,6 @@ class WhoisRestServlet(RestServlet):
         return (200, ret)
 
 
-class PurgeMediaCacheRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/purge_media_cache")
-
-    def __init__(self, hs):
-        self.media_repository = hs.get_media_repository()
-        self.auth = hs.get_auth()
-
-    @defer.inlineCallbacks
-    def on_POST(self, request):
-        yield assert_requester_is_admin(self.auth, request)
-
-        before_ts = parse_integer(request, "before_ts", required=True)
-        logger.info("before_ts: %r", before_ts)
-
-        ret = yield self.media_repository.delete_old_remote_media(before_ts)
-
-        return (200, ret)
-
-
 class PurgeHistoryRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns(
         "/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@@ -542,50 +506,6 @@ class ShutdownRoomRestServlet(RestServlet):
         )
 
 
-class QuarantineMediaInRoom(RestServlet):
-    """Quarantines all media in a room so that no one can download it via
-    this server.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.auth = hs.get_auth()
-
-    @defer.inlineCallbacks
-    def on_POST(self, request, room_id):
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
-
-        num_quarantined = yield self.store.quarantine_media_ids_in_room(
-            room_id, requester.user.to_string()
-        )
-
-        return (200, {"num_quarantined": num_quarantined})
-
-
-class ListMediaInRoom(RestServlet):
-    """Lists all of the media in a given room.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-
-    @defer.inlineCallbacks
-    def on_GET(self, request, room_id):
-        requester = yield self.auth.get_user_by_req(request)
-        is_admin = yield self.auth.is_server_admin(requester.user)
-        if not is_admin:
-            raise AuthError(403, "You are not a server admin")
-
-        local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
-
-        return (200, {"local": local_mxcs, "remote": remote_mxcs})
-
-
 class ResetPasswordRestServlet(RestServlet):
     """Post request to allow an administrator reset password for a user.
     This needs user to have administrator access in Synapse.
@@ -825,7 +745,6 @@ def register_servlets(hs, http_server):
 def register_servlets_for_client_rest_resource(hs, http_server):
     """Register only the servlets which need to be exposed on /_matrix/client/xxx"""
     WhoisRestServlet(hs).register(http_server)
-    PurgeMediaCacheRestServlet(hs).register(http_server)
     PurgeHistoryStatusRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
     PurgeHistoryRestServlet(hs).register(http_server)
@@ -834,10 +753,13 @@ def register_servlets_for_client_rest_resource(hs, http_server):
     GetUsersPaginatedRestServlet(hs).register(http_server)
     SearchUsersRestServlet(hs).register(http_server)
     ShutdownRoomRestServlet(hs).register(http_server)
-    QuarantineMediaInRoom(hs).register(http_server)
-    ListMediaInRoom(hs).register(http_server)
     UserRegisterServlet(hs).register(http_server)
     DeleteGroupAdminRestServlet(hs).register(http_server)
     AccountValidityRenewServlet(hs).register(http_server)
+
+    # Load the media repo ones if we're using them.
+    if hs.config.can_load_media_repo:
+        register_servlets_for_media_repo(hs, http_server)
+
     # don't add more things here: new servlets should only be exposed on
     # /_synapse/admin so should not go here. Instead register them in AdminRestResource.
diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py
index 881d67b89c..5a9b08d3ef 100644
--- a/synapse/rest/admin/_base.py
+++ b/synapse/rest/admin/_base.py
@@ -12,11 +12,36 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import re
+
 from twisted.internet import defer
 
 from synapse.api.errors import AuthError
 
 
+def historical_admin_path_patterns(path_regex):
+    """Returns the list of patterns for an admin endpoint, including historical ones
+
+    This is a backwards-compatibility hack. Previously, the Admin API was exposed at
+    various paths under /_matrix/client. This function returns a list of patterns
+    matching those paths (as well as the new one), so that existing scripts which rely
+    on the endpoints being available there are not broken.
+
+    Note that this should only be used for existing endpoints: new ones should just
+    register for the /_synapse/admin path.
+    """
+    return list(
+        re.compile(prefix + path_regex)
+        for prefix in (
+            "^/_synapse/admin/v1",
+            "^/_matrix/client/api/v1/admin",
+            "^/_matrix/client/unstable/admin",
+            "^/_matrix/client/r0/admin",
+        )
+    )
+
+
 @defer.inlineCallbacks
 def assert_requester_is_admin(auth, request):
     """Verify that the requester is an admin user
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
new file mode 100644
index 0000000000..824df919f2
--- /dev/null
+++ b/synapse/rest/admin/media.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018-2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import AuthError
+from synapse.http.servlet import RestServlet, parse_integer
+from synapse.rest.admin._base import (
+    assert_requester_is_admin,
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class QuarantineMediaInRoom(RestServlet):
+    """Quarantines all media in a room so that no one can download it via
+    this server.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+        yield assert_user_is_admin(self.auth, requester.user)
+
+        num_quarantined = yield self.store.quarantine_media_ids_in_room(
+            room_id, requester.user.to_string()
+        )
+
+        return (200, {"num_quarantined": num_quarantined})
+
+
+class ListMediaInRoom(RestServlet):
+    """Lists all of the media in a given room.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
+
+        return (200, {"local": local_mxcs, "remote": remote_mxcs})
+
+
+class PurgeMediaCacheRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/purge_media_cache")
+
+    def __init__(self, hs):
+        self.media_repository = hs.get_media_repository()
+        self.auth = hs.get_auth()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        yield assert_requester_is_admin(self.auth, request)
+
+        before_ts = parse_integer(request, "before_ts", required=True)
+        logger.info("before_ts: %r", before_ts)
+
+        ret = yield self.media_repository.delete_old_remote_media(before_ts)
+
+        return (200, ret)
+
+
+def register_servlets_for_media_repo(hs, http_server):
+    """
+    Media repo specific APIs.
+    """
+    PurgeMediaCacheRestServlet(hs).register(http_server)
+    QuarantineMediaInRoom(hs).register(http_server)
+    ListMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 92beefa176..cf5759e9a6 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
+from synapse.config._base import ConfigError
 from synapse.logging.context import defer_to_thread
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.async_helpers import Linearizer
@@ -753,8 +754,11 @@ class MediaRepositoryResource(Resource):
     """
 
     def __init__(self, hs):
-        Resource.__init__(self)
+        # If we're not configured to use it, raise if we somehow got here.
+        if not hs.config.can_load_media_repo:
+            raise ConfigError("Synapse is not configured to use a media repo.")
 
+        super().__init__()
         media_repo = hs.get_media_repository()
 
         self.putChild(b"upload", UploadResource(hs, media_repo))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index be3d4d9ded..b431d24b8a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_last_stream_ordering_and_success(
         self, app_id, pushkey, user_id, last_stream_ordering, last_success
     ):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {
+        """Update the last stream ordering position we've processed up to for
+        the given pusher.
+
+        Args:
+            app_id (str)
+            pushkey (str)
+            last_stream_ordering (int)
+            last_success (int)
+
+        Returns:
+            Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+        """
+        updated = yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={
                 "last_stream_ordering": last_stream_ordering,
                 "last_success": last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
 
+        return bool(updated)
+
     @defer.inlineCallbacks
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {"failing_since": failing_since},
+        yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={"failing_since": failing_since},
             desc="update_pusher_failing_since",
         )
 
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index 4255add097..1435baede2 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -25,17 +25,19 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
 from twisted.internet.protocol import Factory
 from twisted.protocols.tls import TLSMemoryBIOFactory
 from twisted.web._newclient import ResponseNeverReceived
+from twisted.web.client import Agent
 from twisted.web.http import HTTPChannel
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IPolicyForHTTPS
 
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto.context_factory import ClientTLSOptionsFactory
-from synapse.http.federation.matrix_federation_agent import (
-    MatrixFederationAgent,
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.federation.srv_resolver import Server
+from synapse.http.federation.well_known_resolver import (
+    WellKnownResolver,
     _cache_period_from_headers,
 )
-from synapse.http.federation.srv_resolver import Server
 from synapse.logging.context import LoggingContext
 from synapse.util.caches.ttlcache import TTLCache
 
@@ -79,9 +81,10 @@ class MatrixFederationAgentTests(TestCase):
         self._config = config = HomeServerConfig()
         config.parse_config_dict(config_dict, "", "")
 
+        self.tls_factory = ClientTLSOptionsFactory(config)
         self.agent = MatrixFederationAgent(
             reactor=self.reactor,
-            tls_client_options_factory=ClientTLSOptionsFactory(config),
+            tls_client_options_factory=self.tls_factory,
             _srv_resolver=self.mock_resolver,
             _well_known_cache=self.well_known_cache,
         )
@@ -928,20 +931,16 @@ class MatrixFederationAgentTests(TestCase):
         self.reactor.pump((0.1,))
         self.successResultOf(test_d)
 
-    @defer.inlineCallbacks
-    def do_get_well_known(self, serv):
-        try:
-            result = yield self.agent._get_well_known(serv)
-            logger.info("Result from well-known fetch: %s", result)
-        except Exception as e:
-            logger.warning("Error fetching well-known: %s", e)
-            raise
-        return result
-
     def test_well_known_cache(self):
+        well_known_resolver = WellKnownResolver(
+            self.reactor,
+            Agent(self.reactor, contextFactory=self.tls_factory),
+            well_known_cache=self.well_known_cache,
+        )
+
         self.reactor.lookups["testserv"] = "1.2.3.4"
 
-        fetch_d = self.do_get_well_known(b"testserv")
+        fetch_d = well_known_resolver.get_well_known(b"testserv")
 
         # there should be an attempt to connect on port 443 for the .well-known
         clients = self.reactor.tcpClients
@@ -953,26 +952,26 @@ class MatrixFederationAgentTests(TestCase):
         well_known_server = self._handle_well_known_connection(
             client_factory,
             expected_sni=b"testserv",
-            response_headers={b"Cache-Control": b"max-age=10"},
+            response_headers={b"Cache-Control": b"max-age=1000"},
             content=b'{ "m.server": "target-server" }',
         )
 
         r = self.successResultOf(fetch_d)
-        self.assertEqual(r, b"target-server")
+        self.assertEqual(r.delegated_server, b"target-server")
 
         # close the tcp connection
         well_known_server.loseConnection()
 
         # repeat the request: it should hit the cache
-        fetch_d = self.do_get_well_known(b"testserv")
+        fetch_d = well_known_resolver.get_well_known(b"testserv")
         r = self.successResultOf(fetch_d)
-        self.assertEqual(r, b"target-server")
+        self.assertEqual(r.delegated_server, b"target-server")
 
         # expire the cache
-        self.reactor.pump((10.0,))
+        self.reactor.pump((1000.0,))
 
         # now it should connect again
-        fetch_d = self.do_get_well_known(b"testserv")
+        fetch_d = well_known_resolver.get_well_known(b"testserv")
 
         self.assertEqual(len(clients), 1)
         (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@@ -986,7 +985,7 @@ class MatrixFederationAgentTests(TestCase):
         )
 
         r = self.successResultOf(fetch_d)
-        self.assertEqual(r, b"other-server")
+        self.assertEqual(r.delegated_server, b"other-server")
 
 
 class TestCachePeriodFromHeaders(TestCase):