diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index ea26f29acb..3a168577c7 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -26,6 +26,7 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
@@ -35,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -71,6 +73,12 @@ class MediaRepositoryServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
media_repo = self.get_media_repository_resource()
+
+ # We need to serve the admin servlets for media on the
+ # worker.
+ admin_resource = JsonResource(self, canonical_json=False)
+ register_servlets_for_media_repo(self, admin_resource)
+
resources.update(
{
MEDIA_PREFIX: media_repo,
@@ -78,6 +86,7 @@ class MediaRepositoryServer(HomeServer):
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
+ "/_synapse/admin": admin_resource,
}
)
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 80a628d9b0..db39697e45 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import os
from collections import namedtuple
@@ -87,6 +88,18 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config, **kwargs):
+
+ # Only enable the media repo if either the media repo is enabled or the
+ # current worker app is the media repo.
+ if (
+ self.enable_media_repo is False
+ and config.worker_app != "synapse.app.media_repository"
+ ):
+ self.can_load_media_repo = False
+ return
+ else:
+ self.can_load_media_repo = True
+
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
@@ -202,6 +215,13 @@ class ContentRepositoryConfig(Config):
return (
r"""
+ ## Media Store ##
+
+ # Enable the media store service in the Synapse master. Uncomment the
+ # following if you are using a separate media store worker.
+ #
+ #enable_media_repo: false
+
# Directory where uploaded images and attachments are stored.
#
media_store_path: "%(media_store)s"
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index a0d5139839..71a15f434d 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
+
import logging
-import random
-import time
import attr
from netaddr import IPAddress
@@ -24,31 +22,16 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IStreamClientEndpoint
-from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
-from twisted.web.http import stringToDatetime
+from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
-from synapse.util.caches.ttlcache import TTLCache
-from synapse.util.metrics import Measure
-
-# period to cache .well-known results for by default
-WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
-
-# jitter to add to the .well-known default cache ttl
-WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
-
-# period to cache failure to fetch .well-known for
-WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
-
-# cap for .well-known cache period
-WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
logger = logging.getLogger(__name__)
-well_known_cache = TTLCache("well-known")
@implementer(IAgent)
@@ -78,7 +61,7 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
- _well_known_cache=well_known_cache,
+ _well_known_cache=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
@@ -93,20 +76,15 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
- _well_known_agent = RedirectAgent(
- Agent(
+ self._well_known_resolver = WellKnownResolver(
+ self._reactor,
+ agent=Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
- )
+ ),
+ well_known_cache=_well_known_cache,
)
- self._well_known_agent = _well_known_agent
-
- # our cache of .well-known lookup results, mapping from server name
- # to delegated name. The values can be:
- # `bytes`: a valid server-name
- # `None`: there is no (valid) .well-known here
- self._well_known_cache = _well_known_cache
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
@@ -217,7 +195,10 @@ class MatrixFederationAgent(object):
if lookup_well_known:
# try a .well-known lookup
- well_known_server = yield self._get_well_known(parsed_uri.host)
+ well_known_result = yield self._well_known_resolver.get_well_known(
+ parsed_uri.host
+ )
+ well_known_server = well_known_result.delegated_server
if well_known_server:
# if we found a .well-known, start again, but don't do another
@@ -280,85 +261,6 @@ class MatrixFederationAgent(object):
target_port=port,
)
- @defer.inlineCallbacks
- def _get_well_known(self, server_name):
- """Attempt to fetch and parse a .well-known file for the given server
-
- Args:
- server_name (bytes): name of the server, from the requested url
-
- Returns:
- Deferred[bytes|None]: either the new server name, from the .well-known, or
- None if there was no .well-known file.
- """
- try:
- result = self._well_known_cache[server_name]
- except KeyError:
- # TODO: should we linearise so that we don't end up doing two .well-known
- # requests for the same server in parallel?
- with Measure(self._clock, "get_well_known"):
- result, cache_period = yield self._do_get_well_known(server_name)
-
- if cache_period > 0:
- self._well_known_cache.set(server_name, result, cache_period)
-
- return result
-
- @defer.inlineCallbacks
- def _do_get_well_known(self, server_name):
- """Actually fetch and parse a .well-known, without checking the cache
-
- Args:
- server_name (bytes): name of the server, from the requested url
-
- Returns:
- Deferred[Tuple[bytes|None|object],int]:
- result, cache period, where result is one of:
- - the new server name from the .well-known (as a `bytes`)
- - None if there was no .well-known file.
- - INVALID_WELL_KNOWN if the .well-known was invalid
- """
- uri = b"https://%s/.well-known/matrix/server" % (server_name,)
- uri_str = uri.decode("ascii")
- logger.info("Fetching %s", uri_str)
- try:
- response = yield make_deferred_yieldable(
- self._well_known_agent.request(b"GET", uri)
- )
- body = yield make_deferred_yieldable(readBody(response))
- if response.code != 200:
- raise Exception("Non-200 response %s" % (response.code,))
-
- parsed_body = json.loads(body.decode("utf-8"))
- logger.info("Response from .well-known: %s", parsed_body)
- if not isinstance(parsed_body, dict):
- raise Exception("not a dict")
- if "m.server" not in parsed_body:
- raise Exception("Missing key 'm.server'")
- except Exception as e:
- logger.info("Error fetching %s: %s", uri_str, e)
-
- # add some randomness to the TTL to avoid a stampeding herd every hour
- # after startup
- cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
- cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
- return (None, cache_period)
-
- result = parsed_body["m.server"].encode("ascii")
-
- cache_period = _cache_period_from_headers(
- response.headers, time_now=self._reactor.seconds
- )
- if cache_period is None:
- cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
- # add some randomness to the TTL to avoid a stampeding herd every 24 hours
- # after startup
- cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
- else:
- cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
-
- return (result, cache_period)
-
@implementer(IStreamClientEndpoint)
class LoggingHostnameEndpoint(object):
@@ -374,44 +276,6 @@ class LoggingHostnameEndpoint(object):
return self.ep.connect(protocol_factory)
-def _cache_period_from_headers(headers, time_now=time.time):
- cache_controls = _parse_cache_control(headers)
-
- if b"no-store" in cache_controls:
- return 0
-
- if b"max-age" in cache_controls:
- try:
- max_age = int(cache_controls[b"max-age"])
- return max_age
- except ValueError:
- pass
-
- expires = headers.getRawHeaders(b"expires")
- if expires is not None:
- try:
- expires_date = stringToDatetime(expires[-1])
- return expires_date - time_now()
- except ValueError:
- # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
- # especially the value "0", as representing a time in the past (i.e.,
- # "already expired").
- return 0
-
- return None
-
-
-def _parse_cache_control(headers):
- cache_controls = {}
- for hdr in headers.getRawHeaders(b"cache-control", []):
- for directive in hdr.split(b","):
- splits = [x.strip() for x in directive.split(b"=", 1)]
- k = splits[0].lower()
- v = splits[1] if len(splits) > 1 else None
- cache_controls[k] = v
- return cache_controls
-
-
@attr.s
class _RoutingResult(object):
"""The result returned by `_route_matrix_uri`.
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
new file mode 100644
index 0000000000..d2866ff67d
--- /dev/null
+++ b/synapse/http/federation/well_known_resolver.py
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import random
+import time
+
+import attr
+
+from twisted.internet import defer
+from twisted.web.client import RedirectAgent, readBody
+from twisted.web.http import stringToDatetime
+
+from synapse.logging.context import make_deferred_yieldable
+from synapse.util import Clock
+from synapse.util.caches.ttlcache import TTLCache
+from synapse.util.metrics import Measure
+
+# period to cache .well-known results for by default
+WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
+
+# jitter to add to the .well-known default cache ttl
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+
+# period to cache failure to fetch .well-known for
+WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
+
+# cap for .well-known cache period
+WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
+
+# lower bound for .well-known cache period
+WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
+
+logger = logging.getLogger(__name__)
+
+
+_well_known_cache = TTLCache("well-known")
+
+
+@attr.s(slots=True, frozen=True)
+class WellKnownLookupResult(object):
+ delegated_server = attr.ib()
+
+
+class WellKnownResolver(object):
+ """Handles well-known lookups for matrix servers.
+ """
+
+ def __init__(self, reactor, agent, well_known_cache=None):
+ self._reactor = reactor
+ self._clock = Clock(reactor)
+
+ if well_known_cache is None:
+ well_known_cache = _well_known_cache
+
+ self._well_known_cache = well_known_cache
+ self._well_known_agent = RedirectAgent(agent)
+
+ @defer.inlineCallbacks
+ def get_well_known(self, server_name):
+ """Attempt to fetch and parse a .well-known file for the given server
+
+ Args:
+ server_name (bytes): name of the server, from the requested url
+
+ Returns:
+ Deferred[WellKnownLookupResult]: The result of the lookup
+ """
+ try:
+ result = self._well_known_cache[server_name]
+ except KeyError:
+ # TODO: should we linearise so that we don't end up doing two .well-known
+ # requests for the same server in parallel?
+ with Measure(self._clock, "get_well_known"):
+ result, cache_period = yield self._do_get_well_known(server_name)
+
+ if cache_period > 0:
+ self._well_known_cache.set(server_name, result, cache_period)
+
+ return WellKnownLookupResult(delegated_server=result)
+
+ @defer.inlineCallbacks
+ def _do_get_well_known(self, server_name):
+ """Actually fetch and parse a .well-known, without checking the cache
+
+ Args:
+ server_name (bytes): name of the server, from the requested url
+
+ Returns:
+ Deferred[Tuple[bytes|None|object],int]:
+ result, cache period, where result is one of:
+ - the new server name from the .well-known (as a `bytes`)
+ - None if there was no .well-known file.
+ - INVALID_WELL_KNOWN if the .well-known was invalid
+ """
+ uri = b"https://%s/.well-known/matrix/server" % (server_name,)
+ uri_str = uri.decode("ascii")
+ logger.info("Fetching %s", uri_str)
+ try:
+ response = yield make_deferred_yieldable(
+ self._well_known_agent.request(b"GET", uri)
+ )
+ body = yield make_deferred_yieldable(readBody(response))
+ if response.code != 200:
+ raise Exception("Non-200 response %s" % (response.code,))
+
+ parsed_body = json.loads(body.decode("utf-8"))
+ logger.info("Response from .well-known: %s", parsed_body)
+ if not isinstance(parsed_body, dict):
+ raise Exception("not a dict")
+ if "m.server" not in parsed_body:
+ raise Exception("Missing key 'm.server'")
+ except Exception as e:
+ logger.info("Error fetching %s: %s", uri_str, e)
+
+ # add some randomness to the TTL to avoid a stampeding herd every hour
+ # after startup
+ cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+ cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ return (None, cache_period)
+
+ result = parsed_body["m.server"].encode("ascii")
+
+ cache_period = _cache_period_from_headers(
+ response.headers, time_now=self._reactor.seconds
+ )
+ if cache_period is None:
+ cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
+ # add some randomness to the TTL to avoid a stampeding herd every 24 hours
+ # after startup
+ cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ else:
+ cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
+ cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
+
+ return (result, cache_period)
+
+
+def _cache_period_from_headers(headers, time_now=time.time):
+ cache_controls = _parse_cache_control(headers)
+
+ if b"no-store" in cache_controls:
+ return 0
+
+ if b"max-age" in cache_controls:
+ try:
+ max_age = int(cache_controls[b"max-age"])
+ return max_age
+ except ValueError:
+ pass
+
+ expires = headers.getRawHeaders(b"expires")
+ if expires is not None:
+ try:
+ expires_date = stringToDatetime(expires[-1])
+ return expires_date - time_now()
+ except ValueError:
+ # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
+ # especially the value "0", as representing a time in the past (i.e.,
+ # "already expired").
+ return 0
+
+ return None
+
+
+def _parse_cache_control(headers):
+ cache_controls = {}
+ for hdr in headers.getRawHeaders(b"cache-control", []):
+ for directive in hdr.split(b","):
+ splits = [x.strip() for x in directive.split(b"=", 1)]
+ k = splits[0].lower()
+ v = splits[1] if len(splits) > 1 else None
+ cache_controls[k] = v
+ return cache_controls
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 424ffa8b68..42e5b0c0a5 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -234,13 +234,19 @@ class EmailPusher(object):
return
self.last_stream_ordering = last_stream_ordering
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.email,
- self.user_id,
- last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.email,
+ self.user_id,
+ last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bfe3b36d52..bf65cfa21a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -203,13 +203,21 @@ class HttpPusher(object):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
+ return
+
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
@@ -238,12 +246,17 @@ class HttpPusher(object):
)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- yield self.store.update_pusher_last_stream_ordering(
+ pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
+ return
self.failing_since = None
yield self.store.update_pusher_failing_since(
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 0a7d9b81b2..5720cab425 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import Membership, UserTypes
-from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import JsonResource
from synapse.http.servlet import (
RestServlet,
@@ -36,7 +36,12 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
-from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
+from synapse.rest.admin._base import (
+ assert_requester_is_admin,
+ assert_user_is_admin,
+ historical_admin_path_patterns,
+)
+from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
@@ -44,28 +49,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
-def historical_admin_path_patterns(path_regex):
- """Returns the list of patterns for an admin endpoint, including historical ones
-
- This is a backwards-compatibility hack. Previously, the Admin API was exposed at
- various paths under /_matrix/client. This function returns a list of patterns
- matching those paths (as well as the new one), so that existing scripts which rely
- on the endpoints being available there are not broken.
-
- Note that this should only be used for existing endpoints: new ones should just
- register for the /_synapse/admin path.
- """
- return list(
- re.compile(prefix + path_regex)
- for prefix in (
- "^/_synapse/admin/v1",
- "^/_matrix/client/api/v1/admin",
- "^/_matrix/client/unstable/admin",
- "^/_matrix/client/r0/admin",
- )
- )
-
-
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
@@ -255,25 +238,6 @@ class WhoisRestServlet(RestServlet):
return (200, ret)
-class PurgeMediaCacheRestServlet(RestServlet):
- PATTERNS = historical_admin_path_patterns("/purge_media_cache")
-
- def __init__(self, hs):
- self.media_repository = hs.get_media_repository()
- self.auth = hs.get_auth()
-
- @defer.inlineCallbacks
- def on_POST(self, request):
- yield assert_requester_is_admin(self.auth, request)
-
- before_ts = parse_integer(request, "before_ts", required=True)
- logger.info("before_ts: %r", before_ts)
-
- ret = yield self.media_repository.delete_old_remote_media(before_ts)
-
- return (200, ret)
-
-
class PurgeHistoryRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns(
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@@ -542,50 +506,6 @@ class ShutdownRoomRestServlet(RestServlet):
)
-class QuarantineMediaInRoom(RestServlet):
- """Quarantines all media in a room so that no one can download it via
- this server.
- """
-
- PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
-
- def __init__(self, hs):
- self.store = hs.get_datastore()
- self.auth = hs.get_auth()
-
- @defer.inlineCallbacks
- def on_POST(self, request, room_id):
- requester = yield self.auth.get_user_by_req(request)
- yield assert_user_is_admin(self.auth, requester.user)
-
- num_quarantined = yield self.store.quarantine_media_ids_in_room(
- room_id, requester.user.to_string()
- )
-
- return (200, {"num_quarantined": num_quarantined})
-
-
-class ListMediaInRoom(RestServlet):
- """Lists all of the media in a given room.
- """
-
- PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
-
- def __init__(self, hs):
- self.store = hs.get_datastore()
-
- @defer.inlineCallbacks
- def on_GET(self, request, room_id):
- requester = yield self.auth.get_user_by_req(request)
- is_admin = yield self.auth.is_server_admin(requester.user)
- if not is_admin:
- raise AuthError(403, "You are not a server admin")
-
- local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
-
- return (200, {"local": local_mxcs, "remote": remote_mxcs})
-
-
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
@@ -825,7 +745,6 @@ def register_servlets(hs, http_server):
def register_servlets_for_client_rest_resource(hs, http_server):
"""Register only the servlets which need to be exposed on /_matrix/client/xxx"""
WhoisRestServlet(hs).register(http_server)
- PurgeMediaCacheRestServlet(hs).register(http_server)
PurgeHistoryStatusRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
@@ -834,10 +753,13 @@ def register_servlets_for_client_rest_resource(hs, http_server):
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
ShutdownRoomRestServlet(hs).register(http_server)
- QuarantineMediaInRoom(hs).register(http_server)
- ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
DeleteGroupAdminRestServlet(hs).register(http_server)
AccountValidityRenewServlet(hs).register(http_server)
+
+ # Load the media repo ones if we're using them.
+ if hs.config.can_load_media_repo:
+ register_servlets_for_media_repo(hs, http_server)
+
# don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py
index 881d67b89c..5a9b08d3ef 100644
--- a/synapse/rest/admin/_base.py
+++ b/synapse/rest/admin/_base.py
@@ -12,11 +12,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import re
+
from twisted.internet import defer
from synapse.api.errors import AuthError
+def historical_admin_path_patterns(path_regex):
+ """Returns the list of patterns for an admin endpoint, including historical ones
+
+ This is a backwards-compatibility hack. Previously, the Admin API was exposed at
+ various paths under /_matrix/client. This function returns a list of patterns
+ matching those paths (as well as the new one), so that existing scripts which rely
+ on the endpoints being available there are not broken.
+
+ Note that this should only be used for existing endpoints: new ones should just
+ register for the /_synapse/admin path.
+ """
+ return list(
+ re.compile(prefix + path_regex)
+ for prefix in (
+ "^/_synapse/admin/v1",
+ "^/_matrix/client/api/v1/admin",
+ "^/_matrix/client/unstable/admin",
+ "^/_matrix/client/r0/admin",
+ )
+ )
+
+
@defer.inlineCallbacks
def assert_requester_is_admin(auth, request):
"""Verify that the requester is an admin user
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
new file mode 100644
index 0000000000..824df919f2
--- /dev/null
+++ b/synapse/rest/admin/media.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018-2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import AuthError
+from synapse.http.servlet import RestServlet, parse_integer
+from synapse.rest.admin._base import (
+ assert_requester_is_admin,
+ assert_user_is_admin,
+ historical_admin_path_patterns,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class QuarantineMediaInRoom(RestServlet):
+ """Quarantines all media in a room so that no one can download it via
+ this server.
+ """
+
+ PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ yield assert_user_is_admin(self.auth, requester.user)
+
+ num_quarantined = yield self.store.quarantine_media_ids_in_room(
+ room_id, requester.user.to_string()
+ )
+
+ return (200, {"num_quarantined": num_quarantined})
+
+
+class ListMediaInRoom(RestServlet):
+ """Lists all of the media in a given room.
+ """
+
+ PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ is_admin = yield self.auth.is_server_admin(requester.user)
+ if not is_admin:
+ raise AuthError(403, "You are not a server admin")
+
+ local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
+
+ return (200, {"local": local_mxcs, "remote": remote_mxcs})
+
+
+class PurgeMediaCacheRestServlet(RestServlet):
+ PATTERNS = historical_admin_path_patterns("/purge_media_cache")
+
+ def __init__(self, hs):
+ self.media_repository = hs.get_media_repository()
+ self.auth = hs.get_auth()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ yield assert_requester_is_admin(self.auth, request)
+
+ before_ts = parse_integer(request, "before_ts", required=True)
+ logger.info("before_ts: %r", before_ts)
+
+ ret = yield self.media_repository.delete_old_remote_media(before_ts)
+
+ return (200, ret)
+
+
+def register_servlets_for_media_repo(hs, http_server):
+ """
+ Media repo specific APIs.
+ """
+ PurgeMediaCacheRestServlet(hs).register(http_server)
+ QuarantineMediaInRoom(hs).register(http_server)
+ ListMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 92beefa176..cf5759e9a6 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
+from synapse.config._base import ConfigError
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async_helpers import Linearizer
@@ -753,8 +754,11 @@ class MediaRepositoryResource(Resource):
"""
def __init__(self, hs):
- Resource.__init__(self)
+ # If we're not configured to use it, raise if we somehow got here.
+ if not hs.config.can_load_media_repo:
+ raise ConfigError("Synapse is not configured to use a media repo.")
+ super().__init__()
media_repo = hs.get_media_repository()
self.putChild(b"upload", UploadResource(hs, media_repo))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index be3d4d9ded..b431d24b8a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success
):
- yield self._simple_update_one(
- "pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- {
+ """Update the last stream ordering position we've processed up to for
+ the given pusher.
+
+ Args:
+ app_id (str)
+ pushkey (str)
+ last_stream_ordering (int)
+ last_success (int)
+
+ Returns:
+ Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+ """
+ updated = yield self._simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={
"last_stream_ordering": last_stream_ordering,
"last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
+ return bool(updated)
+
@defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
- yield self._simple_update_one(
- "pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- {"failing_since": failing_since},
+ yield self._simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={"failing_since": failing_since},
desc="update_pusher_failing_since",
)
|