From 279c48c8b442ec726fb5088e56ce9c1d2ed4bfb5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 2 Dec 2014 15:09:51 +0000 Subject: Write the upload portion of version 1 of the media repository --- synapse/http/content_repository.py | 212 ------------------------------------- synapse/http/server.py | 21 ++-- 2 files changed, 14 insertions(+), 219 deletions(-) delete mode 100644 synapse/http/content_repository.py (limited to 'synapse/http') diff --git a/synapse/http/content_repository.py b/synapse/http/content_repository.py deleted file mode 100644 index 64ecb5346e..0000000000 --- a/synapse/http/content_repository.py +++ /dev/null @@ -1,212 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .server import respond_with_json_bytes - -from synapse.util.stringutils import random_string -from synapse.api.errors import ( - cs_exception, SynapseError, CodeMessageException, Codes, cs_error -) - -from twisted.protocols.basic import FileSender -from twisted.web import server, resource -from twisted.internet import defer - -import base64 -import json -import logging -import os -import re - -logger = logging.getLogger(__name__) - - -class ContentRepoResource(resource.Resource): - """Provides file uploading and downloading. - - Uploads are POSTed to wherever this Resource is linked to. This resource - returns a "content token" which can be used to GET this content again. The - token is typically a path, but it may not be. Tokens can expire, be - one-time uses, etc. - - In this case, the token is a path to the file and contains 3 interesting - sections: - - User ID base64d (for namespacing content to each user) - - random 24 char string - - Content type base64d (so we can return it when clients GET it) - - """ - isLeaf = True - - def __init__(self, hs, directory, auth, external_addr): - resource.Resource.__init__(self) - self.hs = hs - self.directory = directory - self.auth = auth - self.external_addr = external_addr.rstrip('/') - self.max_upload_size = hs.config.max_upload_size - - if not os.path.isdir(self.directory): - os.mkdir(self.directory) - logger.info("ContentRepoResource : Created %s directory.", - self.directory) - - @defer.inlineCallbacks - def map_request_to_name(self, request): - # auth the user - auth_user = yield self.auth.get_user_by_req(request) - - # namespace all file uploads on the user - prefix = base64.urlsafe_b64encode( - auth_user.to_string() - ).replace('=', '') - - # use a random string for the main portion - main_part = random_string(24) - - # suffix with a file extension if we can make one. This is nice to - # provide a hint to clients on the file information. We will also reuse - # this info to spit back the content type to the client. - suffix = "" - if request.requestHeaders.hasHeader("Content-Type"): - content_type = request.requestHeaders.getRawHeaders( - "Content-Type")[0] - suffix = "." + base64.urlsafe_b64encode(content_type) - if (content_type.split("/")[0].lower() in - ["image", "video", "audio"]): - file_ext = content_type.split("/")[-1] - # be a little paranoid and only allow a-z - file_ext = re.sub("[^a-z]", "", file_ext) - suffix += "." + file_ext - - file_name = prefix + main_part + suffix - file_path = os.path.join(self.directory, file_name) - logger.info("User %s is uploading a file to path %s", - auth_user.to_string(), - file_path) - - # keep trying to make a non-clashing file, with a sensible max attempts - attempts = 0 - while os.path.exists(file_path): - main_part = random_string(24) - file_name = prefix + main_part + suffix - file_path = os.path.join(self.directory, file_name) - attempts += 1 - if attempts > 25: # really? Really? - raise SynapseError(500, "Unable to create file.") - - defer.returnValue(file_path) - - def render_GET(self, request): - # no auth here on purpose, to allow anyone to view, even across home - # servers. - - # TODO: A little crude here, we could do this better. - filename = request.path.split('/')[-1] - # be paranoid - filename = re.sub("[^0-9A-z.-_]", "", filename) - - file_path = self.directory + "/" + filename - - logger.debug("Searching for %s", file_path) - - if os.path.isfile(file_path): - # filename has the content type - base64_contentype = filename.split(".")[1] - content_type = base64.urlsafe_b64decode(base64_contentype) - logger.info("Sending file %s", file_path) - f = open(file_path, 'rb') - request.setHeader('Content-Type', content_type) - - # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to - # recommend caching as it's sensitive or private - or at least - # select private. don't bother setting Expires as all our matrix - # clients are smart enough to be happy with Cache-Control (right?) - request.setHeader( - "Cache-Control", "public,max-age=86400,s-maxage=86400" - ) - - d = FileSender().beginFileTransfer(f, request) - - # after the file has been sent, clean up and finish the request - def cbFinished(ignored): - f.close() - request.finish() - d.addCallback(cbFinished) - else: - respond_with_json_bytes( - request, - 404, - json.dumps(cs_error("Not found", code=Codes.NOT_FOUND)), - send_cors=True) - - return server.NOT_DONE_YET - - def render_POST(self, request): - self._async_render(request) - return server.NOT_DONE_YET - - def render_OPTIONS(self, request): - respond_with_json_bytes(request, 200, {}, send_cors=True) - return server.NOT_DONE_YET - - @defer.inlineCallbacks - def _async_render(self, request): - try: - # TODO: The checks here are a bit late. The content will have - # already been uploaded to a tmp file at this point - content_length = request.getHeader("Content-Length") - if content_length is None: - raise SynapseError( - msg="Request must specify a Content-Length", code=400 - ) - if int(content_length) > self.max_upload_size: - raise SynapseError( - msg="Upload request body is too large", - code=413, - ) - - fname = yield self.map_request_to_name(request) - - # TODO I have a suspicious feeling this is just going to block - with open(fname, "wb") as f: - f.write(request.content.read()) - - # FIXME (erikj): These should use constants. - file_name = os.path.basename(fname) - # FIXME: we can't assume what the repo's public mounted path is - # ...plus self-signed SSL won't work to remote clients anyway - # ...and we can't assume that it's SSL anyway, as we might want to - # serve it via the non-SSL listener... - url = "%s/_matrix/content/%s" % ( - self.external_addr, file_name - ) - - respond_with_json_bytes(request, 200, - json.dumps({"content_token": url}), - send_cors=True) - - except CodeMessageException as e: - logger.exception(e) - respond_with_json_bytes(request, e.code, - json.dumps(cs_exception(e))) - except Exception as e: - logger.error("Failed to store file: %s" % e) - respond_with_json_bytes( - request, - 500, - json.dumps({"error": "Internal server error"}), - send_cors=True) diff --git a/synapse/http/server.py b/synapse/http/server.py index 8024ff5bde..046e230361 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -166,14 +166,10 @@ class JsonResource(HttpServer, resource.Resource): request) return - if not self._request_user_agent_is_curl(request): - json_bytes = encode_canonical_json(response_json_object) - else: - json_bytes = encode_pretty_printed_json(response_json_object) - # TODO: Only enable CORS for the requests that need it. - respond_with_json_bytes(request, code, json_bytes, send_cors=True, - response_code_message=response_code_message) + respond_with_json(request, code, response_json_object, send_cors=True, + response_code_message=response_code_message, + pretty_print=self._request_user_agent_is_curl) @staticmethod def _request_user_agent_is_curl(request): @@ -202,6 +198,17 @@ class RootRedirect(resource.Resource): return resource.Resource.getChild(self, name, request) +def respond_with_json(request, code, json_object, send_cors=False, + response_code_message=None, pretty_print=False): + if not pretty_print: + json_bytes = encode_pretty_printed_json(response_json_object) + else: + json_bytes = encode_canonical_json(response_json_object) + + return respond_with_json_bytes(request, code, json_bytes, send_cors, + response_code_message=response_code_message) + + def respond_with_json_bytes(request, code, json_bytes, send_cors=False, response_code_message=None): """Sends encoded JSON in response to the given request. -- cgit 1.4.1 From 5da65085d106e98cf7b762836cb300d01226bf92 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 2 Dec 2014 19:51:47 +0000 Subject: Get uploads working with new media repo --- synapse/api/urls.py | 1 + synapse/app/homeserver.py | 9 +++++++-- synapse/config/_base.py | 14 ++++++++++++++ synapse/config/repository.py | 4 ++++ synapse/http/server.py | 4 ++-- synapse/media/__init__.py | 0 synapse/media/v0/__init__.py | 0 synapse/media/v0/content_repository.py | 2 +- synapse/media/v1/__init__.py | 0 synapse/media/v1/media_repository.py | 23 +++++++---------------- synapse/media/v1/upload_resource.py | 14 ++++++++------ synapse/server.py | 1 + synapse/storage/__init__.py | 6 +++++- synapse/storage/media_repository.py | 7 ++++++- 14 files changed, 56 insertions(+), 29 deletions(-) create mode 100644 synapse/media/__init__.py create mode 100644 synapse/media/v0/__init__.py create mode 100644 synapse/media/v1/__init__.py (limited to 'synapse/http') diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 6dc19305b7..d7625127f8 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -20,3 +20,4 @@ FEDERATION_PREFIX = "/_matrix/federation/v1" WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" SERVER_KEY_PREFIX = "/_matrix/key/v1" +MEDIA_PREFIX = "/_matrix/media/v1" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 855fe8e170..a6e29c0860 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -24,12 +24,13 @@ from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect -from synapse.http.content_repository import ContentRepoResource +from synapse.media.v0.content_repository import ContentRepoResource +from synapse.media.v1.media_repository import MediaRepositoryResource from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, + SERVER_KEY_PREFIX, MEDIA_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory @@ -69,6 +70,9 @@ class SynapseHomeServer(HomeServer): self, self.upload_dir, self.auth, self.content_addr ) + def build_resource_for_media_repository(self): + return MediaRepositoryResource(self) + def build_resource_for_server_key(self): return LocalKey(self) @@ -99,6 +103,7 @@ class SynapseHomeServer(HomeServer): (FEDERATION_PREFIX, self.get_resource_for_federation()), (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), + (MEDIA_PREFIX, self.get_resource_for_media_repository()), ] if web_client: logger.info("Adding the web client.") diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 6870af10e8..1426436dcb 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -50,12 +50,26 @@ class Config(object): ) return cls.abspath(file_path) + @staticmethod + def ensure_directory(dir_path): + if not os.path.exists(dir_path): + os.makedirs(dir_path) + if not os.path.isdir(dir_path): + raise ConfigError( + "%s is not a directory" % (dir_path,) + ) + return dir_path + @classmethod def read_file(cls, file_path, config_name): cls.check_file(file_path, config_name) with open(file_path) as file_stream: return file_stream.read() + @staticmethod + def default_path(name): + return os.path.abspath(os.path.join(os.path.curdir, name)) + @staticmethod def read_config_file(file_path): with open(file_path) as file_stream: diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 743bc26474..6eec930a03 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -20,6 +20,7 @@ class ContentRepositoryConfig(Config): def __init__(self, args): super(ContentRepositoryConfig, self).__init__(args) self.max_upload_size = self.parse_size(args.max_upload_size) + self.media_store_path = self.ensure_directory(args.media_store_path) def parse_size(self, string): sizes = {"K": 1024, "M": 1024 * 1024} @@ -37,3 +38,6 @@ class ContentRepositoryConfig(Config): db_group.add_argument( "--max-upload-size", default="1M" ) + db_group.add_argument( + "--media-store-path", default=cls.default_path("media_store") + ) diff --git a/synapse/http/server.py b/synapse/http/server.py index 046e230361..02277c4998 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -201,9 +201,9 @@ class RootRedirect(resource.Resource): def respond_with_json(request, code, json_object, send_cors=False, response_code_message=None, pretty_print=False): if not pretty_print: - json_bytes = encode_pretty_printed_json(response_json_object) + json_bytes = encode_pretty_printed_json(json_object) else: - json_bytes = encode_canonical_json(response_json_object) + json_bytes = encode_canonical_json(json_object) return respond_with_json_bytes(request, code, json_bytes, send_cors, response_code_message=response_code_message) diff --git a/synapse/media/__init__.py b/synapse/media/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/media/v0/__init__.py b/synapse/media/v0/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/media/v0/content_repository.py b/synapse/media/v0/content_repository.py index 64ecb5346e..ce5d3d153e 100644 --- a/synapse/media/v0/content_repository.py +++ b/synapse/media/v0/content_repository.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .server import respond_with_json_bytes +from synapse.http.server import respond_with_json_bytes from synapse.util.stringutils import random_string from synapse.api.errors import ( diff --git a/synapse/media/v1/__init__.py b/synapse/media/v1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/media/v1/media_repository.py b/synapse/media/v1/media_repository.py index 9c36a8e933..0f4eeef278 100644 --- a/synapse/media/v1/media_repository.py +++ b/synapse/media/v1/media_repository.py @@ -13,27 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import respond_with_json_bytes +from .upload_resource import UploadResource +from .filepath import MediaFilePaths -from synapse.util.stringutils import random_string -from synapse.api.errors import ( - cs_exception, SynapseError, CodeMessageException, Codes, cs_error -) +from twisted.web.resource import Resource -from twisted.protocols.basic import FileSender -from twisted.web import server, resource -from twisted.internet import defer - -import base64 -import json import logging -import os -import re logger = logging.getLogger(__name__) -class MediaRepository(): +class MediaRepositoryResource(Resource): """Profiles file uploading and downloading. Uploads are POSTed to a resource which returns a token which is used to GET @@ -68,5 +58,6 @@ class MediaRepository(): """ def __init__(self, hs): - filepaths = MediaFilePaths - + Resource.__init__(self) + filepaths = MediaFilePaths(hs.config.media_store_path) + self.putChild("upload", UploadResource(hs, filepaths)) diff --git a/synapse/media/v1/upload_resource.py b/synapse/media/v1/upload_resource.py index 3721a0173d..d9d7825b2b 100644 --- a/synapse/media/v1/upload_resource.py +++ b/synapse/media/v1/upload_resource.py @@ -23,6 +23,8 @@ from synapse.api.errors import ( from twisted.web import server, resource from twisted.internet import defer +import os + import logging logger = logging.getLogger(__name__) @@ -31,8 +33,9 @@ class UploadResource(resource.Resource): def __init__(self, hs, filepaths): self.auth = hs.get_auth() + self.clock = hs.get_clock() self.store = hs.get_datastore() - self.max_upload_size = hs.config.max_upload_size() + self.max_upload_size = hs.config.max_upload_size self.filepaths = filepaths def render_POST(self, request): @@ -45,10 +48,8 @@ class UploadResource(resource.Resource): @defer.inlineCallbacks def _async_render_POST(self, request): - - auth_user = yield self.auth.get_user_by_req(request) - try: + auth_user = yield self.auth.get_user_by_req(request) # TODO: The checks here are a bit late. The content will have # already been uploaded to a tmp file at this point content_length = request.getHeader("Content-Length") @@ -62,7 +63,7 @@ class UploadResource(resource.Resource): code=413, ) - headers = request.requestHeaders() + headers = request.requestHeaders if headers.hasHeader("Content-Type"): media_type = headers.getRawHeaders("Content-Type")[0] @@ -78,7 +79,8 @@ class UploadResource(resource.Resource): media_id = random_string(24) - fname = self.filepaths.local_media_file_path(media_id) + fname = self.filepaths.local_media_filepath(media_id) + os.makedirs(os.path.dirname(fname)) # This shouldn't block for very long because the content will have # already been uploaded at this point. diff --git a/synapse/server.py b/synapse/server.py index da0a44433a..7eb15270fc 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -78,6 +78,7 @@ class BaseHomeServer(object): 'resource_for_web_client', 'resource_for_content_repo', 'resource_for_server_key', + 'resource_for_media_repository', 'event_sources', 'ratelimiter', 'keyring', diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 1231794de0..f6811a8117 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .stream import StreamStore from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore +from .media_repository import MediaRepositoryStore from .state import StateStore from .signatures import SignatureStore @@ -62,6 +63,7 @@ SCHEMAS = [ "state", "event_edges", "event_signatures", + "media_repository", ] @@ -81,7 +83,9 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, - EventFederationStore, ): + EventFederationStore, + MediaRepositoryStore, + ): def __init__(self, hs): super(DataStore, self).__init__(hs) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 73ceba3f2c..db03619a80 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -20,10 +20,15 @@ class MediaRepositoryStore(SQLBaseStore): """Persistence for attachments and avatars""" def get_local_media(self, media_id): + """Get the metadata for a local piece of media + Returns: + None if the media_id doesn't exist. + """ return self._simple_select_one( "local_media_repository", {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), + True, ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -36,7 +41,7 @@ class MediaRepositoryStore(SQLBaseStore): "created_ts": time_now_ms, "upload_name": upload_name, "media_length": media_length, - "user_id": user_id, + "user_id": user_id.to_string(), } ) -- cgit 1.4.1 From c01fd5573c92c7c6da258bac7ff377a91cbebfd1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 4 Dec 2014 14:22:31 +0000 Subject: Implement download support for media_repository --- synapse/http/matrixfederationclient.py | 73 ++++++++++++- synapse/media/v1/download_resource.py | 194 +++++++++++++++++++++++++++++++++ synapse/media/v1/media_repository.py | 2 + synapse/media/v1/upload_resource.py | 11 +- synapse/storage/media_repository.py | 10 +- 5 files changed, 278 insertions(+), 12 deletions(-) create mode 100644 synapse/media/v1/download_resource.py (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7b..c7082b83a7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -14,10 +14,11 @@ # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, _AgentBase, _URI from twisted.web.http_headers import Headers +from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep @@ -227,7 +228,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): - """ Get's some json from the given host homeserver and path + """ GETs some json from the given host homeserver and path Args: destination (str): The remote server to send the HTTP request @@ -235,9 +236,6 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. - **Note**: The value of each key is assumed to be an iterable - and *not* a string. - Returns: Deferred: Succeeds when we get *any* HTTP response. @@ -272,6 +270,48 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) + @defer.inlineCallbacks + def get_file(self, destination, path, output_stream, args={}, + retry_on_dns_fail=True): + """GETs a file from a given homeserver + Args: + destination (str): The remote server to send the HTTP request to. + path (str): The HTTP path to GET. + output_stream (file): File to write the response body to. + args (dict): Optional dictionary used to create the query string. + Returns: + A (int,dict) tuple of the file length and a dict of the response + headers. + """ + + encoded_args = {} + for k, vs in args.items(): + if isinstance(vs, basestring): + vs = [vs] + encoded_args[k] = [v.encode("UTF-8") for v in vs] + + query_bytes = urllib.urlencode(encoded_args, True) + logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) + + def body_callback(method, url_bytes, headers_dict): + self.sign_request(destination, method, url_bytes, headers_dict) + return None + + response = yield self._create_request( + destination.encode("ascii"), + "GET", + path.encode("ascii"), + query_bytes=query_bytes, + body_callback=body_callback, + retry_on_dns_fail=retry_on_dns_fail + ) + + headers = dict(response.headers.getAllRawHeaders()) + + length = yield _readBodyToFile(response, output_stream) + + defer.returnValue((length, headers)) + def _getEndpoint(self, reactor, destination): return matrix_federation_endpoint( reactor, destination, timeout=10, @@ -279,6 +319,29 @@ class MatrixFederationHttpClient(object): ) +class _ReadBodyToFileProtocol(protocol.Protocol): + def __init__(self, stream, deferred): + self.stream = stream + self.deferred = deferred + self.length = 0 + + def dataReceived(self, data): + self.stream.write(data) + self.length += len(data) + + def connectionLost(self, reason): + if reason.check(ResponseDone): + self.deferred.callback(self.length) + else: + self.deferred.errback(reason) + + +def _readBodyToFile(response, stream): + d = defer.Deferred() + response.deliverBody(_ReadBodyToFileProtocol(stream, d)) + return d + + def _print_ex(e): if hasattr(e, "reasons") and e.reasons: for ex in e.reasons: diff --git a/synapse/media/v1/download_resource.py b/synapse/media/v1/download_resource.py new file mode 100644 index 0000000000..c243f16a74 --- /dev/null +++ b/synapse/media/v1/download_resource.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json +from synapse.util.stringutils import random_string +from synapse.api.errors import ( + cs_exception, CodeMessageException, cs_error, Codes +) + +from twisted.protocols.basic import FileSender +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + +import os + +import logging + +logger = logging.getLogger(__name__) + + +class DownloadResource(Resource): + isLeaf = True + + def __init__(self, hs, filepaths): + Resource.__init__(self) + self.client = hs.get_http_client() + self.clock = hs.get_clock() + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.filepaths = filepaths + + def render_GET(self, request): + self._async_render_GET(request) + return NOT_DONE_YET + + def _respond_404(self, request): + respond_with_json( + request, 404, + cs_error( + "Not found %r" % (request.postpath,), + code=Codes.NOT_FOUND, + ), + send_cors=True + ) + + @defer.inlineCallbacks + def _async_render_GET(self, request): + + try: + server_name, media_id = request.postpath + except: + self._respond_404(request) + return + + try: + if server_name == self.server_name: + yield self._respond_local_file(request, media_id) + else: + yield self._respond_remote_file(request, server_name, media_id) + except CodeMessageException as e: + logger.exception(e) + respond_with_json(request, e.code, cs_exception(e), send_cors=True) + except: + logger.exception("Failed to serve file") + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True + ) + + @defer.inlineCallbacks + def _download_remote_file(self, server_name, media_id): + filesystem_id = random_string(24) + + fname = self.filepaths.remote_media_filepath( + server_name, filesystem_id + ) + os.makedirs(os.path.dirname(fname)) + + try: + with open(fname, "wb") as f: + length, headers = yield self.client.get_file( + server_name, + "/".join(( + "/_matrix/media/v1/download", server_name, media_id, + )), + output_stream=f, + ) + except: + os.remove(fname) + raise + + media_type = headers["Content-Type"][0] + time_now_ms = self.clock.time_msec() + + yield self.store.store_cached_remote_media( + origin=server_name, + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=None, + media_length=length, + filesystem_id=filesystem_id, + ) + + defer.returnValue({ + "media_type": media_type, + "media_length": length, + "upload_name": None, + "created_ts": time_now_ms, + "filesystem_id": filesystem_id, + }) + + @defer.inlineCallbacks + def _respond_remote_file(self, request, server_name, media_id): + media_info = yield self.store.get_cached_remote_media( + server_name, media_id + ) + + if not media_info: + media_info = yield self._download_remote_file( + server_name, media_id + ) + + filesystem_id = media_info["filesystem_id"] + + file_path = self.filepaths.remote_media_filepath( + server_name, filesystem_id + ) + + if os.path.isfile(file_path): + media_type = media_info["media_type"] + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + request.finish() + else: + self._respond_404() + + @defer.inlineCallbacks + def _respond_local_file(self, request, media_id): + media_info = yield self.store.get_local_media(media_id) + if not media_info: + self._respond_404() + return + + file_path = self.filepaths.local_media_filepath(media_id) + + logger.debug("Searching for %s", file_path) + + if os.path.isfile(file_path): + media_type = media_info["media_type"] + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + request.finish() + else: + self._respond_404() diff --git a/synapse/media/v1/media_repository.py b/synapse/media/v1/media_repository.py index afd92874cf..e0a4cd01ee 100644 --- a/synapse/media/v1/media_repository.py +++ b/synapse/media/v1/media_repository.py @@ -14,6 +14,7 @@ # limitations under the License. from .upload_resource import UploadResource +from .download_resource import DownloadResource from .filepath import MediaFilePaths from twisted.web.resource import Resource @@ -62,3 +63,4 @@ class MediaRepositoryResource(Resource): Resource.__init__(self) filepaths = MediaFilePaths(hs.config.media_store_path) self.putChild("upload", UploadResource(hs, filepaths)) + self.putChild("download", DownloadResource(hs, filepaths)) diff --git a/synapse/media/v1/upload_resource.py b/synapse/media/v1/upload_resource.py index 2919fee12f..91bcc5caff 100644 --- a/synapse/media/v1/upload_resource.py +++ b/synapse/media/v1/upload_resource.py @@ -20,7 +20,8 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException ) -from twisted.web import server, resource +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET from twisted.internet import defer import os @@ -30,9 +31,11 @@ import logging logger = logging.getLogger(__name__) -class UploadResource(resource.Resource): +class UploadResource(Resource): + isLeaf = True def __init__(self, hs, filepaths): + Resource.__init__(self) self.auth = hs.get_auth() self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -41,11 +44,11 @@ class UploadResource(resource.Resource): def render_POST(self, request): self._async_render_POST(request) - return server.NOT_DONE_YET + return NOT_DONE_YET def render_OPTIONS(self, request): respond_with_json(request, 200, {}, send_cors=True) - return server.NOT_DONE_YET + return NOT_DONE_YET @defer.inlineCallbacks def _async_render_POST(self, request): diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index eda191ad5b..2d3a2d1ccb 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -22,13 +22,13 @@ class MediaRepositoryStore(SQLBaseStore): def get_local_media(self, media_id): """Get the metadata for a local piece of media Returns: - None if the media_id doesn't exist. + None if the meia_id doesn't exist. """ return self._simple_select_one( "local_media_repository", {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), - True, + allow_none=True, ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -73,7 +73,11 @@ class MediaRepositoryStore(SQLBaseStore): return self._simple_select_one( "remote_media_cache", {"media_origin": origin, "media_id": media_id}, - ("media_type", "media_length", "upload_name", "created_ts"), + ( + "media_type", "media_length", "upload_name", "created_ts", + "filesystem_id", + ), + allow_none=True, ) def store_cached_remote_media(self, origin, media_id, media_type, -- cgit 1.4.1 From aed62a35832a3ec1c7425ecc99cab06a781263ba Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 7 Dec 2014 02:26:07 +0000 Subject: track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover. --- synapse/federation/replication.py | 43 +++++++++++++++--- synapse/federation/transport.py | 2 +- synapse/http/matrixfederationclient.py | 16 ++++--- synapse/rest/transactions.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/schema/delta/v9.sql | 23 ++++++++++ synapse/storage/schema/transactions.sql | 6 +++ synapse/storage/transactions.py | 78 ++++++++++++++++++++++++++++++++- 9 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/schema/delta/v9.sql (limited to 'synapse/http') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 01f87fe423..f9c05b5ea3 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -723,6 +723,8 @@ class _TransactionQueue(object): deferreds = [] for destination in destinations: + # XXX: why don't we specify an errback for this deferred + # like we do for EDUs? --matthew deferred = defer.Deferred() self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) @@ -738,6 +740,9 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination + + if destination == self.server_name: + return deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append( @@ -766,14 +771,23 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now") + return + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending request + # at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these requests return - # list of (pending_pdu, deferred, order) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -781,7 +795,8 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction", destination) + logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)", + destination, len(pending_pdus), len(pending_edus), len(pending_failures)) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -814,7 +829,7 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.debug("TX [%s] Sending transaction...", destination) + logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id) # Actually send the transaction @@ -835,6 +850,8 @@ class _TransactionQueue(object): transaction, json_data_cb ) + logger.info("TX [%s] got %d response", destination, code) + logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) @@ -849,6 +866,7 @@ class _TransactionQueue(object): if code == 200: deferred.callback(None) else: + start_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -861,12 +879,12 @@ class _TransactionQueue(object): logger.debug("TX [%s] Yielded to callbacks", destination) except Exception as e: - logger.error("TX Problem in _attempt_transaction") - # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception(e) + logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) + start_retrying(destination, retry_interval) + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -877,3 +895,14 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + +def start_retrying(destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + if retry_interval: + retry_interval *= 2 + else: + retry_interval = 2 # try again at first after 2 seconds + self.store.set_destination_retry_timings(destination, + int(self._clock.time_msec()), retry_interval) + \ No newline at end of file diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 8d86152085..0f11c6d491 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -155,7 +155,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to it's destination + """ Sends the given Transaction to its destination Args: transaction (Transaction) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7b..3edc59dbab 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -89,7 +89,7 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.debug("Sending request to %s: %s %s", + logger.info("Sending request to %s: %s %s", destination, method, url_bytes) logger.debug( @@ -101,7 +101,10 @@ class MatrixFederationHttpClient(object): ] ) - retries_left = 5 + # was 5; for now, let's only try once at the HTTP layer and then + # rely on transaction-layer retries for exponential backoff and + # getting the message through. + retries_left = 0 endpoint = self._getEndpoint(reactor, destination) @@ -131,7 +134,8 @@ class MatrixFederationHttpClient(object): e) raise SynapseError(400, "Domain specified not found.") - logger.exception("Got error in _create_request") + logger.exception("Sending request failed to %s: %s %s : %s", + destination, method, url_bytes, e) _print_ex(e) if retries_left: @@ -140,15 +144,15 @@ class MatrixFederationHttpClient(object): else: raise + logger.info("Received response %d %s for %s: %s %s", + response.code, response.phrase, destination, method, url_bytes) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - logger.error( - "Got response %d %s", response.code, response.phrase - ) raise CodeMessageException( response.code, response.phrase ) diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py index 93c0122f30..8c41ab4edb 100644 --- a/synapse/rest/transactions.py +++ b/synapse/rest/transactions.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger(__name__) - +# FIXME: elsewhere we use FooStore to indicate something in the storage layer... class HttpTransactionStore(object): def __init__(self): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f15e3dfe62..04ab39341d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -67,7 +67,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 class _RollbackButIsFineException(Exception): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4881f03368..e72200e2f7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -650,7 +650,7 @@ class JoinHelper(object): to dump the results into. Attributes: - taples (list): List of `Table` classes + tables (list): List of `Table` classes EntryType (type) """ diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql new file mode 100644 index 0000000000..ad680c64da --- /dev/null +++ b/synapse/storage/schema/delta/v9.sql @@ -0,0 +1,23 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); + +PRAGMA user_version = 9; \ No newline at end of file diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 88e3e4e04d..de461bfa15 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 00d0f48082..47b73f7458 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): - # First we find out what the prev_txs should be. + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. query = "%s ORDER BY id DESC LIMIT 1" % ( @@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + def get_destination_retry_timings(self, destination): + """Gets the current retry timings (if any) for a given destination. + + Args: + destination (str) + + Returns: + None if not retrying + tuple: (retry_last_ts, retry_interval) + retry_ts: time of last retry attempt in unix epoch ms + retry_interval: how long until next retry in ms + """ + return self.runInteraction( + "get_destination_retry_timings", + self._get_destination_retry_timings, destination) + + def _get_destination_retry_timings(cls, txn, destination): + query = DestinationsTable.select_statement("destination = ?") + txn.execute(query, (destination,)) + result = DestinationsTable.decode_single_result(txn.fetchone()) + if result and result[0] > 0: + return result + else: + return None + + def set_destination_retry_timings(self, destination): + """Sets the current retry timings for a given destination. + Both timings should be zero if retrying is no longer occuring. + + Args: + destination (str) + retry_last_ts (int) - time of last retry attempt in unix epoch ms + retry_interval (int) - how long until next retry in ms + """ + return self.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings, destination, retry_last_ts, retry_interval) + + def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): + + query = ( + "INSERT OR REPLACE INTO %s " + "(retry_last_ts, retry_interval) " + "VALUES (?, ?) " + "WHERE destination = ?" + ) % DestinationsTable.table_name + + txn.execute(query, (retry_last_ts, retry_interval, destination)) + + def get_destinations_needing_retry(self): + """Get all destinations which are due a retry for sending a transaction. + + Returns: + list: A list of `DestinationsTable.EntryType` + """ + return self.runInteraction( + "get_destinations_needing_retry", + self._get_destinations_needing_retry + ) + + def _get_destinations_needing_retry(cls, txn): + where = "retry_last_ts > 0 and retry_next_ts < now()" + query = DestinationsTable.select_statement(where) + txn.execute(query) + return DestinationsTable.decode_results(txn.fetchall()) class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -247,3 +312,14 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) + +class DestinationsTable(Table): + table_name = "destinations" + + fields = [ + "destination", + "retry_last_ts", + "retry_interval", + ] + + EntryType = namedtuple("DestinationsEntry", fields) \ No newline at end of file -- cgit 1.4.1 From 2b1acb7671e33baeb01be2f0facd20cd6ea7e3b5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 00:03:55 +0000 Subject: squidge to 79 columns as per pep8 --- synapse/federation/replication.py | 30 ++++++++++++++++++++---------- synapse/http/matrixfederationclient.py | 7 ++++--- synapse/storage/transactions.py | 18 ++++++++++++------ 3 files changed, 36 insertions(+), 19 deletions(-) (limited to 'synapse/http') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c4c6667b62..c242488483 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -778,21 +778,25 @@ class _TransactionQueue(object): def _attempt_new_transaction(self, destination): (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings(destination) + retry_timings = yield self.store.get_destination_retry_timings( + destination + ) if retry_timings: (retry_last_ts, retry_interval) = ( retry_timings.retry_last_ts, retry_timings.retry_interval ) if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) + logger.info("TX [%s] not ready for retry yet - " + "dropping transaction for now", destination) return else: logger.info("TX [%s] is ready for retry", destination) if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending request - # at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these requests + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests return # list of (pending_pdu, deferred, order) @@ -803,8 +807,10 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)", - destination, len(pending_pdus), len(pending_edus), len(pending_failures)) + logger.debug("TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), len(pending_edus), len(pending_failures)) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -837,7 +843,8 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id) + logger.info("TX [%s] Sending transaction [%s]", destination, + transaction.transaction_id) # Actually send the transaction @@ -874,7 +881,9 @@ class _TransactionQueue(object): if code == 200: if retry_last_ts: # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings(destination, 0, 0) + yield self.store.set_destination_retry_timings( + destination, 0, 0 + ) deferred.callback(None) else: self.start_retrying(destination, retry_interval) @@ -892,7 +901,8 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) + logger.exception("TX [%s] Problem in _attempt_transaction: %s", + destination, e) self.start_retrying(destination, retry_interval) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3edc59dbab..c76990904d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -90,7 +90,7 @@ class MatrixFederationHttpClient(object): ) logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + destination, method, url_bytes) logger.debug( "Types: %s", @@ -135,7 +135,7 @@ class MatrixFederationHttpClient(object): raise SynapseError(400, "Domain specified not found.") logger.exception("Sending request failed to %s: %s %s : %s", - destination, method, url_bytes, e) + destination, method, url_bytes, e) _print_ex(e) if retries_left: @@ -145,7 +145,8 @@ class MatrixFederationHttpClient(object): raise logger.info("Received response %d %s for %s: %s %s", - response.code, response.phrase, destination, method, url_bytes) + response.code, response.phrase, + destination, method, url_bytes) if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 237b024451..2b16787695 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -28,7 +28,8 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - # a write-through cache of DestinationsTable.EntryType indexed by destination string + # a write-through cache of DestinationsTable.EntryType indexed by + # destination string destination_retry_cache = {} def get_received_txn_response(self, transaction_id, origin): @@ -238,7 +239,8 @@ class TransactionStore(SQLBaseStore): else: return None - def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): + def set_destination_retry_timings(self, destination, + retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. @@ -249,15 +251,19 @@ class TransactionStore(SQLBaseStore): """ self.destination_retry_cache[destination] = ( - DestinationsTable.EntryType(destination, retry_last_ts, retry_interval) + DestinationsTable.EntryType(destination, + retry_last_ts, retry_interval) ) - # xxx: we could chose to not bother persisting this if our cache things this is a NOOP + # XXX: we could chose to not bother persisting this if our cache thinks + # this is a NOOP return self.runInteraction( "set_destination_retry_timings", - self._set_destination_retry_timings, destination, retry_last_ts, retry_interval) + self._set_destination_retry_timings, destination, + retry_last_ts, retry_interval) - def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): + def _set_destination_retry_timings(cls, txn, destination, + retry_last_ts, retry_interval): query = ( "INSERT OR REPLACE INTO %s " -- cgit 1.4.1 From faf12b64f81627d92cb1ac49b6eb58f9d3f4837d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 00:12:51 +0000 Subject: add errbacks to enqueue_pdu deferreds; change logging for failed federation sends to warn rather than exception --- synapse/federation/replication.py | 16 ++++++++++------ synapse/http/matrixfederationclient.py | 4 ++-- 2 files changed, 12 insertions(+), 8 deletions(-) (limited to 'synapse/http') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c242488483..346b5f04c0 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -334,7 +334,7 @@ class ReplicationLayer(object): defer.returnValue(response) return - logger.debug("[%s] Transacition is new", transaction.transaction_id) + logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): dl = [] @@ -724,15 +724,19 @@ class _TransactionQueue(object): deferreds = [] for destination in destinations: - # XXX: why don't we specify an errback for this deferred - # like we do for EDUs? --matthew deferred = defer.Deferred() self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) ) + + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send pdu", failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination) + self._attempt_new_transaction(destination).addErrback(eb) deferreds.append(deferred) @@ -754,7 +758,7 @@ class _TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.exception("Failed to send edu", failure) + logger.warn("Failed to send edu", failure) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -901,7 +905,7 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception("TX [%s] Problem in _attempt_transaction: %s", + logger.warn("TX [%s] Problem in _attempt_transaction: %s", destination, e) self.start_retrying(destination, retry_interval) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c76990904d..8fc6bf8f97 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -134,7 +134,7 @@ class MatrixFederationHttpClient(object): e) raise SynapseError(400, "Domain specified not found.") - logger.exception("Sending request failed to %s: %s %s : %s", + logger.warn("Sending request failed to %s: %s %s : %s", destination, method, url_bytes, e) _print_ex(e) @@ -289,7 +289,7 @@ def _print_ex(e): for ex in e.reasons: _print_ex(ex) else: - logger.exception(e) + logger.warn(e) class _JsonProducer(object): -- cgit 1.4.1 From b8d30899b1296347a75d5a59e32d73a5236e6ea2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 10:16:09 +0000 Subject: Code style. --- synapse/federation/replication.py | 52 +++++++++++++++++++++++----------- synapse/http/matrixfederationclient.py | 29 +++++++++++++------ synapse/storage/transactions.py | 50 ++++++++++++++++++-------------- 3 files changed, 85 insertions(+), 46 deletions(-) (limited to 'synapse/http') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 589a3f581b..0cb632fb08 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -728,7 +728,7 @@ class _TransactionQueue(object): self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) ) - + def eb(failure): if not deferred.called: deferred.errback(failure) @@ -745,7 +745,7 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination - + if destination == self.server_name: return @@ -776,7 +776,7 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -790,12 +790,15 @@ class _TransactionQueue(object): retry_timings.retry_last_ts, retry_timings.retry_interval ) if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - " - "dropping transaction for now", destination) + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) return else: logger.info("TX [%s] is ready for retry", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -811,10 +814,14 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + logger.debug( + "TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", destination, - len(pending_pdus), len(pending_edus), len(pending_failures)) + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -847,8 +854,11 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.info("TX [%s] Sending transaction [%s]", destination, - transaction.transaction_id) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) # Actually send the transaction @@ -905,11 +915,14 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.warn("TX [%s] Problem in _attempt_transaction: %s", - destination, e) + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) - + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -925,12 +938,17 @@ class _TransactionQueue(object): def set_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again + if retry_interval: retry_interval *= 2 # plateau at hourly retries for now if retry_interval >= 60 * 60 * 1000: retry_interval = 60 * 60 * 1000 else: - retry_interval = 2000 # try again at first after 2 seconds - yield self.store.set_destination_retry_timings(destination, - int(self._clock.time_msec()), retry_interval) + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 8fc6bf8f97..16fb2adab5 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -130,12 +130,20 @@ class MatrixFederationHttpClient(object): break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn("DNS Lookup failed to %s with %s", destination, - e) + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) raise SynapseError(400, "Domain specified not found.") - logger.warn("Sending request failed to %s: %s %s : %s", - destination, method, url_bytes, e) + logger.warn( + "Sending request failed to %s: %s %s : %s", + destination, + method, + url_bytes, + e + ) _print_ex(e) if retries_left: @@ -144,10 +152,15 @@ class MatrixFederationHttpClient(object): else: raise - logger.info("Received response %d %s for %s: %s %s", - response.code, response.phrase, - destination, method, url_bytes) - + logger.info( + "Received response %d %s for %s: %s %s", + response.code, + response.phrase, + destination, + method, + url_bytes + ) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 2b16787695..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - + # a write-through cache of DestinationsTable.EntryType indexed by # destination string destination_retry_cache = {} @@ -213,21 +213,21 @@ class TransactionStore(SQLBaseStore): def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. - + Args: destination (str) - + Returns: None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ if destination in self.destination_retry_cache: return defer.succeed(self.destination_retry_cache[destination]) - + return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - + def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) @@ -238,30 +238,36 @@ class TransactionStore(SQLBaseStore): return result else: return None - + def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. - + Args: destination (str) retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ - + self.destination_retry_cache[destination] = ( - DestinationsTable.EntryType(destination, - retry_last_ts, retry_interval) + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) ) - + # XXX: we could chose to not bother persisting this if our cache thinks # this is a NOOP return self.runInteraction( "set_destination_retry_timings", - self._set_destination_retry_timings, destination, - retry_last_ts, retry_interval) - + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): @@ -275,21 +281,22 @@ class TransactionStore(SQLBaseStore): def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. - + Returns: list: A list of `DestinationsTable.EntryType` """ - + return self.runInteraction( "get_destinations_needing_retry", self._get_destinations_needing_retry ) - + def _get_destinations_needing_retry(cls, txn): where = "retry_last_ts > 0 and retry_next_ts < now()" query = DestinationsTable.select_statement(where) txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -332,14 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) - + + class DestinationsTable(Table): table_name = "destinations" - + fields = [ "destination", "retry_last_ts", "retry_interval", ] - EntryType = namedtuple("DestinationsEntry", fields) \ No newline at end of file + EntryType = namedtuple("DestinationsEntry", fields) -- cgit 1.4.1 From 0f4dcab238b029407080cb02cc2cf14e22d8fe89 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 10:28:27 +0000 Subject: turn back on per-request transaction retries, so that every time we try to hit a dead server we actually end up hammering 5 times :| --- synapse/http/matrixfederationclient.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 16fb2adab5..fc5b5ab809 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -101,10 +101,9 @@ class MatrixFederationHttpClient(object): ] ) - # was 5; for now, let's only try once at the HTTP layer and then - # rely on transaction-layer retries for exponential backoff and - # getting the message through. - retries_left = 0 + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) + retries_left = 5 endpoint = self._getEndpoint(reactor, destination) -- cgit 1.4.1 From d80d505b1f70eae128990ce1a9517e5c5edead73 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 11 Dec 2014 14:19:32 +0000 Subject: Limit the size of images that are thumbnailed serverside. Limit the size of file that a server will download from a remote server --- synapse/api/errors.py | 1 + synapse/config/repository.py | 5 +++++ synapse/http/matrixfederationclient.py | 25 +++++++++++++++++++------ synapse/media/v1/base_resource.py | 18 ++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) (limited to 'synapse/http') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 581439ceb3..e250b9b211 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -34,6 +34,7 @@ class Codes(object): LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED" CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED" CAPTCHA_INVALID = "M_CAPTCHA_INVALID" + TOO_LARGE = "M_TOO_LARGE" class CodeMessageException(Exception): diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 6eec930a03..f1b7b1b74e 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -20,6 +20,7 @@ class ContentRepositoryConfig(Config): def __init__(self, args): super(ContentRepositoryConfig, self).__init__(args) self.max_upload_size = self.parse_size(args.max_upload_size) + self.max_image_pixels = self.parse_size(args.max_image_pixels) self.media_store_path = self.ensure_directory(args.media_store_path) def parse_size(self, string): @@ -41,3 +42,7 @@ class ContentRepositoryConfig(Config): db_group.add_argument( "--media-store-path", default=cls.default_path("media_store") ) + db_group.add_argument( + "--max-image-pixels", default="32M", + help="Maximum number of pixels that will be thumbnailed" + ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f05269cdfb..8f4db59c75 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -26,7 +26,7 @@ from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError +from synapse.api.errors import CodeMessageException, SynapseError, Codes from syutil.crypto.jsonsign import sign_json @@ -289,7 +289,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def get_file(self, destination, path, output_stream, args={}, - retry_on_dns_fail=True): + retry_on_dns_fail=True, max_size=None): """GETs a file from a given homeserver Args: destination (str): The remote server to send the HTTP request to. @@ -325,7 +325,11 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) - length = yield _readBodyToFile(response, output_stream) + try: + length = yield _readBodyToFile(response, output_stream, max_size) + except: + logger.exception("Failed to download body") + raise defer.returnValue((length, headers)) @@ -337,14 +341,23 @@ class MatrixFederationHttpClient(object): class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__(self, stream, deferred): + def __init__(self, stream, deferred, max_size): self.stream = stream self.deferred = deferred self.length = 0 + self.max_size = max_size def dataReceived(self, data): self.stream.write(data) self.length += len(data) + if self.max_size is not None and self.length >= self.max_size: + self.deferred.errback(SynapseError( + 502, + "Requested file is too large > %r bytes" % (self.max_size,), + Codes.TOO_LARGE, + )) + self.deferred = defer.Deferred() + self.transport.loseConnection() def connectionLost(self, reason): if reason.check(ResponseDone): @@ -353,9 +366,9 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred.errback(reason) -def _readBodyToFile(response, stream): +def _readBodyToFile(response, stream, max_size): d = defer.Deferred() - response.deliverBody(_ReadBodyToFileProtocol(stream, d)) + response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) return d diff --git a/synapse/media/v1/base_resource.py b/synapse/media/v1/base_resource.py index 8c62ecd597..77b05c6548 100644 --- a/synapse/media/v1/base_resource.py +++ b/synapse/media/v1/base_resource.py @@ -43,6 +43,7 @@ class BaseMediaResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size + self.max_image_pixels = hs.config.max_image_pixels self.filepaths = filepaths @staticmethod @@ -143,6 +144,7 @@ class BaseMediaResource(Resource): )) length, headers = yield self.client.get_file( server_name, request_path, output_stream=f, + max_size=self.max_upload_size, ) media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() @@ -226,6 +228,14 @@ class BaseMediaResource(Resource): thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width m_height = thumbnailer.height + + if m_width * m_height >= self.max_image_pixels: + logger.info( + "Image too large to thumbnail %r x %r > %r" + m_width, m_height, self.max_image_pixels + ) + return + scales = set() crops = set() for r_width, r_height, r_method, r_type in requirements: @@ -281,6 +291,14 @@ class BaseMediaResource(Resource): thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width m_height = thumbnailer.height + + if m_width * m_height >= self.max_image_pixels: + logger.info( + "Image too large to thumbnail %r x %r > %r" + m_width, m_height, self.max_image_pixels + ) + return + scales = set() crops = set() for r_width, r_height, r_method, r_type in requirements: -- cgit 1.4.1 From 7b43a503f31e47b0eae9fe2b12fbea5e7fd280f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Dec 2014 15:05:37 +0000 Subject: Consistently url decode and decode as utf 8 the URL parts --- synapse/http/server.py | 12 +++++++++++- synapse/rest/directory.py | 12 +++--------- synapse/rest/presence.py | 4 ---- synapse/rest/profile.py | 5 ----- synapse/rest/room.py | 45 +++++++++++++++++++++------------------------ 5 files changed, 35 insertions(+), 43 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/server.py b/synapse/http/server.py index 02277c4998..2d5d71c8aa 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -29,6 +29,7 @@ from twisted.web.util import redirectTo import collections import logging +import urllib logger = logging.getLogger(__name__) @@ -122,9 +123,18 @@ class JsonResource(HttpServer, resource.Resource): # We found a match! Trigger callback and then return the # returned response. We pass both the request and any # matched groups from the regex to the callback. + + logger.debug("url things: %r", m.groups()) + + args = [ + urllib.unquote(u).decode("UTF-8") for u in m.groups() + ] + + logger.debug("url things args: %r", args) + code, response = yield path_entry.callback( request, - *m.groups() + *args ) self._send_response(request, code, response) diff --git a/synapse/rest/directory.py b/synapse/rest/directory.py index 35300c6a6f..7b2b18dbcd 100644 --- a/synapse/rest/directory.py +++ b/synapse/rest/directory.py @@ -36,9 +36,7 @@ class ClientDirectoryServer(RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_alias): - room_alias = self.hs.parse_roomalias( - urllib.unquote(room_alias).decode("utf-8") - ) + room_alias = self.hs.parse_roomalias(room_alias) dir_handler = self.handlers.directory_handler res = yield dir_handler.get_association(room_alias) @@ -56,9 +54,7 @@ class ClientDirectoryServer(RestServlet): logger.debug("Got content: %s", content) - room_alias = self.hs.parse_roomalias( - urllib.unquote(room_alias).decode("utf-8") - ) + room_alias = self.hs.parse_roomalias(room_alias) logger.debug("Got room name: %s", room_alias.to_string()) @@ -97,9 +93,7 @@ class ClientDirectoryServer(RestServlet): dir_handler = self.handlers.directory_handler - room_alias = self.hs.parse_roomalias( - urllib.unquote(room_alias).decode("utf-8") - ) + room_alias = self.hs.parse_roomalias(room_alias) yield dir_handler.delete_association( user.to_string(), room_alias diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py index 062c895595..4bcd7ef635 100644 --- a/synapse/rest/presence.py +++ b/synapse/rest/presence.py @@ -33,7 +33,6 @@ class PresenceStatusRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) state = yield self.handlers.presence_handler.get_state( @@ -44,7 +43,6 @@ class PresenceStatusRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) state = {} @@ -80,7 +78,6 @@ class PresenceListRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) if not self.hs.is_mine(user): @@ -101,7 +98,6 @@ class PresenceListRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) if not self.hs.is_mine(user): diff --git a/synapse/rest/profile.py b/synapse/rest/profile.py index 72e02d8dd8..fa1be2c289 100644 --- a/synapse/rest/profile.py +++ b/synapse/rest/profile.py @@ -27,7 +27,6 @@ class ProfileDisplaynameRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) displayname = yield self.handlers.profile_handler.get_displayname( @@ -39,7 +38,6 @@ class ProfileDisplaynameRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) try: @@ -62,7 +60,6 @@ class ProfileAvatarURLRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) avatar_url = yield self.handlers.profile_handler.get_avatar_url( @@ -74,7 +71,6 @@ class ProfileAvatarURLRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) try: @@ -97,7 +93,6 @@ class ProfileRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user_id = urllib.unquote(user_id) user = self.hs.parse_userid(user_id) displayname = yield self.handlers.profile_handler.get_displayname( diff --git a/synapse/rest/room.py b/synapse/rest/room.py index c526e9bc72..7fb5aca0a7 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -129,9 +129,9 @@ class RoomStateEventRestServlet(RestServlet): msg_handler = self.handlers.message_handler data = yield msg_handler.get_room_data( user_id=user.to_string(), - room_id=urllib.unquote(room_id), - event_type=urllib.unquote(event_type), - state_key=urllib.unquote(state_key), + room_id=room_id, + event_type=event_type, + state_key=state_key, ) if not data: @@ -143,19 +143,18 @@ class RoomStateEventRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key): user = yield self.auth.get_user_by_req(request) - event_type = urllib.unquote(event_type) content = _parse_json(request) event_dict = { "type": event_type, "content": content, - "room_id": urllib.unquote(room_id), + "room_id": room_id, "sender": user.to_string(), } if state_key is not None: - event_dict["state_key"] = urllib.unquote(state_key) + event_dict["state_key"] = state_key msg_handler = self.handlers.message_handler yield msg_handler.handle_event(event_dict) @@ -179,9 +178,9 @@ class RoomSendEventRestServlet(RestServlet): msg_handler = self.handlers.message_handler event = yield msg_handler.handle_event( { - "type": urllib.unquote(event_type), + "type": event_type, "content": content, - "room_id": urllib.unquote(room_id), + "room_id": room_id, "sender": user.to_string(), } ) @@ -225,14 +224,10 @@ class JoinRoomAliasServlet(RestServlet): identifier = None is_room_alias = False try: - identifier = self.hs.parse_roomalias( - urllib.unquote(room_identifier) - ) + identifier = self.hs.parse_roomalias(room_identifier) is_room_alias = True except SynapseError: - identifier = self.hs.parse_roomid( - urllib.unquote(room_identifier) - ) + identifier = self.hs.parse_roomid(room_identifier) # TODO: Support for specifying the home server to join with? @@ -246,7 +241,7 @@ class JoinRoomAliasServlet(RestServlet): { "type": RoomMemberEvent.TYPE, "content": {"membership": Membership.JOIN}, - "room_id": urllib.unquote(identifier.to_string()), + "room_id": identifier.to_string(), "sender": user.to_string(), "state_key": user.to_string(), } @@ -290,7 +285,7 @@ class RoomMemberListRestServlet(RestServlet): user = yield self.auth.get_user_by_req(request) handler = self.handlers.room_member_handler members = yield handler.get_room_members_as_pagination_chunk( - room_id=urllib.unquote(room_id), + room_id=room_id, user_id=user.to_string()) for event in members["chunk"]: @@ -322,7 +317,7 @@ class RoomMessageListRestServlet(RestServlet): with_feedback = "feedback" in request.args handler = self.handlers.message_handler msgs = yield handler.get_messages( - room_id=urllib.unquote(room_id), + room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, feedback=with_feedback) @@ -340,7 +335,7 @@ class RoomStateRestServlet(RestServlet): handler = self.handlers.message_handler # Get all the current state for this room events = yield handler.get_state_events( - room_id=urllib.unquote(room_id), + room_id=room_id, user_id=user.to_string(), ) defer.returnValue((200, events)) @@ -355,7 +350,7 @@ class RoomInitialSyncRestServlet(RestServlet): user = yield self.auth.get_user_by_req(request) pagination_config = PaginationConfig.from_request(request) content = yield self.handlers.message_handler.room_initial_sync( - room_id=urllib.unquote(room_id), + room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, ) @@ -367,8 +362,10 @@ class RoomTriggerBackfill(RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - remote_server = urllib.unquote(request.args["remote"][0]) - room_id = urllib.unquote(room_id) + remote_server = urllib.unquote( + request.args["remote"][0] + ).decode("UTF-8") + limit = int(request.args["limit"][0]) handler = self.handlers.federation_handler @@ -408,7 +405,7 @@ class RoomMembershipRestServlet(RestServlet): { "type": RoomMemberEvent.TYPE, "content": {"membership": unicode(membership_action)}, - "room_id": urllib.unquote(room_id), + "room_id": room_id, "sender": user.to_string(), "state_key": state_key, } @@ -446,9 +443,9 @@ class RoomRedactEventRestServlet(RestServlet): { "type": RoomRedactionEvent.TYPE, "content": content, - "room_id": urllib.unquote(room_id), + "room_id": room_id, "sender": user.to_string(), - "redacts": urllib.unquote(event_id), + "redacts": event_id, } ) -- cgit 1.4.1 From 1fc2a0e33ef6e88a79dbf4325468d8eb77db0f65 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Dec 2014 15:08:29 +0000 Subject: Fix tests and remove debug logging --- synapse/http/server.py | 4 ---- tests/utils.py | 9 +++++++-- 2 files changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/server.py b/synapse/http/server.py index 2d5d71c8aa..f33859cf76 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -124,14 +124,10 @@ class JsonResource(HttpServer, resource.Resource): # returned response. We pass both the request and any # matched groups from the regex to the callback. - logger.debug("url things: %r", m.groups()) - args = [ urllib.unquote(u).decode("UTF-8") for u in m.groups() ] - logger.debug("url things args: %r", args) - code, response = yield path_entry.callback( request, *args diff --git a/tests/utils.py b/tests/utils.py index f9a34748cd..70a221550c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -29,7 +29,7 @@ from twisted.enterprise.adbapi import ConnectionPool from collections import namedtuple from mock import patch, Mock -import json +import urllib import urlparse from inspect import getcallargs @@ -103,9 +103,14 @@ class MockHttpResource(HttpServer): matcher = pattern.match(path) if matcher: try: + args = [ + urllib.unquote(u).decode("UTF-8") + for u in matcher.groups() + ] + (code, response) = yield func( mock_request, - *matcher.groups() + *args ) defer.returnValue((code, response)) except CodeMessageException as e: -- cgit 1.4.1