diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ac97e19649..3241ded188 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -457,6 +457,10 @@ def run(hs):
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
+ version = sys.version_info
+ stats["python_version"] = "{}.{}.{}".format(
+ version.major, version.minor, version.micro
+ )
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0c68e8a472..8d6bd7976d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+def shortstr(iterable, maxitems=5):
+ """If iterable has maxitems or fewer, return the stringification of a list
+ containing those items.
+
+ Otherwise, return the stringification of a a list with the first maxitems items,
+ followed by "...".
+
+ Args:
+ iterable (Iterable): iterable to truncate
+ maxitems (int): number of items to return before truncating
+
+ Returns:
+ unicode
+ """
+
+ items = list(itertools.islice(iterable, maxitems + 1))
+ if len(items) <= maxitems:
+ return str(items)
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
- @log_function
def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
):
@@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
Returns (Deferred): completes with None
"""
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ logger.info(
+ "[%s %s] handling received PDU: %s",
+ room_id, event_id, pdu,
+ )
+
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
- pdu.event_id,
+ event_id,
allow_none=True,
allow_rejected=True,
)
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if pdu.room_id in self.room_queues:
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
- "in progress", pdu.event_id, pdu.room_id, origin)
- self.room_queues[pdu.room_id].append((pdu, origin))
+ if room_id in self.room_queues:
+ logger.info(
+ "[%s %s] Queuing PDU from %s for now: join in progress",
+ room_id, event_id, origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
- pdu.room_id,
+ room_id,
self.server_name
)
if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
- "Ignoring PDU %s for room %s from %s as we've left the room!",
- pdu.event_id, pdu.room_id, origin,
+ "[%s %s] Ignoring PDU from %s as we've left the room",
+ room_id, event_id, origin,
)
defer.returnValue(None)
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
+ "[%s %s] min_depth: %d",
+ room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
+ missing_prevs = prevs - seen
+ if get_missing and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
+ room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
- "Found all missing prev events for %s", pdu.event_id
+ "[%s %s] Found all missing prev_events",
+ room_id, event_id,
)
- elif prevs - seen:
+ elif missing_prevs:
logger.info(
- "Not fetching %d missing events for room %r,event %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id,
- list(prevs - seen)[:5],
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
if sent_to_us_directly and prevs - seen:
# If they have sent it to us directly, and the server
# isn't telling us about the auth events that it's
# made a message referencing, we explode
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
raise FederationError(
"ERROR",
403,
@@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
auth_chains = set()
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
+ logger.info(
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id, event_id, p,
+ )
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
- origin, pdu.room_id, p
+ origin, room_id, p,
)
)
auth_chains.update(got_auth_chain)
@@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
- room_version = yield self.store.get_room_version(pdu.room_id)
+ room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {pdu.event_id: pdu}, fetch
+ room_version, state_groups, {event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
+ logger.warn(
+ "[%s %s] Error attempting to resolve state at missing "
+ "prev_events",
+ room_id, event_id, exc_info=True,
+ )
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
- affected=pdu.event_id,
+ affected=event_id,
)
yield self._process_received_pdu(
@@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
- # We recalculate seen, since it may have changed.
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r pdu %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ "[%s %s]: Requesting %d prev_events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@@ -360,49 +406,87 @@ class FederationHandler(BaseHandler):
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
+ #
+ # ----
+ #
+ # Update richvdh 2018/09/18: There are a number of problems with timing this
+ # request out agressively on the client side:
+ #
+ # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+ # if you send too many requests at once, so you end up with the server carefully
+ # working through the backlog of your requests, which you have already timed
+ # out.
+ #
+ # - for this request in particular, we now (as of
+ # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+ # server can't produce a plausible-looking set of prev_events - so we becone
+ # much more likely to reject the event.
+ #
+ # - contrary to what it says above, we do *not* fall back to fetching fresh state
+ # for the room if get_missing_events times out. Rather, we give up processing
+ # the PDU whose prevs we are missing, which then makes it much more likely that
+ # we'll end up back here for the *next* PDU in the list, which exacerbates the
+ # problem.
+ #
+ # - the agressive 10s timeout was introduced to deal with incoming federation
+ # requests taking 8 hours to process. It's not entirely clear why that was going
+ # on; certainly there were other issues causing traffic storms which are now
+ # resolved, and I think in any case we may be more sensible about our locking
+ # now. We're *certainly* more sensible about our logging.
+ #
+ # All that said: Let's try increasing the timout to 60s and see what happens.
missing_events = yield self.federation_client.get_missing_events(
origin,
- pdu.room_id,
+ room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
- timeout=10000,
+ timeout=60000,
)
logger.info(
- "Got %d events: %r...",
- len(missing_events), [e.event_id for e in missing_events[:5]]
+ "[%s %s]: Got %d prev_events: %s",
+ room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
- for e in missing_events:
- logger.info("Handling found event %s", e.event_id)
+ for ev in missing_events:
+ logger.info(
+ "[%s %s] Handling received prev_event %s",
+ room_id, event_id, ev.event_id,
+ )
try:
yield self.on_receive_pdu(
origin,
- e,
+ ev,
get_missing=False
)
except FederationError as e:
if e.code == 403:
- logger.warn("Event %s failed history check.")
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
else:
raise
- @log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
- event = pdu
+ room_id = event.room_id
+ event_id = event.event_id
- logger.debug("Processing event: %s", event)
+ logger.debug(
+ "[%s %s] Processing event: %s",
+ room_id, event_id, event,
+ )
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -411,15 +495,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
+ room_id,
self.server_name
)
else:
is_in_room = True
+
if not is_in_room:
logger.info(
- "Got event for room we're not in: %r %r",
- event.room_id, event.event_id
+ "[%s %s] Got event for room we're not in",
+ room_id, event_id,
)
try:
@@ -431,7 +516,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
- affected=event.event_id,
+ affected=event_id,
)
else:
@@ -480,12 +565,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- room = yield self.store.get_room(event.room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
- room_id=event.room_id,
+ room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@@ -513,7 +598,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
- @log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
- @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
- "Rejecting %s because %s",
- event.event_id, e.msg
+ "[%s %s] Rejecting: %s",
+ event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
room_id = mapping["room_id"]
servers = mapping["servers"]
+ # put the server which owns the alias at the front of the server list.
+ if room_alias.domain in servers:
+ servers.remove(room_alias.domain)
+ servers.insert(0, room_alias.domain)
+
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92ad..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
- add_timeout_to_deferred(
+ request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c9369519..91025037a3 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
Args:
reactor: Twisted reactor.
- destination (bytes): The name of the server to connect to.
+ destination (unicode): The name of the server to connect to.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
+ # the SNI string should be the same as the Host header, minus the port.
+ # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+ # the Host header and SNI should therefore be the server_name of the remote
+ # server.
+ tls_options = tls_client_options_factory.get_options(domain)
+
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
- tls_client_options_factory.get_options(host),
- HostnameEndpoint(reactor, host, port, timeout=timeout))
+ tls_options,
+ HostnameEndpoint(reactor, host, port, timeout=timeout),
+ )
default_port = 8448
if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 083484a687..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,10 +17,12 @@ import cgi
import logging
import random
import sys
+from io import BytesIO
from six import PY3, string_types
from six.moves import urllib
+import attr
import treq
from canonicaljson import encode_canonical_json
from prometheus_client import Counter
@@ -28,8 +30,9 @@ from signedjson.sign import sign_json
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -41,13 +44,11 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
@@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object):
)
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+ method = attr.ib()
+ """HTTP method
+ :type: str
+ """
+
+ path = attr.ib()
+ """HTTP path
+ :type: str
+ """
+
+ destination = attr.ib()
+ """The remote server to send the HTTP request to.
+ :type: str"""
+
+ json = attr.ib(default=None)
+ """JSON to send in the body.
+ :type: dict|None
+ """
+
+ json_callback = attr.ib(default=None)
+ """A callback to generate the JSON.
+ :type: func|None
+ """
+
+ query = attr.ib(default=None)
+ """Query arguments.
+ :type: dict|None
+ """
+
+ txn_id = attr.ib(default=None)
+ """Unique ID for this request (for logging)
+ :type: str|None
+ """
+
+ def __attrs_post_init__(self):
+ global _next_id
+ self.txn_id = "%s-O-%s" % (self.method, _next_id)
+ _next_id = (_next_id + 1) % (MAXINT - 1)
+
+ def get_json(self):
+ if self.json_callback:
+ return self.json_callback()
+ return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+ """
+ Reads the JSON body of a response, with a timeout
+
+ Args:
+ reactor (IReactor): twisted reactor, for the timeout
+ timeout_sec (float): number of seconds to wait for response to complete
+ request (MatrixFederationRequest): the request that triggered the response
+ response (IResponse): response to the request
+
+ Returns:
+ dict: parsed JSON response
+ """
+ try:
+ check_content_type_is_json(response.headers)
+
+ d = treq.json_content(response)
+ d = timeout_deferred(
+ d,
+ timeout=timeout_sec,
+ reactor=reactor,
+ )
+
+ body = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
+ raise
+ logger.info(
+ "{%s} [%s] Completed: %d %s",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
+ defer.returnValue(body)
+
+
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
- self._next_id = 1
self.default_timeout = 60
- def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urllib.parse.urlunparse(
- (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
- )
+ def schedule(x):
+ reactor.callLater(_EPSILON, x)
+
+ self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
- def _request(self, destination, method, path,
- json=None, json_callback=None,
- param_bytes=b"",
- query=None, retry_on_dns_fail=True,
- timeout=None, long_retries=False,
- ignore_backoff=False,
- backoff_on_404=False):
+ def _send_request(
+ self,
+ request,
+ retry_on_dns_fail=True,
+ timeout=None,
+ long_retries=False,
+ ignore_backoff=False,
+ backoff_on_404=False
+ ):
"""
- Creates and sends a request to the given server.
+ Sends a request to the given server.
Args:
- destination (str): The remote server to send the HTTP request to.
- method (str): HTTP method
- path (str): The HTTP path
- json (dict or None): JSON to send in the body.
- json_callback (func or None): A callback to generate the JSON.
- query (dict or None): Query arguments.
+ request (MatrixFederationRequest): details of request to be sent
+
+ timeout (int|None): number of milliseconds to wait for the response headers
+ (including connecting to the server). 60s by default.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
+
backoff_on_404 (bool): Back off if we get a 404
Returns:
@@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object):
if (
self.hs.config.federation_domain_whitelist is not None and
- destination not in self.hs.config.federation_domain_whitelist
+ request.destination not in self.hs.config.federation_domain_whitelist
):
- raise FederationDeniedError(destination)
+ raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
- destination,
+ request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
- headers_dict = {}
- path_bytes = path.encode("ascii")
- if query:
- query_bytes = encode_query_args(query)
+ method = request.method
+ destination = request.destination
+ path_bytes = request.path.encode("ascii")
+ if request.query:
+ query_bytes = encode_query_args(request.query)
else:
query_bytes = b""
headers_dict = {
"User-Agent": [self.version_string],
- "Host": [destination],
+ "Host": [request.destination],
}
with limiter:
- url = self._create_url(
- destination.encode("ascii"), path_bytes, param_bytes, query_bytes
- ).decode('ascii')
-
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (MAXINT - 1)
-
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- http_url = urllib.parse.urlunparse(
- (b"", b"", path_bytes, param_bytes, query_bytes, b"")
- ).decode('ascii')
+ url = urllib.parse.urlunparse((
+ b"matrix", destination.encode("ascii"),
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
+
+ http_url = urllib.parse.urlunparse((
+ b"", b"",
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
- log_result = None
while True:
try:
- if json_callback:
- json = json_callback()
-
+ json = request.get_json()
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
@@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object):
data = None
self.sign_request(destination, method, http_url, headers_dict)
- outbound_logger.info(
+ logger.info(
"{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url
+ request.txn_id, destination, method, url
)
+ if data:
+ producer = FileBodyProducer(
+ BytesIO(data),
+ cooperator=self._cooperator
+ )
+ else:
+ producer = None
+
request_deferred = treq.request(
method,
url,
headers=Headers(headers_dict),
- data=data,
+ data=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
)
- request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
- # Sometimes the timeout above doesn't work, so lets hack yet
- # another layer of timeouts in in the vain hope that at some
- # point the world made sense and this really really really
- # should work.
- request_deferred = timeout_no_seriously(
+ request_deferred = timeout_deferred(
request_deferred,
- timeout=_sec_timeout * 2,
+ timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
@@ -244,33 +339,19 @@ class MatrixFederationHttpClient(object):
request_deferred,
)
- log_result = "%d %s" % (
- response.code,
- response.phrase.decode('ascii', errors='replace'),
- )
break
except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
-
logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s",
- txn_id,
+ "{%s} [%s] Request failed: %s %s: %s",
+ request.txn_id,
destination,
method,
url,
_flatten_response_never_received(e),
)
- log_result = _flatten_response_never_received(e)
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ raise
if retries_left and not timeout:
if long_retries:
@@ -283,33 +364,37 @@ class MatrixFederationHttpClient(object):
delay *= random.uniform(0.8, 1.4)
logger.debug(
- "{%s} Waiting %s before sending to %s...",
- txn_id,
+ "{%s} [%s] Waiting %ss before re-sending...",
+ request.txn_id,
+ destination,
delay,
- destination
)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
+
+ logger.info(
+ "{%s} [%s] Got response headers: %d %s",
+ request.txn_id,
+ destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
- with logcontext.PreserveLoggingContext():
- d = treq.content(response)
- d.addTimeout(_sec_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ d = treq.content(response)
+ d = timeout_deferred(
+ d,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+ body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -403,29 +488,26 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
- if not json_data_callback:
- json_data_callback = lambda: data
-
- response = yield self._request(
- destination,
- "PUT",
- path,
- json_callback=json_data_callback,
+ request = MatrixFederationRequest(
+ method="PUT",
+ destination=destination,
+ path=path,
query=args,
+ json_callback=json_data_callback,
+ json=data,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -459,31 +541,30 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "POST",
- path,
+
+ request = MatrixFederationRequest(
+ method="POST",
+ destination=destination,
+ path=path,
query=args,
json=data,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- if timeout:
- _sec_timeout = timeout / 1000
- else:
- _sec_timeout = self.default_timeout
-
- d.addTimeout(_sec_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), _sec_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -519,25 +600,23 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- response = yield self._request(
- destination,
- "GET",
- path,
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
-
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -568,25 +647,23 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "DELETE",
- path,
+ request = MatrixFederationRequest(
+ method="DELETE",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
-
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -614,11 +691,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "GET",
- path,
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -626,14 +707,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
- with logcontext.PreserveLoggingContext():
- d = _readBodyToFile(response, output_stream, max_size)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- length = yield make_deferred_yieldable(d)
- except Exception:
- logger.exception("Failed to download body")
+ d = _readBodyToFile(response, output_stream, max_size)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ length = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
raise
-
+ logger.info(
+ "{%s} [%s] Completed: %d %s [%d bytes]",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ length,
+ )
defer.returnValue((length, headers))
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9579e8cd0d..50be2de3bb 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,9 +75,9 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
- self.method,
+ self.method.decode('ascii', errors='replace'),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
@@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest):
C{b"-"}.
"""
return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+ b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
class SynapseRequestFactory(object):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..f1d92c1395 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
-from synapse.util.async_helpers import (
- DeferredTimeoutError,
- ObservableDeferred,
- add_timeout_to_deferred,
-)
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- add_timeout_to_deferred(
+ listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
@@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now:
break
- add_timeout_to_deferred(
- listener.deferred.addTimeout,
- (end_time - now) / 1000.,
- self.hs.get_reactor(),
+ listener.deferred = timeout_deferred(
+ listener.deferred,
+ timeout=(end_time - now) / 1000.,
+ reactor=self.hs.get_reactor(),
)
+
try:
with PreserveLoggingContext():
yield listener.deferred
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..9ad17b7c25 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
- self._simple_upsert_txn(
- txn,
- table="user_ips",
- keyvalues={
- "user_id": user_id,
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- "device_id": device_id,
- },
- values={
- "last_seen": last_seen,
- },
- lock=False,
- )
+ try:
+ self._simple_upsert_txn(
+ txn,
+ table="user_ips",
+ keyvalues={
+ "user_id": user_id,
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "device_id": device_id,
+ },
+ values={
+ "last_seen": last_seen,
+ },
+ lock=False,
+ )
+ except Exception as e:
+ # Failed to upsert, log and continue
+ logger.error("Failed to insert client IP %r: %r", entry, e)
@defer.inlineCallbacks
def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd3322..baf0379a68 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
@@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
"""
pass
- @cached(max_entries=10000)
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
- # XXX: we could chose to not bother persisting this if our cache thinks
- # this is a NOOP
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
@@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
retry_last_ts, retry_interval):
self.database_engine.lock_table(txn, "destinations")
- self._invalidate_cache_and_stream(
- txn, self.get_destination_retry_timings, (destination,)
- )
-
# We need to be careful here as the data may have changed from under us
# due to a worker setting the timings.
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c2946129..ec7b2c9672 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager())
-class DeferredTimeoutError(Exception):
- """
- This error is raised by default when a L{Deferred} times out.
- """
-
+def _cancelled_to_timed_out_error(value, timeout):
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise defer.TimeoutError(timeout, "Deferred")
+ return value
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
- """
- Add a timeout to a deferred by scheduling it to be cancelled after
- timeout seconds.
- This is essentially a backport of deferred.addTimeout, which was introduced
- in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+ """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+ that have a canceller that throws exceptions. This method creates a new
+ deferred that wraps and times out the given deferred, correctly handling
+ the case where the given deferred's canceller throws.
- If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
- unless a cancelable function was passed to its initialization or unless
- a different on_timeout_cancel callable is provided.
+ NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
- deferred (defer.Deferred): deferred to be timed out
- timeout (Number): seconds to time out after
- reactor (twisted.internet.reactor): the Twisted reactor to use
-
+ deferred (Deferred)
+ timeout (float): Timeout in seconds
+ reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
@@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
- CancelledError Failure into a DeferredTimeoutError.
- """
- timed_out = [False]
-
- def time_it_out():
- timed_out[0] = True
- deferred.cancel()
-
- delayed_call = reactor.callLater(timeout, time_it_out)
-
- def convert_cancelled(value):
- if timed_out[0]:
- to_call = on_timeout_cancel or _cancelled_to_timed_out_error
- return to_call(value, timeout)
- return value
-
- deferred.addBoth(convert_cancelled)
+ CancelledError Failure into a defer.TimeoutError.
- def cancel_timeout(result):
- # stop the pending call to cancel the deferred if it's been fired
- if delayed_call.active():
- delayed_call.cancel()
- return result
-
- deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise DeferredTimeoutError(timeout, "Deferred")
- return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
- """The in build twisted deferred addTimeout (and the method above)
- completely fail to time things out under some unknown circumstances.
-
- Lets try a different way of timing things out and maybe that will make
- things work?!
-
- TODO: Kill this with fire.
+ Returns:
+ Deferred
"""
new_d = defer.Deferred()
@@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True
- if not new_d.called:
- new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+ try:
+ deferred.cancel()
+ except: # noqa: E722, if we throw any exception it'll break time outs
+ logger.exception("Canceller failed during timeout")
- deferred.cancel()
+ if not new_d.called:
+ new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
- return _cancelled_to_timed_out_error(value, timeout)
+ to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+ return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
- logger.debug(
+ logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
|