From 0ddf48672421aa64920342b959dc77432c869889 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 12 Sep 2018 11:58:52 +0100 Subject: expose number of real reserved users --- synapse/app/homeserver.py | 10 +++++++--- synapse/storage/monthly_active_users.py | 17 ++++++++++++++++ tests/storage/test_monthly_active_users.py | 31 ++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3eb5b663de..e6372bcec2 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -307,6 +307,7 @@ class SynapseHomeServer(HomeServer): # Gauges to expose monthly active user control metrics current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") +reserved_mau_gauge = Gauge("synapse_admin_mau:reserved", "Reserved real MAU users") def setup(config_options): @@ -531,10 +532,13 @@ def run(hs): @defer.inlineCallbacks def generate_monthly_active_users(): - count = 0 + current_mau_count = 0 + reserved_mau_count = 0 if hs.config.limit_usage_by_mau: - count = yield hs.get_datastore().get_monthly_active_count() - current_mau_gauge.set(float(count)) + current_mau_count = yield hs.get_datastore().get_monthly_active_count() + reserved_mau_count = yield hs.get_datastore().get_reserved_real_user_account() + current_mau_gauge.set(float(current_mau_count)) + reserved_mau_gauge.set(float(reserved_mau_count)) max_mau_gauge.set(float(hs.config.max_mau_value)) hs.get_datastore().initialise_reserved_users( diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index b890c152db..53d125d305 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -146,6 +146,23 @@ class MonthlyActiveUsersStore(SQLBaseStore): return count return self.runInteraction("count_users", _count_users) + @defer.inlineCallbacks + def get_reserved_real_user_account(self): + """Of the reserved threepids defined in config, how many are associated + with registered users? + + Returns: + Defered[int]: Number of real reserved users + """ + count = 0 + for tp in self.hs.config.mau_limits_reserved_threepids: + user_id = yield self.hs.get_datastore().get_user_id_by_threepid( + tp["medium"], tp["address"] + ) + if user_id: + count = count + 1 + defer.returnValue(count) + @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): """ diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py index ccfc21b7fc..662f2ed845 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py @@ -183,3 +183,34 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): self.store.populate_monthly_active_users('user_id') self.pump() self.store.upsert_monthly_active_user.assert_not_called() + + def test_get_reserved_real_user_account(self): + # Test no reserved users, or reserved threepids + count = self.store.get_reserved_real_user_account() + self.assertEquals(self.get_success(count), 0) + # Test reserved users but no registered users + + user1 = '@user1:example.com' + user2 = '@user2:example.com' + user1_email = 'user1@example.com' + user2_email = 'user2@example.com' + threepids = [ + {'medium': 'email', 'address': user1_email}, + {'medium': 'email', 'address': user2_email}, + ] + self.hs.config.mau_limits_reserved_threepids = threepids + self.store.initialise_reserved_users(threepids) + self.pump() + count = self.store.get_reserved_real_user_account() + self.assertEquals(self.get_success(count), 0) + + # Test reserved registed users + self.store.register(user_id=user1, token="123", password_hash=None) + self.store.register(user_id=user2, token="456", password_hash=None) + self.pump() + + now = int(self.hs.get_clock().time_msec()) + self.store.user_add_threepid(user1, "email", user1_email, now, now) + self.store.user_add_threepid(user2, "email", user2_email, now, now) + count = self.store.get_reserved_real_user_account() + self.assertEquals(self.get_success(count), len(threepids)) -- cgit 1.5.1 From 5cea4e16c7c6e7b38ff294cb77b4ef8f51d8b76e Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 12 Sep 2018 12:03:31 +0100 Subject: towncrier --- changelog.d/3846.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3846.feature diff --git a/changelog.d/3846.feature b/changelog.d/3846.feature new file mode 100644 index 0000000000..dc9e9d86cc --- /dev/null +++ b/changelog.d/3846.feature @@ -0,0 +1 @@ +create synapse_admin_mau:reserved metric to expose number of real reaserved users -- cgit 1.5.1 From 8decd6233dabd87160794949ffd95282e60ab01e Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 12 Sep 2018 16:22:15 +0100 Subject: improve naming --- changelog.d/3846.feature | 2 +- synapse/app/homeserver.py | 14 +++++++++----- synapse/storage/monthly_active_users.py | 2 +- tests/storage/test_monthly_active_users.py | 6 +++--- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/changelog.d/3846.feature b/changelog.d/3846.feature index dc9e9d86cc..453c11d3f8 100644 --- a/changelog.d/3846.feature +++ b/changelog.d/3846.feature @@ -1 +1 @@ -create synapse_admin_mau:reserved metric to expose number of real reaserved users +Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e6372bcec2..ac97e19649 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -307,7 +307,10 @@ class SynapseHomeServer(HomeServer): # Gauges to expose monthly active user control metrics current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") -reserved_mau_gauge = Gauge("synapse_admin_mau:reserved", "Reserved real MAU users") +registered_reserved_users_mau_gauge = Gauge( + "synapse_admin_mau:registered_reserved_users", + "Registered users with reserved threepids" +) def setup(config_options): @@ -533,12 +536,13 @@ def run(hs): @defer.inlineCallbacks def generate_monthly_active_users(): current_mau_count = 0 - reserved_mau_count = 0 + reserved_count = 0 + store = hs.get_datastore() if hs.config.limit_usage_by_mau: - current_mau_count = yield hs.get_datastore().get_monthly_active_count() - reserved_mau_count = yield hs.get_datastore().get_reserved_real_user_account() + current_mau_count = yield store.get_monthly_active_count() + reserved_count = yield store.get_registered_reserved_users_count() current_mau_gauge.set(float(current_mau_count)) - reserved_mau_gauge.set(float(reserved_mau_count)) + registered_reserved_users_mau_gauge.set(float(reserved_count)) max_mau_gauge.set(float(hs.config.max_mau_value)) hs.get_datastore().initialise_reserved_users( diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 53d125d305..59580949f1 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -147,7 +147,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): return self.runInteraction("count_users", _count_users) @defer.inlineCallbacks - def get_reserved_real_user_account(self): + def get_registered_reserved_users_count(self): """Of the reserved threepids defined in config, how many are associated with registered users? diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py index 662f2ed845..686f12a0dc 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py @@ -186,7 +186,7 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): def test_get_reserved_real_user_account(self): # Test no reserved users, or reserved threepids - count = self.store.get_reserved_real_user_account() + count = self.store.get_registered_reserved_users_count() self.assertEquals(self.get_success(count), 0) # Test reserved users but no registered users @@ -201,7 +201,7 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): self.hs.config.mau_limits_reserved_threepids = threepids self.store.initialise_reserved_users(threepids) self.pump() - count = self.store.get_reserved_real_user_account() + count = self.store.get_registered_reserved_users_count() self.assertEquals(self.get_success(count), 0) # Test reserved registed users @@ -212,5 +212,5 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): now = int(self.hs.get_clock().time_msec()) self.store.user_add_threepid(user1, "email", user1_email, now, now) self.store.user_add_threepid(user2, "email", user2_email, now, now) - count = self.store.get_reserved_real_user_account() + count = self.store.get_registered_reserved_users_count() self.assertEquals(self.get_success(count), len(threepids)) -- cgit 1.5.1 From b7d2fb5eb9c55f95048444bb6703abbf2fe14c5c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 13 Sep 2018 19:59:32 +1000 Subject: Remove some superfluous logging (#3855) --- .circleci/merge_base_branch.sh | 2 +- changelog.d/3855.misc | 1 + synapse/__init__.py | 10 ++++++++++ synapse/config/logger.py | 17 ++++++++++++++++- 4 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 changelog.d/3855.misc diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh index 2d700dbf11..4e297d77da 100755 --- a/.circleci/merge_base_branch.sh +++ b/.circleci/merge_base_branch.sh @@ -4,7 +4,7 @@ set -e # CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful. # In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL. -echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> "$BASH_ENV" +echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV source $BASH_ENV if [[ -z "${CIRCLE_PR_NUMBER}" ]] diff --git a/changelog.d/3855.misc b/changelog.d/3855.misc new file mode 100644 index 0000000000..a25bb020ba --- /dev/null +++ b/changelog.d/3855.misc @@ -0,0 +1 @@ +Removed some excess logging messages. \ No newline at end of file diff --git a/synapse/__init__.py b/synapse/__init__.py index 65a2b894cc..9dbe0b9f10 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,4 +17,14 @@ """ This is a reference implementation of a Matrix home server. """ +try: + from twisted.internet import protocol + from twisted.internet.protocol import Factory + from twisted.names.dns import DNSDatagramProtocol + protocol.Factory.noisy = False + Factory.noisy = False + DNSDatagramProtocol.noisy = False +except ImportError: + pass + __version__ = "0.33.4" diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 3f187adfc8..e9a936118d 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -227,7 +227,22 @@ def setup_logging(config, use_worker_options=False): # # However this may not be too much of a problem if we are just writing to a file. observer = STDLibLogObserver() + + def _log(event): + + if "log_text" in event: + if event["log_text"].startswith("DNSDatagramProtocol starting on "): + return + + if event["log_text"].startswith("(UDP Port "): + return + + if event["log_text"].startswith("Timing out client"): + return + + return observer(event) + globalLogBeginner.beginLoggingTo( - [observer], + [_log], redirectStandardIO=not config.no_redirect_stdio, ) -- cgit 1.5.1 From c857f5ef9b77b07179555a63973f11b8aa8c1752 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 12:48:10 +0100 Subject: Make purge history slightly faster Don't pull out events that are outliers and won't be deleted, as nothing should happen to them. --- synapse/storage/events.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 30ff87a4c4..1c8e01a47e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1930,15 +1930,22 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore should_delete_params = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" - should_delete_params += ("%:" + self.hs.hostname, ) + should_delete_params += ( + "%:" + self.hs.hostname, + "%:" + self.hs.hostname, + ) should_delete_params += (room_id, token.topological) + # Note that we insert events that are outliers and aren't going to be + # deleted, as nothing will happen to them. txn.execute( "INSERT INTO events_to_purge" " SELECT event_id, %s" " FROM events AS e LEFT JOIN state_events USING (event_id)" - " WHERE e.room_id = ? AND topological_ordering < ?" % ( + " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" + % ( + should_delete_expr, should_delete_expr, ), should_delete_params, -- cgit 1.5.1 From e7cd7cb0f02f1f918387e4b3061b4a251c5d68fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 12:55:40 +0100 Subject: Newsfile --- changelog.d/3856.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3856.misc diff --git a/changelog.d/3856.misc b/changelog.d/3856.misc new file mode 100644 index 0000000000..36c311eb3d --- /dev/null +++ b/changelog.d/3856.misc @@ -0,0 +1 @@ +Speed up purge history for rooms that have been previously purged -- cgit 1.5.1 From 9dbe38ea7de66f6e5dfab1570dd3a5217a4883eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 15:05:52 +0100 Subject: Create indices after insertion --- synapse/storage/events.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1c8e01a47e..622f2ababf 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1890,20 +1890,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ")" ) - # create an index on should_delete because later we'll be looking for - # the should_delete / shouldn't_delete subsets - txn.execute( - "CREATE INDEX events_to_purge_should_delete" - " ON events_to_purge(should_delete)", - ) - - # We do joins against events_to_purge for e.g. calculating state - # groups to purge, etc., so lets make an index. - txn.execute( - "CREATE INDEX events_to_purge_id" - " ON events_to_purge(event_id)", - ) - # First ensure that we're not about to delete all the forward extremeties txn.execute( "SELECT e.event_id, e.depth FROM events as e " @@ -1950,6 +1936,24 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ), should_delete_params, ) + + # We create the indices *after* insertion as that's a lot faster. + + # create an index on should_delete because later we'll be looking for + # the should_delete / shouldn't_delete subsets + txn.execute( + "CREATE INDEX events_to_purge_should_delete" + " ON events_to_purge(should_delete)", + ) + + # We do joins against events_to_purge for e.g. calculating state + # groups to purge, etc., so lets make an index. + txn.execute( + "CREATE INDEX events_to_purge_id" + " ON events_to_purge(event_id)", + ) + + txn.execute( "SELECT event_id, should_delete FROM events_to_purge" ) -- cgit 1.5.1 From 9cbd0094f0dcf03676b3234896e73af80cb75021 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 15:15:35 +0100 Subject: pep8 --- synapse/storage/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 622f2ababf..bc1c18023b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1953,7 +1953,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore " ON events_to_purge(event_id)", ) - txn.execute( "SELECT event_id, should_delete FROM events_to_purge" ) -- cgit 1.5.1 From bfa0b759e02c74931606ea77f34bec99dfb10589 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 00:15:51 +1000 Subject: Attempt to figure out what's going on with timeouts (#3857) --- changelog.d/3857.misc | 1 + synapse/http/matrixfederationclient.py | 98 +++++++++----------- tests/http/test_fedclient.py | 157 +++++++++++++++++++++++++++++++++ tests/server.py | 42 ++++++++- 4 files changed, 241 insertions(+), 57 deletions(-) create mode 100644 changelog.d/3857.misc create mode 100644 tests/http/test_fedclient.py diff --git a/changelog.d/3857.misc b/changelog.d/3857.misc new file mode 100644 index 0000000000..e128d193d9 --- /dev/null +++ b/changelog.d/3857.misc @@ -0,0 +1 @@ +Refactor some HTTP timeout code. \ No newline at end of file diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cf920bc041..c49dbacd93 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -26,7 +26,7 @@ from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol, reactor +from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.web._newclient import ResponseDone from twisted.web.client import Agent, HTTPConnectionPool @@ -40,10 +40,8 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext -from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -66,13 +64,14 @@ else: class MatrixFederationEndpointFactory(object): def __init__(self, hs): + self.reactor = hs.get_reactor() self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): destination = uri.netloc.decode('ascii') return matrix_federation_endpoint( - reactor, destination, timeout=10, + self.reactor, destination, timeout=10, tls_client_options_factory=self.tls_client_options_factory ) @@ -90,6 +89,7 @@ class MatrixFederationHttpClient(object): self.hs = hs self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 @@ -100,6 +100,7 @@ class MatrixFederationHttpClient(object): self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') self._next_id = 1 + self.default_timeout = 60 def _create_url(self, destination, path_bytes, param_bytes, query_bytes): return urllib.parse.urlunparse( @@ -143,6 +144,11 @@ class MatrixFederationHttpClient(object): (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + if ( self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist @@ -215,13 +221,9 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, + reactor=self.hs.get_reactor() ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) + request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) response = yield make_deferred_yieldable( request_deferred, ) @@ -261,6 +263,13 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) + logger.debug( + "{%s} Waiting %s before sending to %s...", + txn_id, + delay, + destination + ) + yield self.clock.sleep(delay) retries_left -= 1 else: @@ -279,10 +288,9 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.content(response), - timeout, - ) + d = treq.content(response) + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -396,10 +404,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @defer.inlineCallbacks @@ -449,10 +456,14 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -504,10 +515,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -554,10 +564,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -599,38 +608,15 @@ class MatrixFederationHttpClient(object): try: with logcontext.PreserveLoggingContext(): - length = yield self._timeout_deferred( - _readBodyToFile( - response, output_stream, max_size - ), - ) + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) except Exception: logger.exception("Failed to download body") raise defer.returnValue((length, headers)) - def _timeout_deferred(self, deferred, timeout_ms=None): - """Times the deferred out after `timeout_ms` ms - - Args: - deferred (Deferred) - timeout_ms (int|None): Timeout in milliseconds. If None defaults - to 60 seconds. - - Returns: - Deferred - """ - - add_timeout_to_deferred( - deferred, - timeout_ms / 1000. if timeout_ms else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - - return deferred - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py new file mode 100644 index 0000000000..1c46c9cfeb --- /dev/null +++ b/tests/http/test_fedclient.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from twisted.internet.defer import TimeoutError +from twisted.internet.error import ConnectingCancelledError, DNSLookupError +from twisted.web.client import ResponseNeverReceived + +from synapse.http.matrixfederationclient import MatrixFederationHttpClient + +from tests.unittest import HomeserverTestCase + + +class FederationClientTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver(reactor=reactor, clock=clock) + hs.tls_client_options_factory = None + return hs + + def prepare(self, reactor, clock, homeserver): + + self.cl = MatrixFederationHttpClient(self.hs) + self.reactor.lookups["testserv"] = "1.2.3.4" + + def test_dns_error(self): + """ + If the DNS raising returns an error, it will bubble up. + """ + d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_never_connect(self): + """ + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ConnectingCancelledError) + + def test_client_connect_no_response(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ResponseNeverReceived) + + def test_client_gets_headers(self): + """ + Once the client gets the headers, _request returns successfully. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n") + + # We should get a successful response + r = self.successResultOf(d) + self.assertEqual(r.code, 200) + + def test_client_headers_no_body(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived( + (b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n" + b"Server: Fake\r\n\r\n") + ) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, TimeoutError) diff --git a/tests/server.py b/tests/server.py index a2c3ca61f6..420ec4e088 100644 --- a/tests/server.py +++ b/tests/server.py @@ -4,9 +4,14 @@ from io import BytesIO from six import text_type import attr +from zope.interface import implementer -from twisted.internet import address, threads +from twisted.internet import address, threads, udp +from twisted.internet._resolver import HostResolution +from twisted.internet.address import IPv4Address from twisted.internet.defer import Deferred +from twisted.internet.error import DNSLookupError +from twisted.internet.interfaces import IReactorPluggableNameResolver from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock @@ -154,11 +159,46 @@ def render(request, resource, clock): wait_until_result(clock, request) +@implementer(IReactorPluggableNameResolver) class ThreadedMemoryReactorClock(MemoryReactorClock): """ A MemoryReactorClock that supports callFromThread. """ + def __init__(self): + self._udp = [] + self.lookups = {} + + class Resolver(object): + def resolveHostName( + _self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics='TCP', + ): + + resolution = HostResolution(hostName) + resolutionReceiver.resolutionBegan(resolution) + if hostName not in self.lookups: + raise DNSLookupError("OH NO") + + resolutionReceiver.addressResolved( + IPv4Address('TCP', self.lookups[hostName], portNumber) + ) + resolutionReceiver.resolutionComplete() + return resolution + + self.nameResolver = Resolver() + super(ThreadedMemoryReactorClock, self).__init__() + + def listenUDP(self, port, protocol, interface='', maxPacketSize=8196): + p = udp.Port(port, protocol, interface, maxPacketSize, self) + p.startListening() + self._udp.append(p) + return p + def callFromThread(self, callback, *args, **kwargs): """ Make the callback fire in the next reactor iteration. -- cgit 1.5.1 From 89a76d18898a0172010c25e5a7c66d57c16b9196 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 15:33:16 +0100 Subject: Fix handling of redacted events from federation If we receive an event that doesn't pass their content hash check (e.g. due to already being redacted) then we hit a bug which causes an exception to be raised, which then promplty stops the event (and request) from being processed. This effects all sorts of federation APIs, including joining rooms with a redacted state event. --- synapse/events/__init__.py | 5 +++++ synapse/federation/federation_base.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 51f9084b90..b782af6308 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six + from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -147,6 +149,9 @@ class EventBase(object): def items(self): return list(self._event_dict.items()) + def keys(self): + return six.iterkeys(self._event_dict) + class FrozenEvent(EventBase): def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 61782ae1c0..b7ad729c63 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -153,7 +153,7 @@ class FederationBase(object): # *actual* redacted copy to be on the safe side.) redacted_event = prune_event(pdu) if ( - set(six.iterkeys(redacted_event)) == set(six.iterkeys(pdu)) and + set(redacted_event.keys()) == set(pdu.keys()) and set(six.iterkeys(redacted_event.content)) == set(six.iterkeys(pdu.content)) ): -- cgit 1.5.1 From 3126b88d35994b09096239462abbbbcd17d9cbf0 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 00:44:31 +1000 Subject: fix circleci merged builds (#3858) * fix * changelog --- .circleci/merge_base_branch.sh | 4 ++++ changelog.d/3858.misc | 1 + 2 files changed, 5 insertions(+) create mode 100644 changelog.d/3858.misc diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh index 4e297d77da..9614eb91b6 100755 --- a/.circleci/merge_base_branch.sh +++ b/.circleci/merge_base_branch.sh @@ -19,6 +19,10 @@ GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_ # Show what we are before git show -s +# Set up username so it can do a merge +git config --global user.email bot@matrix.org +git config --global user.name "A robot" + # Fetch and merge. If it doesn't work, it will raise due to set -e. git fetch -u origin $GITBASE git merge --no-edit origin/$GITBASE diff --git a/changelog.d/3858.misc b/changelog.d/3858.misc new file mode 100644 index 0000000000..4644db5330 --- /dev/null +++ b/changelog.d/3858.misc @@ -0,0 +1 @@ +Fix running merged builds on CircleCI \ No newline at end of file -- cgit 1.5.1 From 13193a6e2bd3557cae89e34438e8598f75276c34 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 15:46:45 +0100 Subject: Newsfile --- changelog.d/3859.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3859.bugfix diff --git a/changelog.d/3859.bugfix b/changelog.d/3859.bugfix new file mode 100644 index 0000000000..ec5b172464 --- /dev/null +++ b/changelog.d/3859.bugfix @@ -0,0 +1 @@ +Fix handling of redacted events from federation -- cgit 1.5.1 From ed5331a6272280efd4fc98ae425711acec8c6be5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Sep 2018 16:10:56 +0100 Subject: comment --- synapse/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bc1c18023b..e7487311ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1916,6 +1916,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore should_delete_params = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" + + # We include the parameter twice since we use the expression twice should_delete_params += ( "%:" + self.hs.hostname, "%:" + self.hs.hostname, -- cgit 1.5.1