diff options
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/__init__.py | 14 | ||||
-rw-r--r-- | synapse/http/client.py | 246 | ||||
-rw-r--r-- | synapse/http/endpoint.py | 171 | ||||
-rw-r--r-- | synapse/http/server.py | 181 |
4 files changed, 612 insertions, 0 deletions
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py new file mode 100644 index 0000000000..fe8a073cd3 --- /dev/null +++ b/synapse/http/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/http/client.py b/synapse/http/client.py new file mode 100644 index 0000000000..bb22b0ee9a --- /dev/null +++ b/synapse/http/client.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor +from twisted.web.client import _AgentBase, _URI, readBody +from twisted.web.http_headers import Headers + +from synapse.http.endpoint import matrix_endpoint +from synapse.util.async import sleep + +from syutil.jsonutil import encode_canonical_json + +from synapse.api.errors import CodeMessageException + +import json +import logging +import urllib + + +logger = logging.getLogger(__name__) + + +_destination_mappings = { + "red": "localhost:8080", + "blue": "localhost:8081", + "green": "localhost:8082", +} + + +class HttpClient(object): + """ Interface for talking json over http + """ + + def put_json(self, destination, path, data): + """ Sends the specifed json data using PUT + + Args: + destination (str): The remote server to send the HTTP request + to. + path (str): The HTTP path. + data (dict): A dict containing the data that will be used as + the request body. This will be encoded as JSON. + + Returns: + Deferred: Succeeds when we get *any* HTTP response. + + The result of the deferred is a tuple of `(code, response)`, + where `response` is a dict representing the decoded JSON body. + """ + pass + + def get_json(self, destination, path, args=None): + """ Get's some json from the given host homeserver and path + + Args: + destination (str): The remote server to send the HTTP request + to. + path (str): The HTTP path. + args (dict): A dictionary used to create query strings, defaults to + None. + **Note**: The value of each key is assumed to be an iterable + and *not* a string. + + Returns: + Deferred: Succeeds when we get *any* HTTP response. + + The result of the deferred is a tuple of `(code, response)`, + where `response` is a dict representing the decoded JSON body. + """ + pass + + +class MatrixHttpAgent(_AgentBase): + + def __init__(self, reactor, pool=None): + _AgentBase.__init__(self, reactor, pool) + + def request(self, destination, endpoint, method, path, params, query, + headers, body_producer): + + host = b"" + port = 0 + fragment = b"" + + parsed_URI = _URI(b"http", destination, host, port, path, params, + query, fragment) + + # Set the connection pool key to be the destination. + key = destination + + return self._requestWithEndpoint(key, endpoint, method, parsed_URI, + headers, body_producer, + parsed_URI.originForm) + + +class TwistedHttpClient(HttpClient): + """ Wrapper around the twisted HTTP client api. + + Attributes: + agent (twisted.web.client.Agent): The twisted Agent used to send the + requests. + """ + + def __init__(self): + self.agent = MatrixHttpAgent(reactor) + + @defer.inlineCallbacks + def put_json(self, destination, path, data): + if destination in _destination_mappings: + destination = _destination_mappings[destination] + + response = yield self._create_request( + destination.encode("ascii"), + "PUT", + path.encode("ascii"), + producer=_JsonProducer(data), + headers_dict={"Content-Type": ["application/json"]} + ) + + logger.debug("Getting resp body") + body = yield readBody(response) + logger.debug("Got resp body") + + defer.returnValue((response.code, body)) + + @defer.inlineCallbacks + def get_json(self, destination, path, args={}): + if destination in _destination_mappings: + destination = _destination_mappings[destination] + + logger.debug("get_json args: %s", args) + query_bytes = urllib.urlencode(args, True) + + response = yield self._create_request( + destination.encode("ascii"), + "GET", + path.encode("ascii"), + query_bytes + ) + + body = yield readBody(response) + + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks + def _create_request(self, destination, method, path_bytes, param_bytes=b"", + query_bytes=b"", producer=None, headers_dict={}): + """ Creates and sends a request to the given url + """ + headers_dict[b"User-Agent"] = [b"Synapse"] + headers_dict[b"Host"] = [destination] + + logger.debug("Sending request to %s: %s %s;%s?%s", + destination, method, path_bytes, param_bytes, query_bytes) + + logger.debug( + "Types: %s", + [ + type(destination), type(method), type(path_bytes), + type(param_bytes), + type(query_bytes) + ] + ) + + retries_left = 5 + + # TODO: setup and pass in an ssl_context to enable TLS + endpoint = matrix_endpoint(reactor, destination, timeout=10) + + while True: + try: + response = yield self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) + + logger.debug("Got response to %s", method) + break + except Exception as e: + logger.exception("Got error in _create_request") + _print_ex(e) + + if retries_left: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + pass + else: + # :'( + # Update transactions table? + logger.error( + "Got response %d %s", response.code, response.phrase + ) + raise CodeMessageException( + response.code, response.phrase + ) + + defer.returnValue(response) + + +def _print_ex(e): + if hasattr(e, "reasons") and e.reasons: + for ex in e.reasons: + _print_ex(ex) + else: + logger.exception(e) + + +class _JsonProducer(object): + """ Used by the twisted http client to create the HTTP body from json + """ + def __init__(self, jsn): + self.body = encode_canonical_json(jsn) + self.length = len(self.body) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py new file mode 100644 index 0000000000..c4e6e63a80 --- /dev/null +++ b/synapse/http/endpoint.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint +from twisted.internet import defer +from twisted.internet.error import ConnectError +from twisted.names import client, dns +from twisted.names.error import DNSNameError + +import collections +import logging +import random + + +logger = logging.getLogger(__name__) + + +def matrix_endpoint(reactor, destination, ssl_context_factory=None, + timeout=None): + """Construct an endpoint for the given matrix destination. + + Args: + reactor: Twisted reactor. + destination (bytes): The name of the server to connect to. + ssl_context_factory (twisted.internet.ssl.ContextFactory): Factory + which generates SSL contexts to use for TLS. + timeout (int): connection timeout in seconds + """ + + domain_port = destination.split(":") + domain = domain_port[0] + port = int(domain_port[1]) if domain_port[1:] else None + + endpoint_kw_args = {} + + if timeout is not None: + endpoint_kw_args.update(timeout=timeout) + + if ssl_context_factory is None: + transport_endpoint = TCP4ClientEndpoint + default_port = 8080 + else: + transport_endpoint = SSL4ClientEndpoint + endpoint_kw_args.update(ssl_context_factory=ssl_context_factory) + default_port = 443 + + if port is None: + return SRVClientEndpoint( + reactor, "matrix", domain, protocol="tcp", + default_port=default_port, endpoint=transport_endpoint, + endpoint_kw_args=endpoint_kw_args + ) + else: + return transport_endpoint(reactor, domain, port, **endpoint_kw_args) + + +class SRVClientEndpoint(object): + """An endpoint which looks up SRV records for a service. + Cycles through the list of servers starting with each call to connect + picking the next server. + Implements twisted.internet.interfaces.IStreamClientEndpoint. + """ + + _Server = collections.namedtuple( + "_Server", "priority weight host port" + ) + + def __init__(self, reactor, service, domain, protocol="tcp", + default_port=None, endpoint=TCP4ClientEndpoint, + endpoint_kw_args={}): + self.reactor = reactor + self.service_name = "_%s._%s.%s" % (service, protocol, domain) + + if default_port is not None: + self.default_server = self._Server( + host=domain, + port=default_port, + priority=0, + weight=0 + ) + else: + self.default_server = None + + self.endpoint = endpoint + self.endpoint_kw_args = endpoint_kw_args + + self.servers = None + self.used_servers = None + + @defer.inlineCallbacks + def fetch_servers(self): + try: + answers, auth, add = yield client.lookupService(self.service_name) + except DNSNameError: + answers = [] + + if (len(answers) == 1 + and answers[0].type == dns.SRV + and answers[0].payload + and answers[0].payload.target == dns.Name('.')): + raise ConnectError("Service %s unavailable", self.service_name) + + self.servers = [] + self.used_servers = [] + + for answer in answers: + if answer.type != dns.SRV or not answer.payload: + continue + payload = answer.payload + self.servers.append(self._Server( + host=str(payload.target), + port=int(payload.port), + priority=int(payload.priority), + weight=int(payload.weight) + )) + + self.servers.sort() + + def pick_server(self): + if not self.servers: + if self.used_servers: + self.servers = self.used_servers + self.used_servers = [] + self.servers.sort() + elif self.default_server: + return self.default_server + else: + raise ConnectError( + "Not server available for %s", self.service_name + ) + + min_priority = self.servers[0].priority + weight_indexes = list( + (index, server.weight + 1) + for index, server in enumerate(self.servers) + if server.priority == min_priority + ) + + total_weight = sum(weight for index, weight in weight_indexes) + target_weight = random.randint(0, total_weight) + + for index, weight in weight_indexes: + target_weight -= weight + if target_weight <= 0: + server = self.servers[index] + del self.servers[index] + self.used_servers.append(server) + return server + + @defer.inlineCallbacks + def connect(self, protocolFactory): + if self.servers is None: + yield self.fetch_servers() + server = self.pick_server() + logger.info("Connecting to %s:%s", server.host, server.port) + endpoint = self.endpoint( + self.reactor, server.host, server.port, **self.endpoint_kw_args + ) + connection = yield endpoint.connect(protocolFactory) + defer.returnValue(connection) diff --git a/synapse/http/server.py b/synapse/http/server.py new file mode 100644 index 0000000000..8823aade78 --- /dev/null +++ b/synapse/http/server.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from syutil.jsonutil import ( + encode_canonical_json, encode_pretty_printed_json +) +from synapse.api.errors import cs_exception, CodeMessageException + +from twisted.internet import defer, reactor +from twisted.web import server, resource +from twisted.web.server import NOT_DONE_YET + +import collections +import logging + + +logger = logging.getLogger(__name__) + + +class HttpServer(object): + """ Interface for registering callbacks on a HTTP server + """ + + def register_path(self, method, path_pattern, callback): + """ Register a callback that get's fired if we receive a http request + with the given method for a path that matches the given regex. + + If the regex contains groups these get's passed to the calback via + an unpacked tuple. + + Args: + method (str): The method to listen to. + path_pattern (str): The regex used to match requests. + callback (function): The function to fire if we receive a matched + request. The first argument will be the request object and + subsequent arguments will be any matched groups from the regex. + This should return a tuple of (code, response). + """ + pass + + +# The actual HTTP server impl, using twisted http server +class TwistedHttpServer(HttpServer, resource.Resource): + """ This wraps the twisted HTTP server, and triggers the correct callbacks + on the transport_layer. + + Register callbacks via register_path() + """ + + isLeaf = True + + _PathEntry = collections.namedtuple("_PathEntry", ["pattern", "callback"]) + + def __init__(self): + resource.Resource.__init__(self) + + self.path_regexs = {} + + def register_path(self, method, path_pattern, callback): + self.path_regexs.setdefault(method, []).append( + self._PathEntry(path_pattern, callback) + ) + + def start_listening(self, port): + """ Registers the http server with the twisted reactor. + + Args: + port (int): The port to listen on. + + """ + reactor.listenTCP(port, server.Site(self)) + + # Gets called by twisted + def render(self, request): + """ This get's called by twisted every time someone sends us a request. + """ + self._async_render(request) + return server.NOT_DONE_YET + + @defer.inlineCallbacks + def _async_render(self, request): + """ This get's called by twisted every time someone sends us a request. + This checks if anyone has registered a callback for that method and + path. + """ + try: + # Loop through all the registered callbacks to check if the method + # and path regex match + for path_entry in self.path_regexs.get(request.method, []): + m = path_entry.pattern.match(request.path) + if m: + # We found a match! Trigger callback and then return the + # returned response. We pass both the request and any + # matched groups from the regex to the callback. + code, response = yield path_entry.callback( + request, + *m.groups() + ) + + self._send_response(request, code, response) + return + + # Huh. No one wanted to handle that? Fiiiiiine. Send 400. + self._send_response( + request, + 400, + {"error": "Unrecognized request"} + ) + except CodeMessageException as e: + logger.exception(e) + self._send_response( + request, + e.code, + cs_exception(e) + ) + except Exception as e: + logger.exception(e) + self._send_response( + request, + 500, + {"error": "Internal server error"} + ) + + def _send_response(self, request, code, response_json_object): + + if not self._request_user_agent_is_curl(request): + json_bytes = encode_canonical_json(response_json_object) + else: + json_bytes = encode_pretty_printed_json(response_json_object) + + # TODO: Only enable CORS for the requests that need it. + respond_with_json_bytes(request, code, json_bytes, send_cors=True) + + @staticmethod + def _request_user_agent_is_curl(request): + user_agents = request.requestHeaders.getRawHeaders( + "User-Agent", default=[] + ) + for user_agent in user_agents: + if "curl" in user_agent: + return True + return False + + +def respond_with_json_bytes(request, code, json_bytes, send_cors=False): + """Sends encoded JSON in response to the given request. + + Args: + request (twisted.web.http.Request): The http request to respond to. + code (int): The HTTP response code. + json_bytes (bytes): The json bytes to use as the response body. + send_cors (bool): Whether to send Cross-Origin Resource Sharing headers + http://www.w3.org/TR/cors/ + Returns: + twisted.web.server.NOT_DONE_YET""" + + request.setResponseCode(code) + request.setHeader(b"Content-Type", b"application/json") + + if send_cors: + request.setHeader("Access-Control-Allow-Origin", "*") + request.setHeader("Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS") + request.setHeader("Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept") + + request.write(json_bytes) + request.finish() + return NOT_DONE_YET |