summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:14 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-21 14:53:14 +0100
commitab26b5894a3720cac4daa0daf47ed08d29c4356e (patch)
tree2044b1fabe9cdeb96d1195beef09498b95b21509
parentMerge commit '31acc5c30' into anoa/dinsic_release_1_21_x (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into anoa/info-mainli... (diff)
downloadsynapse-ab26b5894a3720cac4daa0daf47ed08d29c4356e.tar.xz
Merge commit 'f43c66d23' into anoa/dinsic_release_1_21_x
* commit 'f43c66d23':
  Add support for running Complement against the local checkout (#8317)
  Filter out appservices from mau count (#8404)
  Only assert valid next_link params when provided (#8417)
  Add metrics to track success/otherwise of replication requests (#8406)
  Fix handling of connection timeouts in outgoing http requests (#8400)
  Changelog
  Don't check whether a 3pid is allowed to register during password reset
  Add checks for postgres sequence consistency (#8402)
  Create a mechanism for marking tests "logcontext clean" (#8399)
  Add `ui_auth_sessions_ips` table to `synapse_port_db` ignore list (#8410)
  A pair of tiny cleanups in the federation request code. (#8401)
  typo
-rw-r--r--changelog.d/8317.feature1
-rw-r--r--changelog.d/8399.misc1
-rw-r--r--changelog.d/8400.bugfix1
-rw-r--r--changelog.d/8401.misc1
-rw-r--r--changelog.d/8402.misc1
-rw-r--r--changelog.d/8404.misc1
-rw-r--r--changelog.d/8406.feature1
-rw-r--r--changelog.d/8410.bugfix1
-rw-r--r--changelog.d/8414.bugfix1
-rw-r--r--changelog.d/8417.feature1
-rw-r--r--docs/postgres.md11
-rwxr-xr-xscripts-dev/complement.sh22
-rwxr-xr-xscripts/synapse_port_db1
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/identity.py25
-rw-r--r--synapse/http/__init__.py17
-rw-r--r--synapse/http/client.py54
-rw-r--r--synapse/http/matrixfederationclient.py57
-rw-r--r--synapse/http/proxyagent.py16
-rw-r--r--synapse/logging/context.py43
-rw-r--r--synapse/replication/http/_base.py40
-rw-r--r--synapse/rest/client/v2_alpha/account.py7
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py9
-rw-r--r--synapse/storage/databases/main/registration.py3
-rw-r--r--synapse/storage/databases/main/schema/delta/56/event_labels.sql2
-rw-r--r--synapse/storage/databases/state/store.py3
-rw-r--r--synapse/storage/util/id_generators.py5
-rw-r--r--synapse/storage/util/sequence.py90
-rw-r--r--synapse/util/async_helpers.py47
-rw-r--r--tests/crypto/test_keyring.py3
-rw-r--r--tests/http/test_fedclient.py14
-rw-r--r--tests/http/test_simple_client.py180
-rw-r--r--tests/rest/client/v2_alpha/test_account.py6
-rw-r--r--tests/storage/test_id_generators.py22
-rw-r--r--tests/storage/test_monthly_active_users.py17
-rw-r--r--tests/unittest.py46
36 files changed, 602 insertions, 150 deletions
diff --git a/changelog.d/8317.feature b/changelog.d/8317.feature
new file mode 100644

index 0000000000..f9edda099c --- /dev/null +++ b/changelog.d/8317.feature
@@ -0,0 +1 @@ +Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/). \ No newline at end of file diff --git a/changelog.d/8399.misc b/changelog.d/8399.misc new file mode 100644
index 0000000000..ce6e8123cf --- /dev/null +++ b/changelog.d/8399.misc
@@ -0,0 +1 @@ +Create a mechanism for marking tests "logcontext clean". diff --git a/changelog.d/8400.bugfix b/changelog.d/8400.bugfix new file mode 100644
index 0000000000..835658ba5e --- /dev/null +++ b/changelog.d/8400.bugfix
@@ -0,0 +1 @@ +Fix incorrect handling of timeouts on outgoing HTTP requests. diff --git a/changelog.d/8401.misc b/changelog.d/8401.misc new file mode 100644
index 0000000000..27fd7ab129 --- /dev/null +++ b/changelog.d/8401.misc
@@ -0,0 +1 @@ +A pair of tiny cleanups in the federation request code. diff --git a/changelog.d/8402.misc b/changelog.d/8402.misc new file mode 100644
index 0000000000..ad1804d207 --- /dev/null +++ b/changelog.d/8402.misc
@@ -0,0 +1 @@ +Add checks on startup that PostgreSQL sequences are consistent with their associated tables. diff --git a/changelog.d/8404.misc b/changelog.d/8404.misc new file mode 100644
index 0000000000..7aadded6c1 --- /dev/null +++ b/changelog.d/8404.misc
@@ -0,0 +1 @@ +Do not include appservice users when calculating the total MAU for a server. diff --git a/changelog.d/8406.feature b/changelog.d/8406.feature new file mode 100644
index 0000000000..1c6472ae7e --- /dev/null +++ b/changelog.d/8406.feature
@@ -0,0 +1 @@ +Add prometheus metrics for replication requests. diff --git a/changelog.d/8410.bugfix b/changelog.d/8410.bugfix new file mode 100644
index 0000000000..1323ddc525 --- /dev/null +++ b/changelog.d/8410.bugfix
@@ -0,0 +1 @@ +Fix a v1.20.0 regression in the `synapse_port_db` script regarding the `ui_auth_sessions_ips` table. diff --git a/changelog.d/8414.bugfix b/changelog.d/8414.bugfix new file mode 100644
index 0000000000..315876e892 --- /dev/null +++ b/changelog.d/8414.bugfix
@@ -0,0 +1 @@ +Remove unnecessary 3PID registration check when resetting password via an email address. Bug introduced in v0.34.0rc2. \ No newline at end of file diff --git a/changelog.d/8417.feature b/changelog.d/8417.feature new file mode 100644
index 0000000000..17549c3df3 --- /dev/null +++ b/changelog.d/8417.feature
@@ -0,0 +1 @@ +Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number. \ No newline at end of file diff --git a/docs/postgres.md b/docs/postgres.md
index e71a1975d8..c30cc1fd8c 100644 --- a/docs/postgres.md +++ b/docs/postgres.md
@@ -106,6 +106,17 @@ Note that the above may fail with an error about duplicate rows if corruption has already occurred, and such duplicate rows will need to be manually removed. +## Fixing inconsistent sequences error + +Synapse uses Postgres sequences to generate IDs for various tables. A sequence +and associated table can get out of sync if, for example, Synapse has been +downgraded and then upgraded again. + +To fix the issue shut down Synapse (including any and all workers) and run the +SQL command included in the error message. Once done Synapse should start +successfully. + + ## Tuning Postgres The default settings should be fine for most deployments. For larger diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh new file mode 100755
index 0000000000..3cde53f5c0 --- /dev/null +++ b/scripts-dev/complement.sh
@@ -0,0 +1,22 @@ +#! /bin/bash -eu +# This script is designed for developers who want to test their code +# against Complement. +# +# It makes a Synapse image which represents the current checkout, +# then downloads Complement and runs it with that image. + +cd "$(dirname $0)/.." + +# Build the base Synapse image from the local checkout +docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile . + +# Download Complement +wget -N https://github.com/matrix-org/complement/archive/master.tar.gz +tar -xzf master.tar.gz +cd complement-master + +# Build the Synapse image from Complement, based on the above image we just built +docker build -t complement-synapse -f dockerfiles/Synapse.Dockerfile ./dockerfiles + +# Run the tests on the resulting image! +COMPLEMENT_BASE_IMAGE=complement-synapse go test -v -count=1 ./tests diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 267faa2743..dd1ebde894 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db
@@ -146,6 +146,7 @@ IGNORED_TABLES = { # the sessions are transient anyway, so ignore them. "ui_auth_sessions", "ui_auth_sessions_credentials", + "ui_auth_sessions_ips", } diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 81134fcdf6..33feb1b2ba 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -281,7 +281,7 @@ class FederationHandler(BaseHandler): raise Exception( "Error fetching missing prev_events for %s: %s" % (event_id, e) - ) + ) from e # Update the set of things we've seen after trying to # fetch the missing stuff diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index f71bd653e8..efc68cb9b4 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py
@@ -21,8 +21,6 @@ import logging import urllib.parse from typing import Awaitable, Callable, Dict, List, Optional, Tuple -from twisted.internet.error import TimeoutError - from synapse.api.errors import ( AuthError, CodeMessageException, @@ -32,6 +30,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.config.emailconfig import ThreepidBehaviour +from synapse.http import RequestTimedOutError from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, Requester from synapse.util import json_decoder @@ -107,7 +106,7 @@ class IdentityHandler(BaseHandler): try: data = await self.http_client.get_json(url, query_params) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.info( @@ -192,7 +191,7 @@ class IdentityHandler(BaseHandler): if e.code != 404 or not use_v2: logger.error("3PID bind failed with Matrix error: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except CodeMessageException as e: data = json_decoder.decode(e.msg) # XXX WAT? @@ -299,7 +298,7 @@ class IdentityHandler(BaseHandler): else: logger.error("Failed to unbind threepid on identity server: %s", e) raise SynapseError(500, "Failed to contact identity server") - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") await self.store.remove_user_bound_threepid( @@ -470,7 +469,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") async def requestMsisdnToken( @@ -526,7 +525,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") assert self.hs.config.public_baseurl @@ -608,7 +607,7 @@ class IdentityHandler(BaseHandler): id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken", body, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning("Error contacting msisdn account_threepid_delegate: %s", e) @@ -766,7 +765,7 @@ class IdentityHandler(BaseHandler): # require or validate it. See the following for context: # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950 return data["mxid"] - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except IOError as e: logger.warning("Error from v1 identity server lookup: %s" % (e,)) @@ -793,7 +792,7 @@ class IdentityHandler(BaseHandler): "%s/_matrix/identity/v2/hash_details" % (id_server_url,), {"access_token": id_access_token}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") if not isinstance(hash_details, dict): @@ -864,7 +863,7 @@ class IdentityHandler(BaseHandler): }, headers=headers, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except Exception as e: logger.warning("Error when performing a v2 3pid lookup: %s", e) @@ -962,7 +961,7 @@ class IdentityHandler(BaseHandler): invite_config, {"Authorization": create_id_access_token_header(id_access_token)}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: if e.code != 404: @@ -979,7 +978,7 @@ class IdentityHandler(BaseHandler): data = await self.blacklisting_http_client.post_json_get_json( url, invite_config ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning( diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 8eb3638591..59b01b812c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py
@@ -16,8 +16,6 @@ import re from twisted.internet import task -from twisted.internet.defer import CancelledError -from twisted.python import failure from twisted.web.client import FileBodyProducer from synapse.api.errors import SynapseError @@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError class RequestTimedOutError(SynapseError): """Exception representing timeout of an outbound request""" - def __init__(self): - super().__init__(504, "Timed out") - - -def cancelled_to_request_timed_out_error(value, timeout): - """Turns CancelledErrors into RequestTimedOutErrors. - - For use with async.add_timeout_to_deferred - """ - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise RequestTimedOutError() - return value + def __init__(self, msg): + super().__init__(504, msg) ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$") diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4694adc400..8324632cb6 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py
@@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import urllib from io import BytesIO @@ -38,7 +37,7 @@ from zope.interface import implementer, provider from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE -from twisted.internet import defer, protocol, ssl +from twisted.internet import defer, error as twisted_error, protocol, ssl from twisted.internet.interfaces import ( IReactorPluggableNameResolver, IResolutionReceiver, @@ -46,17 +45,18 @@ from twisted.internet.interfaces import ( from twisted.internet.task import Cooperator from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.client import ( + Agent, + HTTPConnectionPool, + ResponseNeverReceived, + readBody, +) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web.iweb import IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError -from synapse.http import ( - QuieterFileBodyProducer, - cancelled_to_request_timed_out_error, - redact_uri, -) +from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags @@ -332,8 +332,6 @@ class SimpleHttpClient: RequestTimedOutError if the request times out before the headers are read """ - # A small wrapper around self.agent.request() so we can easily attach - # counters to it outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) @@ -362,15 +360,17 @@ class SimpleHttpClient: data=body_producer, headers=headers, **self._extra_treq_args - ) + ) # type: defer.Deferred + # we use our own timeout mechanism rather than treq's as a workaround # for https://twistedmatrix.com/trac/ticket/9534. request_deferred = timeout_deferred( - request_deferred, - 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, + request_deferred, 60, self.hs.get_reactor(), ) + + # turn timeouts into RequestTimedOutErrors + request_deferred.addErrback(_timeout_to_request_timed_out_error) + response = await make_deferred_yieldable(request_deferred) incoming_responses_counter.labels(method, response.code).inc() @@ -410,7 +410,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -461,7 +461,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -506,7 +506,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -538,7 +538,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -586,7 +586,7 @@ class SimpleHttpClient: Succeeds when we get a 2xx HTTP response, with the HTTP body as bytes. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -631,7 +631,7 @@ class SimpleHttpClient: headers, absolute URI of the response and HTTP response code. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -684,6 +684,18 @@ class SimpleHttpClient: ) +def _timeout_to_request_timed_out_error(f: Failure): + if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError): + # The TCP connection has its own timeout (set by the 'connectTimeout' param + # on the Agent), which raises twisted_error.TimeoutError exception. + raise RequestTimedOutError("Timeout connecting to remote server") + elif f.check(defer.TimeoutError, ResponseNeverReceived): + # this one means that we hit our overall timeout on the request + raise RequestTimedOutError("Timeout waiting for response from remote server") + + return f + + # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3c86cbc546..c23a4d7c0c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -171,7 +171,7 @@ async def _handle_json_response( d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = await make_deferred_yieldable(d) - except TimeoutError as e: + except defer.TimeoutError as e: logger.warning( "{%s} [%s] Timed out reading response - %s %s", request.txn_id, @@ -473,8 +473,6 @@ class MatrixFederationHttpClient: ) response = await request_deferred - except TimeoutError as e: - raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: @@ -657,10 +655,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. backoff_on_404 (bool): True if we should count a 404 response as @@ -706,8 +708,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -736,10 +743,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -803,10 +814,14 @@ class MatrixFederationHttpClient: args (dict|None): A dictionary used to create query strings, defaults to None. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -842,8 +857,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -867,10 +887,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -902,8 +926,13 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 332da02a8d..e32d3f43e0 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py
@@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase): `BrowserLikePolicyForHTTPS`, so unless you have special requirements you can leave this as-is. - connectTimeout (float): The amount of time that this Agent will wait - for the peer to accept a connection. + connectTimeout (Optional[float]): The amount of time that this Agent will wait + for the peer to accept a connection, in seconds. If 'None', + HostnameEndpoint's default (30s) will be used. + + This is used for connections to both proxies and destination servers. bindAddress (bytes): The local address for client sockets to bind to. @@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase): Returns: Deferred[IResponse]: completes when the header of the response has been received (regardless of the response status code). + + Can fail with: + SchemeNotSupported: if the uri is not http or https + + twisted.internet.error.TimeoutError if the server we are connecting + to (proxy or destination) does not accept a connection before + connectTimeout. + + ... other things too. """ uri = uri.strip() if not _VALID_URI.match(uri): diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 2e282d9d67..ca0c774cc5 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py
@@ -65,6 +65,11 @@ except Exception: return None +# a hook which can be set during testing to assert that we aren't abusing logcontexts. +def logcontext_error(msg: str): + logger.warning(msg) + + # get an id for the current thread. # # threading.get_ident doesn't actually return an OS-level tid, and annoyingly, @@ -330,10 +335,9 @@ class LoggingContext: """Enters this logging context into thread local storage""" old_context = set_current_context(self) if self.previous_context != old_context: - logger.warning( - "Expected previous context %r, found %r", - self.previous_context, - old_context, + logcontext_error( + "Expected previous context %r, found %r" + % (self.previous_context, old_context,) ) return self @@ -346,10 +350,10 @@ class LoggingContext: current = set_current_context(self.previous_context) if current is not self: if current is SENTINEL_CONTEXT: - logger.warning("Expected logging context %s was lost", self) + logcontext_error("Expected logging context %s was lost" % (self,)) else: - logger.warning( - "Expected logging context %s but found %s", self, current + logcontext_error( + "Expected logging context %s but found %s" % (self, current) ) # the fact that we are here suggests that the caller thinks that everything @@ -387,16 +391,16 @@ class LoggingContext: support getrusuage. """ if get_thread_id() != self.main_thread: - logger.warning("Started logcontext %s on different thread", self) + logcontext_error("Started logcontext %s on different thread" % (self,)) return if self.finished: - logger.warning("Re-starting finished log context %s", self) + logcontext_error("Re-starting finished log context %s" % (self,)) # If we haven't already started record the thread resource usage so # far if self.usage_start: - logger.warning("Re-starting already-active log context %s", self) + logcontext_error("Re-starting already-active log context %s" % (self,)) else: self.usage_start = rusage @@ -414,7 +418,7 @@ class LoggingContext: try: if get_thread_id() != self.main_thread: - logger.warning("Stopped logcontext %s on different thread", self) + logcontext_error("Stopped logcontext %s on different thread" % (self,)) return if not rusage: @@ -422,9 +426,9 @@ class LoggingContext: # Record the cpu used since we started if not self.usage_start: - logger.warning( - "Called stop on logcontext %s without recording a start rusage", - self, + logcontext_error( + "Called stop on logcontext %s without recording a start rusage" + % (self,) ) return @@ -584,14 +588,13 @@ class PreserveLoggingContext: if context != self._new_context: if not context: - logger.warning( - "Expected logging context %s was lost", self._new_context + logcontext_error( + "Expected logging context %s was lost" % (self._new_context,) ) else: - logger.warning( - "Expected logging context %s but found %s", - self._new_context, - context, + logcontext_error( + "Expected logging context %s but found %s" + % (self._new_context, context,) ) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index b448da6710..64edadb624 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -20,18 +20,28 @@ import urllib from inspect import signature from typing import Dict, List, Tuple -from synapse.api.errors import ( - CodeMessageException, - HttpResponseException, - RequestSendFailed, - SynapseError, -) +from prometheus_client import Counter, Gauge + +from synapse.api.errors import HttpResponseException, SynapseError +from synapse.http import RequestTimedOutError from synapse.logging.opentracing import inject_active_span_byte_dict, trace from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) +_pending_outgoing_requests = Gauge( + "synapse_pending_outgoing_replication_requests", + "Number of active outgoing replication requests, by replication method name", + ["name"], +) + +_outgoing_request_counter = Counter( + "synapse_outgoing_replication_requests", + "Number of outgoing replication requests, by replication method name and result", + ["name", "code"], +) + class ReplicationEndpoint(metaclass=abc.ABCMeta): """Helper base class for defining new replication HTTP endpoints. @@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): instance_map = hs.config.worker.instance_map + outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) + @trace(opname="outgoing_replication_request") + @outgoing_gauge.track_inprogress() async def send_request(instance_name="master", **kwargs): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") @@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): try: result = await request_func(uri, data, headers=headers) break - except CodeMessageException as e: - if e.code != 504 or not cls.RETRY_ON_TIMEOUT: + except RequestTimedOutError: + if not cls.RETRY_ON_TIMEOUT: raise - logger.warning("%s request timed out", cls.NAME) + logger.warning("%s request timed out; retrying", cls.NAME) # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. await clock.sleep(1) except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And + # on the main process that we should send to the client. (And # importantly, not stack traces everywhere) + _outgoing_request_counter.labels(cls.NAME, e.code).inc() raise e.to_synapse_error() - except RequestSendFailed as e: - raise SynapseError(502, "Failed to talk to master") from e + except Exception as e: + _outgoing_request_counter.labels(cls.NAME, "ERR").inc() + raise SynapseError(502, "Failed to talk to main process") from e + _outgoing_request_counter.labels(cls.NAME, 200).inc() return result return send_request diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 9875669789..1320aad8f6 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py
@@ -98,13 +98,6 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - if not check_3pid_allowed(self.hs, "email", email): - raise SynapseError( - 403, - "Your email is not authorized on this server", - Codes.THREEPID_DENIED, - ) - if next_link: # Raise if the provided next_link value isn't valid assert_valid_next_link(self.hs, next_link) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index e0cedd1aac..e93aad33cd 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): """ def _count_users(txn): - sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" + # Exclude app service users + sql = """ + SELECT COALESCE(count(*), 0) + FROM monthly_active_users + LEFT JOIN users + ON monthly_active_users.user_id=users.name + WHERE (users.appservice_id IS NULL OR users.appservice_id = ''); + """ txn.execute(sql) (count,) = txn.fetchone() return count diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 2ed696cc14..f949308994 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -41,6 +41,9 @@ class RegistrationWorkerStore(SQLBaseStore): self.config = hs.config self.clock = hs.get_clock() + # Note: we don't check this sequence for consistency as we'd have to + # call `find_max_generated_user_id_localpart` each time, which is + # expensive if there are many entries. self._user_id_seq = build_sequence_generator( database.engine, find_max_generated_user_id_localpart, "user_id_seq", ) diff --git a/synapse/storage/databases/main/schema/delta/56/event_labels.sql b/synapse/storage/databases/main/schema/delta/56/event_labels.sql
index 5e29c1da19..ccf287971c 100644 --- a/synapse/storage/databases/main/schema/delta/56/event_labels.sql +++ b/synapse/storage/databases/main/schema/delta/56/event_labels.sql
@@ -13,7 +13,7 @@ * limitations under the License. */ --- room_id and topoligical_ordering are denormalised from the events table in order to +-- room_id and topological_ordering are denormalised from the events table in order to -- make the index work. CREATE TABLE IF NOT EXISTS event_labels ( event_id TEXT, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bec3780a32..989f0cbc9d 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -99,6 +99,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self._state_group_seq_gen = build_sequence_generator( self.database_engine, get_max_state_group_txn, "state_group_id_seq" ) + self._state_group_seq_gen.check_consistency( + db_conn, table="state_groups", id_column="id" + ) @cached(max_entries=10000, iterable=True) async def get_state_group_delta(self, state_group): diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4269eaf918..4fd7573e26 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -258,6 +258,11 @@ class MultiWriterIdGenerator: self._sequence_gen = PostgresSequenceGenerator(sequence_name) + # We check that the table and sequence haven't diverged. + self._sequence_gen.check_consistency( + db_conn, table=table, id_column=id_column, positive=positive + ) + # This goes and fills out the above state from the database. self._load_current_ids(db_conn, table, instance_column, id_column) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index ffc1894748..2dd95e2709 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py
@@ -13,11 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc +import logging import threading from typing import Callable, List, Optional -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.engines import ( + BaseDatabaseEngine, + IncorrectDatabaseSetup, + PostgresEngine, +) +from synapse.storage.types import Connection, Cursor + +logger = logging.getLogger(__name__) + + +_INCONSISTENT_SEQUENCE_ERROR = """ +Postgres sequence '%(seq)s' is inconsistent with associated +table '%(table)s'. This can happen if Synapse has been downgraded and +then upgraded again, or due to a bad migration. + +To fix this error, shut down Synapse (including any and all workers) +and run the following SQL: + + SELECT setval('%(seq)s', ( + %(max_id_sql)s + )); + +See docs/postgres.md for more information. +""" class SequenceGenerator(metaclass=abc.ABCMeta): @@ -28,6 +51,19 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """Gets the next ID in the sequence""" ... + @abc.abstractmethod + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + """Should be called during start up to test that the current value of + the sequence is greater than or equal to the maximum ID in the table. + + This is to handle various cases where the sequence value can get out + of sync with the table, e.g. if Synapse gets rolled back to a previous + version and the rolled forwards again. + """ + ... + class PostgresSequenceGenerator(SequenceGenerator): """An implementation of SequenceGenerator which uses a postgres sequence""" @@ -45,6 +81,50 @@ class PostgresSequenceGenerator(SequenceGenerator): ) return [i for (i,) in txn] + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + txn = db_conn.cursor() + + # First we get the current max ID from the table. + table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % { + "id": id_column, + "table": table, + "agg": "MAX" if positive else "-MIN", + } + + txn.execute(table_sql) + row = txn.fetchone() + if not row: + # Table is empty, so nothing to do. + txn.close() + return + + # Now we fetch the current value from the sequence and compare with the + # above. + max_stream_id = row[0] + txn.execute( + "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} + ) + last_value, is_called = txn.fetchone() + txn.close() + + # If `is_called` is False then `last_value` is actually the value that + # will be generated next, so we decrement to get the true "last value". + if not is_called: + last_value -= 1 + + if max_stream_id > last_value: + logger.warning( + "Postgres sequence %s is behind table %s: %d < %d", + last_value, + max_stream_id, + ) + raise IncorrectDatabaseSetup( + _INCONSISTENT_SEQUENCE_ERROR + % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} + ) + GetFirstCallbackType = Callable[[Cursor], int] @@ -81,6 +161,12 @@ class LocalSequenceGenerator(SequenceGenerator): self._current_max_id += 1 return self._current_max_id + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + # There is nothing to do for in memory sequences + pass + def build_sequence_generator( database_engine: BaseDatabaseEngine, diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 67ce9a5f39..382f0cf3f0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py
@@ -449,18 +449,8 @@ class ReadWriteLock: R = TypeVar("R") -def _cancelled_to_timed_out_error(value: R, timeout: float) -> R: - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise defer.TimeoutError(timeout, "Deferred") - return value - - def timeout_deferred( - deferred: defer.Deferred, - timeout: float, - reactor: IReactorTime, - on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None, + deferred: defer.Deferred, timeout: float, reactor: IReactorTime, ) -> defer.Deferred: """The in built twisted `Deferred.addTimeout` fails to time out deferreds that have a canceller that throws exceptions. This method creates a new @@ -469,27 +459,21 @@ def timeout_deferred( (See https://twistedmatrix.com/trac/ticket/9534) - NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred. + + NOTE: the TimeoutError raised by the resultant deferred is + twisted.internet.defer.TimeoutError, which is *different* to the built-in + TimeoutError, as well as various other TimeoutErrors you might have imported. Args: deferred: The Deferred to potentially timeout. timeout: Timeout in seconds reactor: The twisted reactor to use - on_timeout_cancel: A callable which is called immediately - after the deferred times out, and not if this deferred is - otherwise cancelled before the timeout. - It takes an arbitrary value, which is the value of the deferred at - that exact point in time (probably a CancelledError Failure), and - the timeout. - - The default callable (if none is provided) will translate a - CancelledError Failure into a defer.TimeoutError. Returns: - A new Deferred. + A new Deferred, which will errback with defer.TimeoutError on timeout. """ - new_d = defer.Deferred() timed_out = [False] @@ -502,18 +486,23 @@ def timeout_deferred( except: # noqa: E722, if we throw any exception it'll break time outs logger.exception("Canceller failed during timeout") + # the cancel() call should have set off a chain of errbacks which + # will have errbacked new_d, but in case it hasn't, errback it now. + if not new_d.called: - new_d.errback(defer.TimeoutError(timeout, "Deferred")) + new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,))) delayed_call = reactor.callLater(timeout, time_it_out) - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) + def convert_cancelled(value: failure.Failure): + # if the orgininal deferred was cancelled, and our timeout has fired, then + # the reason it was cancelled was due to our timeout. Turn the CancelledError + # into a TimeoutError. + if timed_out[0] and value.check(CancelledError): + raise defer.TimeoutError("Timed out after %gs" % (timeout,)) return value - deferred.addBoth(convert_cancelled) + deferred.addErrback(convert_cancelled) def cancel_timeout(result): # stop the pending call to cancel the deferred if it's been fired diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 5cf408f21f..8ff1460c0d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py
@@ -41,6 +41,7 @@ from synapse.storage.keys import FetchKeyResult from tests import unittest from tests.test_utils import make_awaitable +from tests.unittest import logcontext_clean class MockPerspectiveServer: @@ -67,6 +68,7 @@ class MockPerspectiveServer: signedjson.sign.sign_json(res, self.server_name, self.key) +@logcontext_clean class KeyringTestCase(unittest.HomeserverTestCase): def check_context(self, val, expected): self.assertEquals(getattr(current_context(), "request", None), expected) @@ -309,6 +311,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): mock_fetcher2.get_keys.assert_called_once() +@logcontext_clean class ServerKeyFetcherTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): self.http_client = Mock() diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index 5604af3795..212484a7fe 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py
@@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase): r = self.successResultOf(d) self.assertEqual(r.code, 200) - def test_client_headers_no_body(self): + @parameterized.expand(["get_json", "post_json", "delete_json", "put_json"]) + def test_timeout_reading_body(self, method_name: str): """ If the HTTP request is connected, but gets no response before being - timed out, it'll give a ResponseNeverReceived. + timed out, it'll give a RequestSendFailed with can_retry. """ - d = defer.ensureDeferred( - self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) - ) + method = getattr(self.cl, method_name) + d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000)) self.pump() @@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase): self.reactor.advance(10.5) f = self.failureResultOf(d) - self.assertIsInstance(f.value, TimeoutError) + self.assertIsInstance(f.value, RequestSendFailed) + self.assertTrue(f.value.can_retry) + self.assertIsInstance(f.value.inner_exception, defer.TimeoutError) def test_client_requires_trailing_slashes(self): """ diff --git a/tests/http/test_simple_client.py b/tests/http/test_simple_client.py new file mode 100644
index 0000000000..a1cf0862d4 --- /dev/null +++ b/tests/http/test_simple_client.py
@@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from mock import Mock + +from netaddr import IPSet + +from twisted.internet import defer +from twisted.internet.error import DNSLookupError + +from synapse.http import RequestTimedOutError +from synapse.http.client import SimpleHttpClient +from synapse.server import HomeServer + +from tests.unittest import HomeserverTestCase + + +class SimpleHttpClientTests(HomeserverTestCase): + def prepare(self, reactor, clock, hs: "HomeServer"): + # Add a DNS entry for a test server + self.reactor.lookups["testserv"] = "1.2.3.4" + + self.cl = hs.get_simple_http_client() + + def test_dns_error(self): + """ + If the DNS lookup returns an error, it will bubble up. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar")) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_connection_refused(self): + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8008) + e = Exception("go away") + factory.clientConnectionFailed(None, e) + self.pump(0.5) + + f = self.failureResultOf(d) + + self.assertIs(f.value, e) + + def test_client_never_connect(self): + """ + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError or TimeoutError. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], "1.2.3.4") + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertNoResult(d) + + # Push by enough to time it out + self.reactor.advance(120) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestTimedOutError) + + def test_client_connect_no_response(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], "1.2.3.4") + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertNoResult(d) + + # Push by enough to time it out + self.reactor.advance(120) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestTimedOutError) + + def test_client_ip_range_blacklist(self): + """Ensure that Synapse does not try to connect to blacklisted IPs""" + + # Add some DNS entries we'll blacklist + self.reactor.lookups["internal"] = "127.0.0.1" + self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337" + ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"]) + + cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist) + + # Try making a GET request to a blacklisted IPv4 address + # ------------------------------------------------------ + # Make the request + d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar")) + self.pump(1) + + # Check that it was unable to resolve the address + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 0) + + self.failureResultOf(d, DNSLookupError) + + # Try making a POST request to a blacklisted IPv6 address + # ------------------------------------------------------- + # Make the request + d = defer.ensureDeferred( + cl.post_json_get_json("http://internalv6:8008/foo/bar", {}) + ) + + # Move the reactor forwards + self.pump(1) + + # Check that it was unable to resolve the address + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 0) + + # Check that it was due to a blacklisted DNS lookup + self.failureResultOf(d, DNSLookupError) + + # Try making a GET request to a non-blacklisted IPv4 address + # ---------------------------------------------------------- + # Make the request + d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar")) + + # Nothing has happened yet + self.assertNoResult(d) + + # Move the reactor forwards + self.pump(1) + + # Check that it was able to resolve the address + clients = self.reactor.tcpClients + self.assertNotEqual(len(clients), 0) + + # Connection will still fail as this IP address does not resolve to anything + self.failureResultOf(d, RequestTimedOutError) diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py
index 93f899d861..ae2cd67f35 100644 --- a/tests/rest/client/v2_alpha/test_account.py +++ b/tests/rest/client/v2_alpha/test_account.py
@@ -732,6 +732,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase): @override_config({"next_link_domain_whitelist": ["example.com", "example.org"]}) def test_next_link_domain_whitelist(self): """Tests next_link parameters must fit the whitelist if provided""" + + # Ensure not providing a next_link parameter still works + self._request_token( + "something@example.com", "some_secret", next_link=None, expect_code=200, + ) + self._request_token( "something@example.com", "some_secret", diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py
index d4ff55fbff..4558bee7be 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py
@@ -12,9 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from synapse.storage.database import DatabasePool +from synapse.storage.engines import IncorrectDatabaseSetup from synapse.storage.util.id_generators import MultiWriterIdGenerator from tests.unittest import HomeserverTestCase @@ -59,7 +58,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): writers=writers, ) - return self.get_success(self.db_pool.runWithConnection(_create)) + return self.get_success_or_raise(self.db_pool.runWithConnection(_create)) def _insert_rows(self, instance_name: str, number: int): """Insert N rows as the given instance, inserting with stream IDs pulled @@ -411,6 +410,23 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) + def test_sequence_consistency(self): + """Test that we error out if the table and sequence diverges. + """ + + # Prefill with some rows + self._insert_row_with_id("master", 3) + + # Now we add a row *without* updating the stream ID + def _insert(txn): + txn.execute("INSERT INTO foobar VALUES (26, 'master')") + + self.get_success(self.db_pool.runInteraction("_insert", _insert)) + + # Creating the ID gen should error + with self.assertRaises(IncorrectDatabaseSetup): + self._create_id_generator("first") + class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs. diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 643072bbaf..8d97b6d4cd 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py
@@ -137,6 +137,21 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase): count = self.get_success(self.store.get_monthly_active_count()) self.assertEqual(count, 1) + def test_appservice_user_not_counted_in_mau(self): + self.get_success( + self.store.register_user( + user_id="@appservice_user:server", appservice_id="wibble" + ) + ) + count = self.get_success(self.store.get_monthly_active_count()) + self.assertEqual(count, 0) + + d = self.store.upsert_monthly_active_user("@appservice_user:server") + self.get_success(d) + + count = self.get_success(self.store.get_monthly_active_count()) + self.assertEqual(count, 0) + def test_user_last_seen_monthly_active(self): user_id1 = "@user1:server" user_id2 = "@user2:server" @@ -383,7 +398,7 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase): self.get_success(self.store.upsert_monthly_active_user(appservice2_user1)) count = self.get_success(self.store.get_monthly_active_count()) - self.assertEqual(count, 4) + self.assertEqual(count, 1) d = self.store.get_monthly_active_count_by_service() result = self.get_success(d) diff --git a/tests/unittest.py b/tests/unittest.py
index dabf69cff4..e654c0442d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py
@@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import gc import hashlib import hmac @@ -23,11 +22,12 @@ import logging import time from typing import Optional, Tuple, Type, TypeVar, Union -from mock import Mock +from mock import Mock, patch from canonicaljson import json from twisted.internet.defer import Deferred, ensureDeferred, succeed +from twisted.python.failure import Failure from twisted.python.threadpool import ThreadPool from twisted.trial import unittest @@ -169,6 +169,19 @@ def INFO(target): return target +def logcontext_clean(target): + """A decorator which marks the TestCase or method as 'logcontext_clean' + + ... ie, any logcontext errors should cause a test failure + """ + + def logcontext_error(msg): + raise AssertionError("logcontext error: %s" % (msg)) + + patcher = patch("synapse.logging.context.logcontext_error", new=logcontext_error) + return patcher(target) + + class HomeserverTestCase(TestCase): """ A base TestCase that reduces boilerplate for HomeServer-using test cases. @@ -463,6 +476,35 @@ class HomeserverTestCase(TestCase): self.pump() return self.failureResultOf(d, exc) + def get_success_or_raise(self, d, by=0.0): + """Drive deferred to completion and return result or raise exception + on failure. + """ + + if inspect.isawaitable(d): + deferred = ensureDeferred(d) + if not isinstance(deferred, Deferred): + return d + + results = [] # type: list + deferred.addBoth(results.append) + + self.pump(by=by) + + if not results: + self.fail( + "Success result expected on {!r}, found no result instead".format( + deferred + ) + ) + + result = results[0] + + if isinstance(result, Failure): + result.raiseException() + + return result + def register_user(self, username, password, admin=False): """ Register a user. Requires the Admin API be registered.