diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/auth.py | 14 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 11 | ||||
-rw-r--r-- | synapse/config/server.py | 7 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 17 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 89 | ||||
-rw-r--r-- | synapse/handlers/register.py | 10 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 28 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 4 | ||||
-rw-r--r-- | synapse/storage/events.py | 11 | ||||
-rw-r--r-- | synapse/storage/monthly_active_users.py | 201 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/51/monthly_active_users.sql | 27 |
12 files changed, 334 insertions, 87 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5bbbe8e2e7..91b23ff1d7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -213,7 +213,7 @@ class Auth(object): default=[b""] )[0] if user and access_token and ip_addr: - self.store.insert_client_ip( + yield self.store.insert_client_ip( user_id=user.to_string(), access_token=access_token, ip=ip_addr, @@ -773,3 +773,15 @@ class Auth(object): raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) + + @defer.inlineCallbacks + def check_auth_blocking(self): + """Checks if the user should be rejected for some external reason, + such as monthly active user limiting or global disable flag + """ + if self.hs.config.limit_usage_by_mau is True: + current_mau = yield self.store.get_monthly_active_count() + if current_mau >= self.hs.config.max_mau_value: + raise AuthError( + 403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED + ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fba51c26e8..a4a65e7286 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -519,17 +519,26 @@ def run(hs): # table will decrease clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + # monthly active user limiting functionality + clock.looping_call( + hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 + ) + @defer.inlineCallbacks def generate_monthly_active_users(): count = 0 if hs.config.limit_usage_by_mau: - count = yield hs.get_datastore().count_monthly_users() + count = yield hs.get_datastore().get_monthly_active_count() current_mau_gauge.set(float(count)) max_mau_value_gauge.set(float(hs.config.max_mau_value)) + hs.get_datastore().initialise_reserved_users( + hs.config.mau_limits_reserved_threepids + ) generate_monthly_active_users() if hs.config.limit_usage_by_mau: clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + # End of monthly active user settings if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") diff --git a/synapse/config/server.py b/synapse/config/server.py index 6a471a0a5e..114d7a9815 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -69,12 +69,15 @@ class ServerConfig(Config): # Options to control access by tracking MAU self.limit_usage_by_mau = config.get("limit_usage_by_mau", False) + self.max_mau_value = 0 if self.limit_usage_by_mau: self.max_mau_value = config.get( "max_mau_value", 0, ) - else: - self.max_mau_value = 0 + self.mau_limits_reserved_threepids = config.get( + "mau_limit_reserved_threepids", [] + ) + # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None federation_domain_whitelist = config.get( diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 184eef09d0..7ea8ce9f94 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -520,7 +520,7 @@ class AuthHandler(BaseHandler): """ logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) - yield self._check_mau_limits() + yield self.auth.check_auth_blocking() # the device *should* have been registered before we got here; however, # it's possible we raced against a DELETE operation. The thing we @@ -734,7 +734,7 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def validate_short_term_login_token_and_get_user_id(self, login_token): - yield self._check_mau_limits() + yield self.auth.check_auth_blocking() auth_api = self.hs.get_auth() user_id = None try: @@ -907,19 +907,6 @@ class AuthHandler(BaseHandler): else: return defer.succeed(False) - @defer.inlineCallbacks - def _check_mau_limits(self): - """ - Ensure that if mau blocking is enabled that invalid users cannot - log in. - """ - if self.hs.config.limit_usage_by_mau is True: - current_mau = yield self.store.count_monthly_users() - if current_mau >= self.hs.config.max_mau_value: - raise AuthError( - 403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED - ) - @attr.s class MacaroonGenerator(object): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb5c6d587e..9af2e8f869 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,13 @@ import logging from twisted.internet import defer -from synapse.api.errors import AuthError, CodeMessageException, SynapseError +from synapse.api.errors import ( + AuthError, + CodeMessageException, + Codes, + StoreError, + SynapseError, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, get_domain_from_id @@ -49,12 +55,17 @@ class ProfileHandler(BaseHandler): def get_profile(self, user_id): target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): - displayname = yield self.store.get_profile_displayname( - target_user.localpart - ) - avatar_url = yield self.store.get_profile_avatar_url( - target_user.localpart - ) + try: + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) + raise defer.returnValue({ "displayname": displayname, @@ -74,7 +85,6 @@ class ProfileHandler(BaseHandler): except CodeMessageException as e: if e.code != 404: logger.exception("Failed to get displayname") - raise @defer.inlineCallbacks @@ -85,12 +95,17 @@ class ProfileHandler(BaseHandler): """ target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): - displayname = yield self.store.get_profile_displayname( - target_user.localpart - ) - avatar_url = yield self.store.get_profile_avatar_url( - target_user.localpart - ) + try: + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) + raise defer.returnValue({ "displayname": displayname, @@ -103,9 +118,14 @@ class ProfileHandler(BaseHandler): @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): - displayname = yield self.store.get_profile_displayname( - target_user.localpart - ) + try: + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) + raise defer.returnValue(displayname) else: @@ -122,7 +142,6 @@ class ProfileHandler(BaseHandler): except CodeMessageException as e: if e.code != 404: logger.exception("Failed to get displayname") - raise except Exception: logger.exception("Failed to get displayname") @@ -157,10 +176,14 @@ class ProfileHandler(BaseHandler): @defer.inlineCallbacks def get_avatar_url(self, target_user): if self.hs.is_mine(target_user): - avatar_url = yield self.store.get_profile_avatar_url( - target_user.localpart - ) - + try: + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) + raise defer.returnValue(avatar_url) else: try: @@ -213,16 +236,20 @@ class ProfileHandler(BaseHandler): just_field = args.get("field", None) response = {} + try: + if just_field is None or just_field == "displayname": + response["displayname"] = yield self.store.get_profile_displayname( + user.localpart + ) - if just_field is None or just_field == "displayname": - response["displayname"] = yield self.store.get_profile_displayname( - user.localpart - ) - - if just_field is None or just_field == "avatar_url": - response["avatar_url"] = yield self.store.get_profile_avatar_url( - user.localpart - ) + if just_field is None or just_field == "avatar_url": + response["avatar_url"] = yield self.store.get_profile_avatar_url( + user.localpart + ) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) + raise defer.returnValue(response) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 289704b241..0e16bbe0ee 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -540,9 +540,7 @@ class RegistrationHandler(BaseHandler): Do not accept registrations if monthly active user limits exceeded and limiting is enabled """ - if self.hs.config.limit_usage_by_mau is True: - current_mau = yield self.store.count_monthly_users() - if current_mau >= self.hs.config.max_mau_value: - raise RegistrationError( - 403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED - ) + try: + yield self.auth.check_auth_blocking() + except AuthError as e: + raise RegistrationError(e.code, str(e), e.errcode) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 134e4a80f1..23b4a8d76d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -39,6 +39,7 @@ from .filtering import FilteringStore from .group_server import GroupServerStore from .keys import KeyStore from .media_repository import MediaRepositoryStore +from .monthly_active_users import MonthlyActiveUsersStore from .openid import OpenIdStore from .presence import PresenceStore, UserPresenceState from .profile import ProfileStore @@ -87,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, UserDirectoryStore, GroupServerStore, UserErasureStore, + MonthlyActiveUsersStore, ): def __init__(self, db_conn, hs): @@ -94,7 +96,6 @@ class DataStore(RoomMemberStore, RoomStore, self._clock = hs.get_clock() self.database_engine = hs.database_engine - self.db_conn = db_conn self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -267,31 +268,6 @@ class DataStore(RoomMemberStore, RoomStore, return self.runInteraction("count_users", _count_users) - def count_monthly_users(self): - """Counts the number of users who used this homeserver in the last 30 days - - This method should be refactored with count_daily_users - the only - reason not to is waiting on definition of mau - - Returns: - Defered[int] - """ - def _count_monthly_users(txn): - thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) - sql = """ - SELECT COALESCE(count(*), 0) FROM ( - SELECT user_id FROM user_ips - WHERE last_seen > ? - GROUP BY user_id - ) u - """ - - txn.execute(sql, (thirty_days_ago,)) - count, = txn.fetchone() - return count - - return self.runInteraction("count_monthly_users", _count_monthly_users) - def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b8cefd43d6..2489527f2c 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -35,6 +35,7 @@ LAST_SEEN_GRANULARITY = 120 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): + self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, @@ -74,6 +75,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "before", "shutdown", self._update_client_ips_batch ) + @defer.inlineCallbacks def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): if not now: @@ -84,7 +86,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): last_seen = self.client_ip_last_seen.get(key) except KeyError: last_seen = None - + yield self.populate_monthly_active_users(user_id) # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e8e5a0fe44..ce32e8fefd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -485,9 +485,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc(len(chunk)) - synapse.metrics.event_persisted_position.set( - chunk[-1][0].internal_metadata.stream_ordering, - ) + + if not backfilled: + # backfilled events have negative stream orderings, so we don't + # want to set the event_persisted_position to that. + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) + for event, context in chunk: if context.app_service: origin_type = "local" diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py new file mode 100644 index 0000000000..d47dcef3a0 --- /dev/null +++ b/synapse/storage/monthly_active_users.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer + +from synapse.util.caches.descriptors import cached + +from ._base import SQLBaseStore + +logger = logging.getLogger(__name__) + +# Number of msec of granularity to store the monthly_active_user timestamp +# This means it is not necessary to update the table on every request +LAST_SEEN_GRANULARITY = 60 * 60 * 1000 + + +class MonthlyActiveUsersStore(SQLBaseStore): + def __init__(self, dbconn, hs): + super(MonthlyActiveUsersStore, self).__init__(None, hs) + self._clock = hs.get_clock() + self.hs = hs + self.reserved_users = () + + @defer.inlineCallbacks + def initialise_reserved_users(self, threepids): + # TODO Why can't I do this in init? + store = self.hs.get_datastore() + reserved_user_list = [] + + # Do not add more reserved users than the total allowable number + for tp in threepids[:self.hs.config.max_mau_value]: + user_id = yield store.get_user_id_by_threepid( + tp["medium"], tp["address"] + ) + if user_id: + self.upsert_monthly_active_user(user_id) + reserved_user_list.append(user_id) + else: + logger.warning( + "mau limit reserved threepid %s not found in db" % tp + ) + self.reserved_users = tuple(reserved_user_list) + + @defer.inlineCallbacks + def reap_monthly_active_users(self): + """ + Cleans out monthly active user table to ensure that no stale + entries exist. + + Returns: + Deferred[] + """ + def _reap_users(txn): + + thirty_days_ago = ( + int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) + ) + # Purge stale users + + # questionmarks is a hack to overcome sqlite not supporting + # tuples in 'WHERE IN %s' + questionmarks = '?' * len(self.reserved_users) + query_args = [thirty_days_ago] + query_args.extend(self.reserved_users) + + sql = """ + DELETE FROM monthly_active_users + WHERE timestamp < ? + AND user_id NOT IN ({}) + """.format(','.join(questionmarks)) + + txn.execute(sql, query_args) + + # If MAU user count still exceeds the MAU threshold, then delete on + # a least recently active basis. + # Note it is not possible to write this query using OFFSET due to + # incompatibilities in how sqlite and postgres support the feature. + # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present + # While Postgres does not require 'LIMIT', but also does not support + # negative LIMIT values. So there is no way to write it that both can + # support + query_args = [self.hs.config.max_mau_value] + query_args.extend(self.reserved_users) + sql = """ + DELETE FROM monthly_active_users + WHERE user_id NOT IN ( + SELECT user_id FROM monthly_active_users + ORDER BY timestamp DESC + LIMIT ? + ) + AND user_id NOT IN ({}) + """.format(','.join(questionmarks)) + txn.execute(sql, query_args) + + yield self.runInteraction("reap_monthly_active_users", _reap_users) + # It seems poor to invalidate the whole cache, Postgres supports + # 'Returning' which would allow me to invalidate only the + # specific users, but sqlite has no way to do this and instead + # I would need to SELECT and the DELETE which without locking + # is racy. + # Have resolved to invalidate the whole cache for now and do + # something about it if and when the perf becomes significant + self._user_last_seen_monthly_active.invalidate_all() + self.get_monthly_active_count.invalidate_all() + + @cached(num_args=0) + def get_monthly_active_count(self): + """Generates current count of monthly active users + + Returns: + Defered[int]: Number of current monthly active users + """ + + def _count_users(txn): + sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" + + txn.execute(sql) + count, = txn.fetchone() + return count + return self.runInteraction("count_users", _count_users) + + def upsert_monthly_active_user(self, user_id): + """ + Updates or inserts monthly active user member + Arguments: + user_id (str): user to add/update + Deferred[bool]: True if a new entry was created, False if an + existing one was updated. + """ + is_insert = self._simple_upsert( + desc="upsert_monthly_active_user", + table="monthly_active_users", + keyvalues={ + "user_id": user_id, + }, + values={ + "timestamp": int(self._clock.time_msec()), + }, + lock=False, + ) + if is_insert: + self._user_last_seen_monthly_active.invalidate((user_id,)) + self.get_monthly_active_count.invalidate(()) + + @cached(num_args=1) + def _user_last_seen_monthly_active(self, user_id): + """ + Checks if a given user is part of the monthly active user group + Arguments: + user_id (str): user to add/update + Return: + Deferred[int] : timestamp since last seen, None if never seen + + """ + + return(self._simple_select_one_onecol( + table="monthly_active_users", + keyvalues={ + "user_id": user_id, + }, + retcol="timestamp", + allow_none=True, + desc="_user_last_seen_monthly_active", + )) + + @defer.inlineCallbacks + def populate_monthly_active_users(self, user_id): + """Checks on the state of monthly active user limits and optionally + add the user to the monthly active tables + + Args: + user_id(str): the user_id to query + """ + if self.hs.config.limit_usage_by_mau: + last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id) + now = self.hs.get_clock().time_msec() + + # We want to reduce to the total number of db writes, and are happy + # to trade accuracy of timestamp in order to lighten load. This means + # We always insert new users (where MAU threshold has not been reached), + # but only update if we have not previously seen the user for + # LAST_SEEN_GRANULARITY ms + if last_seen_timestamp is None: + count = yield self.get_monthly_active_count() + if count < self.hs.config.max_mau_value: + yield self.upsert_monthly_active_user(user_id) + elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY: + yield self.upsert_monthly_active_user(user_id) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b290f834b3..b364719312 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 50 +SCHEMA_VERSION = 51 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/51/monthly_active_users.sql b/synapse/storage/schema/delta/51/monthly_active_users.sql new file mode 100644 index 0000000000..c9d537d5a3 --- /dev/null +++ b/synapse/storage/schema/delta/51/monthly_active_users.sql @@ -0,0 +1,27 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a table of monthly active users, for use where blocking based on mau limits +CREATE TABLE monthly_active_users ( + user_id TEXT NOT NULL, + -- Last time we saw the user. Not guaranteed to be accurate due to rate limiting + -- on updates, Granularity of updates governed by + -- synapse.storage.monthly_active_users.LAST_SEEN_GRANULARITY + -- Measured in ms since epoch. + timestamp BIGINT NOT NULL +); + +CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users(user_id); +CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users(timestamp); |