diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/auth.py | 1 | ||||
-rw-r--r-- | synapse/rest/client/v1_only/register.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/register.py | 2 | ||||
-rw-r--r-- | synapse/storage/monthly_active_users.py | 123 |
4 files changed, 100 insertions, 28 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 34382e4e3c..7d67ea8999 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -822,6 +822,7 @@ class Auth(object): return # Else if there is no room in the MAU bucket, bail current_mau = yield self.store.get_monthly_active_count() + print ("auth check, current_mau %d" % current_mau) if current_mau >= self.hs.config.max_mau_value: raise ResourceLimitError( 403, "Monthly Active User Limit Exceeded", diff --git a/synapse/rest/client/v1_only/register.py b/synapse/rest/client/v1_only/register.py index dadb376b02..a5830c16c1 100644 --- a/synapse/rest/client/v1_only/register.py +++ b/synapse/rest/client/v1_only/register.py @@ -295,7 +295,7 @@ class RegisterRestServlet(ClientV1RestServlet): # Necessary due to auth checks prior to the threepid being # written to the db if is_threepid_reserved(self.hs.config, threepid): - yield self.store.upsert_monthly_active_user(user_id) + self.store.upsert_monthly_active_user(user_id) if session[LoginType.EMAIL_IDENTITY]: logger.debug("Binding emails %s to %s" % ( diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 192f52e462..6dc0cca5b3 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -416,7 +416,7 @@ class RegisterRestServlet(RestServlet): # Necessary due to auth checks prior to the threepid being # written to the db if is_threepid_reserved(self.hs.config, threepid): - yield self.store.upsert_monthly_active_user(registered_user_id) + self.store.upsert_monthly_active_user(registered_user_id) # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 59580949f1..74d6deb0e8 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -14,26 +14,41 @@ # limitations under the License. import logging +from six import iteritems + from twisted.internet import defer from synapse.util.caches.descriptors import cached +from synapse.metrics.background_process_metrics import run_as_background_process +from . import background_updates -from ._base import SQLBaseStore logger = logging.getLogger(__name__) + + # Number of msec of granularity to store the monthly_active_user timestamp # This means it is not necessary to update the table on every request LAST_SEEN_GRANULARITY = 60 * 60 * 1000 -class MonthlyActiveUsersStore(SQLBaseStore): +class MonthlyActiveUsersStore(background_updates.BackgroundUpdateStore): def __init__(self, dbconn, hs): super(MonthlyActiveUsersStore, self).__init__(None, hs) + self._clock = hs.get_clock() self.hs = hs self.reserved_users = () + # user_id:timestamp + self._batch_row_update_mau = {} + self._mau_looper = self._clock.looping_call( + self._update_monthly_active_users_batch, 5 * 1000 + ) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_monthly_active_users_batch + ) + @defer.inlineCallbacks def initialise_reserved_users(self, threepids): store = self.hs.get_datastore() @@ -127,23 +142,37 @@ class MonthlyActiveUsersStore(SQLBaseStore): # is racy. # Have resolved to invalidate the whole cache for now and do # something about it if and when the perf becomes significant - self.user_last_seen_monthly_active.invalidate_all() - self.get_monthly_active_count.invalidate_all() + # self.user_last_seen_monthly_active.invalidate_all() + # self.get_monthly_active_count.invalidate_all() - @cached(num_args=0) + #@cached(num_args=0) def get_monthly_active_count(self): """Generates current count of monthly active users Returns: Defered[int]: Number of current monthly active users """ + # in_mem_new_users = 0 + # for user_id, timestamp in iteritems(self._batch_row_update_mau): + # mau_member_ts = self.user_last_seen_monthly_active(user_id) + # if mau_member_ts is None: + # in_mem_new_users = in_mem_new_users + 1 + + # Ideally I'd check in self._batch_row_update_mau adnd any outstanding + # new users to the total, but I can't because the only way to determine + # if the user is new is to call user_last_seen_monthly_active which itself + # checks in self._batch_row_update_mau and therefore will always answer + # that the user is pre-existing. def _count_users(txn): sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" txn.execute(sql) count, = txn.fetchone() + print "count is %d" % count return count + + #return defer.returnValue(self.runInteraction("count_users", _count_users, in_mem_new_users)) return self.runInteraction("count_users", _count_users) @defer.inlineCallbacks @@ -163,31 +192,21 @@ class MonthlyActiveUsersStore(SQLBaseStore): count = count + 1 defer.returnValue(count) - @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): """ - Updates or inserts monthly active user member + Adds request to updates or insert monthly active user member Arguments: user_id (str): user to add/update - Deferred[bool]: True if a new entry was created, False if an - existing one was updated. """ - is_insert = yield self._simple_upsert( - desc="upsert_monthly_active_user", - table="monthly_active_users", - keyvalues={ - "user_id": user_id, - }, - values={ - "timestamp": int(self._clock.time_msec()), - }, - lock=False, - ) - if is_insert: - self.user_last_seen_monthly_active.invalidate((user_id,)) - self.get_monthly_active_count.invalidate(()) + logger.error('upsert_monthly_active_user type of user_id is %s' % type(user_id)) + timestamp = int(self._clock.time_msec()) + self._batch_row_update_mau[user_id] = timestamp + self.user_last_seen_monthly_active.prefill(user_id, timestamp) + + # self.user_last_seen_monthly_active.invalidate((user_id,)) + # self.get_monthly_active_count.invalidate(()) - @cached(num_args=1) + #@cached(num_args=1) def user_last_seen_monthly_active(self, user_id): """ Checks if a given user is part of the monthly active user group @@ -197,6 +216,10 @@ class MonthlyActiveUsersStore(SQLBaseStore): Deferred[int] : timestamp since last seen, None if never seen """ + # Need to check in memory batch queue + # last_seen = self._batch_row_update_mau.get(user_id) + # if last_seen: + # return defer.returnValue(last_seen) return(self._simple_select_one_onecol( table="monthly_active_users", @@ -237,6 +260,54 @@ class MonthlyActiveUsersStore(SQLBaseStore): if last_seen_timestamp is None: count = yield self.get_monthly_active_count() if count < self.hs.config.max_mau_value: - yield self.upsert_monthly_active_user(user_id) + self.upsert_monthly_active_user(user_id) elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY: - yield self.upsert_monthly_active_user(user_id) + self.upsert_monthly_active_user(user_id) + + def _update_monthly_active_users_batch(self): + # If the DB pool has already terminated, don't try updating + if not self.hs.get_db_pool().running: + return + + def update(): + to_update = self._batch_row_update_mau + self._batch_row_update_mau = {} + return self.runInteraction( + "_update_monthly_active_users_batch", + self._update_monthly_active_users_batch_txn, + to_update, + ) + + #self.get_monthly_active_count.invalidate(()) + return run_as_background_process( + "update_monthly_active_users", update, + ) + + def _update_monthly_active_users_batch_txn(self, txn, to_update): + + self.database_engine.lock_table(txn, "monthly_active_users") + logger.error('to_update %r' % to_update) + for user_id, timestamp in iteritems(to_update): + logger.error("upserting %s" % user_id) + print "upserting %s" % user_id + try: + self._simple_upsert_txn( + txn, + table="monthly_active_users", + keyvalues={ + "user_id": user_id, + }, + values={ + "timestamp": timestamp, + }, + lock=False, + ) + # Not sure if I need to do this here since the result is already + # prefilled in upsert_monthly_active_user though seems safer to + # do so + #self.user_last_seen_monthly_active.invalidate((user_id,)) + except Exception as e: + # Failed to upsert, log and continue + logger.error("Failed to insert mau user %s: %r", user_id, e) + # if len(to_update) > 0: + # self.get_monthly_active_count.invalidate(()) |