diff options
-rw-r--r-- | changelog.d/6309.misc | 1 | ||||
-rw-r--r-- | changelog.d/7030.feature | 1 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 13 | ||||
-rw-r--r-- | synapse/logging/context.py | 121 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/monthly_active_users.py | 32 | ||||
-rw-r--r-- | tests/storage/test_monthly_active_users.py | 42 |
6 files changed, 162 insertions, 48 deletions
diff --git a/changelog.d/6309.misc b/changelog.d/6309.misc new file mode 100644 index 0000000000..1aa7294617 --- /dev/null +++ b/changelog.d/6309.misc @@ -0,0 +1 @@ +Add type hints to `logging/context.py`. diff --git a/changelog.d/7030.feature b/changelog.d/7030.feature new file mode 100644 index 0000000000..fcfdb8d8a1 --- /dev/null +++ b/changelog.d/7030.feature @@ -0,0 +1 @@ +Break down monthly active users by `appservice_id` and emit via Prometheus. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index c2a334a2b0..e0fdddfdc9 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -298,6 +298,11 @@ class SynapseHomeServer(HomeServer): # Gauges to expose monthly active user control metrics current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") +current_mau_by_service_gauge = Gauge( + "synapse_admin_mau_current_mau_by_service", + "Current MAU by service", + ["app_service"], +) max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") registered_reserved_users_mau_gauge = Gauge( "synapse_admin_mau:registered_reserved_users", @@ -585,12 +590,20 @@ def run(hs): @defer.inlineCallbacks def generate_monthly_active_users(): current_mau_count = 0 + current_mau_count_by_service = {} reserved_users = () store = hs.get_datastore() if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: current_mau_count = yield store.get_monthly_active_count() + current_mau_count_by_service = ( + yield store.get_monthly_active_count_by_service() + ) reserved_users = yield store.get_registered_reserved_users() current_mau_gauge.set(float(current_mau_count)) + + for app_service, count in current_mau_count_by_service.items(): + current_mau_by_service_gauge.labels(app_service).set(float(count)) + registered_reserved_users_mau_gauge.set(float(len(reserved_users))) max_mau_gauge.set(float(hs.config.max_mau_value)) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 1b940842f6..1eccc0e83f 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -27,10 +27,15 @@ import inspect import logging import threading import types -from typing import Any, List +from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union + +from typing_extensions import Literal from twisted.internet import defer, threads +if TYPE_CHECKING: + from synapse.logging.scopecontextmanager import _LogContextScope + logger = logging.getLogger(__name__) try: @@ -91,7 +96,7 @@ class ContextResourceUsage(object): "evt_db_fetch_count", ] - def __init__(self, copy_from=None): + def __init__(self, copy_from: "Optional[ContextResourceUsage]" = None) -> None: """Create a new ContextResourceUsage Args: @@ -101,27 +106,28 @@ class ContextResourceUsage(object): if copy_from is None: self.reset() else: - self.ru_utime = copy_from.ru_utime - self.ru_stime = copy_from.ru_stime - self.db_txn_count = copy_from.db_txn_count + # FIXME: mypy can't infer the types set via reset() above, so specify explicitly for now + self.ru_utime = copy_from.ru_utime # type: float + self.ru_stime = copy_from.ru_stime # type: float + self.db_txn_count = copy_from.db_txn_count # type: int - self.db_txn_duration_sec = copy_from.db_txn_duration_sec - self.db_sched_duration_sec = copy_from.db_sched_duration_sec - self.evt_db_fetch_count = copy_from.evt_db_fetch_count + self.db_txn_duration_sec = copy_from.db_txn_duration_sec # type: float + self.db_sched_duration_sec = copy_from.db_sched_duration_sec # type: float + self.evt_db_fetch_count = copy_from.evt_db_fetch_count # type: int - def copy(self): + def copy(self) -> "ContextResourceUsage": return ContextResourceUsage(copy_from=self) - def reset(self): + def reset(self) -> None: self.ru_stime = 0.0 self.ru_utime = 0.0 self.db_txn_count = 0 - self.db_txn_duration_sec = 0 - self.db_sched_duration_sec = 0 + self.db_txn_duration_sec = 0.0 + self.db_sched_duration_sec = 0.0 self.evt_db_fetch_count = 0 - def __repr__(self): + def __repr__(self) -> str: return ( "<ContextResourceUsage ru_stime='%r', ru_utime='%r', " "db_txn_count='%r', db_txn_duration_sec='%r', " @@ -135,7 +141,7 @@ class ContextResourceUsage(object): self.evt_db_fetch_count, ) - def __iadd__(self, other): + def __iadd__(self, other: "ContextResourceUsage") -> "ContextResourceUsage": """Add another ContextResourceUsage's stats to this one's. Args: @@ -149,7 +155,7 @@ class ContextResourceUsage(object): self.evt_db_fetch_count += other.evt_db_fetch_count return self - def __isub__(self, other): + def __isub__(self, other: "ContextResourceUsage") -> "ContextResourceUsage": self.ru_utime -= other.ru_utime self.ru_stime -= other.ru_stime self.db_txn_count -= other.db_txn_count @@ -158,17 +164,20 @@ class ContextResourceUsage(object): self.evt_db_fetch_count -= other.evt_db_fetch_count return self - def __add__(self, other): + def __add__(self, other: "ContextResourceUsage") -> "ContextResourceUsage": res = ContextResourceUsage(copy_from=self) res += other return res - def __sub__(self, other): + def __sub__(self, other: "ContextResourceUsage") -> "ContextResourceUsage": res = ContextResourceUsage(copy_from=self) res -= other return res +LoggingContextOrSentinel = Union["LoggingContext", "LoggingContext.Sentinel"] + + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a "with" block. @@ -201,7 +210,14 @@ class LoggingContext(object): class Sentinel(object): """Sentinel to represent the root context""" - __slots__ = [] # type: List[Any] + __slots__ = ["previous_context", "alive", "request", "scope"] + + def __init__(self) -> None: + # Minimal set for compatibility with LoggingContext + self.previous_context = None + self.alive = None + self.request = None + self.scope = None def __str__(self): return "sentinel" @@ -235,7 +251,7 @@ class LoggingContext(object): sentinel = Sentinel() - def __init__(self, name=None, parent_context=None, request=None): + def __init__(self, name=None, parent_context=None, request=None) -> None: self.previous_context = LoggingContext.current_context() self.name = name @@ -250,7 +266,7 @@ class LoggingContext(object): self.request = None self.tag = "" self.alive = True - self.scope = None + self.scope = None # type: Optional[_LogContextScope] self.parent_context = parent_context @@ -261,13 +277,13 @@ class LoggingContext(object): # the request param overrides the request from the parent context self.request = request - def __str__(self): + def __str__(self) -> str: if self.request: return str(self.request) return "%s@%x" % (self.name, id(self)) @classmethod - def current_context(cls): + def current_context(cls) -> LoggingContextOrSentinel: """Get the current logging context from thread local storage Returns: @@ -276,7 +292,9 @@ class LoggingContext(object): return getattr(cls.thread_local, "current_context", cls.sentinel) @classmethod - def set_current_context(cls, context): + def set_current_context( + cls, context: LoggingContextOrSentinel + ) -> LoggingContextOrSentinel: """Set the current logging context in thread local storage Args: context(LoggingContext): The context to activate. @@ -291,7 +309,7 @@ class LoggingContext(object): context.start() return current - def __enter__(self): + def __enter__(self) -> "LoggingContext": """Enters this logging context into thread local storage""" old_context = self.set_current_context(self) if self.previous_context != old_context: @@ -304,7 +322,7 @@ class LoggingContext(object): return self - def __exit__(self, type, value, traceback): + def __exit__(self, type, value, traceback) -> None: """Restore the logging context in thread local storage to the state it was before this context was entered. Returns: @@ -318,7 +336,6 @@ class LoggingContext(object): logger.warning( "Expected logging context %s but found %s", self, current ) - self.previous_context = None self.alive = False # if we have a parent, pass our CPU usage stats on @@ -330,7 +347,7 @@ class LoggingContext(object): # reset them in case we get entered again self._resource_usage.reset() - def copy_to(self, record): + def copy_to(self, record) -> None: """Copy logging fields from this context to a log record or another LoggingContext """ @@ -341,14 +358,14 @@ class LoggingContext(object): # we also track the current scope: record.scope = self.scope - def copy_to_twisted_log_entry(self, record): + def copy_to_twisted_log_entry(self, record) -> None: """ Copy logging fields from this context to a Twisted log record. """ record["request"] = self.request record["scope"] = self.scope - def start(self): + def start(self) -> None: if get_thread_id() != self.main_thread: logger.warning("Started logcontext %s on different thread", self) return @@ -358,7 +375,7 @@ class LoggingContext(object): if not self.usage_start: self.usage_start = get_thread_resource_usage() - def stop(self): + def stop(self) -> None: if get_thread_id() != self.main_thread: logger.warning("Stopped logcontext %s on different thread", self) return @@ -378,7 +395,7 @@ class LoggingContext(object): self.usage_start = None - def get_resource_usage(self): + def get_resource_usage(self) -> ContextResourceUsage: """Get resources used by this logcontext so far. Returns: @@ -398,11 +415,13 @@ class LoggingContext(object): return res - def _get_cputime(self): + def _get_cputime(self) -> Tuple[float, float]: """Get the cpu usage time so far Returns: Tuple[float, float]: seconds in user mode, seconds in system mode """ + assert self.usage_start is not None + current = get_thread_resource_usage() # Indicate to mypy that we know that self.usage_start is None. @@ -430,13 +449,13 @@ class LoggingContext(object): return utime_delta, stime_delta - def add_database_transaction(self, duration_sec): + def add_database_transaction(self, duration_sec: float) -> None: if duration_sec < 0: raise ValueError("DB txn time can only be non-negative") self._resource_usage.db_txn_count += 1 self._resource_usage.db_txn_duration_sec += duration_sec - def add_database_scheduled(self, sched_sec): + def add_database_scheduled(self, sched_sec: float) -> None: """Record a use of the database pool Args: @@ -447,7 +466,7 @@ class LoggingContext(object): raise ValueError("DB scheduling time can only be non-negative") self._resource_usage.db_sched_duration_sec += sched_sec - def record_event_fetch(self, event_count): + def record_event_fetch(self, event_count: int) -> None: """Record a number of events being fetched from the db Args: @@ -464,10 +483,10 @@ class LoggingContextFilter(logging.Filter): missing fields """ - def __init__(self, **defaults): + def __init__(self, **defaults) -> None: self.defaults = defaults - def filter(self, record): + def filter(self, record) -> Literal[True]: """Add each fields from the logging contexts to the record. Returns: True to include the record in the log output. @@ -492,12 +511,13 @@ class PreserveLoggingContext(object): __slots__ = ["current_context", "new_context", "has_parent"] - def __init__(self, new_context=None): + def __init__(self, new_context: Optional[LoggingContext] = None) -> None: if new_context is None: - new_context = LoggingContext.sentinel - self.new_context = new_context + self.new_context = LoggingContext.sentinel # type: LoggingContextOrSentinel + else: + self.new_context = new_context - def __enter__(self): + def __enter__(self) -> None: """Captures the current logging context""" self.current_context = LoggingContext.set_current_context(self.new_context) @@ -506,7 +526,7 @@ class PreserveLoggingContext(object): if not self.current_context.alive: logger.debug("Entering dead context: %s", self.current_context) - def __exit__(self, type, value, traceback): + def __exit__(self, type, value, traceback) -> None: """Restores the current logging context""" context = LoggingContext.set_current_context(self.current_context) @@ -525,7 +545,9 @@ class PreserveLoggingContext(object): logger.debug("Restoring dead context: %s", self.current_context) -def nested_logging_context(suffix, parent_context=None): +def nested_logging_context( + suffix: str, parent_context: Optional[LoggingContext] = None +) -> LoggingContext: """Creates a new logging context as a child of another. The nested logging context will have a 'request' made up of the parent context's @@ -546,10 +568,12 @@ def nested_logging_context(suffix, parent_context=None): Returns: LoggingContext: new logging context. """ - if parent_context is None: - parent_context = LoggingContext.current_context() + if parent_context is not None: + context = parent_context # type: LoggingContextOrSentinel + else: + context = LoggingContext.current_context() return LoggingContext( - parent_context=parent_context, request=parent_context.request + "-" + suffix + parent_context=context, request=str(context.request) + "-" + suffix ) @@ -654,7 +678,10 @@ def make_deferred_yieldable(deferred): return deferred -def _set_context_cb(result, context): +ResultT = TypeVar("ResultT") + + +def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT: """A callback function which just sets the logging context""" LoggingContext.set_current_context(context) return result diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py index 1507a14e09..925bc5691b 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py @@ -43,13 +43,40 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): def _count_users(txn): sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" - txn.execute(sql) (count,) = txn.fetchone() return count return self.db.runInteraction("count_users", _count_users) + @cached(num_args=0) + def get_monthly_active_count_by_service(self): + """Generates current count of monthly active users broken down by service. + A service is typically an appservice but also includes native matrix users. + Since the `monthly_active_users` table is populated from the `user_ips` table + `config.track_appservice_user_ips` must be set to `true` for this + method to return anything other than native matrix users. + + Returns: + Deferred[dict]: dict that includes a mapping between app_service_id + and the number of occurrences. + + """ + + def _count_users_by_service(txn): + sql = """ + SELECT COALESCE(appservice_id, 'native'), COALESCE(count(*), 0) + FROM monthly_active_users + LEFT JOIN users ON monthly_active_users.user_id=users.name + GROUP BY appservice_id; + """ + + txn.execute(sql) + result = txn.fetchall() + return dict(result) + + return self.db.runInteraction("count_users_by_service", _count_users_by_service) + @defer.inlineCallbacks def get_registered_reserved_users(self): """Of the reserved threepids defined in config, which are associated @@ -292,6 +319,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ()) self._invalidate_cache_and_stream( + txn, self.get_monthly_active_count_by_service, () + ) + self._invalidate_cache_and_stream( txn, self.user_last_seen_monthly_active, (user_id,) ) diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py index 3c78faab45..bc53bf0951 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py @@ -303,3 +303,45 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase): self.pump() self.store.upsert_monthly_active_user.assert_not_called() + + def test_get_monthly_active_count_by_service(self): + appservice1_user1 = "@appservice1_user1:example.com" + appservice1_user2 = "@appservice1_user2:example.com" + + appservice2_user1 = "@appservice2_user1:example.com" + native_user1 = "@native_user1:example.com" + + service1 = "service1" + service2 = "service2" + native = "native" + + self.store.register_user( + user_id=appservice1_user1, password_hash=None, appservice_id=service1 + ) + self.store.register_user( + user_id=appservice1_user2, password_hash=None, appservice_id=service1 + ) + self.store.register_user( + user_id=appservice2_user1, password_hash=None, appservice_id=service2 + ) + self.store.register_user(user_id=native_user1, password_hash=None) + self.pump() + + count = self.store.get_monthly_active_count_by_service() + self.assertEqual({}, self.get_success(count)) + + self.store.upsert_monthly_active_user(native_user1) + self.store.upsert_monthly_active_user(appservice1_user1) + self.store.upsert_monthly_active_user(appservice1_user2) + self.store.upsert_monthly_active_user(appservice2_user1) + self.pump() + + count = self.store.get_monthly_active_count() + self.assertEqual(4, self.get_success(count)) + + count = self.store.get_monthly_active_count_by_service() + result = self.get_success(count) + + self.assertEqual(2, result[service1]) + self.assertEqual(1, result[service2]) + self.assertEqual(1, result[native]) |