diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index c488b10d3c..bfebb0f644 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 27e5190224..fdd90b1c3c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 4ae45f136d..4775f6707d 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@ from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
-from twisted.names.error import DNSNameError
+from twisted.names.error import DNSNameError, DomainError
import collections
import logging
@@ -27,6 +27,14 @@ import random
logger = logging.getLogger(__name__)
+SERVER_CACHE = {}
+
+
+_Server = collections.namedtuple(
+ "_Server", "priority weight host port"
+)
+
+
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.
@@ -73,10 +81,6 @@ class SRVClientEndpoint(object):
Implements twisted.internet.interfaces.IStreamClientEndpoint.
"""
- _Server = collections.namedtuple(
- "_Server", "priority weight host port"
- )
-
def __init__(self, reactor, service, domain, protocol="tcp",
default_port=None, endpoint=TCP4ClientEndpoint,
endpoint_kw_args={}):
@@ -84,7 +88,7 @@ class SRVClientEndpoint(object):
self.service_name = "_%s._%s.%s" % (service, protocol, domain)
if default_port is not None:
- self.default_server = self._Server(
+ self.default_server = _Server(
host=domain,
port=default_port,
priority=0,
@@ -101,32 +105,8 @@ class SRVClientEndpoint(object):
@defer.inlineCallbacks
def fetch_servers(self):
- try:
- answers, auth, add = yield client.lookupService(self.service_name)
- except DNSNameError:
- answers = []
-
- if (len(answers) == 1
- and answers[0].type == dns.SRV
- and answers[0].payload
- and answers[0].payload.target == dns.Name('.')):
- raise ConnectError("Service %s unavailable", self.service_name)
-
- self.servers = []
self.used_servers = []
-
- for answer in answers:
- if answer.type != dns.SRV or not answer.payload:
- continue
- payload = answer.payload
- self.servers.append(self._Server(
- host=str(payload.target),
- port=int(payload.port),
- priority=int(payload.priority),
- weight=int(payload.weight)
- ))
-
- self.servers.sort()
+ self.servers = yield resolve_service(self.service_name)
def pick_server(self):
if not self.servers:
@@ -170,3 +150,64 @@ class SRVClientEndpoint(object):
)
connection = yield endpoint.connect(protocolFactory)
defer.returnValue(connection)
+
+
+@defer.inlineCallbacks
+def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE):
+ servers = []
+
+ try:
+ try:
+ answers, _, _ = yield dns_client.lookupService(service_name)
+ except DNSNameError:
+ defer.returnValue([])
+
+ if (len(answers) == 1
+ and answers[0].type == dns.SRV
+ and answers[0].payload
+ and answers[0].payload.target == dns.Name('.')):
+ raise ConnectError("Service %s unavailable", service_name)
+
+ for answer in answers:
+ if answer.type != dns.SRV or not answer.payload:
+ continue
+
+ payload = answer.payload
+
+ host = str(payload.target)
+
+ try:
+ answers, _, _ = yield dns_client.lookupAddress(host)
+ except DNSNameError:
+ continue
+
+ ips = [
+ answer.payload.dottedQuad()
+ for answer in answers
+ if answer.type == dns.A and answer.payload
+ ]
+
+ for ip in ips:
+ servers.append(_Server(
+ host=ip,
+ port=int(payload.port),
+ priority=int(payload.priority),
+ weight=int(payload.weight)
+ ))
+
+ servers.sort()
+ cache[service_name] = list(servers)
+ except DomainError as e:
+ # We failed to resolve the name (other than a NameError)
+ # Try something in the cache, else rereaise
+ cache_entry = cache.get(service_name, None)
+ if cache_entry:
+ logger.warn(
+ "Failed to resolve %r, falling back to cache. %r",
+ service_name, e
+ )
+ servers = list(cache_entry)
+ else:
+ raise e
+
+ defer.returnValue(servers)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b7b7c2cce8..c3589534f8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -152,7 +152,7 @@ class MatrixFederationHttpClient(object):
return self.clock.time_bound_deferred(
request_deferred,
- time_out=timeout/1000. if timeout else 60,
+ time_out=timeout / 1000. if timeout else 60,
)
response = yield preserve_context_over_fn(
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 682b6b379b..a90e2e1125 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ metrics = synapse.metrics.get_metrics_for(__name__)
incoming_requests_counter = metrics.register_counter(
"requests",
- labels=["method", "servlet"],
+ labels=["method", "servlet", "tag"],
)
outgoing_responses_counter = metrics.register_counter(
"responses",
@@ -50,23 +50,23 @@ outgoing_responses_counter = metrics.register_counter(
response_timer = metrics.register_distribution(
"response_time",
- labels=["method", "servlet"]
+ labels=["method", "servlet", "tag"]
)
response_ru_utime = metrics.register_distribution(
- "response_ru_utime", labels=["method", "servlet"]
+ "response_ru_utime", labels=["method", "servlet", "tag"]
)
response_ru_stime = metrics.register_distribution(
- "response_ru_stime", labels=["method", "servlet"]
+ "response_ru_stime", labels=["method", "servlet", "tag"]
)
response_db_txn_count = metrics.register_distribution(
- "response_db_txn_count", labels=["method", "servlet"]
+ "response_db_txn_count", labels=["method", "servlet", "tag"]
)
response_db_txn_duration = metrics.register_distribution(
- "response_db_txn_duration", labels=["method", "servlet"]
+ "response_db_txn_duration", labels=["method", "servlet", "tag"]
)
@@ -99,9 +99,8 @@ def request_handler(request_handler):
request_context.request = request_id
with request.processing():
try:
- d = request_handler(self, request)
- with PreserveLoggingContext():
- yield d
+ with PreserveLoggingContext(request_context):
+ yield request_handler(self, request)
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
@@ -208,6 +207,9 @@ class JsonResource(HttpServer, resource.Resource):
if request.method == "OPTIONS":
self._send_response(request, 200, {})
return
+
+ start_context = LoggingContext.current_context()
+
# Loop through all the registered callbacks to check if the method
# and path regex match
for path_entry in self.path_regexs.get(request.method, []):
@@ -226,7 +228,6 @@ class JsonResource(HttpServer, resource.Resource):
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
- incoming_requests_counter.inc(request.method, servlet_classname)
args = [
urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
@@ -237,21 +238,40 @@ class JsonResource(HttpServer, resource.Resource):
code, response = callback_return
self._send_response(request, code, response)
- response_timer.inc_by(
- self.clock.time_msec() - start, request.method, servlet_classname
- )
-
try:
context = LoggingContext.current_context()
+
+ tag = ""
+ if context:
+ tag = context.tag
+
+ if context != start_context:
+ logger.warn(
+ "Context have unexpectedly changed %r, %r",
+ context, self.start_context
+ )
+ return
+
+ incoming_requests_counter.inc(request.method, servlet_classname, tag)
+
+ response_timer.inc_by(
+ self.clock.time_msec() - start, request.method,
+ servlet_classname, tag
+ )
+
ru_utime, ru_stime = context.get_resource_usage()
- response_ru_utime.inc_by(ru_utime, request.method, servlet_classname)
- response_ru_stime.inc_by(ru_stime, request.method, servlet_classname)
+ response_ru_utime.inc_by(
+ ru_utime, request.method, servlet_classname, tag
+ )
+ response_ru_stime.inc_by(
+ ru_stime, request.method, servlet_classname, tag
+ )
response_db_txn_count.inc_by(
- context.db_txn_count, request.method, servlet_classname
+ context.db_txn_count, request.method, servlet_classname, tag
)
response_db_txn_duration.inc_by(
- context.db_txn_duration, request.method, servlet_classname
+ context.db_txn_duration, request.method, servlet_classname, tag
)
except:
pass
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 32b6d6cd72..7bd87940b4 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
|