diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2019-08-13 15:54:01 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-08-13 15:54:01 +0100 |
commit | a7efbc541601927904196bcf220a99e666a9a4a4 (patch) | |
tree | ff065ad5d403adfb72eb00160696205823299b54 /synapse | |
parent | Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff) | |
parent | Merge pull request #5809 from matrix-org/erikj/handle_pusher_stop (diff) | |
download | synapse-a7efbc541601927904196bcf220a99e666a9a4a4.tar.xz |
Merge branch 'release-v1.3.0' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/media_repository.py | 9 | ||||
-rw-r--r-- | synapse/config/repository.py | 20 | ||||
-rw-r--r-- | synapse/http/federation/matrix_federation_agent.py | 162 | ||||
-rw-r--r-- | synapse/http/federation/well_known_resolver.py | 187 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 18 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 27 | ||||
-rw-r--r-- | synapse/rest/admin/__init__.py | 102 | ||||
-rw-r--r-- | synapse/rest/admin/_base.py | 25 | ||||
-rw-r--r-- | synapse/rest/admin/media.py | 101 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 6 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 30 |
11 files changed, 426 insertions, 261 deletions
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index ea26f29acb..3a168577c7 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -26,6 +26,7 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy @@ -35,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -71,6 +73,12 @@ class MediaRepositoryServer(HomeServer): resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": media_repo = self.get_media_repository_resource() + + # We need to serve the admin servlets for media on the + # worker. + admin_resource = JsonResource(self, canonical_json=False) + register_servlets_for_media_repo(self, admin_resource) + resources.update( { MEDIA_PREFIX: media_repo, @@ -78,6 +86,7 @@ class MediaRepositoryServer(HomeServer): CONTENT_REPO_PREFIX: ContentRepoResource( self, self.config.uploads_path ), + "/_synapse/admin": admin_resource, } ) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 80a628d9b0..db39697e45 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import os from collections import namedtuple @@ -87,6 +88,18 @@ def parse_thumbnail_requirements(thumbnail_sizes): class ContentRepositoryConfig(Config): def read_config(self, config, **kwargs): + + # Only enable the media repo if either the media repo is enabled or the + # current worker app is the media repo. + if ( + self.enable_media_repo is False + and config.worker_app != "synapse.app.media_repository" + ): + self.can_load_media_repo = False + return + else: + self.can_load_media_repo = True + self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M")) self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M")) self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M")) @@ -202,6 +215,13 @@ class ContentRepositoryConfig(Config): return ( r""" + ## Media Store ## + + # Enable the media store service in the Synapse master. Uncomment the + # following if you are using a separate media store worker. + # + #enable_media_repo: false + # Directory where uploaded images and attachments are stored. # media_store_path: "%(media_store)s" diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index a0d5139839..71a15f434d 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -12,10 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json + import logging -import random -import time import attr from netaddr import IPAddress @@ -24,31 +22,16 @@ from zope.interface import implementer from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet.interfaces import IStreamClientEndpoint -from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody -from twisted.web.http import stringToDatetime +from twisted.web.client import URI, Agent, HTTPConnectionPool from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list +from synapse.http.federation.well_known_resolver import WellKnownResolver from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock -from synapse.util.caches.ttlcache import TTLCache -from synapse.util.metrics import Measure - -# period to cache .well-known results for by default -WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600 - -# jitter to add to the .well-known default cache ttl -WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60 - -# period to cache failure to fetch .well-known for -WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600 - -# cap for .well-known cache period -WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600 logger = logging.getLogger(__name__) -well_known_cache = TTLCache("well-known") @implementer(IAgent) @@ -78,7 +61,7 @@ class MatrixFederationAgent(object): reactor, tls_client_options_factory, _srv_resolver=None, - _well_known_cache=well_known_cache, + _well_known_cache=None, ): self._reactor = reactor self._clock = Clock(reactor) @@ -93,20 +76,15 @@ class MatrixFederationAgent(object): self._pool.maxPersistentPerHost = 5 self._pool.cachedConnectionTimeout = 2 * 60 - _well_known_agent = RedirectAgent( - Agent( + self._well_known_resolver = WellKnownResolver( + self._reactor, + agent=Agent( self._reactor, pool=self._pool, contextFactory=tls_client_options_factory, - ) + ), + well_known_cache=_well_known_cache, ) - self._well_known_agent = _well_known_agent - - # our cache of .well-known lookup results, mapping from server name - # to delegated name. The values can be: - # `bytes`: a valid server-name - # `None`: there is no (valid) .well-known here - self._well_known_cache = _well_known_cache @defer.inlineCallbacks def request(self, method, uri, headers=None, bodyProducer=None): @@ -217,7 +195,10 @@ class MatrixFederationAgent(object): if lookup_well_known: # try a .well-known lookup - well_known_server = yield self._get_well_known(parsed_uri.host) + well_known_result = yield self._well_known_resolver.get_well_known( + parsed_uri.host + ) + well_known_server = well_known_result.delegated_server if well_known_server: # if we found a .well-known, start again, but don't do another @@ -280,85 +261,6 @@ class MatrixFederationAgent(object): target_port=port, ) - @defer.inlineCallbacks - def _get_well_known(self, server_name): - """Attempt to fetch and parse a .well-known file for the given server - - Args: - server_name (bytes): name of the server, from the requested url - - Returns: - Deferred[bytes|None]: either the new server name, from the .well-known, or - None if there was no .well-known file. - """ - try: - result = self._well_known_cache[server_name] - except KeyError: - # TODO: should we linearise so that we don't end up doing two .well-known - # requests for the same server in parallel? - with Measure(self._clock, "get_well_known"): - result, cache_period = yield self._do_get_well_known(server_name) - - if cache_period > 0: - self._well_known_cache.set(server_name, result, cache_period) - - return result - - @defer.inlineCallbacks - def _do_get_well_known(self, server_name): - """Actually fetch and parse a .well-known, without checking the cache - - Args: - server_name (bytes): name of the server, from the requested url - - Returns: - Deferred[Tuple[bytes|None|object],int]: - result, cache period, where result is one of: - - the new server name from the .well-known (as a `bytes`) - - None if there was no .well-known file. - - INVALID_WELL_KNOWN if the .well-known was invalid - """ - uri = b"https://%s/.well-known/matrix/server" % (server_name,) - uri_str = uri.decode("ascii") - logger.info("Fetching %s", uri_str) - try: - response = yield make_deferred_yieldable( - self._well_known_agent.request(b"GET", uri) - ) - body = yield make_deferred_yieldable(readBody(response)) - if response.code != 200: - raise Exception("Non-200 response %s" % (response.code,)) - - parsed_body = json.loads(body.decode("utf-8")) - logger.info("Response from .well-known: %s", parsed_body) - if not isinstance(parsed_body, dict): - raise Exception("not a dict") - if "m.server" not in parsed_body: - raise Exception("Missing key 'm.server'") - except Exception as e: - logger.info("Error fetching %s: %s", uri_str, e) - - # add some randomness to the TTL to avoid a stampeding herd every hour - # after startup - cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD - cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) - return (None, cache_period) - - result = parsed_body["m.server"].encode("ascii") - - cache_period = _cache_period_from_headers( - response.headers, time_now=self._reactor.seconds - ) - if cache_period is None: - cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD - # add some randomness to the TTL to avoid a stampeding herd every 24 hours - # after startup - cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) - else: - cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD) - - return (result, cache_period) - @implementer(IStreamClientEndpoint) class LoggingHostnameEndpoint(object): @@ -374,44 +276,6 @@ class LoggingHostnameEndpoint(object): return self.ep.connect(protocol_factory) -def _cache_period_from_headers(headers, time_now=time.time): - cache_controls = _parse_cache_control(headers) - - if b"no-store" in cache_controls: - return 0 - - if b"max-age" in cache_controls: - try: - max_age = int(cache_controls[b"max-age"]) - return max_age - except ValueError: - pass - - expires = headers.getRawHeaders(b"expires") - if expires is not None: - try: - expires_date = stringToDatetime(expires[-1]) - return expires_date - time_now() - except ValueError: - # RFC7234 says 'A cache recipient MUST interpret invalid date formats, - # especially the value "0", as representing a time in the past (i.e., - # "already expired"). - return 0 - - return None - - -def _parse_cache_control(headers): - cache_controls = {} - for hdr in headers.getRawHeaders(b"cache-control", []): - for directive in hdr.split(b","): - splits = [x.strip() for x in directive.split(b"=", 1)] - k = splits[0].lower() - v = splits[1] if len(splits) > 1 else None - cache_controls[k] = v - return cache_controls - - @attr.s class _RoutingResult(object): """The result returned by `_route_matrix_uri`. diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py new file mode 100644 index 0000000000..d2866ff67d --- /dev/null +++ b/synapse/http/federation/well_known_resolver.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import random +import time + +import attr + +from twisted.internet import defer +from twisted.web.client import RedirectAgent, readBody +from twisted.web.http import stringToDatetime + +from synapse.logging.context import make_deferred_yieldable +from synapse.util import Clock +from synapse.util.caches.ttlcache import TTLCache +from synapse.util.metrics import Measure + +# period to cache .well-known results for by default +WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600 + +# jitter to add to the .well-known default cache ttl +WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60 + +# period to cache failure to fetch .well-known for +WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600 + +# cap for .well-known cache period +WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600 + +# lower bound for .well-known cache period +WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60 + +logger = logging.getLogger(__name__) + + +_well_known_cache = TTLCache("well-known") + + +@attr.s(slots=True, frozen=True) +class WellKnownLookupResult(object): + delegated_server = attr.ib() + + +class WellKnownResolver(object): + """Handles well-known lookups for matrix servers. + """ + + def __init__(self, reactor, agent, well_known_cache=None): + self._reactor = reactor + self._clock = Clock(reactor) + + if well_known_cache is None: + well_known_cache = _well_known_cache + + self._well_known_cache = well_known_cache + self._well_known_agent = RedirectAgent(agent) + + @defer.inlineCallbacks + def get_well_known(self, server_name): + """Attempt to fetch and parse a .well-known file for the given server + + Args: + server_name (bytes): name of the server, from the requested url + + Returns: + Deferred[WellKnownLookupResult]: The result of the lookup + """ + try: + result = self._well_known_cache[server_name] + except KeyError: + # TODO: should we linearise so that we don't end up doing two .well-known + # requests for the same server in parallel? + with Measure(self._clock, "get_well_known"): + result, cache_period = yield self._do_get_well_known(server_name) + + if cache_period > 0: + self._well_known_cache.set(server_name, result, cache_period) + + return WellKnownLookupResult(delegated_server=result) + + @defer.inlineCallbacks + def _do_get_well_known(self, server_name): + """Actually fetch and parse a .well-known, without checking the cache + + Args: + server_name (bytes): name of the server, from the requested url + + Returns: + Deferred[Tuple[bytes|None|object],int]: + result, cache period, where result is one of: + - the new server name from the .well-known (as a `bytes`) + - None if there was no .well-known file. + - INVALID_WELL_KNOWN if the .well-known was invalid + """ + uri = b"https://%s/.well-known/matrix/server" % (server_name,) + uri_str = uri.decode("ascii") + logger.info("Fetching %s", uri_str) + try: + response = yield make_deferred_yieldable( + self._well_known_agent.request(b"GET", uri) + ) + body = yield make_deferred_yieldable(readBody(response)) + if response.code != 200: + raise Exception("Non-200 response %s" % (response.code,)) + + parsed_body = json.loads(body.decode("utf-8")) + logger.info("Response from .well-known: %s", parsed_body) + if not isinstance(parsed_body, dict): + raise Exception("not a dict") + if "m.server" not in parsed_body: + raise Exception("Missing key 'm.server'") + except Exception as e: + logger.info("Error fetching %s: %s", uri_str, e) + + # add some randomness to the TTL to avoid a stampeding herd every hour + # after startup + cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD + cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) + return (None, cache_period) + + result = parsed_body["m.server"].encode("ascii") + + cache_period = _cache_period_from_headers( + response.headers, time_now=self._reactor.seconds + ) + if cache_period is None: + cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD + # add some randomness to the TTL to avoid a stampeding herd every 24 hours + # after startup + cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) + else: + cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD) + cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD) + + return (result, cache_period) + + +def _cache_period_from_headers(headers, time_now=time.time): + cache_controls = _parse_cache_control(headers) + + if b"no-store" in cache_controls: + return 0 + + if b"max-age" in cache_controls: + try: + max_age = int(cache_controls[b"max-age"]) + return max_age + except ValueError: + pass + + expires = headers.getRawHeaders(b"expires") + if expires is not None: + try: + expires_date = stringToDatetime(expires[-1]) + return expires_date - time_now() + except ValueError: + # RFC7234 says 'A cache recipient MUST interpret invalid date formats, + # especially the value "0", as representing a time in the past (i.e., + # "already expired"). + return 0 + + return None + + +def _parse_cache_control(headers): + cache_controls = {} + for hdr in headers.getRawHeaders(b"cache-control", []): + for directive in hdr.split(b","): + splits = [x.strip() for x in directive.split(b"=", 1)] + k = splits[0].lower() + v = splits[1] if len(splits) > 1 else None + cache_controls[k] = v + return cache_controls diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 424ffa8b68..42e5b0c0a5 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -234,13 +234,19 @@ class EmailPusher(object): return self.last_stream_ordering = last_stream_ordering - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() def seconds_until(self, ts_msec): secs = (ts_msec - self.clock.time_msec()) / 1000 diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index bfe3b36d52..bf65cfa21a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -203,13 +203,21 @@ class HttpPusher(object): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return + if self.failing_since: self.failing_since = None yield self.store.update_pusher_failing_since( @@ -238,12 +246,17 @@ class HttpPusher(object): ) self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering( + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering( self.app_id, self.pushkey, self.user_id, self.last_stream_ordering, ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return self.failing_since = None yield self.store.update_pusher_failing_since( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 0a7d9b81b2..5720cab425 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -27,7 +27,7 @@ from twisted.internet import defer import synapse from synapse.api.constants import Membership, UserTypes -from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError +from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.http.server import JsonResource from synapse.http.servlet import ( RestServlet, @@ -36,7 +36,12 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) -from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin +from synapse.rest.admin._base import ( + assert_requester_is_admin, + assert_user_is_admin, + historical_admin_path_patterns, +) +from synapse.rest.admin.media import register_servlets_for_media_repo from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.types import UserID, create_requester from synapse.util.versionstring import get_version_string @@ -44,28 +49,6 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) -def historical_admin_path_patterns(path_regex): - """Returns the list of patterns for an admin endpoint, including historical ones - - This is a backwards-compatibility hack. Previously, the Admin API was exposed at - various paths under /_matrix/client. This function returns a list of patterns - matching those paths (as well as the new one), so that existing scripts which rely - on the endpoints being available there are not broken. - - Note that this should only be used for existing endpoints: new ones should just - register for the /_synapse/admin path. - """ - return list( - re.compile(prefix + path_regex) - for prefix in ( - "^/_synapse/admin/v1", - "^/_matrix/client/api/v1/admin", - "^/_matrix/client/unstable/admin", - "^/_matrix/client/r0/admin", - ) - ) - - class UsersRestServlet(RestServlet): PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)") @@ -255,25 +238,6 @@ class WhoisRestServlet(RestServlet): return (200, ret) -class PurgeMediaCacheRestServlet(RestServlet): - PATTERNS = historical_admin_path_patterns("/purge_media_cache") - - def __init__(self, hs): - self.media_repository = hs.get_media_repository() - self.auth = hs.get_auth() - - @defer.inlineCallbacks - def on_POST(self, request): - yield assert_requester_is_admin(self.auth, request) - - before_ts = parse_integer(request, "before_ts", required=True) - logger.info("before_ts: %r", before_ts) - - ret = yield self.media_repository.delete_old_remote_media(before_ts) - - return (200, ret) - - class PurgeHistoryRestServlet(RestServlet): PATTERNS = historical_admin_path_patterns( "/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?" @@ -542,50 +506,6 @@ class ShutdownRoomRestServlet(RestServlet): ) -class QuarantineMediaInRoom(RestServlet): - """Quarantines all media in a room so that no one can download it via - this server. - """ - - PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)") - - def __init__(self, hs): - self.store = hs.get_datastore() - self.auth = hs.get_auth() - - @defer.inlineCallbacks - def on_POST(self, request, room_id): - requester = yield self.auth.get_user_by_req(request) - yield assert_user_is_admin(self.auth, requester.user) - - num_quarantined = yield self.store.quarantine_media_ids_in_room( - room_id, requester.user.to_string() - ) - - return (200, {"num_quarantined": num_quarantined}) - - -class ListMediaInRoom(RestServlet): - """Lists all of the media in a given room. - """ - - PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media") - - def __init__(self, hs): - self.store = hs.get_datastore() - - @defer.inlineCallbacks - def on_GET(self, request, room_id): - requester = yield self.auth.get_user_by_req(request) - is_admin = yield self.auth.is_server_admin(requester.user) - if not is_admin: - raise AuthError(403, "You are not a server admin") - - local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id) - - return (200, {"local": local_mxcs, "remote": remote_mxcs}) - - class ResetPasswordRestServlet(RestServlet): """Post request to allow an administrator reset password for a user. This needs user to have administrator access in Synapse. @@ -825,7 +745,6 @@ def register_servlets(hs, http_server): def register_servlets_for_client_rest_resource(hs, http_server): """Register only the servlets which need to be exposed on /_matrix/client/xxx""" WhoisRestServlet(hs).register(http_server) - PurgeMediaCacheRestServlet(hs).register(http_server) PurgeHistoryStatusRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) PurgeHistoryRestServlet(hs).register(http_server) @@ -834,10 +753,13 @@ def register_servlets_for_client_rest_resource(hs, http_server): GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) ShutdownRoomRestServlet(hs).register(http_server) - QuarantineMediaInRoom(hs).register(http_server) - ListMediaInRoom(hs).register(http_server) UserRegisterServlet(hs).register(http_server) DeleteGroupAdminRestServlet(hs).register(http_server) AccountValidityRenewServlet(hs).register(http_server) + + # Load the media repo ones if we're using them. + if hs.config.can_load_media_repo: + register_servlets_for_media_repo(hs, http_server) + # don't add more things here: new servlets should only be exposed on # /_synapse/admin so should not go here. Instead register them in AdminRestResource. diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py index 881d67b89c..5a9b08d3ef 100644 --- a/synapse/rest/admin/_base.py +++ b/synapse/rest/admin/_base.py @@ -12,11 +12,36 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import re + from twisted.internet import defer from synapse.api.errors import AuthError +def historical_admin_path_patterns(path_regex): + """Returns the list of patterns for an admin endpoint, including historical ones + + This is a backwards-compatibility hack. Previously, the Admin API was exposed at + various paths under /_matrix/client. This function returns a list of patterns + matching those paths (as well as the new one), so that existing scripts which rely + on the endpoints being available there are not broken. + + Note that this should only be used for existing endpoints: new ones should just + register for the /_synapse/admin path. + """ + return list( + re.compile(prefix + path_regex) + for prefix in ( + "^/_synapse/admin/v1", + "^/_matrix/client/api/v1/admin", + "^/_matrix/client/unstable/admin", + "^/_matrix/client/r0/admin", + ) + ) + + @defer.inlineCallbacks def assert_requester_is_admin(auth, request): """Verify that the requester is an admin user diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py new file mode 100644 index 0000000000..824df919f2 --- /dev/null +++ b/synapse/rest/admin/media.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018-2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import AuthError +from synapse.http.servlet import RestServlet, parse_integer +from synapse.rest.admin._base import ( + assert_requester_is_admin, + assert_user_is_admin, + historical_admin_path_patterns, +) + +logger = logging.getLogger(__name__) + + +class QuarantineMediaInRoom(RestServlet): + """Quarantines all media in a room so that no one can download it via + this server. + """ + + PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)") + + def __init__(self, hs): + self.store = hs.get_datastore() + self.auth = hs.get_auth() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + yield assert_user_is_admin(self.auth, requester.user) + + num_quarantined = yield self.store.quarantine_media_ids_in_room( + room_id, requester.user.to_string() + ) + + return (200, {"num_quarantined": num_quarantined}) + + +class ListMediaInRoom(RestServlet): + """Lists all of the media in a given room. + """ + + PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media") + + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id) + + return (200, {"local": local_mxcs, "remote": remote_mxcs}) + + +class PurgeMediaCacheRestServlet(RestServlet): + PATTERNS = historical_admin_path_patterns("/purge_media_cache") + + def __init__(self, hs): + self.media_repository = hs.get_media_repository() + self.auth = hs.get_auth() + + @defer.inlineCallbacks + def on_POST(self, request): + yield assert_requester_is_admin(self.auth, request) + + before_ts = parse_integer(request, "before_ts", required=True) + logger.info("before_ts: %r", before_ts) + + ret = yield self.media_repository.delete_old_remote_media(before_ts) + + return (200, ret) + + +def register_servlets_for_media_repo(hs, http_server): + """ + Media repo specific APIs. + """ + PurgeMediaCacheRestServlet(hs).register(http_server) + QuarantineMediaInRoom(hs).register(http_server) + ListMediaInRoom(hs).register(http_server) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 92beefa176..cf5759e9a6 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -33,6 +33,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) +from synapse.config._base import ConfigError from synapse.logging.context import defer_to_thread from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async_helpers import Linearizer @@ -753,8 +754,11 @@ class MediaRepositoryResource(Resource): """ def __init__(self, hs): - Resource.__init__(self) + # If we're not configured to use it, raise if we somehow got here. + if not hs.config.can_load_media_repo: + raise ConfigError("Synapse is not configured to use a media repo.") + super().__init__() media_repo = hs.get_media_repository() self.putChild(b"upload", UploadResource(hs, media_repo)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index be3d4d9ded..b431d24b8a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore): def update_pusher_last_stream_ordering_and_success( self, app_id, pushkey, user_id, last_stream_ordering, last_success ): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - { + """Update the last stream ordering position we've processed up to for + the given pusher. + + Args: + app_id (str) + pushkey (str) + last_stream_ordering (int) + last_success (int) + + Returns: + Deferred[bool]: True if the pusher still exists; False if it has been deleted. + """ + updated = yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={ "last_stream_ordering": last_stream_ordering, "last_success": last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) + return bool(updated) + @defer.inlineCallbacks def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - {"failing_since": failing_since}, + yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) |