summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-x.circleci/merge_base_branch.sh6
-rw-r--r--changelog.d/3846.feature1
-rw-r--r--changelog.d/3855.misc1
-rw-r--r--changelog.d/3856.misc1
-rw-r--r--changelog.d/3857.misc1
-rw-r--r--changelog.d/3858.misc1
-rw-r--r--changelog.d/3859.bugfix1
-rw-r--r--synapse/__init__.py10
-rwxr-xr-xsynapse/app/homeserver.py14
-rw-r--r--synapse/config/logger.py17
-rw-r--r--synapse/events/__init__.py5
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/http/matrixfederationclient.py98
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/monthly_active_users.py17
-rw-r--r--tests/http/test_fedclient.py157
-rw-r--r--tests/server.py42
-rw-r--r--tests/storage/test_monthly_active_users.py31
18 files changed, 370 insertions, 79 deletions
diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh

index 2d700dbf11..9614eb91b6 100755 --- a/.circleci/merge_base_branch.sh +++ b/.circleci/merge_base_branch.sh
@@ -4,7 +4,7 @@ set -e # CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful. # In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL. -echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> "$BASH_ENV" +echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV source $BASH_ENV if [[ -z "${CIRCLE_PR_NUMBER}" ]] @@ -19,6 +19,10 @@ GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_ # Show what we are before git show -s +# Set up username so it can do a merge +git config --global user.email bot@matrix.org +git config --global user.name "A robot" + # Fetch and merge. If it doesn't work, it will raise due to set -e. git fetch -u origin $GITBASE git merge --no-edit origin/$GITBASE diff --git a/changelog.d/3846.feature b/changelog.d/3846.feature new file mode 100644
index 0000000000..453c11d3f8 --- /dev/null +++ b/changelog.d/3846.feature
@@ -0,0 +1 @@ +Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users diff --git a/changelog.d/3855.misc b/changelog.d/3855.misc new file mode 100644
index 0000000000..a25bb020ba --- /dev/null +++ b/changelog.d/3855.misc
@@ -0,0 +1 @@ +Removed some excess logging messages. \ No newline at end of file diff --git a/changelog.d/3856.misc b/changelog.d/3856.misc new file mode 100644
index 0000000000..36c311eb3d --- /dev/null +++ b/changelog.d/3856.misc
@@ -0,0 +1 @@ +Speed up purge history for rooms that have been previously purged diff --git a/changelog.d/3857.misc b/changelog.d/3857.misc new file mode 100644
index 0000000000..e128d193d9 --- /dev/null +++ b/changelog.d/3857.misc
@@ -0,0 +1 @@ +Refactor some HTTP timeout code. \ No newline at end of file diff --git a/changelog.d/3858.misc b/changelog.d/3858.misc new file mode 100644
index 0000000000..4644db5330 --- /dev/null +++ b/changelog.d/3858.misc
@@ -0,0 +1 @@ +Fix running merged builds on CircleCI \ No newline at end of file diff --git a/changelog.d/3859.bugfix b/changelog.d/3859.bugfix new file mode 100644
index 0000000000..ec5b172464 --- /dev/null +++ b/changelog.d/3859.bugfix
@@ -0,0 +1 @@ +Fix handling of redacted events from federation diff --git a/synapse/__init__.py b/synapse/__init__.py
index 65a2b894cc..9dbe0b9f10 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -17,4 +17,14 @@ """ This is a reference implementation of a Matrix home server. """ +try: + from twisted.internet import protocol + from twisted.internet.protocol import Factory + from twisted.names.dns import DNSDatagramProtocol + protocol.Factory.noisy = False + Factory.noisy = False + DNSDatagramProtocol.noisy = False +except ImportError: + pass + __version__ = "0.33.4" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3eb5b663de..ac97e19649 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -307,6 +307,10 @@ class SynapseHomeServer(HomeServer): # Gauges to expose monthly active user control metrics current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") +registered_reserved_users_mau_gauge = Gauge( + "synapse_admin_mau:registered_reserved_users", + "Registered users with reserved threepids" +) def setup(config_options): @@ -531,10 +535,14 @@ def run(hs): @defer.inlineCallbacks def generate_monthly_active_users(): - count = 0 + current_mau_count = 0 + reserved_count = 0 + store = hs.get_datastore() if hs.config.limit_usage_by_mau: - count = yield hs.get_datastore().get_monthly_active_count() - current_mau_gauge.set(float(count)) + current_mau_count = yield store.get_monthly_active_count() + reserved_count = yield store.get_registered_reserved_users_count() + current_mau_gauge.set(float(current_mau_count)) + registered_reserved_users_mau_gauge.set(float(reserved_count)) max_mau_gauge.set(float(hs.config.max_mau_value)) hs.get_datastore().initialise_reserved_users( diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 3f187adfc8..e9a936118d 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py
@@ -227,7 +227,22 @@ def setup_logging(config, use_worker_options=False): # # However this may not be too much of a problem if we are just writing to a file. observer = STDLibLogObserver() + + def _log(event): + + if "log_text" in event: + if event["log_text"].startswith("DNSDatagramProtocol starting on "): + return + + if event["log_text"].startswith("(UDP Port "): + return + + if event["log_text"].startswith("Timing out client"): + return + + return observer(event) + globalLogBeginner.beginLoggingTo( - [observer], + [_log], redirectStandardIO=not config.no_redirect_stdio, ) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 51f9084b90..b782af6308 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six + from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -147,6 +149,9 @@ class EventBase(object): def items(self): return list(self._event_dict.items()) + def keys(self): + return six.iterkeys(self._event_dict) + class FrozenEvent(EventBase): def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 61782ae1c0..b7ad729c63 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -153,7 +153,7 @@ class FederationBase(object): # *actual* redacted copy to be on the safe side.) redacted_event = prune_event(pdu) if ( - set(six.iterkeys(redacted_event)) == set(six.iterkeys(pdu)) and + set(redacted_event.keys()) == set(pdu.keys()) and set(six.iterkeys(redacted_event.content)) == set(six.iterkeys(pdu.content)) ): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cf920bc041..c49dbacd93 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -26,7 +26,7 @@ from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol, reactor +from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.web._newclient import ResponseDone from twisted.web.client import Agent, HTTPConnectionPool @@ -40,10 +40,8 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext -from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -66,13 +64,14 @@ else: class MatrixFederationEndpointFactory(object): def __init__(self, hs): + self.reactor = hs.get_reactor() self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): destination = uri.netloc.decode('ascii') return matrix_federation_endpoint( - reactor, destination, timeout=10, + self.reactor, destination, timeout=10, tls_client_options_factory=self.tls_client_options_factory ) @@ -90,6 +89,7 @@ class MatrixFederationHttpClient(object): self.hs = hs self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 @@ -100,6 +100,7 @@ class MatrixFederationHttpClient(object): self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') self._next_id = 1 + self.default_timeout = 60 def _create_url(self, destination, path_bytes, param_bytes, query_bytes): return urllib.parse.urlunparse( @@ -143,6 +144,11 @@ class MatrixFederationHttpClient(object): (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + if ( self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist @@ -215,13 +221,9 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, + reactor=self.hs.get_reactor() ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) + request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) response = yield make_deferred_yieldable( request_deferred, ) @@ -261,6 +263,13 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) + logger.debug( + "{%s} Waiting %s before sending to %s...", + txn_id, + delay, + destination + ) + yield self.clock.sleep(delay) retries_left -= 1 else: @@ -279,10 +288,9 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.content(response), - timeout, - ) + d = treq.content(response) + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -396,10 +404,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @defer.inlineCallbacks @@ -449,10 +456,14 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -504,10 +515,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -554,10 +564,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -599,38 +608,15 @@ class MatrixFederationHttpClient(object): try: with logcontext.PreserveLoggingContext(): - length = yield self._timeout_deferred( - _readBodyToFile( - response, output_stream, max_size - ), - ) + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) except Exception: logger.exception("Failed to download body") raise defer.returnValue((length, headers)) - def _timeout_deferred(self, deferred, timeout_ms=None): - """Times the deferred out after `timeout_ms` ms - - Args: - deferred (Deferred) - timeout_ms (int|None): Timeout in milliseconds. If None defaults - to 60 seconds. - - Returns: - Deferred - """ - - add_timeout_to_deferred( - deferred, - timeout_ms / 1000. if timeout_ms else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - - return deferred - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 30ff87a4c4..e7487311ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -1890,20 +1890,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ")" ) - # create an index on should_delete because later we'll be looking for - # the should_delete / shouldn't_delete subsets - txn.execute( - "CREATE INDEX events_to_purge_should_delete" - " ON events_to_purge(should_delete)", - ) - - # We do joins against events_to_purge for e.g. calculating state - # groups to purge, etc., so lets make an index. - txn.execute( - "CREATE INDEX events_to_purge_id" - " ON events_to_purge(event_id)", - ) - # First ensure that we're not about to delete all the forward extremeties txn.execute( "SELECT e.event_id, e.depth FROM events as e " @@ -1930,19 +1916,45 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore should_delete_params = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" - should_delete_params += ("%:" + self.hs.hostname, ) + + # We include the parameter twice since we use the expression twice + should_delete_params += ( + "%:" + self.hs.hostname, + "%:" + self.hs.hostname, + ) should_delete_params += (room_id, token.topological) + # Note that we insert events that are outliers and aren't going to be + # deleted, as nothing will happen to them. txn.execute( "INSERT INTO events_to_purge" " SELECT event_id, %s" " FROM events AS e LEFT JOIN state_events USING (event_id)" - " WHERE e.room_id = ? AND topological_ordering < ?" % ( + " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" + % ( + should_delete_expr, should_delete_expr, ), should_delete_params, ) + + # We create the indices *after* insertion as that's a lot faster. + + # create an index on should_delete because later we'll be looking for + # the should_delete / shouldn't_delete subsets + txn.execute( + "CREATE INDEX events_to_purge_should_delete" + " ON events_to_purge(should_delete)", + ) + + # We do joins against events_to_purge for e.g. calculating state + # groups to purge, etc., so lets make an index. + txn.execute( + "CREATE INDEX events_to_purge_id" + " ON events_to_purge(event_id)", + ) + txn.execute( "SELECT event_id, should_delete FROM events_to_purge" ) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index b890c152db..59580949f1 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py
@@ -147,6 +147,23 @@ class MonthlyActiveUsersStore(SQLBaseStore): return self.runInteraction("count_users", _count_users) @defer.inlineCallbacks + def get_registered_reserved_users_count(self): + """Of the reserved threepids defined in config, how many are associated + with registered users? + + Returns: + Defered[int]: Number of real reserved users + """ + count = 0 + for tp in self.hs.config.mau_limits_reserved_threepids: + user_id = yield self.hs.get_datastore().get_user_id_by_threepid( + tp["medium"], tp["address"] + ) + if user_id: + count = count + 1 + defer.returnValue(count) + + @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): """ Updates or inserts monthly active user member diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py new file mode 100644
index 0000000000..1c46c9cfeb --- /dev/null +++ b/tests/http/test_fedclient.py
@@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from twisted.internet.defer import TimeoutError +from twisted.internet.error import ConnectingCancelledError, DNSLookupError +from twisted.web.client import ResponseNeverReceived + +from synapse.http.matrixfederationclient import MatrixFederationHttpClient + +from tests.unittest import HomeserverTestCase + + +class FederationClientTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver(reactor=reactor, clock=clock) + hs.tls_client_options_factory = None + return hs + + def prepare(self, reactor, clock, homeserver): + + self.cl = MatrixFederationHttpClient(self.hs) + self.reactor.lookups["testserv"] = "1.2.3.4" + + def test_dns_error(self): + """ + If the DNS raising returns an error, it will bubble up. + """ + d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_never_connect(self): + """ + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ConnectingCancelledError) + + def test_client_connect_no_response(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ResponseNeverReceived) + + def test_client_gets_headers(self): + """ + Once the client gets the headers, _request returns successfully. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n") + + # We should get a successful response + r = self.successResultOf(d) + self.assertEqual(r.code, 200) + + def test_client_headers_no_body(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived( + (b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n" + b"Server: Fake\r\n\r\n") + ) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, TimeoutError) diff --git a/tests/server.py b/tests/server.py
index a2c3ca61f6..420ec4e088 100644 --- a/tests/server.py +++ b/tests/server.py
@@ -4,9 +4,14 @@ from io import BytesIO from six import text_type import attr +from zope.interface import implementer -from twisted.internet import address, threads +from twisted.internet import address, threads, udp +from twisted.internet._resolver import HostResolution +from twisted.internet.address import IPv4Address from twisted.internet.defer import Deferred +from twisted.internet.error import DNSLookupError +from twisted.internet.interfaces import IReactorPluggableNameResolver from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock @@ -154,11 +159,46 @@ def render(request, resource, clock): wait_until_result(clock, request) +@implementer(IReactorPluggableNameResolver) class ThreadedMemoryReactorClock(MemoryReactorClock): """ A MemoryReactorClock that supports callFromThread. """ + def __init__(self): + self._udp = [] + self.lookups = {} + + class Resolver(object): + def resolveHostName( + _self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics='TCP', + ): + + resolution = HostResolution(hostName) + resolutionReceiver.resolutionBegan(resolution) + if hostName not in self.lookups: + raise DNSLookupError("OH NO") + + resolutionReceiver.addressResolved( + IPv4Address('TCP', self.lookups[hostName], portNumber) + ) + resolutionReceiver.resolutionComplete() + return resolution + + self.nameResolver = Resolver() + super(ThreadedMemoryReactorClock, self).__init__() + + def listenUDP(self, port, protocol, interface='', maxPacketSize=8196): + p = udp.Port(port, protocol, interface, maxPacketSize, self) + p.startListening() + self._udp.append(p) + return p + def callFromThread(self, callback, *args, **kwargs): """ Make the callback fire in the next reactor iteration. diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index ccfc21b7fc..686f12a0dc 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py
@@ -183,3 +183,34 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): self.store.populate_monthly_active_users('user_id') self.pump() self.store.upsert_monthly_active_user.assert_not_called() + + def test_get_reserved_real_user_account(self): + # Test no reserved users, or reserved threepids + count = self.store.get_registered_reserved_users_count() + self.assertEquals(self.get_success(count), 0) + # Test reserved users but no registered users + + user1 = '@user1:example.com' + user2 = '@user2:example.com' + user1_email = 'user1@example.com' + user2_email = 'user2@example.com' + threepids = [ + {'medium': 'email', 'address': user1_email}, + {'medium': 'email', 'address': user2_email}, + ] + self.hs.config.mau_limits_reserved_threepids = threepids + self.store.initialise_reserved_users(threepids) + self.pump() + count = self.store.get_registered_reserved_users_count() + self.assertEquals(self.get_success(count), 0) + + # Test reserved registed users + self.store.register(user_id=user1, token="123", password_hash=None) + self.store.register(user_id=user2, token="456", password_hash=None) + self.pump() + + now = int(self.hs.get_clock().time_msec()) + self.store.user_add_threepid(user1, "email", user1_email, now, now) + self.store.user_add_threepid(user2, "email", user2_email, now, now) + count = self.store.get_registered_reserved_users_count() + self.assertEquals(self.get_success(count), len(threepids))