summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/event_creator.py4
-rw-r--r--synapse/app/federation_reader.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py165
5 files changed, 99 insertions, 82 deletions
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 3edfe19567..ca96da6a4a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.rest.client.versions import VersionsRestServlet
 from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+    MonthlyActiveUsersWorkerStore,
+)
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
     SlavedTransactionStore,
     SlavedProfileStore,
     SlavedClientIpStore,
+    MonthlyActiveUsersWorkerStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index d0ddbe38fc..58e5b354f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
     RoomStateEventRestServlet,
 )
 from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+    MonthlyActiveUsersWorkerStore,
+)
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
     SlavedEventStore,
     SlavedRegistrationStore,
     RoomStore,
+    MonthlyActiveUsersWorkerStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 311523e0ed..1f1cea1416 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+    MonthlyActiveUsersWorkerStore,
+)
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
     RoomStore,
     DirectoryStore,
     SlavedTransactionStore,
+    MonthlyActiveUsersWorkerStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3218da07bd..8982c0676e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
 from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+    MonthlyActiveUsersWorkerStore,
+)
 from synapse.storage.data_stores.main.presence import UserPresenceState
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
     SlavedEventStore,
     SlavedClientIpStore,
     RoomStore,
+    MonthlyActiveUsersWorkerStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 27158534cb..89a41542a3 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -27,12 +27,76 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 60 * 60 * 1000
 
 
-class MonthlyActiveUsersStore(SQLBaseStore):
+class MonthlyActiveUsersWorkerStore(SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
-        super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
+        super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
         self._clock = hs.get_clock()
         self.hs = hs
+
+    @cached(num_args=0)
+    def get_monthly_active_count(self):
+        """Generates current count of monthly active users
+
+        Returns:
+            Defered[int]: Number of current monthly active users
+        """
+
+        def _count_users(txn):
+            sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
+
+            txn.execute(sql)
+            (count,) = txn.fetchone()
+            return count
+
+        return self.db.runInteraction("count_users", _count_users)
+
+    @defer.inlineCallbacks
+    def get_registered_reserved_users(self):
+        """Of the reserved threepids defined in config, which are associated
+        with registered users?
+
+        Returns:
+            Defered[list]: Real reserved users
+        """
+        users = []
+
+        for tp in self.hs.config.mau_limits_reserved_threepids[
+            : self.hs.config.max_mau_value
+        ]:
+            user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+                tp["medium"], tp["address"]
+            )
+            if user_id:
+                users.append(user_id)
+
+        return users
+
+    @cached(num_args=1)
+    def user_last_seen_monthly_active(self, user_id):
+        """
+            Checks if a given user is part of the monthly active user group
+            Arguments:
+                user_id (str): user to add/update
+            Return:
+                Deferred[int] : timestamp since last seen, None if never seen
+
+        """
+
+        return self.db.simple_select_one_onecol(
+            table="monthly_active_users",
+            keyvalues={"user_id": user_id},
+            retcol="timestamp",
+            allow_none=True,
+            desc="user_last_seen_monthly_active",
+        )
+
+
+class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
+    def __init__(self, database: Database, db_conn, hs):
+        super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
+
         # Do not add more reserved users than the total allowable number
+        # cur = LoggingTransaction(
         self.db.new_transaction(
             db_conn,
             "initialise_mau_threepids",
@@ -146,57 +210,22 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
                     txn.execute(sql, query_args)
 
+            # It seems poor to invalidate the whole cache, Postgres supports
+            # 'Returning' which would allow me to invalidate only the
+            # specific users, but sqlite has no way to do this and instead
+            # I would need to SELECT and the DELETE which without locking
+            # is racy.
+            # Have resolved to invalidate the whole cache for now and do
+            # something about it if and when the perf becomes significant
+            self._invalidate_all_cache_and_stream(
+                txn, self.user_last_seen_monthly_active
+            )
+            self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
+
         reserved_users = yield self.get_registered_reserved_users()
         yield self.db.runInteraction(
             "reap_monthly_active_users", _reap_users, reserved_users
         )
-        # It seems poor to invalidate the whole cache, Postgres supports
-        # 'Returning' which would allow me to invalidate only the
-        # specific users, but sqlite has no way to do this and instead
-        # I would need to SELECT and the DELETE which without locking
-        # is racy.
-        # Have resolved to invalidate the whole cache for now and do
-        # something about it if and when the perf becomes significant
-        self.user_last_seen_monthly_active.invalidate_all()
-        self.get_monthly_active_count.invalidate_all()
-
-    @cached(num_args=0)
-    def get_monthly_active_count(self):
-        """Generates current count of monthly active users
-
-        Returns:
-            Defered[int]: Number of current monthly active users
-        """
-
-        def _count_users(txn):
-            sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
-
-            txn.execute(sql)
-            (count,) = txn.fetchone()
-            return count
-
-        return self.db.runInteraction("count_users", _count_users)
-
-    @defer.inlineCallbacks
-    def get_registered_reserved_users(self):
-        """Of the reserved threepids defined in config, which are associated
-        with registered users?
-
-        Returns:
-            Defered[list]: Real reserved users
-        """
-        users = []
-
-        for tp in self.hs.config.mau_limits_reserved_threepids[
-            : self.hs.config.max_mau_value
-        ]:
-            user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
-                tp["medium"], tp["address"]
-            )
-            if user_id:
-                users.append(user_id)
-
-        return users
 
     @defer.inlineCallbacks
     def upsert_monthly_active_user(self, user_id):
@@ -222,23 +251,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
         )
 
-        user_in_mau = self.user_last_seen_monthly_active.cache.get(
-            (user_id,), None, update_metrics=False
-        )
-        if user_in_mau is None:
-            self.get_monthly_active_count.invalidate(())
-
-        self.user_last_seen_monthly_active.invalidate((user_id,))
-
     def upsert_monthly_active_user_txn(self, txn, user_id):
         """Updates or inserts monthly active user member
 
-        Note that, after calling this method, it will generally be necessary
-        to invalidate the caches on user_last_seen_monthly_active and
-        get_monthly_active_count. We can't do that here, because we are running
-        in a database thread rather than the main thread, and we can't call
-        txn.call_after because txn may not be a LoggingTransaction.
-
         We consciously do not call is_support_txn from this method because it
         is not possible to cache the response. is_support_txn will be false in
         almost all cases, so it seems reasonable to call it only for
@@ -269,27 +284,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             values={"timestamp": int(self._clock.time_msec())},
         )
 
-        return is_insert
-
-    @cached(num_args=1)
-    def user_last_seen_monthly_active(self, user_id):
-        """
-            Checks if a given user is part of the monthly active user group
-            Arguments:
-                user_id (str): user to add/update
-            Return:
-                Deferred[int] : timestamp since last seen, None if never seen
-
-        """
-
-        return self.db.simple_select_one_onecol(
-            table="monthly_active_users",
-            keyvalues={"user_id": user_id},
-            retcol="timestamp",
-            allow_none=True,
-            desc="user_last_seen_monthly_active",
+        self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
+        self._invalidate_cache_and_stream(
+            txn, self.user_last_seen_monthly_active, (user_id,)
         )
 
+        return is_insert
+
     @defer.inlineCallbacks
     def populate_monthly_active_users(self, user_id):
         """Checks on the state of monthly active user limits and optionally