summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4081.bugfix2
-rwxr-xr-xsynapse/app/homeserver.py8
-rw-r--r--synapse/server.py1
-rw-r--r--synapse/storage/monthly_active_users.py71
-rw-r--r--synapse/storage/registration.py35
-rw-r--r--tests/storage/test_monthly_active_users.py10
6 files changed, 94 insertions, 33 deletions
diff --git a/changelog.d/4081.bugfix b/changelog.d/4081.bugfix
new file mode 100644
index 0000000000..cfe4b3e9d9
--- /dev/null
+++ b/changelog.d/4081.bugfix
@@ -0,0 +1,2 @@
+Fix race condition where config defined reserved users were not being added to
+the monthly active user list prior to the homeserver reactor firing up
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b85b377e3..593e1e75db 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -553,14 +553,6 @@ def run(hs):
             generate_monthly_active_users,
         )
 
-    # XXX is this really supposed to be a background process? it looks
-    # like it needs to complete before some of the other stuff runs.
-    run_as_background_process(
-        "initialise_reserved_users",
-        hs.get_datastore().initialise_reserved_users,
-        hs.config.mau_limits_reserved_threepids,
-    )
-
     start_generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
         clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
diff --git a/synapse/server.py b/synapse/server.py
index 3e9d3d8256..cf6b872cbd 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -207,6 +207,7 @@ class HomeServer(object):
         logger.info("Setting up.")
         with self.get_db_conn() as conn:
             self.datastore = self.DATASTORE_CLASS(conn, self)
+            conn.commit()
         logger.info("Finished setting up.")
 
     def get_reactor(self):
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 0fe8c8e24c..cf4104dc2e 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -33,19 +33,29 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         self._clock = hs.get_clock()
         self.hs = hs
         self.reserved_users = ()
+        # Do not add more reserved users than the total allowable number
+        self._initialise_reserved_users(
+            dbconn.cursor(),
+            hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
+        )
 
-    @defer.inlineCallbacks
-    def initialise_reserved_users(self, threepids):
-        store = self.hs.get_datastore()
+    def _initialise_reserved_users(self, txn, threepids):
+        """Ensures that reserved threepids are accounted for in the MAU table, should
+        be called on start up.
+
+        Args:
+            txn (cursor):
+            threepids (list[dict]): List of threepid dicts to reserve
+        """
         reserved_user_list = []
 
-        # Do not add more reserved users than the total allowable number
-        for tp in threepids[:self.hs.config.max_mau_value]:
-            user_id = yield store.get_user_id_by_threepid(
+        for tp in threepids:
+            user_id = self.get_user_id_by_threepid_txn(
+                txn,
                 tp["medium"], tp["address"]
             )
             if user_id:
-                yield self.upsert_monthly_active_user(user_id)
+                self.upsert_monthly_active_user_txn(txn, user_id)
                 reserved_user_list.append(user_id)
             else:
                 logger.warning(
@@ -55,8 +65,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def reap_monthly_active_users(self):
-        """
-        Cleans out monthly active user table to ensure that no stale
+        """Cleans out monthly active user table to ensure that no stale
         entries exist.
 
         Returns:
@@ -165,19 +174,44 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def upsert_monthly_active_user(self, user_id):
+        """Updates or inserts the user into the monthly active user table, which
+        is used to track the current MAU usage of the server
+
+        Args:
+            user_id (str): user to add/update
         """
-            Updates or inserts monthly active user member
-            Arguments:
-                user_id (str): user to add/update
-            Deferred[bool]: True if a new entry was created, False if an
-                existing one was updated.
+        is_insert = yield self.runInteraction(
+            "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
+            user_id
+        )
+
+        if is_insert:
+            self.user_last_seen_monthly_active.invalidate((user_id,))
+            self.get_monthly_active_count.invalidate(())
+
+    def upsert_monthly_active_user_txn(self, txn, user_id):
+        """Updates or inserts monthly active user member
+
+        Note that, after calling this method, it will generally be necessary
+        to invalidate the caches on user_last_seen_monthly_active and
+        get_monthly_active_count. We can't do that here, because we are running
+        in a database thread rather than the main thread, and we can't call
+        txn.call_after because txn may not be a LoggingTransaction.
+
+        Args:
+            txn (cursor):
+            user_id (str): user to add/update
+
+        Returns:
+            bool: True if a new entry was created, False if an
+            existing one was updated.
         """
         # Am consciously deciding to lock the table on the basis that is ought
         # never be a big table and alternative approaches (batching multiple
         # upserts into a single txn) introduced a lot of extra complexity.
         # See https://github.com/matrix-org/synapse/issues/3854 for more
-        is_insert = yield self._simple_upsert(
-            desc="upsert_monthly_active_user",
+        is_insert = self._simple_upsert_txn(
+            txn,
             table="monthly_active_users",
             keyvalues={
                 "user_id": user_id,
@@ -186,9 +220,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 "timestamp": int(self._clock.time_msec()),
             },
         )
-        if is_insert:
-            self.user_last_seen_monthly_active.invalidate((user_id,))
-            self.get_monthly_active_count.invalidate(())
+
+        return is_insert
 
     @cached(num_args=1)
     def user_last_seen_monthly_active(self, user_id):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 2dd14aba1c..80d76bf9d7 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -474,17 +474,44 @@ class RegistrationStore(RegistrationWorkerStore,
 
     @defer.inlineCallbacks
     def get_user_id_by_threepid(self, medium, address):
-        ret = yield self._simple_select_one(
+        """Returns user id from threepid
+
+        Args:
+            medium (str): threepid medium e.g. email
+            address (str): threepid address e.g. me@example.com
+
+        Returns:
+            Deferred[str|None]: user id or None if no user id/threepid mapping exists
+        """
+        user_id = yield self.runInteraction(
+            "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
+            medium, address
+        )
+        defer.returnValue(user_id)
+
+    def get_user_id_by_threepid_txn(self, txn, medium, address):
+        """Returns user id from threepid
+
+        Args:
+            txn (cursor):
+            medium (str): threepid medium e.g. email
+            address (str): threepid address e.g. me@example.com
+
+        Returns:
+            str|None: user id or None if no user id/threepid mapping exists
+        """
+        ret = self._simple_select_one_txn(
+            txn,
             "user_threepids",
             {
                 "medium": medium,
                 "address": address
             },
-            ['user_id'], True, 'get_user_id_by_threepid'
+            ['user_id'], True
         )
         if ret:
-            defer.returnValue(ret['user_id'])
-        defer.returnValue(None)
+            return ret['user_id']
+        return None
 
     def user_delete_threepid(self, user_id, medium, address):
         return self._simple_delete(
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 686f12a0dc..832e379a83 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -52,7 +52,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
         now = int(self.hs.get_clock().time_msec())
         self.store.user_add_threepid(user1, "email", user1_email, now, now)
         self.store.user_add_threepid(user2, "email", user2_email, now, now)
-        self.store.initialise_reserved_users(threepids)
+
+        self.store.runInteraction(
+            "initialise", self.store._initialise_reserved_users, threepids
+        )
         self.pump()
 
         active_count = self.store.get_monthly_active_count()
@@ -199,7 +202,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
             {'medium': 'email', 'address': user2_email},
         ]
         self.hs.config.mau_limits_reserved_threepids = threepids
-        self.store.initialise_reserved_users(threepids)
+        self.store.runInteraction(
+            "initialise", self.store._initialise_reserved_users, threepids
+        )
+
         self.pump()
         count = self.store.get_registered_reserved_users_count()
         self.assertEquals(self.get_success(count), 0)