diff options
-rw-r--r-- | AUTHORS.rst | 4 | ||||
-rw-r--r-- | synapse/api/auth.py | 2 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 106 | ||||
-rw-r--r-- | synapse/config/_base.py | 4 | ||||
-rw-r--r-- | synapse/config/repository.py | 5 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 1 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 52 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 28 | ||||
-rw-r--r-- | synapse/http/client.py | 18 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 128 | ||||
-rw-r--r-- | synapse/http/server.py | 78 | ||||
-rw-r--r-- | synapse/rest/client/v1/transactions.py | 6 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 43 |
14 files changed, 310 insertions, 167 deletions
diff --git a/AUTHORS.rst b/AUTHORS.rst index 3a457cd9fc..d7224ff5de 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -38,3 +38,7 @@ Brabo <brabo at riseup.net> Ivan Shapovalov <intelfx100 at gmail.com> * contrib/systemd: a sample systemd unit file and a logger configuration + +Eric Myhre <hash at exultant.us> + * Fix bug where ``media_store_path`` config option was ignored by v0 content + repository API. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d5bf0be85c..4da62e5d8d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -370,6 +370,8 @@ class Auth(object): user_agent=user_agent ) + request.authenticated_entity = user.to_string() + defer.returnValue((user, ClientInfo(device_id, token_id))) except KeyError: raise AuthError( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 95e9122d3e..49e27c9e11 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,6 @@ from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File from twisted.web.server import Site, GzipEncoderFactory, Request -from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -61,10 +60,13 @@ import twisted.manhole.telnet import synapse +import contextlib import logging import os +import re import resource import subprocess +import time logger = logging.getLogger("synapse.app.homeserver") @@ -112,7 +114,7 @@ class SynapseHomeServer(HomeServer): def build_resource_for_content_repo(self): return ContentRepoResource( - self, self.upload_dir, self.auth, self.content_addr + self, self.config.uploads_path, self.auth, self.content_addr ) def build_resource_for_media_repository(self): @@ -142,6 +144,7 @@ class SynapseHomeServer(HomeServer): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") tls = listener_config.get("tls", False) + site_tag = listener_config.get("tag", port) if tls and config.no_tls: return @@ -197,7 +200,8 @@ class SynapseHomeServer(HomeServer): reactor.listenSSL( port, SynapseSite( - "synapse.access.https", + "synapse.access.https.%s" % (site_tag,), + site_tag, listener_config, root_resource, ), @@ -208,7 +212,8 @@ class SynapseHomeServer(HomeServer): reactor.listenTCP( port, SynapseSite( - "synapse.access.https", + "synapse.access.http.%s" % (site_tag,), + site_tag, listener_config, root_resource, ), @@ -375,7 +380,6 @@ def setup(config_options): hs = SynapseHomeServer( config.server_name, - upload_dir=os.path.abspath("uploads"), db_config=config.database_config, tls_context_factory=tls_context_factory, config=config, @@ -433,9 +437,70 @@ class SynapseService(service.Service): return self._port.stopListening() -class XForwardedForRequest(Request): - def __init__(self, *args, **kw): +class SynapseRequest(Request): + def __init__(self, site, *args, **kw): Request.__init__(self, *args, **kw) + self.site = site + self.authenticated_entity = None + self.start_time = 0 + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site.site_tag, + ) + + def get_redacted_uri(self): + return re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1<redacted>\2', + self.uri + ) + + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) """ Add a layer on top of another request that only uses the value of an @@ -451,8 +516,16 @@ class XForwardedForRequest(Request): b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() -def XForwardedFactory(*args, **kwargs): - return XForwardedForRequest(*args, **kwargs) +class SynapseRequestFactory(object): + def __init__(self, site, x_forwarded_for): + self.site = site + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site, *args, **kwargs) + else: + return SynapseRequest(self.site, *args, **kwargs) class SynapseSite(Site): @@ -460,18 +533,17 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - if config.get("x_forwarded", False): - self.requestFactory = XForwardedFactory - self._log_formatter = proxiedLogFormatter - else: - self._log_formatter = combinedLogFormatter + + self.site_tag = site_tag + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) def log(self, request): - line = self._log_formatter(self._logDateTime, request) - self.access_logger.info(line) + pass def create_resource_tree(desired_tree, redirect_root_to_web_client=True): diff --git a/synapse/config/_base.py b/synapse/config/_base.py index d4163d6272..d483c67c6a 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -148,7 +148,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) @@ -209,7 +209,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index adaf4e4bb2..6891abd71d 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -21,13 +21,18 @@ class ContentRepositoryConfig(Config): self.max_upload_size = self.parse_size(config["max_upload_size"]) self.max_image_pixels = self.parse_size(config["max_image_pixels"]) self.media_store_path = self.ensure_directory(config["media_store_path"]) + self.uploads_path = self.ensure_directory(config["uploads_path"]) def default_config(self, config_dir_path, server_name): media_store = self.default_path("media_store") + uploads_path = self.default_path("uploads") return """ # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" + # Directory where in-progress uploads are stored. + uploads_path: "%(uploads_path)s" + # The largest allowed upload size in bytes max_upload_size: "10M" diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 31190e700a..bad93c6b2f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,6 +94,7 @@ class TransportLayerServer(object): yield self.keyring.verify_json_for_server(origin, json_request) logger.info("Request from %s", origin) + request.authenticated_entity = origin defer.returnValue((origin, content)) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8269482e47..1240e51649 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - if not user_info: + if user_info: defer.returnValue(False) return diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..e324662f18 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -380,15 +380,6 @@ class MessageHandler(BaseHandler): if limit is None: limit = 10 - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=now_token.room_key, - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - room_members = [ m for m in current_state.values() if m.type == EventTypes.Member @@ -396,19 +387,38 @@ class MessageHandler(BaseHandler): ] presence_handler = self.hs.get_handlers().presence_handler - presence = [] - for m in room_members: - try: - member_presence = yield presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - ) - presence.append(member_presence) - except SynapseError: - logger.exception( - "Failed to get member presence of %r", m.user_id + + @defer.inlineCallbacks + def get_presence(): + presence_defs = yield defer.DeferredList( + [ + presence_handler.get_state( + target_user=UserID.from_string(m.user_id), + auth_user=auth_user, + as_event=True, + check_auth=False, + ) + for m in room_members + ], + consumeErrors=True, + ) + + defer.returnValue([p for success, p in presence_defs if success]) + + presence, (messages, token) = yield defer.gatherResults( + [ + get_presence(), + self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=now_token.room_key, ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 023ad33ab0..7c03198313 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler): defer.returnValue(False) @defer.inlineCallbacks - def get_state(self, target_user, auth_user, as_event=False): + def get_state(self, target_user, auth_user, as_event=False, check_auth=True): if self.hs.is_mine(target_user): - visible = yield self.is_presence_visible( - observer_user=auth_user, - observed_user=target_user - ) + if check_auth: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=target_user + ) - if not visible: - raise SynapseError(404, "Presence information not visible") - state = yield self.store.get_presence_state(target_user.localpart) - if "mtime" in state: - del state["mtime"] - state["presence"] = state.pop("state") + if not visible: + raise SynapseError(404, "Presence information not visible") if target_user in self._user_cachemap: - cached_state = self._user_cachemap[target_user].get_state() - if "last_active" in cached_state: - state["last_active"] = cached_state["last_active"] + state = self._user_cachemap[target_user].get_state() + else: + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state.pop("state") else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() diff --git a/synapse/http/client.py b/synapse/http/client.py index e746f2416e..9091ae2d38 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,21 +61,31 @@ class SimpleHttpClient(object): self.agent = Agent(reactor, pool=pool) self.version_string = hs.version_string - def request(self, method, *args, **kwargs): + def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) d = preserve_context_over_fn( self.agent.request, - method, *args, **kwargs + method, uri, *args, **kwargs ) + logger.info("Sending request %s %s", method, uri) + def _cb(response): incoming_responses_counter.inc(method, response.code) + logger.info( + "Received response to %s %s: %s", + method, uri, response.code + ) return response def _eb(failure): incoming_responses_counter.inc(method, "ERR") + logger.info( + "Error sending request to %s %s: %s %s", + method, uri, failure.type, failure.getErrorMessage() + ) return failure d.addCallbacks(_cb, _eb) @@ -84,7 +94,9 @@ class SimpleHttpClient(object): @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}): + # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) + query_bytes = urllib.urlencode(args, True) response = yield self.request( @@ -105,7 +117,7 @@ class SimpleHttpClient(object): def post_json_get_json(self, uri, post_json): json_str = encode_canonical_json(post_json) - logger.info("HTTP POST %s -> %s", json_str, uri) + logger.debug("HTTP POST %s -> %s", json_str, uri) response = yield self.request( "POST", diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7f3d8fc884..1b90692731 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json import simplejson as json import logging +import sys import urllib import urlparse logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") metrics = synapse.metrics.get_metrics_for(__name__) @@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self.version_string = hs.version_string + self._next_id = 1 + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,16 +127,12 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + txn_id = "%s-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - logger.debug( - "Types: %s", - [ - type(destination), type(method), type(path_bytes), - type(param_bytes), - type(query_bytes) - ] + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes ) # XXX: Would be much nicer to retry only at the transaction-layer @@ -141,63 +141,71 @@ class MatrixFederationHttpClient(object): endpoint = self._getEndpoint(reactor, destination) - while True: - producer = None - if body_callback: - producer = body_callback(method, url_bytes, headers_dict) - - try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) + + try: + request_deferred = preserve_context_over_fn( + self.agent.request, + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - logger.debug("Got response to %s", method) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "DNS Lookup failed to %s with %s", + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, destination, - e + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), ) - raise - - logger.warn( - "Sending request failed to %s: %s %s: %s - %s", - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - if retries_left and not timeout: - yield sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise - - logger.info( - "Received response %d %s for %s: %s %s", - response.code, - response.phrase, - destination, - method, - url_bytes - ) + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), + ) + + if retries_left and not timeout: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass diff --git a/synapse/http/server.py b/synapse/http/server.py index ae8f3b3972..807ff95c65 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -79,53 +79,39 @@ def request_handler(request_handler): _next_request_id += 1 with LoggingContext(request_id) as request_context: request_context.request = request_id - code = None - start = self.clock.time_msec() - try: - logger.info( - "Received request: %s %s", - request.method, request.path - ) - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d - code = request.code - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg + with request.processing(): + try: + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) + else: + logger.exception(e) + outgoing_responses_counter.inc(request.method, str(code)) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + version_string=self.version_string, + ) + except: + logger.exception( + "Failed handle request %s.%s on %r: %r", + request_handler.__module__, + request_handler.__name__, + self, + request + ) + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except: - code = 500 - logger.exception( - "Failed handle request %s.%s on %r: %r", - request_handler.__module__, - request_handler.__name__, - self, - request - ) - respond_with_json( - request, - 500, - {"error": "Internal server error"}, - send_cors=True - ) - finally: - code = str(code) if code else "-" - end = self.clock.time_msec() - logger.info( - "Processed request: %dms %s %s %s", - end-start, code, request.method, request.path - ) return wrapped_request_handler diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index d933fea18a..b861069b89 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -39,10 +39,10 @@ class HttpTransactionStore(object): A tuple of (HTTP response code, response content) or None. """ try: - logger.debug("get_response Key: %s TxnId: %s", key, txn_id) + logger.debug("get_response TxnId: %s", txn_id) (last_txn_id, response) = self.transactions[key] if txn_id == last_txn_id: - logger.info("get_response: Returning a response for %s", key) + logger.info("get_response: Returning a response for %s", txn_id) return response except KeyError: pass @@ -58,7 +58,7 @@ class HttpTransactionStore(object): txn_id (str): The transaction ID for this request. response (tuple): A tuple of (HTTP response code, response content) """ - logger.debug("store_response Key: %s TxnId: %s", key, txn_id) + logger.debug("store_response TxnId: %s", txn_id) self.transactions[key] = (txn_id, response) def store_client_transaction(self, request, txn_id, response): diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 06cb1dd4cf..9e95d1e532 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -58,6 +58,49 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) @defer.inlineCallbacks + def test_query_user_exists_unknown_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value=None) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.mock_as_api.query_user.assert_called_once_with( + services[0], user_id + ) + + @defer.inlineCallbacks + def test_query_user_exists_known_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value={ + "name": user_id + }) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.assertFalse( + self.mock_as_api.query_user.called, + "query_user called when it shouldn't have been." + ) + + @defer.inlineCallbacks def test_query_room_alias_exists(self): room_alias_str = "#foo:bar" room_alias = Mock() |