summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6309.misc1
-rw-r--r--changelog.d/7030.feature1
-rw-r--r--synapse/app/homeserver.py13
-rw-r--r--synapse/logging/context.py121
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py32
-rw-r--r--tests/storage/test_monthly_active_users.py42
6 files changed, 162 insertions, 48 deletions
diff --git a/changelog.d/6309.misc b/changelog.d/6309.misc
new file mode 100644
index 0000000000..1aa7294617
--- /dev/null
+++ b/changelog.d/6309.misc
@@ -0,0 +1 @@
+Add type hints to `logging/context.py`.
diff --git a/changelog.d/7030.feature b/changelog.d/7030.feature
new file mode 100644
index 0000000000..fcfdb8d8a1
--- /dev/null
+++ b/changelog.d/7030.feature
@@ -0,0 +1 @@
+Break down monthly active users by `appservice_id` and emit via Prometheus.
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index c2a334a2b0..e0fdddfdc9 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -298,6 +298,11 @@ class SynapseHomeServer(HomeServer):
 
 # Gauges to expose monthly active user control metrics
 current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+current_mau_by_service_gauge = Gauge(
+    "synapse_admin_mau_current_mau_by_service",
+    "Current MAU by service",
+    ["app_service"],
+)
 max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 registered_reserved_users_mau_gauge = Gauge(
     "synapse_admin_mau:registered_reserved_users",
@@ -585,12 +590,20 @@ def run(hs):
     @defer.inlineCallbacks
     def generate_monthly_active_users():
         current_mau_count = 0
+        current_mau_count_by_service = {}
         reserved_users = ()
         store = hs.get_datastore()
         if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
             current_mau_count = yield store.get_monthly_active_count()
+            current_mau_count_by_service = (
+                yield store.get_monthly_active_count_by_service()
+            )
             reserved_users = yield store.get_registered_reserved_users()
         current_mau_gauge.set(float(current_mau_count))
+
+        for app_service, count in current_mau_count_by_service.items():
+            current_mau_by_service_gauge.labels(app_service).set(float(count))
+
         registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
         max_mau_gauge.set(float(hs.config.max_mau_value))
 
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 1b940842f6..1eccc0e83f 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -27,10 +27,15 @@ import inspect
 import logging
 import threading
 import types
-from typing import Any, List
+from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union
+
+from typing_extensions import Literal
 
 from twisted.internet import defer, threads
 
+if TYPE_CHECKING:
+    from synapse.logging.scopecontextmanager import _LogContextScope
+
 logger = logging.getLogger(__name__)
 
 try:
@@ -91,7 +96,7 @@ class ContextResourceUsage(object):
         "evt_db_fetch_count",
     ]
 
-    def __init__(self, copy_from=None):
+    def __init__(self, copy_from: "Optional[ContextResourceUsage]" = None) -> None:
         """Create a new ContextResourceUsage
 
         Args:
@@ -101,27 +106,28 @@ class ContextResourceUsage(object):
         if copy_from is None:
             self.reset()
         else:
-            self.ru_utime = copy_from.ru_utime
-            self.ru_stime = copy_from.ru_stime
-            self.db_txn_count = copy_from.db_txn_count
+            # FIXME: mypy can't infer the types set via reset() above, so specify explicitly for now
+            self.ru_utime = copy_from.ru_utime  # type: float
+            self.ru_stime = copy_from.ru_stime  # type: float
+            self.db_txn_count = copy_from.db_txn_count  # type: int
 
-            self.db_txn_duration_sec = copy_from.db_txn_duration_sec
-            self.db_sched_duration_sec = copy_from.db_sched_duration_sec
-            self.evt_db_fetch_count = copy_from.evt_db_fetch_count
+            self.db_txn_duration_sec = copy_from.db_txn_duration_sec  # type: float
+            self.db_sched_duration_sec = copy_from.db_sched_duration_sec  # type: float
+            self.evt_db_fetch_count = copy_from.evt_db_fetch_count  # type: int
 
-    def copy(self):
+    def copy(self) -> "ContextResourceUsage":
         return ContextResourceUsage(copy_from=self)
 
-    def reset(self):
+    def reset(self) -> None:
         self.ru_stime = 0.0
         self.ru_utime = 0.0
         self.db_txn_count = 0
 
-        self.db_txn_duration_sec = 0
-        self.db_sched_duration_sec = 0
+        self.db_txn_duration_sec = 0.0
+        self.db_sched_duration_sec = 0.0
         self.evt_db_fetch_count = 0
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return (
             "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
             "db_txn_count='%r', db_txn_duration_sec='%r', "
@@ -135,7 +141,7 @@ class ContextResourceUsage(object):
             self.evt_db_fetch_count,
         )
 
-    def __iadd__(self, other):
+    def __iadd__(self, other: "ContextResourceUsage") -> "ContextResourceUsage":
         """Add another ContextResourceUsage's stats to this one's.
 
         Args:
@@ -149,7 +155,7 @@ class ContextResourceUsage(object):
         self.evt_db_fetch_count += other.evt_db_fetch_count
         return self
 
-    def __isub__(self, other):
+    def __isub__(self, other: "ContextResourceUsage") -> "ContextResourceUsage":
         self.ru_utime -= other.ru_utime
         self.ru_stime -= other.ru_stime
         self.db_txn_count -= other.db_txn_count
@@ -158,17 +164,20 @@ class ContextResourceUsage(object):
         self.evt_db_fetch_count -= other.evt_db_fetch_count
         return self
 
-    def __add__(self, other):
+    def __add__(self, other: "ContextResourceUsage") -> "ContextResourceUsage":
         res = ContextResourceUsage(copy_from=self)
         res += other
         return res
 
-    def __sub__(self, other):
+    def __sub__(self, other: "ContextResourceUsage") -> "ContextResourceUsage":
         res = ContextResourceUsage(copy_from=self)
         res -= other
         return res
 
 
+LoggingContextOrSentinel = Union["LoggingContext", "LoggingContext.Sentinel"]
+
+
 class LoggingContext(object):
     """Additional context for log formatting. Contexts are scoped within a
     "with" block.
@@ -201,7 +210,14 @@ class LoggingContext(object):
     class Sentinel(object):
         """Sentinel to represent the root context"""
 
-        __slots__ = []  # type: List[Any]
+        __slots__ = ["previous_context", "alive", "request", "scope"]
+
+        def __init__(self) -> None:
+            # Minimal set for compatibility with LoggingContext
+            self.previous_context = None
+            self.alive = None
+            self.request = None
+            self.scope = None
 
         def __str__(self):
             return "sentinel"
@@ -235,7 +251,7 @@ class LoggingContext(object):
 
     sentinel = Sentinel()
 
-    def __init__(self, name=None, parent_context=None, request=None):
+    def __init__(self, name=None, parent_context=None, request=None) -> None:
         self.previous_context = LoggingContext.current_context()
         self.name = name
 
@@ -250,7 +266,7 @@ class LoggingContext(object):
         self.request = None
         self.tag = ""
         self.alive = True
-        self.scope = None
+        self.scope = None  # type: Optional[_LogContextScope]
 
         self.parent_context = parent_context
 
@@ -261,13 +277,13 @@ class LoggingContext(object):
             # the request param overrides the request from the parent context
             self.request = request
 
-    def __str__(self):
+    def __str__(self) -> str:
         if self.request:
             return str(self.request)
         return "%s@%x" % (self.name, id(self))
 
     @classmethod
-    def current_context(cls):
+    def current_context(cls) -> LoggingContextOrSentinel:
         """Get the current logging context from thread local storage
 
         Returns:
@@ -276,7 +292,9 @@ class LoggingContext(object):
         return getattr(cls.thread_local, "current_context", cls.sentinel)
 
     @classmethod
-    def set_current_context(cls, context):
+    def set_current_context(
+        cls, context: LoggingContextOrSentinel
+    ) -> LoggingContextOrSentinel:
         """Set the current logging context in thread local storage
         Args:
             context(LoggingContext): The context to activate.
@@ -291,7 +309,7 @@ class LoggingContext(object):
             context.start()
         return current
 
-    def __enter__(self):
+    def __enter__(self) -> "LoggingContext":
         """Enters this logging context into thread local storage"""
         old_context = self.set_current_context(self)
         if self.previous_context != old_context:
@@ -304,7 +322,7 @@ class LoggingContext(object):
 
         return self
 
-    def __exit__(self, type, value, traceback):
+    def __exit__(self, type, value, traceback) -> None:
         """Restore the logging context in thread local storage to the state it
         was before this context was entered.
         Returns:
@@ -318,7 +336,6 @@ class LoggingContext(object):
                 logger.warning(
                     "Expected logging context %s but found %s", self, current
                 )
-        self.previous_context = None
         self.alive = False
 
         # if we have a parent, pass our CPU usage stats on
@@ -330,7 +347,7 @@ class LoggingContext(object):
             # reset them in case we get entered again
             self._resource_usage.reset()
 
-    def copy_to(self, record):
+    def copy_to(self, record) -> None:
         """Copy logging fields from this context to a log record or
         another LoggingContext
         """
@@ -341,14 +358,14 @@ class LoggingContext(object):
         # we also track the current scope:
         record.scope = self.scope
 
-    def copy_to_twisted_log_entry(self, record):
+    def copy_to_twisted_log_entry(self, record) -> None:
         """
         Copy logging fields from this context to a Twisted log record.
         """
         record["request"] = self.request
         record["scope"] = self.scope
 
-    def start(self):
+    def start(self) -> None:
         if get_thread_id() != self.main_thread:
             logger.warning("Started logcontext %s on different thread", self)
             return
@@ -358,7 +375,7 @@ class LoggingContext(object):
         if not self.usage_start:
             self.usage_start = get_thread_resource_usage()
 
-    def stop(self):
+    def stop(self) -> None:
         if get_thread_id() != self.main_thread:
             logger.warning("Stopped logcontext %s on different thread", self)
             return
@@ -378,7 +395,7 @@ class LoggingContext(object):
 
         self.usage_start = None
 
-    def get_resource_usage(self):
+    def get_resource_usage(self) -> ContextResourceUsage:
         """Get resources used by this logcontext so far.
 
         Returns:
@@ -398,11 +415,13 @@ class LoggingContext(object):
 
         return res
 
-    def _get_cputime(self):
+    def _get_cputime(self) -> Tuple[float, float]:
         """Get the cpu usage time so far
 
         Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
         """
+        assert self.usage_start is not None
+
         current = get_thread_resource_usage()
 
         # Indicate to mypy that we know that self.usage_start is None.
@@ -430,13 +449,13 @@ class LoggingContext(object):
 
         return utime_delta, stime_delta
 
-    def add_database_transaction(self, duration_sec):
+    def add_database_transaction(self, duration_sec: float) -> None:
         if duration_sec < 0:
             raise ValueError("DB txn time can only be non-negative")
         self._resource_usage.db_txn_count += 1
         self._resource_usage.db_txn_duration_sec += duration_sec
 
-    def add_database_scheduled(self, sched_sec):
+    def add_database_scheduled(self, sched_sec: float) -> None:
         """Record a use of the database pool
 
         Args:
@@ -447,7 +466,7 @@ class LoggingContext(object):
             raise ValueError("DB scheduling time can only be non-negative")
         self._resource_usage.db_sched_duration_sec += sched_sec
 
-    def record_event_fetch(self, event_count):
+    def record_event_fetch(self, event_count: int) -> None:
         """Record a number of events being fetched from the db
 
         Args:
@@ -464,10 +483,10 @@ class LoggingContextFilter(logging.Filter):
             missing fields
     """
 
-    def __init__(self, **defaults):
+    def __init__(self, **defaults) -> None:
         self.defaults = defaults
 
-    def filter(self, record):
+    def filter(self, record) -> Literal[True]:
         """Add each fields from the logging contexts to the record.
         Returns:
             True to include the record in the log output.
@@ -492,12 +511,13 @@ class PreserveLoggingContext(object):
 
     __slots__ = ["current_context", "new_context", "has_parent"]
 
-    def __init__(self, new_context=None):
+    def __init__(self, new_context: Optional[LoggingContext] = None) -> None:
         if new_context is None:
-            new_context = LoggingContext.sentinel
-        self.new_context = new_context
+            self.new_context = LoggingContext.sentinel  # type: LoggingContextOrSentinel
+        else:
+            self.new_context = new_context
 
-    def __enter__(self):
+    def __enter__(self) -> None:
         """Captures the current logging context"""
         self.current_context = LoggingContext.set_current_context(self.new_context)
 
@@ -506,7 +526,7 @@ class PreserveLoggingContext(object):
             if not self.current_context.alive:
                 logger.debug("Entering dead context: %s", self.current_context)
 
-    def __exit__(self, type, value, traceback):
+    def __exit__(self, type, value, traceback) -> None:
         """Restores the current logging context"""
         context = LoggingContext.set_current_context(self.current_context)
 
@@ -525,7 +545,9 @@ class PreserveLoggingContext(object):
                 logger.debug("Restoring dead context: %s", self.current_context)
 
 
-def nested_logging_context(suffix, parent_context=None):
+def nested_logging_context(
+    suffix: str, parent_context: Optional[LoggingContext] = None
+) -> LoggingContext:
     """Creates a new logging context as a child of another.
 
     The nested logging context will have a 'request' made up of the parent context's
@@ -546,10 +568,12 @@ def nested_logging_context(suffix, parent_context=None):
     Returns:
         LoggingContext: new logging context.
     """
-    if parent_context is None:
-        parent_context = LoggingContext.current_context()
+    if parent_context is not None:
+        context = parent_context  # type: LoggingContextOrSentinel
+    else:
+        context = LoggingContext.current_context()
     return LoggingContext(
-        parent_context=parent_context, request=parent_context.request + "-" + suffix
+        parent_context=context, request=str(context.request) + "-" + suffix
     )
 
 
@@ -654,7 +678,10 @@ def make_deferred_yieldable(deferred):
     return deferred
 
 
-def _set_context_cb(result, context):
+ResultT = TypeVar("ResultT")
+
+
+def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT:
     """A callback function which just sets the logging context"""
     LoggingContext.set_current_context(context)
     return result
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 1507a14e09..925bc5691b 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -43,13 +43,40 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
 
         def _count_users(txn):
             sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
-
             txn.execute(sql)
             (count,) = txn.fetchone()
             return count
 
         return self.db.runInteraction("count_users", _count_users)
 
+    @cached(num_args=0)
+    def get_monthly_active_count_by_service(self):
+        """Generates current count of monthly active users broken down by service.
+        A service is typically an appservice but also includes native matrix users.
+        Since the `monthly_active_users` table is populated from the `user_ips` table
+        `config.track_appservice_user_ips` must be set to `true` for this
+        method to return anything other than native matrix users.
+
+        Returns:
+            Deferred[dict]: dict that includes a mapping between app_service_id
+                and the number of occurrences.
+
+        """
+
+        def _count_users_by_service(txn):
+            sql = """
+                SELECT COALESCE(appservice_id, 'native'), COALESCE(count(*), 0)
+                FROM monthly_active_users
+                LEFT JOIN users ON monthly_active_users.user_id=users.name
+                GROUP BY appservice_id;
+            """
+
+            txn.execute(sql)
+            result = txn.fetchall()
+            return dict(result)
+
+        return self.db.runInteraction("count_users_by_service", _count_users_by_service)
+
     @defer.inlineCallbacks
     def get_registered_reserved_users(self):
         """Of the reserved threepids defined in config, which are associated
@@ -292,6 +319,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
 
         self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
         self._invalidate_cache_and_stream(
+            txn, self.get_monthly_active_count_by_service, ()
+        )
+        self._invalidate_cache_and_stream(
             txn, self.user_last_seen_monthly_active, (user_id,)
         )
 
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 3c78faab45..bc53bf0951 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -303,3 +303,45 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
         self.pump()
 
         self.store.upsert_monthly_active_user.assert_not_called()
+
+    def test_get_monthly_active_count_by_service(self):
+        appservice1_user1 = "@appservice1_user1:example.com"
+        appservice1_user2 = "@appservice1_user2:example.com"
+
+        appservice2_user1 = "@appservice2_user1:example.com"
+        native_user1 = "@native_user1:example.com"
+
+        service1 = "service1"
+        service2 = "service2"
+        native = "native"
+
+        self.store.register_user(
+            user_id=appservice1_user1, password_hash=None, appservice_id=service1
+        )
+        self.store.register_user(
+            user_id=appservice1_user2, password_hash=None, appservice_id=service1
+        )
+        self.store.register_user(
+            user_id=appservice2_user1, password_hash=None, appservice_id=service2
+        )
+        self.store.register_user(user_id=native_user1, password_hash=None)
+        self.pump()
+
+        count = self.store.get_monthly_active_count_by_service()
+        self.assertEqual({}, self.get_success(count))
+
+        self.store.upsert_monthly_active_user(native_user1)
+        self.store.upsert_monthly_active_user(appservice1_user1)
+        self.store.upsert_monthly_active_user(appservice1_user2)
+        self.store.upsert_monthly_active_user(appservice2_user1)
+        self.pump()
+
+        count = self.store.get_monthly_active_count()
+        self.assertEqual(4, self.get_success(count))
+
+        count = self.store.get_monthly_active_count_by_service()
+        result = self.get_success(count)
+
+        self.assertEqual(2, result[service1])
+        self.assertEqual(1, result[service2])
+        self.assertEqual(1, result[native])