diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b85b377e3..593e1e75db 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -553,14 +553,6 @@ def run(hs):
generate_monthly_active_users,
)
- # XXX is this really supposed to be a background process? it looks
- # like it needs to complete before some of the other stuff runs.
- run_as_background_process(
- "initialise_reserved_users",
- hs.get_datastore().initialise_reserved_users,
- hs.config.mau_limits_reserved_threepids,
- )
-
start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 0fe8c8e24c..26e577814a 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -33,19 +33,28 @@ class MonthlyActiveUsersStore(SQLBaseStore):
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
+ self.initialise_reserved_users(
+ dbconn.cursor(), hs.config.mau_limits_reserved_threepids
+ )
- @defer.inlineCallbacks
- def initialise_reserved_users(self, threepids):
- store = self.hs.get_datastore()
+ def initialise_reserved_users(self, txn, threepids):
+ """
+ Ensures that reserved threepids are accounted for in the MAU table, should
+ be called on start up.
+
+ Arguments:
+ threepids []: List of threepid dicts to reserve
+ """
reserved_user_list = []
# Do not add more reserved users than the total allowable number
for tp in threepids[:self.hs.config.max_mau_value]:
- user_id = yield store.get_user_id_by_threepid(
+ user_id = self.get_user_id_by_threepid_txn(
+ txn,
tp["medium"], tp["address"]
)
if user_id:
- yield self.upsert_monthly_active_user(user_id)
+ self.upsert_monthly_active_user_txn(txn, user_id)
reserved_user_list.append(user_id)
else:
logger.warning(
@@ -55,8 +64,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def reap_monthly_active_users(self):
- """
- Cleans out monthly active user table to ensure that no stale
+ """Cleans out monthly active user table to ensure that no stale
entries exist.
Returns:
@@ -165,19 +173,33 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
+ """Updates or inserts monthly active user member
+ Arguments:
+ user_id (str): user to add/update
+ """
+ is_insert = yield self.runInteraction(
+ "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
+ user_id
+ )
+ if is_insert:
+ self.user_last_seen_monthly_active.invalidate((user_id,))
+ self.get_monthly_active_count.invalidate(())
+
+ def upsert_monthly_active_user_txn(self, txn, user_id):
"""
Updates or inserts monthly active user member
Arguments:
+ txn (cursor):
user_id (str): user to add/update
- Deferred[bool]: True if a new entry was created, False if an
+ bool: True if a new entry was created, False if an
existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
- is_insert = yield self._simple_upsert(
- desc="upsert_monthly_active_user",
+ is_insert = self._simple_upsert_txn(
+ txn,
table="monthly_active_users",
keyvalues={
"user_id": user_id,
@@ -186,9 +208,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"timestamp": int(self._clock.time_msec()),
},
)
- if is_insert:
- self.user_last_seen_monthly_active.invalidate((user_id,))
- self.get_monthly_active_count.invalidate(())
+ return is_insert
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26b429e307..01931f29cc 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -474,17 +474,25 @@ class RegistrationStore(RegistrationWorkerStore,
@defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address):
- ret = yield self._simple_select_one(
+ user_id = yield self.runInteraction(
+ "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
+ medium, address
+ )
+ defer.returnValue(user_id)
+
+ def get_user_id_by_threepid_txn(self, txn, medium, address):
+ ret = self._simple_select_one_txn(
+ txn,
"user_threepids",
{
"medium": medium,
"address": address
},
- ['user_id'], True, 'get_user_id_by_threepid'
+ ['user_id'], True
)
if ret:
- defer.returnValue(ret['user_id'])
- defer.returnValue(None)
+ return ret['user_id']
+ return None
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
|