summary refs log tree commit diff
path: root/synapse/storage/databases/main/monthly_active_users.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/monthly_active_users.py')
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py60
1 files changed, 36 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 216622964a..4f1c22c71b 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -15,7 +15,6 @@ import logging
 from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, cast
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
@@ -36,7 +35,7 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 60 * 60 * 1000
 
 
-class MonthlyActiveUsersWorkerStore(SQLBaseStore):
+class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -47,9 +46,30 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
         self._clock = hs.get_clock()
         self.hs = hs
 
+        if hs.config.redis.redis_enabled:
+            # If we're using Redis, we can shift this update process off to
+            # the background worker
+            self._update_on_this_worker = hs.config.worker.run_background_tasks
+        else:
+            # If we're NOT using Redis, this must be handled by the master
+            self._update_on_this_worker = hs.get_instance_name() == "master"
+
         self._limit_usage_by_mau = hs.config.server.limit_usage_by_mau
         self._max_mau_value = hs.config.server.max_mau_value
 
+        self._mau_stats_only = hs.config.server.mau_stats_only
+
+        if self._update_on_this_worker:
+            # Do not add more reserved users than the total allowable number
+            self.db_pool.new_transaction(
+                db_conn,
+                "initialise_mau_threepids",
+                [],
+                [],
+                self._initialise_reserved_users,
+                hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
+            )
+
     @cached(num_args=0)
     async def get_monthly_active_count(self) -> int:
         """Generates current count of monthly active users
@@ -222,28 +242,6 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
             "reap_monthly_active_users", _reap_users, reserved_users
         )
 
-
-class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore, RegistrationWorkerStore):
-    def __init__(
-        self,
-        database: DatabasePool,
-        db_conn: LoggingDatabaseConnection,
-        hs: "HomeServer",
-    ):
-        super().__init__(database, db_conn, hs)
-
-        self._mau_stats_only = hs.config.server.mau_stats_only
-
-        # Do not add more reserved users than the total allowable number
-        self.db_pool.new_transaction(
-            db_conn,
-            "initialise_mau_threepids",
-            [],
-            [],
-            self._initialise_reserved_users,
-            hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
-        )
-
     def _initialise_reserved_users(
         self, txn: LoggingTransaction, threepids: List[dict]
     ) -> None:
@@ -254,6 +252,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore, RegistrationWorkerS
             txn:
             threepids: List of threepid dicts to reserve
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update MAUs"
 
         # XXX what is this function trying to achieve?  It upserts into
         # monthly_active_users for each *registered* reserved mau user, but why?
@@ -287,6 +288,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore, RegistrationWorkerS
         Args:
             user_id: user to add/update
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update MAUs"
+
         # Support user never to be included in MAU stats. Note I can't easily call this
         # from upsert_monthly_active_user_txn because then I need a _txn form of
         # is_support_user which is complicated because I want to cache the result.
@@ -322,6 +327,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore, RegistrationWorkerS
             txn (cursor):
             user_id (str): user to add/update
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update MAUs"
 
         # Am consciously deciding to lock the table on the basis that is ought
         # never be a big table and alternative approaches (batching multiple
@@ -349,6 +357,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore, RegistrationWorkerS
         Args:
             user_id(str): the user_id to query
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update MAUs"
+
         if self._limit_usage_by_mau or self._mau_stats_only:
             # Trial users and guests should not be included as part of MAU group
             is_guest = await self.is_guest(user_id)  # type: ignore[attr-defined]