From 5e3ca12b158b4abefe2e3a54259ab5255dca93d8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 28 Sep 2020 17:58:33 +0100 Subject: Create a mechanism for marking tests "logcontext clean" (#8399) --- tests/crypto/test_keyring.py | 3 +++ tests/unittest.py | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) (limited to 'tests') diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 5cf408f21f..8ff1460c0d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -41,6 +41,7 @@ from synapse.storage.keys import FetchKeyResult from tests import unittest from tests.test_utils import make_awaitable +from tests.unittest import logcontext_clean class MockPerspectiveServer: @@ -67,6 +68,7 @@ class MockPerspectiveServer: signedjson.sign.sign_json(res, self.server_name, self.key) +@logcontext_clean class KeyringTestCase(unittest.HomeserverTestCase): def check_context(self, val, expected): self.assertEquals(getattr(current_context(), "request", None), expected) @@ -309,6 +311,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): mock_fetcher2.get_keys.assert_called_once() +@logcontext_clean class ServerKeyFetcherTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): self.http_client = Mock() diff --git a/tests/unittest.py b/tests/unittest.py index dabf69cff4..bbe50c3851 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -23,7 +23,7 @@ import logging import time from typing import Optional, Tuple, Type, TypeVar, Union -from mock import Mock +from mock import Mock, patch from canonicaljson import json @@ -169,6 +169,19 @@ def INFO(target): return target +def logcontext_clean(target): + """A decorator which marks the TestCase or method as 'logcontext_clean' + + ... ie, any logcontext errors should cause a test failure + """ + + def logcontext_error(msg): + raise AssertionError("logcontext error: %s" % (msg)) + + patcher = patch("synapse.logging.context.logcontext_error", new=logcontext_error) + return patcher(target) + + class HomeserverTestCase(TestCase): """ A base TestCase that reduces boilerplate for HomeServer-using test cases. -- cgit 1.5.1 From bd380d942fdf91cf1214d6859f2bc97d12a92ab4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 28 Sep 2020 18:00:30 +0100 Subject: Add checks for postgres sequence consistency (#8402) --- changelog.d/8402.misc | 1 + docs/postgres.md | 11 ++++ synapse/storage/databases/main/registration.py | 3 + synapse/storage/databases/state/store.py | 3 + synapse/storage/util/id_generators.py | 5 ++ synapse/storage/util/sequence.py | 90 +++++++++++++++++++++++++- tests/storage/test_id_generators.py | 22 ++++++- tests/unittest.py | 31 ++++++++- 8 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 changelog.d/8402.misc (limited to 'tests') diff --git a/changelog.d/8402.misc b/changelog.d/8402.misc new file mode 100644 index 0000000000..ad1804d207 --- /dev/null +++ b/changelog.d/8402.misc @@ -0,0 +1 @@ +Add checks on startup that PostgreSQL sequences are consistent with their associated tables. diff --git a/docs/postgres.md b/docs/postgres.md index e71a1975d8..c30cc1fd8c 100644 --- a/docs/postgres.md +++ b/docs/postgres.md @@ -106,6 +106,17 @@ Note that the above may fail with an error about duplicate rows if corruption has already occurred, and such duplicate rows will need to be manually removed. +## Fixing inconsistent sequences error + +Synapse uses Postgres sequences to generate IDs for various tables. A sequence +and associated table can get out of sync if, for example, Synapse has been +downgraded and then upgraded again. + +To fix the issue shut down Synapse (including any and all workers) and run the +SQL command included in the error message. Once done Synapse should start +successfully. + + ## Tuning Postgres The default settings should be fine for most deployments. For larger diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 48ce7ecd16..a83df7759d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -41,6 +41,9 @@ class RegistrationWorkerStore(SQLBaseStore): self.config = hs.config self.clock = hs.get_clock() + # Note: we don't check this sequence for consistency as we'd have to + # call `find_max_generated_user_id_localpart` each time, which is + # expensive if there are many entries. self._user_id_seq = build_sequence_generator( database.engine, find_max_generated_user_id_localpart, "user_id_seq", ) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index bec3780a32..989f0cbc9d 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -99,6 +99,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self._state_group_seq_gen = build_sequence_generator( self.database_engine, get_max_state_group_txn, "state_group_id_seq" ) + self._state_group_seq_gen.check_consistency( + db_conn, table="state_groups", id_column="id" + ) @cached(max_entries=10000, iterable=True) async def get_state_group_delta(self, state_group): diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 4269eaf918..4fd7573e26 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -258,6 +258,11 @@ class MultiWriterIdGenerator: self._sequence_gen = PostgresSequenceGenerator(sequence_name) + # We check that the table and sequence haven't diverged. + self._sequence_gen.check_consistency( + db_conn, table=table, id_column=id_column, positive=positive + ) + # This goes and fills out the above state from the database. self._load_current_ids(db_conn, table, instance_column, id_column) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index ffc1894748..2dd95e2709 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -13,11 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc +import logging import threading from typing import Callable, List, Optional -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.engines import ( + BaseDatabaseEngine, + IncorrectDatabaseSetup, + PostgresEngine, +) +from synapse.storage.types import Connection, Cursor + +logger = logging.getLogger(__name__) + + +_INCONSISTENT_SEQUENCE_ERROR = """ +Postgres sequence '%(seq)s' is inconsistent with associated +table '%(table)s'. This can happen if Synapse has been downgraded and +then upgraded again, or due to a bad migration. + +To fix this error, shut down Synapse (including any and all workers) +and run the following SQL: + + SELECT setval('%(seq)s', ( + %(max_id_sql)s + )); + +See docs/postgres.md for more information. +""" class SequenceGenerator(metaclass=abc.ABCMeta): @@ -28,6 +51,19 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """Gets the next ID in the sequence""" ... + @abc.abstractmethod + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + """Should be called during start up to test that the current value of + the sequence is greater than or equal to the maximum ID in the table. + + This is to handle various cases where the sequence value can get out + of sync with the table, e.g. if Synapse gets rolled back to a previous + version and the rolled forwards again. + """ + ... + class PostgresSequenceGenerator(SequenceGenerator): """An implementation of SequenceGenerator which uses a postgres sequence""" @@ -45,6 +81,50 @@ class PostgresSequenceGenerator(SequenceGenerator): ) return [i for (i,) in txn] + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + txn = db_conn.cursor() + + # First we get the current max ID from the table. + table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % { + "id": id_column, + "table": table, + "agg": "MAX" if positive else "-MIN", + } + + txn.execute(table_sql) + row = txn.fetchone() + if not row: + # Table is empty, so nothing to do. + txn.close() + return + + # Now we fetch the current value from the sequence and compare with the + # above. + max_stream_id = row[0] + txn.execute( + "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} + ) + last_value, is_called = txn.fetchone() + txn.close() + + # If `is_called` is False then `last_value` is actually the value that + # will be generated next, so we decrement to get the true "last value". + if not is_called: + last_value -= 1 + + if max_stream_id > last_value: + logger.warning( + "Postgres sequence %s is behind table %s: %d < %d", + last_value, + max_stream_id, + ) + raise IncorrectDatabaseSetup( + _INCONSISTENT_SEQUENCE_ERROR + % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} + ) + GetFirstCallbackType = Callable[[Cursor], int] @@ -81,6 +161,12 @@ class LocalSequenceGenerator(SequenceGenerator): self._current_max_id += 1 return self._current_max_id + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + # There is nothing to do for in memory sequences + pass + def build_sequence_generator( database_engine: BaseDatabaseEngine, diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index d4ff55fbff..4558bee7be 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -12,9 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from synapse.storage.database import DatabasePool +from synapse.storage.engines import IncorrectDatabaseSetup from synapse.storage.util.id_generators import MultiWriterIdGenerator from tests.unittest import HomeserverTestCase @@ -59,7 +58,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): writers=writers, ) - return self.get_success(self.db_pool.runWithConnection(_create)) + return self.get_success_or_raise(self.db_pool.runWithConnection(_create)) def _insert_rows(self, instance_name: str, number: int): """Insert N rows as the given instance, inserting with stream IDs pulled @@ -411,6 +410,23 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) + def test_sequence_consistency(self): + """Test that we error out if the table and sequence diverges. + """ + + # Prefill with some rows + self._insert_row_with_id("master", 3) + + # Now we add a row *without* updating the stream ID + def _insert(txn): + txn.execute("INSERT INTO foobar VALUES (26, 'master')") + + self.get_success(self.db_pool.runInteraction("_insert", _insert)) + + # Creating the ID gen should error + with self.assertRaises(IncorrectDatabaseSetup): + self._create_id_generator("first") + class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs. diff --git a/tests/unittest.py b/tests/unittest.py index bbe50c3851..e654c0442d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import gc import hashlib import hmac @@ -28,6 +27,7 @@ from mock import Mock, patch from canonicaljson import json from twisted.internet.defer import Deferred, ensureDeferred, succeed +from twisted.python.failure import Failure from twisted.python.threadpool import ThreadPool from twisted.trial import unittest @@ -476,6 +476,35 @@ class HomeserverTestCase(TestCase): self.pump() return self.failureResultOf(d, exc) + def get_success_or_raise(self, d, by=0.0): + """Drive deferred to completion and return result or raise exception + on failure. + """ + + if inspect.isawaitable(d): + deferred = ensureDeferred(d) + if not isinstance(deferred, Deferred): + return d + + results = [] # type: list + deferred.addBoth(results.append) + + self.pump(by=by) + + if not results: + self.fail( + "Success result expected on {!r}, found no result instead".format( + deferred + ) + ) + + result = results[0] + + if isinstance(result, Failure): + result.raiseException() + + return result + def register_user(self, username, password, admin=False): """ Register a user. Requires the Admin API be registered. -- cgit 1.5.1 From 1c262431f9bf768d106bf79a568479fa5a0784a1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:29:21 +0100 Subject: Fix handling of connection timeouts in outgoing http requests (#8400) * Remove `on_timeout_cancel` from `timeout_deferred` The `on_timeout_cancel` param to `timeout_deferred` wasn't always called on a timeout (in particular if the canceller raised an exception), so it was unreliable. It was also only used in one place, and to be honest it's easier to do what it does a different way. * Fix handling of connection timeouts in outgoing http requests Turns out that if we get a timeout during connection, then a different exception is raised, which wasn't always handled correctly. To fix it, catch the exception in SimpleHttpClient and turn it into a RequestTimedOutError (which is already a documented exception). Also add a description to RequestTimedOutError so that we can see which stage it failed at. * Fix incorrect handling of timeouts reading federation responses This was trapping the wrong sort of TimeoutError, so was never being hit. The effect was relatively minor, but we should fix this so that it does the expected thing. * Fix inconsistent handling of `timeout` param between methods `get_json`, `put_json` and `delete_json` were applying a different timeout to the response body to `post_json`; bring them in line and test. Co-authored-by: Patrick Cloke Co-authored-by: Erik Johnston --- changelog.d/8400.bugfix | 1 + synapse/handlers/identity.py | 25 +++-- synapse/http/__init__.py | 17 +--- synapse/http/client.py | 54 ++++++---- synapse/http/matrixfederationclient.py | 55 +++++++--- synapse/http/proxyagent.py | 16 ++- synapse/util/async_helpers.py | 47 ++++----- tests/http/test_fedclient.py | 14 +-- tests/http/test_simple_client.py | 180 +++++++++++++++++++++++++++++++++ 9 files changed, 311 insertions(+), 98 deletions(-) create mode 100644 changelog.d/8400.bugfix create mode 100644 tests/http/test_simple_client.py (limited to 'tests') diff --git a/changelog.d/8400.bugfix b/changelog.d/8400.bugfix new file mode 100644 index 0000000000..835658ba5e --- /dev/null +++ b/changelog.d/8400.bugfix @@ -0,0 +1 @@ +Fix incorrect handling of timeouts on outgoing HTTP requests. diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index ab15570f7a..bc3e9607ca 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -21,8 +21,6 @@ import logging import urllib.parse from typing import Awaitable, Callable, Dict, List, Optional, Tuple -from twisted.internet.error import TimeoutError - from synapse.api.errors import ( CodeMessageException, Codes, @@ -30,6 +28,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.config.emailconfig import ThreepidBehaviour +from synapse.http import RequestTimedOutError from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, Requester from synapse.util import json_decoder @@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler): try: data = await self.http_client.get_json(url, query_params) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.info( @@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler): if e.code != 404 or not use_v2: logger.error("3PID bind failed with Matrix error: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except CodeMessageException as e: data = json_decoder.decode(e.msg) # XXX WAT? @@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler): else: logger.error("Failed to unbind threepid on identity server: %s", e) raise SynapseError(500, "Failed to contact identity server") - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") await self.store.remove_user_bound_threepid( @@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") async def requestMsisdnToken( @@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") assert self.hs.config.public_baseurl @@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler): id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken", body, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning("Error contacting msisdn account_threepid_delegate: %s", e) @@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler): # require or validate it. See the following for context: # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950 return data["mxid"] - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except IOError as e: logger.warning("Error from v1 identity server lookup: %s" % (e,)) @@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler): "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server), {"access_token": id_access_token}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") if not isinstance(hash_details, dict): @@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler): }, headers=headers, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except Exception as e: logger.warning("Error when performing a v2 3pid lookup: %s", e) @@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler): invite_config, {"Authorization": create_id_access_token_header(id_access_token)}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: if e.code != 404: @@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler): data = await self.blacklisting_http_client.post_json_get_json( url, invite_config ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning( diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 8eb3638591..59b01b812c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -16,8 +16,6 @@ import re from twisted.internet import task -from twisted.internet.defer import CancelledError -from twisted.python import failure from twisted.web.client import FileBodyProducer from synapse.api.errors import SynapseError @@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError class RequestTimedOutError(SynapseError): """Exception representing timeout of an outbound request""" - def __init__(self): - super().__init__(504, "Timed out") - - -def cancelled_to_request_timed_out_error(value, timeout): - """Turns CancelledErrors into RequestTimedOutErrors. - - For use with async.add_timeout_to_deferred - """ - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise RequestTimedOutError() - return value + def __init__(self, msg): + super().__init__(504, msg) ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$") diff --git a/synapse/http/client.py b/synapse/http/client.py index 4694adc400..8324632cb6 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import urllib from io import BytesIO @@ -38,7 +37,7 @@ from zope.interface import implementer, provider from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE -from twisted.internet import defer, protocol, ssl +from twisted.internet import defer, error as twisted_error, protocol, ssl from twisted.internet.interfaces import ( IReactorPluggableNameResolver, IResolutionReceiver, @@ -46,17 +45,18 @@ from twisted.internet.interfaces import ( from twisted.internet.task import Cooperator from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.client import ( + Agent, + HTTPConnectionPool, + ResponseNeverReceived, + readBody, +) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web.iweb import IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError -from synapse.http import ( - QuieterFileBodyProducer, - cancelled_to_request_timed_out_error, - redact_uri, -) +from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags @@ -332,8 +332,6 @@ class SimpleHttpClient: RequestTimedOutError if the request times out before the headers are read """ - # A small wrapper around self.agent.request() so we can easily attach - # counters to it outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) @@ -362,15 +360,17 @@ class SimpleHttpClient: data=body_producer, headers=headers, **self._extra_treq_args - ) + ) # type: defer.Deferred + # we use our own timeout mechanism rather than treq's as a workaround # for https://twistedmatrix.com/trac/ticket/9534. request_deferred = timeout_deferred( - request_deferred, - 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, + request_deferred, 60, self.hs.get_reactor(), ) + + # turn timeouts into RequestTimedOutErrors + request_deferred.addErrback(_timeout_to_request_timed_out_error) + response = await make_deferred_yieldable(request_deferred) incoming_responses_counter.labels(method, response.code).inc() @@ -410,7 +410,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -461,7 +461,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -506,7 +506,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -538,7 +538,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -586,7 +586,7 @@ class SimpleHttpClient: Succeeds when we get a 2xx HTTP response, with the HTTP body as bytes. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -631,7 +631,7 @@ class SimpleHttpClient: headers, absolute URI of the response and HTTP response code. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -684,6 +684,18 @@ class SimpleHttpClient: ) +def _timeout_to_request_timed_out_error(f: Failure): + if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError): + # The TCP connection has its own timeout (set by the 'connectTimeout' param + # on the Agent), which raises twisted_error.TimeoutError exception. + raise RequestTimedOutError("Timeout connecting to remote server") + elif f.check(defer.TimeoutError, ResponseNeverReceived): + # this one means that we hit our overall timeout on the request + raise RequestTimedOutError("Timeout waiting for response from remote server") + + return f + + # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b02c74ab2d..c23a4d7c0c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -171,7 +171,7 @@ async def _handle_json_response( d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = await make_deferred_yieldable(d) - except TimeoutError as e: + except defer.TimeoutError as e: logger.warning( "{%s} [%s] Timed out reading response - %s %s", request.txn_id, @@ -655,10 +655,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. backoff_on_404 (bool): True if we should count a 404 response as @@ -704,8 +708,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -734,10 +743,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -801,10 +814,14 @@ class MatrixFederationHttpClient: args (dict|None): A dictionary used to create query strings, defaults to None. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -840,8 +857,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -865,10 +887,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -900,8 +926,13 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 332da02a8d..e32d3f43e0 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase): `BrowserLikePolicyForHTTPS`, so unless you have special requirements you can leave this as-is. - connectTimeout (float): The amount of time that this Agent will wait - for the peer to accept a connection. + connectTimeout (Optional[float]): The amount of time that this Agent will wait + for the peer to accept a connection, in seconds. If 'None', + HostnameEndpoint's default (30s) will be used. + + This is used for connections to both proxies and destination servers. bindAddress (bytes): The local address for client sockets to bind to. @@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase): Returns: Deferred[IResponse]: completes when the header of the response has been received (regardless of the response status code). + + Can fail with: + SchemeNotSupported: if the uri is not http or https + + twisted.internet.error.TimeoutError if the server we are connecting + to (proxy or destination) does not accept a connection before + connectTimeout. + + ... other things too. """ uri = uri.strip() if not _VALID_URI.match(uri): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 67ce9a5f39..382f0cf3f0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -449,18 +449,8 @@ class ReadWriteLock: R = TypeVar("R") -def _cancelled_to_timed_out_error(value: R, timeout: float) -> R: - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise defer.TimeoutError(timeout, "Deferred") - return value - - def timeout_deferred( - deferred: defer.Deferred, - timeout: float, - reactor: IReactorTime, - on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None, + deferred: defer.Deferred, timeout: float, reactor: IReactorTime, ) -> defer.Deferred: """The in built twisted `Deferred.addTimeout` fails to time out deferreds that have a canceller that throws exceptions. This method creates a new @@ -469,27 +459,21 @@ def timeout_deferred( (See https://twistedmatrix.com/trac/ticket/9534) - NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred. + + NOTE: the TimeoutError raised by the resultant deferred is + twisted.internet.defer.TimeoutError, which is *different* to the built-in + TimeoutError, as well as various other TimeoutErrors you might have imported. Args: deferred: The Deferred to potentially timeout. timeout: Timeout in seconds reactor: The twisted reactor to use - on_timeout_cancel: A callable which is called immediately - after the deferred times out, and not if this deferred is - otherwise cancelled before the timeout. - It takes an arbitrary value, which is the value of the deferred at - that exact point in time (probably a CancelledError Failure), and - the timeout. - - The default callable (if none is provided) will translate a - CancelledError Failure into a defer.TimeoutError. Returns: - A new Deferred. + A new Deferred, which will errback with defer.TimeoutError on timeout. """ - new_d = defer.Deferred() timed_out = [False] @@ -502,18 +486,23 @@ def timeout_deferred( except: # noqa: E722, if we throw any exception it'll break time outs logger.exception("Canceller failed during timeout") + # the cancel() call should have set off a chain of errbacks which + # will have errbacked new_d, but in case it hasn't, errback it now. + if not new_d.called: - new_d.errback(defer.TimeoutError(timeout, "Deferred")) + new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,))) delayed_call = reactor.callLater(timeout, time_it_out) - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) + def convert_cancelled(value: failure.Failure): + # if the orgininal deferred was cancelled, and our timeout has fired, then + # the reason it was cancelled was due to our timeout. Turn the CancelledError + # into a TimeoutError. + if timed_out[0] and value.check(CancelledError): + raise defer.TimeoutError("Timed out after %gs" % (timeout,)) return value - deferred.addBoth(convert_cancelled) + deferred.addErrback(convert_cancelled) def cancel_timeout(result): # stop the pending call to cancel the deferred if it's been fired diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 5604af3795..212484a7fe 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase): r = self.successResultOf(d) self.assertEqual(r.code, 200) - def test_client_headers_no_body(self): + @parameterized.expand(["get_json", "post_json", "delete_json", "put_json"]) + def test_timeout_reading_body(self, method_name: str): """ If the HTTP request is connected, but gets no response before being - timed out, it'll give a ResponseNeverReceived. + timed out, it'll give a RequestSendFailed with can_retry. """ - d = defer.ensureDeferred( - self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) - ) + method = getattr(self.cl, method_name) + d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000)) self.pump() @@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase): self.reactor.advance(10.5) f = self.failureResultOf(d) - self.assertIsInstance(f.value, TimeoutError) + self.assertIsInstance(f.value, RequestSendFailed) + self.assertTrue(f.value.can_retry) + self.assertIsInstance(f.value.inner_exception, defer.TimeoutError) def test_client_requires_trailing_slashes(self): """ diff --git a/tests/http/test_simple_client.py b/tests/http/test_simple_client.py new file mode 100644 index 0000000000..a1cf0862d4 --- /dev/null +++ b/tests/http/test_simple_client.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from mock import Mock + +from netaddr import IPSet + +from twisted.internet import defer +from twisted.internet.error import DNSLookupError + +from synapse.http import RequestTimedOutError +from synapse.http.client import SimpleHttpClient +from synapse.server import HomeServer + +from tests.unittest import HomeserverTestCase + + +class SimpleHttpClientTests(HomeserverTestCase): + def prepare(self, reactor, clock, hs: "HomeServer"): + # Add a DNS entry for a test server + self.reactor.lookups["testserv"] = "1.2.3.4" + + self.cl = hs.get_simple_http_client() + + def test_dns_error(self): + """ + If the DNS lookup returns an error, it will bubble up. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar")) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_connection_refused(self): + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8008) + e = Exception("go away") + factory.clientConnectionFailed(None, e) + self.pump(0.5) + + f = self.failureResultOf(d) + + self.assertIs(f.value, e) + + def test_client_never_connect(self): + """ + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError or TimeoutError. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], "1.2.3.4") + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertNoResult(d) + + # Push by enough to time it out + self.reactor.advance(120) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestTimedOutError) + + def test_client_connect_no_response(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], "1.2.3.4") + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertNoResult(d) + + # Push by enough to time it out + self.reactor.advance(120) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestTimedOutError) + + def test_client_ip_range_blacklist(self): + """Ensure that Synapse does not try to connect to blacklisted IPs""" + + # Add some DNS entries we'll blacklist + self.reactor.lookups["internal"] = "127.0.0.1" + self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337" + ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"]) + + cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist) + + # Try making a GET request to a blacklisted IPv4 address + # ------------------------------------------------------ + # Make the request + d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar")) + self.pump(1) + + # Check that it was unable to resolve the address + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 0) + + self.failureResultOf(d, DNSLookupError) + + # Try making a POST request to a blacklisted IPv6 address + # ------------------------------------------------------- + # Make the request + d = defer.ensureDeferred( + cl.post_json_get_json("http://internalv6:8008/foo/bar", {}) + ) + + # Move the reactor forwards + self.pump(1) + + # Check that it was unable to resolve the address + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 0) + + # Check that it was due to a blacklisted DNS lookup + self.failureResultOf(d, DNSLookupError) + + # Try making a GET request to a non-blacklisted IPv4 address + # ---------------------------------------------------------- + # Make the request + d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar")) + + # Nothing has happened yet + self.assertNoResult(d) + + # Move the reactor forwards + self.pump(1) + + # Check that it was able to resolve the address + clients = self.reactor.tcpClients + self.assertNotEqual(len(clients), 0) + + # Connection will still fail as this IP address does not resolve to anything + self.failureResultOf(d, RequestTimedOutError) -- cgit 1.5.1 From 1c6b8752b891c1a25524d8dfaa8efb7176c0dbec Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 29 Sep 2020 12:36:44 +0100 Subject: Only assert valid next_link params when provided (#8417) Broken in https://github.com/matrix-org/synapse/pull/8275 and has yet to be put in a release. Fixes https://github.com/matrix-org/synapse/issues/8418. `next_link` is an optional parameter. However, we were checking whether the `next_link` param was valid, even if it wasn't provided. In that case, `next_link` was `None`, which would clearly not be a valid URL. This would prevent password reset and other operations if `next_link` was not provided, and the `next_link_domain_whitelist` config option was set. --- changelog.d/8417.feature | 1 + synapse/rest/client/v2_alpha/account.py | 15 +++++++++------ tests/rest/client/v2_alpha/test_account.py | 6 ++++++ 3 files changed, 16 insertions(+), 6 deletions(-) create mode 100644 changelog.d/8417.feature (limited to 'tests') diff --git a/changelog.d/8417.feature b/changelog.d/8417.feature new file mode 100644 index 0000000000..17549c3df3 --- /dev/null +++ b/changelog.d/8417.feature @@ -0,0 +1 @@ +Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number. \ No newline at end of file diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index c3ce0f6259..9245214f36 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -103,8 +103,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): Codes.THREEPID_DENIED, ) - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) # The email will be sent to the stored address. # This avoids a potential account hijack by requesting a password reset to @@ -379,8 +380,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): Codes.THREEPID_DENIED, ) - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) existing_user_id = await self.store.get_user_id_by_threepid("email", email) @@ -453,8 +455,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): Codes.THREEPID_DENIED, ) - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn) diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py index 93f899d861..ae2cd67f35 100644 --- a/tests/rest/client/v2_alpha/test_account.py +++ b/tests/rest/client/v2_alpha/test_account.py @@ -732,6 +732,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase): @override_config({"next_link_domain_whitelist": ["example.com", "example.org"]}) def test_next_link_domain_whitelist(self): """Tests next_link parameters must fit the whitelist if provided""" + + # Ensure not providing a next_link parameter still works + self._request_token( + "something@example.com", "some_secret", next_link=None, expect_code=200, + ) + self._request_token( "something@example.com", "some_secret", -- cgit 1.5.1 From 8676d8ab2e5667d7c12774effc64b3ab99344a8d Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 29 Sep 2020 13:11:02 +0100 Subject: Filter out appservices from mau count (#8404) This is an attempt to fix #8403. --- changelog.d/8404.misc | 1 + synapse/storage/databases/main/monthly_active_users.py | 9 ++++++++- tests/storage/test_monthly_active_users.py | 17 ++++++++++++++++- 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 changelog.d/8404.misc (limited to 'tests') diff --git a/changelog.d/8404.misc b/changelog.d/8404.misc new file mode 100644 index 0000000000..7aadded6c1 --- /dev/null +++ b/changelog.d/8404.misc @@ -0,0 +1 @@ +Do not include appservice users when calculating the total MAU for a server. diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index e0cedd1aac..e93aad33cd 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): """ def _count_users(txn): - sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" + # Exclude app service users + sql = """ + SELECT COALESCE(count(*), 0) + FROM monthly_active_users + LEFT JOIN users + ON monthly_active_users.user_id=users.name + WHERE (users.appservice_id IS NULL OR users.appservice_id = ''); + """ txn.execute(sql) (count,) = txn.fetchone() return count diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py index 643072bbaf..8d97b6d4cd 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py @@ -137,6 +137,21 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase): count = self.get_success(self.store.get_monthly_active_count()) self.assertEqual(count, 1) + def test_appservice_user_not_counted_in_mau(self): + self.get_success( + self.store.register_user( + user_id="@appservice_user:server", appservice_id="wibble" + ) + ) + count = self.get_success(self.store.get_monthly_active_count()) + self.assertEqual(count, 0) + + d = self.store.upsert_monthly_active_user("@appservice_user:server") + self.get_success(d) + + count = self.get_success(self.store.get_monthly_active_count()) + self.assertEqual(count, 0) + def test_user_last_seen_monthly_active(self): user_id1 = "@user1:server" user_id2 = "@user2:server" @@ -383,7 +398,7 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase): self.get_success(self.store.upsert_monthly_active_user(appservice2_user1)) count = self.get_success(self.store.get_monthly_active_count()) - self.assertEqual(count, 4) + self.assertEqual(count, 1) d = self.store.get_monthly_active_count_by_service() result = self.get_success(d) -- cgit 1.5.1