diff --git a/changelog.d/6742.bugfix b/changelog.d/6742.bugfix
new file mode 100644
index 0000000000..ca2687c8bb
--- /dev/null
+++ b/changelog.d/6742.bugfix
@@ -0,0 +1 @@
+Fix monthly active user limiting support for worker mode, fixes [#4639](https://github.com/matrix-org/synapse/issues/4639).
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 3edfe19567..ca96da6a4a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index d0ddbe38fc..58e5b354f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
RoomStateEventRestServlet,
)
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 311523e0ed..1f1cea1416 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
RoomStore,
DirectoryStore,
SlavedTransactionStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3218da07bd..8982c0676e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 27158534cb..89a41542a3 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -27,12 +27,76 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
-class MonthlyActiveUsersStore(SQLBaseStore):
+class MonthlyActiveUsersWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
- super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
+ super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
self._clock = hs.get_clock()
self.hs = hs
+
+ @cached(num_args=0)
+ def get_monthly_active_count(self):
+ """Generates current count of monthly active users
+
+ Returns:
+ Defered[int]: Number of current monthly active users
+ """
+
+ def _count_users(txn):
+ sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
+
+ txn.execute(sql)
+ (count,) = txn.fetchone()
+ return count
+
+ return self.db.runInteraction("count_users", _count_users)
+
+ @defer.inlineCallbacks
+ def get_registered_reserved_users(self):
+ """Of the reserved threepids defined in config, which are associated
+ with registered users?
+
+ Returns:
+ Defered[list]: Real reserved users
+ """
+ users = []
+
+ for tp in self.hs.config.mau_limits_reserved_threepids[
+ : self.hs.config.max_mau_value
+ ]:
+ user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+ tp["medium"], tp["address"]
+ )
+ if user_id:
+ users.append(user_id)
+
+ return users
+
+ @cached(num_args=1)
+ def user_last_seen_monthly_active(self, user_id):
+ """
+ Checks if a given user is part of the monthly active user group
+ Arguments:
+ user_id (str): user to add/update
+ Return:
+ Deferred[int] : timestamp since last seen, None if never seen
+
+ """
+
+ return self.db.simple_select_one_onecol(
+ table="monthly_active_users",
+ keyvalues={"user_id": user_id},
+ retcol="timestamp",
+ allow_none=True,
+ desc="user_last_seen_monthly_active",
+ )
+
+
+class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
+ def __init__(self, database: Database, db_conn, hs):
+ super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
+
# Do not add more reserved users than the total allowable number
+ # cur = LoggingTransaction(
self.db.new_transaction(
db_conn,
"initialise_mau_threepids",
@@ -146,57 +210,22 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn.execute(sql, query_args)
+ # It seems poor to invalidate the whole cache, Postgres supports
+ # 'Returning' which would allow me to invalidate only the
+ # specific users, but sqlite has no way to do this and instead
+ # I would need to SELECT and the DELETE which without locking
+ # is racy.
+ # Have resolved to invalidate the whole cache for now and do
+ # something about it if and when the perf becomes significant
+ self._invalidate_all_cache_and_stream(
+ txn, self.user_last_seen_monthly_active
+ )
+ self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
+
reserved_users = yield self.get_registered_reserved_users()
yield self.db.runInteraction(
"reap_monthly_active_users", _reap_users, reserved_users
)
- # It seems poor to invalidate the whole cache, Postgres supports
- # 'Returning' which would allow me to invalidate only the
- # specific users, but sqlite has no way to do this and instead
- # I would need to SELECT and the DELETE which without locking
- # is racy.
- # Have resolved to invalidate the whole cache for now and do
- # something about it if and when the perf becomes significant
- self.user_last_seen_monthly_active.invalidate_all()
- self.get_monthly_active_count.invalidate_all()
-
- @cached(num_args=0)
- def get_monthly_active_count(self):
- """Generates current count of monthly active users
-
- Returns:
- Defered[int]: Number of current monthly active users
- """
-
- def _count_users(txn):
- sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
-
- txn.execute(sql)
- (count,) = txn.fetchone()
- return count
-
- return self.db.runInteraction("count_users", _count_users)
-
- @defer.inlineCallbacks
- def get_registered_reserved_users(self):
- """Of the reserved threepids defined in config, which are associated
- with registered users?
-
- Returns:
- Defered[list]: Real reserved users
- """
- users = []
-
- for tp in self.hs.config.mau_limits_reserved_threepids[
- : self.hs.config.max_mau_value
- ]:
- user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
- tp["medium"], tp["address"]
- )
- if user_id:
- users.append(user_id)
-
- return users
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
@@ -222,23 +251,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
)
- user_in_mau = self.user_last_seen_monthly_active.cache.get(
- (user_id,), None, update_metrics=False
- )
- if user_in_mau is None:
- self.get_monthly_active_count.invalidate(())
-
- self.user_last_seen_monthly_active.invalidate((user_id,))
-
def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member
- Note that, after calling this method, it will generally be necessary
- to invalidate the caches on user_last_seen_monthly_active and
- get_monthly_active_count. We can't do that here, because we are running
- in a database thread rather than the main thread, and we can't call
- txn.call_after because txn may not be a LoggingTransaction.
-
We consciously do not call is_support_txn from this method because it
is not possible to cache the response. is_support_txn will be false in
almost all cases, so it seems reasonable to call it only for
@@ -269,27 +284,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={"timestamp": int(self._clock.time_msec())},
)
- return is_insert
-
- @cached(num_args=1)
- def user_last_seen_monthly_active(self, user_id):
- """
- Checks if a given user is part of the monthly active user group
- Arguments:
- user_id (str): user to add/update
- Return:
- Deferred[int] : timestamp since last seen, None if never seen
-
- """
-
- return self.db.simple_select_one_onecol(
- table="monthly_active_users",
- keyvalues={"user_id": user_id},
- retcol="timestamp",
- allow_none=True,
- desc="user_last_seen_monthly_active",
+ self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
+ self._invalidate_cache_and_stream(
+ txn, self.user_last_seen_monthly_active, (user_id,)
)
+ return is_insert
+
@defer.inlineCallbacks
def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
|