diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 34382e4e3c..7d67ea8999 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -822,6 +822,7 @@ class Auth(object):
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
+ print ("auth check, current_mau %d" % current_mau)
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
diff --git a/synapse/rest/client/v1_only/register.py b/synapse/rest/client/v1_only/register.py
index dadb376b02..a5830c16c1 100644
--- a/synapse/rest/client/v1_only/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -295,7 +295,7 @@ class RegisterRestServlet(ClientV1RestServlet):
# Necessary due to auth checks prior to the threepid being
# written to the db
if is_threepid_reserved(self.hs.config, threepid):
- yield self.store.upsert_monthly_active_user(user_id)
+ self.store.upsert_monthly_active_user(user_id)
if session[LoginType.EMAIL_IDENTITY]:
logger.debug("Binding emails %s to %s" % (
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 192f52e462..6dc0cca5b3 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -416,7 +416,7 @@ class RegisterRestServlet(RestServlet):
# Necessary due to auth checks prior to the threepid being
# written to the db
if is_threepid_reserved(self.hs.config, threepid):
- yield self.store.upsert_monthly_active_user(registered_user_id)
+ self.store.upsert_monthly_active_user(registered_user_id)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949f1..74d6deb0e8 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -14,26 +14,41 @@
# limitations under the License.
import logging
+from six import iteritems
+
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
+from synapse.metrics.background_process_metrics import run_as_background_process
+from . import background_updates
-from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
+
+
# Number of msec of granularity to store the monthly_active_user timestamp
# This means it is not necessary to update the table on every request
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
-class MonthlyActiveUsersStore(SQLBaseStore):
+class MonthlyActiveUsersStore(background_updates.BackgroundUpdateStore):
def __init__(self, dbconn, hs):
super(MonthlyActiveUsersStore, self).__init__(None, hs)
+
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
+ # user_id:timestamp
+ self._batch_row_update_mau = {}
+ self._mau_looper = self._clock.looping_call(
+ self._update_monthly_active_users_batch, 5 * 1000
+ )
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_monthly_active_users_batch
+ )
+
@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
store = self.hs.get_datastore()
@@ -127,23 +142,37 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# is racy.
# Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant
- self.user_last_seen_monthly_active.invalidate_all()
- self.get_monthly_active_count.invalidate_all()
+ # self.user_last_seen_monthly_active.invalidate_all()
+ # self.get_monthly_active_count.invalidate_all()
- @cached(num_args=0)
+ #@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
+ # in_mem_new_users = 0
+ # for user_id, timestamp in iteritems(self._batch_row_update_mau):
+ # mau_member_ts = self.user_last_seen_monthly_active(user_id)
+ # if mau_member_ts is None:
+ # in_mem_new_users = in_mem_new_users + 1
+
+ # Ideally I'd check in self._batch_row_update_mau adnd any outstanding
+ # new users to the total, but I can't because the only way to determine
+ # if the user is new is to call user_last_seen_monthly_active which itself
+ # checks in self._batch_row_update_mau and therefore will always answer
+ # that the user is pre-existing.
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
count, = txn.fetchone()
+ print "count is %d" % count
return count
+
+ #return defer.returnValue(self.runInteraction("count_users", _count_users, in_mem_new_users))
return self.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
@@ -163,31 +192,21 @@ class MonthlyActiveUsersStore(SQLBaseStore):
count = count + 1
defer.returnValue(count)
- @defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
"""
- Updates or inserts monthly active user member
+ Adds request to updates or insert monthly active user member
Arguments:
user_id (str): user to add/update
- Deferred[bool]: True if a new entry was created, False if an
- existing one was updated.
"""
- is_insert = yield self._simple_upsert(
- desc="upsert_monthly_active_user",
- table="monthly_active_users",
- keyvalues={
- "user_id": user_id,
- },
- values={
- "timestamp": int(self._clock.time_msec()),
- },
- lock=False,
- )
- if is_insert:
- self.user_last_seen_monthly_active.invalidate((user_id,))
- self.get_monthly_active_count.invalidate(())
+ logger.error('upsert_monthly_active_user type of user_id is %s' % type(user_id))
+ timestamp = int(self._clock.time_msec())
+ self._batch_row_update_mau[user_id] = timestamp
+ self.user_last_seen_monthly_active.prefill(user_id, timestamp)
+
+ # self.user_last_seen_monthly_active.invalidate((user_id,))
+ # self.get_monthly_active_count.invalidate(())
- @cached(num_args=1)
+ #@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
@@ -197,6 +216,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[int] : timestamp since last seen, None if never seen
"""
+ # Need to check in memory batch queue
+ # last_seen = self._batch_row_update_mau.get(user_id)
+ # if last_seen:
+ # return defer.returnValue(last_seen)
return(self._simple_select_one_onecol(
table="monthly_active_users",
@@ -237,6 +260,54 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if last_seen_timestamp is None:
count = yield self.get_monthly_active_count()
if count < self.hs.config.max_mau_value:
- yield self.upsert_monthly_active_user(user_id)
+ self.upsert_monthly_active_user(user_id)
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
- yield self.upsert_monthly_active_user(user_id)
+ self.upsert_monthly_active_user(user_id)
+
+ def _update_monthly_active_users_batch(self):
+ # If the DB pool has already terminated, don't try updating
+ if not self.hs.get_db_pool().running:
+ return
+
+ def update():
+ to_update = self._batch_row_update_mau
+ self._batch_row_update_mau = {}
+ return self.runInteraction(
+ "_update_monthly_active_users_batch",
+ self._update_monthly_active_users_batch_txn,
+ to_update,
+ )
+
+ #self.get_monthly_active_count.invalidate(())
+ return run_as_background_process(
+ "update_monthly_active_users", update,
+ )
+
+ def _update_monthly_active_users_batch_txn(self, txn, to_update):
+
+ self.database_engine.lock_table(txn, "monthly_active_users")
+ logger.error('to_update %r' % to_update)
+ for user_id, timestamp in iteritems(to_update):
+ logger.error("upserting %s" % user_id)
+ print "upserting %s" % user_id
+ try:
+ self._simple_upsert_txn(
+ txn,
+ table="monthly_active_users",
+ keyvalues={
+ "user_id": user_id,
+ },
+ values={
+ "timestamp": timestamp,
+ },
+ lock=False,
+ )
+ # Not sure if I need to do this here since the result is already
+ # prefilled in upsert_monthly_active_user though seems safer to
+ # do so
+ #self.user_last_seen_monthly_active.invalidate((user_id,))
+ except Exception as e:
+ # Failed to upsert, log and continue
+ logger.error("Failed to insert mau user %s: %r", user_id, e)
+ # if len(to_update) > 0:
+ # self.get_monthly_active_count.invalidate(())
|