diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/homeserver.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 195 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 5 | ||||
-rw-r--r-- | synapse/http/client.py | 4 | ||||
-rw-r--r-- | synapse/http/endpoint.py | 13 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 406 | ||||
-rw-r--r-- | synapse/http/site.py | 6 | ||||
-rw-r--r-- | synapse/notifier.py | 21 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 34 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 8 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 88 | ||||
-rw-r--r-- | synapse/util/retryutils.py | 2 |
12 files changed, 466 insertions, 320 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index ac97e19649..3241ded188 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -457,6 +457,10 @@ def run(hs): stats["homeserver"] = hs.config.server_name stats["timestamp"] = now stats["uptime_seconds"] = uptime + version = sys.version_info + stats["python_version"] = "{}.{}.{}".format( + version.major, version.minor, version.micro + ) stats["total_users"] = yield hs.get_datastore().count_all_users() total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0c68e8a472..8d6bd7976d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,6 +69,27 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +def shortstr(iterable, maxitems=5): + """If iterable has maxitems or fewer, return the stringification of a list + containing those items. + + Otherwise, return the stringification of a a list with the first maxitems items, + followed by "...". + + Args: + iterable (Iterable): iterable to truncate + maxitems (int): number of items to return before truncating + + Returns: + unicode + """ + + items = list(itertools.islice(iterable, maxitems + 1)) + if len(items) <= maxitems: + return str(items) + return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]" + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -114,7 +135,6 @@ class FederationHandler(BaseHandler): self._room_pdu_linearizer = Linearizer("fed_room_pdu") @defer.inlineCallbacks - @log_function def on_receive_pdu( self, origin, pdu, get_missing=True, sent_to_us_directly=False, ): @@ -130,9 +150,17 @@ class FederationHandler(BaseHandler): Returns (Deferred): completes with None """ + room_id = pdu.room_id + event_id = pdu.event_id + + logger.info( + "[%s %s] handling received PDU: %s", + room_id, event_id, pdu, + ) + # We reprocess pdus when we have seen them only as outliers existing = yield self.store.get_event( - pdu.event_id, + event_id, allow_none=True, allow_rejected=True, ) @@ -147,7 +175,7 @@ class FederationHandler(BaseHandler): ) ) if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) + logger.debug("[%s %s]: Already seen pdu", room_id, event_id) return # do some initial sanity-checking of the event. In particular, make @@ -156,6 +184,7 @@ class FederationHandler(BaseHandler): try: self._sanity_check_event(pdu) except SynapseError as err: + logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id) raise FederationError( "ERROR", err.code, @@ -165,10 +194,12 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. - if pdu.room_id in self.room_queues: - logger.info("Ignoring PDU %s for room %s from %s for now; join " - "in progress", pdu.event_id, pdu.room_id, origin) - self.room_queues[pdu.room_id].append((pdu, origin)) + if room_id in self.room_queues: + logger.info( + "[%s %s] Queuing PDU from %s for now: join in progress", + room_id, event_id, origin, + ) + self.room_queues[room_id].append((pdu, origin)) return # If we're no longer in the room just ditch the event entirely. This @@ -179,7 +210,7 @@ class FederationHandler(BaseHandler): # we should check if we *are* in fact in the room. If we are then we # can magically rejoin the room. is_in_room = yield self.auth.check_host_in_room( - pdu.room_id, + room_id, self.server_name ) if not is_in_room: @@ -188,8 +219,8 @@ class FederationHandler(BaseHandler): ) if was_in_room: logger.info( - "Ignoring PDU %s for room %s from %s as we've left the room!", - pdu.event_id, pdu.room_id, origin, + "[%s %s] Ignoring PDU from %s as we've left the room", + room_id, event_id, origin, ) defer.returnValue(None) @@ -204,8 +235,8 @@ class FederationHandler(BaseHandler): ) logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth + "[%s %s] min_depth: %d", + room_id, event_id, min_depth, ) prevs = {e_id for e_id, _ in pdu.prev_events} @@ -218,17 +249,18 @@ class FederationHandler(BaseHandler): # send to the clients. pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: + missing_prevs = prevs - seen + if get_missing and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) with (yield self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), + "[%s %s] Acquired room lock to fetch %d missing prev_events", + room_id, event_id, len(missing_prevs), ) yield self._get_missing_events_for_pdu( @@ -241,19 +273,23 @@ class FederationHandler(BaseHandler): if not prevs - seen: logger.info( - "Found all missing prev events for %s", pdu.event_id + "[%s %s] Found all missing prev_events", + room_id, event_id, ) - elif prevs - seen: + elif missing_prevs: logger.info( - "Not fetching %d missing events for room %r,event %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, - list(prevs - seen)[:5], + "[%s %s] Not recursively fetching %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) if sent_to_us_directly and prevs - seen: # If they have sent it to us directly, and the server # isn't telling us about the auth events that it's # made a message referencing, we explode + logger.warn( + "[%s %s] Failed to fetch %d prev events: rejecting", + room_id, event_id, len(prevs - seen), + ) raise FederationError( "ERROR", 403, @@ -270,15 +306,19 @@ class FederationHandler(BaseHandler): auth_chains = set() try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + ours = yield self.store.get_state_groups(room_id, list(seen)) state_groups.append(ours) # Ask the remote server for the states we don't # know about for p in prevs - seen: + logger.info( + "[%s %s] Requesting state at missing prev_event %s", + room_id, event_id, p, + ) state, got_auth_chain = ( yield self.federation_client.get_state_for_room( - origin, pdu.room_id, p + origin, room_id, p, ) ) auth_chains.update(got_auth_chain) @@ -291,19 +331,24 @@ class FederationHandler(BaseHandler): ev_ids, get_prev_content=False, check_redacted=False ) - room_version = yield self.store.get_room_version(pdu.room_id) + room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {pdu.event_id: pdu}, fetch + room_version, state_groups, {event_id: pdu}, fetch ) state = (yield self.store.get_events(state_map.values())).values() auth_chain = list(auth_chains) except Exception: + logger.warn( + "[%s %s] Error attempting to resolve state at missing " + "prev_events", + room_id, event_id, exc_info=True, + ) raise FederationError( "ERROR", 403, "We can't get valid state history.", - affected=pdu.event_id, + affected=event_id, ) yield self._process_received_pdu( @@ -322,15 +367,16 @@ class FederationHandler(BaseHandler): prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. """ - # We recalculate seen, since it may have changed. + + room_id = pdu.room_id + event_id = pdu.event_id + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) + latest = yield self.store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -338,8 +384,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r pdu %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] + "[%s %s]: Requesting %d prev_events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) # XXX: we set timeout to 10s to help workaround @@ -360,49 +406,87 @@ class FederationHandler(BaseHandler): # apparently. # # see https://github.com/matrix-org/synapse/pull/1744 + # + # ---- + # + # Update richvdh 2018/09/18: There are a number of problems with timing this + # request out agressively on the client side: + # + # - it plays badly with the server-side rate-limiter, which starts tarpitting you + # if you send too many requests at once, so you end up with the server carefully + # working through the backlog of your requests, which you have already timed + # out. + # + # - for this request in particular, we now (as of + # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the + # server can't produce a plausible-looking set of prev_events - so we becone + # much more likely to reject the event. + # + # - contrary to what it says above, we do *not* fall back to fetching fresh state + # for the room if get_missing_events times out. Rather, we give up processing + # the PDU whose prevs we are missing, which then makes it much more likely that + # we'll end up back here for the *next* PDU in the list, which exacerbates the + # problem. + # + # - the agressive 10s timeout was introduced to deal with incoming federation + # requests taking 8 hours to process. It's not entirely clear why that was going + # on; certainly there were other issues causing traffic storms which are now + # resolved, and I think in any case we may be more sensible about our locking + # now. We're *certainly* more sensible about our logging. + # + # All that said: Let's try increasing the timout to 60s and see what happens. missing_events = yield self.federation_client.get_missing_events( origin, - pdu.room_id, + room_id, earliest_events_ids=list(latest), latest_events=[pdu], limit=10, min_depth=min_depth, - timeout=10000, + timeout=60000, ) logger.info( - "Got %d events: %r...", - len(missing_events), [e.event_id for e in missing_events[:5]] + "[%s %s]: Got %d prev_events: %s", + room_id, event_id, len(missing_events), shortstr(missing_events), ) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) - for e in missing_events: - logger.info("Handling found event %s", e.event_id) + for ev in missing_events: + logger.info( + "[%s %s] Handling received prev_event %s", + room_id, event_id, ev.event_id, + ) try: yield self.on_receive_pdu( origin, - e, + ev, get_missing=False ) except FederationError as e: if e.code == 403: - logger.warn("Event %s failed history check.") + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) else: raise - @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state, auth_chain): + def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ - event = pdu + room_id = event.room_id + event_id = event.event_id - logger.debug("Processing event: %s", event) + logger.debug( + "[%s %s] Processing event: %s", + room_id, event_id, event, + ) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -411,15 +495,16 @@ class FederationHandler(BaseHandler): # event. if state and auth_chain and not event.internal_metadata.is_outlier(): is_in_room = yield self.auth.check_host_in_room( - event.room_id, + room_id, self.server_name ) else: is_in_room = True + if not is_in_room: logger.info( - "Got event for room we're not in: %r %r", - event.room_id, event.event_id + "[%s %s] Got event for room we're not in", + room_id, event_id, ) try: @@ -431,7 +516,7 @@ class FederationHandler(BaseHandler): "ERROR", e.code, e.msg, - affected=event.event_id, + affected=event_id, ) else: @@ -480,12 +565,12 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - room = yield self.store.get_room(event.room_id) + room = yield self.store.get_room(room_id) if not room: try: yield self.store.store_room( - room_id=event.room_id, + room_id=room_id, room_creator_user_id="", is_public=False, ) @@ -513,7 +598,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + yield self.user_joined_room(user, room_id) @log_function @defer.inlineCallbacks @@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler): else: defer.returnValue(None) - @log_function def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) @defer.inlineCallbacks - @log_function def _handle_new_event(self, origin, event, state=None, auth_events=None, backfilled=False): context = yield self._prep_event( @@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler): ) except AuthError as e: logger.warn( - "Rejecting %s because %s", - event.event_id, e.msg + "[%s %s] Rejecting: %s", + event.room_id, event.event_id, e.msg ) context.rejected = RejectedReason.AUTH_ERROR diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index f643619047..07fd3e82fc 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -583,6 +583,11 @@ class RoomMemberHandler(object): room_id = mapping["room_id"] servers = mapping["servers"] + # put the server which owns the alias at the front of the server list. + if room_alias.domain in servers: + servers.remove(room_alias.domain) + servers.insert(0, room_alias.domain) + defer.returnValue((RoomID.from_string(room_id), servers)) @defer.inlineCallbacks diff --git a/synapse/http/client.py b/synapse/http/client.py index ec339a92ad..3d05f83b8c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http.endpoint import SpiderEndpoint -from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -99,7 +99,7 @@ class SimpleHttpClient(object): request_deferred = treq.request( method, uri, agent=self.agent, data=data, headers=headers ) - add_timeout_to_deferred( + request_deferred = timeout_deferred( request_deferred, 60, self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index b0c9369519..91025037a3 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= Args: reactor: Twisted reactor. - destination (bytes): The name of the server to connect to. + destination (unicode): The name of the server to connect to. tls_client_options_factory (synapse.crypto.context_factory.ClientTLSOptionsFactory): Factory which generates TLS options for client connections. @@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= transport_endpoint = HostnameEndpoint default_port = 8008 else: + # the SNI string should be the same as the Host header, minus the port. + # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777, + # the Host header and SNI should therefore be the server_name of the remote + # server. + tls_options = tls_client_options_factory.get_options(domain) + def transport_endpoint(reactor, host, port, timeout): return wrapClientTLS( - tls_client_options_factory.get_options(host), - HostnameEndpoint(reactor, host, port, timeout=timeout)) + tls_options, + HostnameEndpoint(reactor, host, port, timeout=timeout), + ) default_port = 8448 if port is None: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 083484a687..14b12cd1c4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,10 +17,12 @@ import cgi import logging import random import sys +from io import BytesIO from six import PY3, string_types from six.moves import urllib +import attr import treq from canonicaljson import encode_canonical_json from prometheus_client import Counter @@ -28,8 +30,9 @@ from signedjson.sign import sign_json from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError +from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool +from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool from twisted.web.http_headers import Headers import synapse.metrics @@ -41,13 +44,11 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util import logcontext -from synapse.util.async_helpers import timeout_no_seriously +from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -outbound_logger = logging.getLogger("synapse.http.outbound") outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) @@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object): ) +_next_id = 1 + + +@attr.s +class MatrixFederationRequest(object): + method = attr.ib() + """HTTP method + :type: str + """ + + path = attr.ib() + """HTTP path + :type: str + """ + + destination = attr.ib() + """The remote server to send the HTTP request to. + :type: str""" + + json = attr.ib(default=None) + """JSON to send in the body. + :type: dict|None + """ + + json_callback = attr.ib(default=None) + """A callback to generate the JSON. + :type: func|None + """ + + query = attr.ib(default=None) + """Query arguments. + :type: dict|None + """ + + txn_id = attr.ib(default=None) + """Unique ID for this request (for logging) + :type: str|None + """ + + def __attrs_post_init__(self): + global _next_id + self.txn_id = "%s-O-%s" % (self.method, _next_id) + _next_id = (_next_id + 1) % (MAXINT - 1) + + def get_json(self): + if self.json_callback: + return self.json_callback() + return self.json + + +@defer.inlineCallbacks +def _handle_json_response(reactor, timeout_sec, request, response): + """ + Reads the JSON body of a response, with a timeout + + Args: + reactor (IReactor): twisted reactor, for the timeout + timeout_sec (float): number of seconds to wait for response to complete + request (MatrixFederationRequest): the request that triggered the response + response (IResponse): response to the request + + Returns: + dict: parsed JSON response + """ + try: + check_content_type_is_json(response.headers) + + d = treq.json_content(response) + d = timeout_deferred( + d, + timeout=timeout_sec, + reactor=reactor, + ) + + body = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) + raise + logger.info( + "{%s} [%s] Completed: %d %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) + defer.returnValue(body) + + class MatrixFederationHttpClient(object): """HTTP client used to talk to other homeservers over the federation protocol. Send client certificates and signs requests. @@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') - self._next_id = 1 self.default_timeout = 60 - def _create_url(self, destination, path_bytes, param_bytes, query_bytes): - return urllib.parse.urlunparse( - (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"") - ) + def schedule(x): + reactor.callLater(_EPSILON, x) + + self._cooperator = Cooperator(scheduler=schedule) @defer.inlineCallbacks - def _request(self, destination, method, path, - json=None, json_callback=None, - param_bytes=b"", - query=None, retry_on_dns_fail=True, - timeout=None, long_retries=False, - ignore_backoff=False, - backoff_on_404=False): + def _send_request( + self, + request, + retry_on_dns_fail=True, + timeout=None, + long_retries=False, + ignore_backoff=False, + backoff_on_404=False + ): """ - Creates and sends a request to the given server. + Sends a request to the given server. Args: - destination (str): The remote server to send the HTTP request to. - method (str): HTTP method - path (str): The HTTP path - json (dict or None): JSON to send in the body. - json_callback (func or None): A callback to generate the JSON. - query (dict or None): Query arguments. + request (MatrixFederationRequest): details of request to be sent + + timeout (int|None): number of milliseconds to wait for the response headers + (including connecting to the server). 60s by default. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + backoff_on_404 (bool): Back off if we get a 404 Returns: @@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object): if ( self.hs.config.federation_domain_whitelist is not None and - destination not in self.hs.config.federation_domain_whitelist + request.destination not in self.hs.config.federation_domain_whitelist ): - raise FederationDeniedError(destination) + raise FederationDeniedError(request.destination) limiter = yield synapse.util.retryutils.get_retry_limiter( - destination, + request.destination, self.clock, self._store, backoff_on_404=backoff_on_404, ignore_backoff=ignore_backoff, ) - headers_dict = {} - path_bytes = path.encode("ascii") - if query: - query_bytes = encode_query_args(query) + method = request.method + destination = request.destination + path_bytes = request.path.encode("ascii") + if request.query: + query_bytes = encode_query_args(request.query) else: query_bytes = b"" headers_dict = { "User-Agent": [self.version_string], - "Host": [destination], + "Host": [request.destination], } with limiter: - url = self._create_url( - destination.encode("ascii"), path_bytes, param_bytes, query_bytes - ).decode('ascii') - - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (MAXINT - 1) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - http_url = urllib.parse.urlunparse( - (b"", b"", path_bytes, param_bytes, query_bytes, b"") - ).decode('ascii') + url = urllib.parse.urlunparse(( + b"matrix", destination.encode("ascii"), + path_bytes, None, query_bytes, b"", + )).decode('ascii') + + http_url = urllib.parse.urlunparse(( + b"", b"", + path_bytes, None, query_bytes, b"", + )).decode('ascii') - log_result = None while True: try: - if json_callback: - json = json_callback() - + json = request.get_json() if json: data = encode_canonical_json(json) headers_dict["Content-Type"] = ["application/json"] @@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object): data = None self.sign_request(destination, method, http_url, headers_dict) - outbound_logger.info( + logger.info( "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url + request.txn_id, destination, method, url ) + if data: + producer = FileBodyProducer( + BytesIO(data), + cooperator=self._cooperator + ) + else: + producer = None + request_deferred = treq.request( method, url, headers=Headers(headers_dict), - data=data, + data=producer, agent=self.agent, reactor=self.hs.get_reactor(), unbuffered=True ) - request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - # Sometimes the timeout above doesn't work, so lets hack yet - # another layer of timeouts in in the vain hope that at some - # point the world made sense and this really really really - # should work. - request_deferred = timeout_no_seriously( + request_deferred = timeout_deferred( request_deferred, - timeout=_sec_timeout * 2, + timeout=_sec_timeout, reactor=self.hs.get_reactor(), ) @@ -244,33 +339,19 @@ class MatrixFederationHttpClient(object): request_deferred, ) - log_result = "%d %s" % ( - response.code, - response.phrase.decode('ascii', errors='replace'), - ) break except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s", - txn_id, + "{%s} [%s] Request failed: %s %s: %s", + request.txn_id, destination, method, url, _flatten_response_never_received(e), ) - log_result = _flatten_response_never_received(e) + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + raise if retries_left and not timeout: if long_retries: @@ -283,33 +364,37 @@ class MatrixFederationHttpClient(object): delay *= random.uniform(0.8, 1.4) logger.debug( - "{%s} Waiting %s before sending to %s...", - txn_id, + "{%s} [%s] Waiting %ss before re-sending...", + request.txn_id, + destination, delay, - destination ) yield self.clock.sleep(delay) retries_left -= 1 else: raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + + logger.info( + "{%s} [%s] Got response headers: %d %s", + request.txn_id, + destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) if 200 <= response.code < 300: pass else: # :'( # Update transactions table? - with logcontext.PreserveLoggingContext(): - d = treq.content(response) - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + d = treq.content(response) + d = timeout_deferred( + d, + timeout=_sec_timeout, + reactor=self.hs.get_reactor(), + ) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -403,29 +488,26 @@ class MatrixFederationHttpClient(object): is not on our federation whitelist """ - if not json_data_callback: - json_data_callback = lambda: data - - response = yield self._request( - destination, - "PUT", - path, - json_callback=json_data_callback, + request = MatrixFederationRequest( + method="PUT", + destination=destination, + path=path, query=args, + json_callback=json_data_callback, + json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, backoff_on_404=backoff_on_404, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -459,31 +541,30 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "POST", - path, + + request = MatrixFederationRequest( + method="POST", + destination=destination, + path=path, query=args, json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - if timeout: - _sec_timeout = timeout / 1000 - else: - _sec_timeout = self.default_timeout - - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = yield _handle_json_response( + self.hs.get_reactor(), _sec_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -519,25 +600,23 @@ class MatrixFederationHttpClient(object): logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -568,25 +647,23 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "DELETE", - path, + request = MatrixFederationRequest( + method="DELETE", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -614,11 +691,15 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff, ) @@ -626,14 +707,25 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - with logcontext.PreserveLoggingContext(): - d = _readBodyToFile(response, output_stream, max_size) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - length = yield make_deferred_yieldable(d) - except Exception: - logger.exception("Failed to download body") + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) raise - + logger.info( + "{%s} [%s] Completed: %d %s [%d bytes]", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + length, + ) defer.returnValue((length, headers)) diff --git a/synapse/http/site.py b/synapse/http/site.py index 9579e8cd0d..50be2de3bb 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -75,9 +75,9 @@ class SynapseRequest(Request): return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), - self.method, + self.method.decode('ascii', errors='replace'), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), self.site.site_tag, ) @@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest): C{b"-"}. """ return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii') class SynapseRequestFactory(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index 82f391481c..f1d92c1395 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,11 +25,7 @@ from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken -from synapse.util.async_helpers import ( - DeferredTimeoutError, - ObservableDeferred, - add_timeout_to_deferred, -) +from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -337,7 +333,7 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - add_timeout_to_deferred( + listener.deferred = timeout_deferred( listener.deferred, (end_time - now) / 1000., self.hs.get_reactor(), @@ -354,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break @@ -559,15 +555,16 @@ class Notifier(object): if end_time <= now: break - add_timeout_to_deferred( - listener.deferred.addTimeout, - (end_time - now) / 1000., - self.hs.get_reactor(), + listener.deferred = timeout_deferred( + listener.deferred, + timeout=(end_time - now) / 1000., + reactor=self.hs.get_reactor(), ) + try: with PreserveLoggingContext(): yield listener.deferred - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 8fc678fa67..9ad17b7c25 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - self._simple_upsert_txn( - txn, - table="user_ips", - keyvalues={ - "user_id": user_id, - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": last_seen, - }, - lock=False, - ) + try: + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + except Exception as e: + # Failed to upsert, log and continue + logger.error("Failed to insert client IP %r: %r", entry, e) @defer.inlineCallbacks def get_last_client_ip_by_device(self, user_id, device_id): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0c42bd3322..baf0379a68 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json @@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore): """ pass - @cached(max_entries=10000) def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ - # XXX: we could chose to not bother persisting this if our cache thinks - # this is a NOOP return self.runInteraction( "set_destination_retry_timings", self._set_destination_retry_timings, @@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore): retry_last_ts, retry_interval): self.database_engine.lock_table(txn, "destinations") - self._invalidate_cache_and_stream( - txn, self.get_destination_retry_timings, (destination,) - ) - # We need to be careful here as the data may have changed from under us # due to a worker setting the timings. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 40c2946129..ec7b2c9672 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -374,29 +374,25 @@ class ReadWriteLock(object): defer.returnValue(_ctx_manager()) -class DeferredTimeoutError(Exception): - """ - This error is raised by default when a L{Deferred} times out. - """ - +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise defer.TimeoutError(timeout, "Deferred") + return value -def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): - """ - Add a timeout to a deferred by scheduling it to be cancelled after - timeout seconds. - This is essentially a backport of deferred.addTimeout, which was introduced - in twisted 16.5. +def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): + """The in built twisted `Deferred.addTimeout` fails to time out deferreds + that have a canceller that throws exceptions. This method creates a new + deferred that wraps and times out the given deferred, correctly handling + the case where the given deferred's canceller throws. - If the deferred gets timed out, it errbacks with a DeferredTimeoutError, - unless a cancelable function was passed to its initialization or unless - a different on_timeout_cancel callable is provided. + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred Args: - deferred (defer.Deferred): deferred to be timed out - timeout (Number): seconds to time out after - reactor (twisted.internet.reactor): the Twisted reactor to use - + deferred (Deferred) + timeout (float): Timeout in seconds + reactor (twisted.internet.reactor): The twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is otherwise cancelled before the timeout. @@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): the timeout. The default callable (if none is provided) will translate a - CancelledError Failure into a DeferredTimeoutError. - """ - timed_out = [False] - - def time_it_out(): - timed_out[0] = True - deferred.cancel() - - delayed_call = reactor.callLater(timeout, time_it_out) - - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) - return value - - deferred.addBoth(convert_cancelled) + CancelledError Failure into a defer.TimeoutError. - def cancel_timeout(result): - # stop the pending call to cancel the deferred if it's been fired - if delayed_call.active(): - delayed_call.cancel() - return result - - deferred.addBoth(cancel_timeout) - - -def _cancelled_to_timed_out_error(value, timeout): - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") - return value - - -def timeout_no_seriously(deferred, timeout, reactor): - """The in build twisted deferred addTimeout (and the method above) - completely fail to time things out under some unknown circumstances. - - Lets try a different way of timing things out and maybe that will make - things work?! - - TODO: Kill this with fire. + Returns: + Deferred """ new_d = defer.Deferred() @@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor): def time_it_out(): timed_out[0] = True - if not new_d.called: - new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + try: + deferred.cancel() + except: # noqa: E722, if we throw any exception it'll break time outs + logger.exception("Canceller failed during timeout") - deferred.cancel() + if not new_d.called: + new_d.errback(defer.TimeoutError(timeout, "Deferred")) delayed_call = reactor.callLater(timeout, time_it_out) def convert_cancelled(value): if timed_out[0]: - return _cancelled_to_timed_out_error(value, timeout) + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) return value deferred.addBoth(convert_cancelled) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 8a3a06fd74..26cce7d197 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -188,7 +188,7 @@ class RetryDestinationLimiter(object): else: self.retry_interval = self.min_retry_interval - logger.debug( + logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", self.destination, exc_type, exc_val, self.retry_interval ) |