summary refs log tree commit diff
path: root/synapse/handlers/stats.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r--synapse/handlers/stats.py67
1 files changed, 34 insertions, 33 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index cbac7c347a..149f861239 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -16,17 +16,14 @@
 import logging
 from collections import Counter
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, Membership
-from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
 
 logger = logging.getLogger(__name__)
 
 
-class StatsHandler(StateDeltasHandler):
+class StatsHandler:
     """Handles keeping the *_stats tables updated with a simple time-series of
     information about the users, rooms and media on the server, such that admins
     have some idea of who is consuming their resources.
@@ -35,7 +32,6 @@ class StatsHandler(StateDeltasHandler):
     """
 
     def __init__(self, hs):
-        super(StatsHandler, self).__init__(hs)
         self.hs = hs
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -45,6 +41,8 @@ class StatsHandler(StateDeltasHandler):
         self.is_mine_id = hs.is_mine_id
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_enabled = hs.config.stats_enabled
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -61,25 +59,23 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled or self._is_processing:
+        if not self.stats_enabled or self._is_processing:
             return
 
         self._is_processing = True
 
-        @defer.inlineCallbacks
-        def process():
+        async def process():
             try:
-                yield self._unsafe_process()
+                await self._unsafe_process()
             finally:
                 self._is_processing = False
 
         run_as_background_process("stats.notify_new_event", process)
 
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
+    async def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
-            self.pos = yield self.store.get_stats_positions()
+            self.pos = await self.store.get_stats_positions()
 
         # Loop round handling deltas until we're up to date
 
@@ -87,24 +83,29 @@ class StatsHandler(StateDeltasHandler):
             # Be sure to read the max stream_ordering *before* checking if there are any outstanding
             # deltas, since there is otherwise a chance that we could miss updates which arrive
             # after we check the deltas.
-            room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
             if self.pos == room_max_stream_ordering:
                 break
 
-            deltas = yield self.store.get_current_state_deltas(self.pos)
+            logger.debug(
+                "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+            )
+            max_pos, deltas = await self.store.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
 
             if deltas:
                 logger.debug("Handling %d state deltas", len(deltas))
-                room_deltas, user_deltas = yield self._handle_deltas(deltas)
-
-                max_pos = deltas[-1]["stream_id"]
+                room_deltas, user_deltas = await self._handle_deltas(deltas)
             else:
                 room_deltas = {}
                 user_deltas = {}
-                max_pos = room_max_stream_ordering
 
             # Then count deltas for total_events and total_event_bytes.
-            room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+            (
+                room_count,
+                user_count,
+            ) = await self.store.get_changes_room_total_events_and_bytes(
                 self.pos, max_pos
             )
 
@@ -118,7 +119,7 @@ class StatsHandler(StateDeltasHandler):
             logger.debug("user_deltas: %s", user_deltas)
 
             # Always call this so that we update the stats position.
-            yield self.store.bulk_update_stats_delta(
+            await self.store.bulk_update_stats_delta(
                 self.clock.time_msec(),
                 updates={"room": room_deltas, "user": user_deltas},
                 stream_id=max_pos,
@@ -130,13 +131,12 @@ class StatsHandler(StateDeltasHandler):
 
             self.pos = max_pos
 
-    @defer.inlineCallbacks
-    def _handle_deltas(self, deltas):
+    async def _handle_deltas(self, deltas):
         """Called with the state deltas to process
 
         Returns:
-            Deferred[tuple[dict[str, Counter], dict[str, counter]]]
-            Resovles to two dicts, the room deltas and the user deltas,
+            tuple[dict[str, Counter], dict[str, counter]]
+            Two dicts: the room deltas and the user deltas,
             mapping from room/user ID to changes in the various fields.
         """
 
@@ -155,7 +155,7 @@ class StatsHandler(StateDeltasHandler):
 
             logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_stats("room", room_id)
+            token = await self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -177,7 +177,7 @@ class StatsHandler(StateDeltasHandler):
 
             sender = None
             if event_id is not None:
-                event = yield self.store.get_event(event_id, allow_none=True)
+                event = await self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
                     sender = event.sender
@@ -193,16 +193,16 @@ class StatsHandler(StateDeltasHandler):
                 room_stats_delta["current_state_events"] += 1
 
             if typ == EventTypes.Member:
-                # we could use _get_key_change here but it's a bit inefficient
-                # given we're not testing for a specific result; might as well
-                # just grab the prev_membership and membership strings and
-                # compare them.
+                # we could use StateDeltasHandler._get_key_change here but it's
+                # a bit inefficient given we're not testing for a specific
+                # result; might as well just grab the prev_membership and
+                # membership strings and compare them.
                 # We take None rather than leave as a previous membership
                 # in the absence of a previous event because we do not want to
                 # reduce the leave count when a new-to-the-room user joins.
                 prev_membership = None
                 if prev_event_id is not None:
-                    prev_event = yield self.store.get_event(
+                    prev_event = await self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
@@ -279,7 +279,7 @@ class StatsHandler(StateDeltasHandler):
                 room_state["history_visibility"] = event_content.get(
                     "history_visibility"
                 )
-            elif typ == EventTypes.Encryption:
+            elif typ == EventTypes.RoomEncryption:
                 room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
                 room_state["name"] = event_content.get("name")
@@ -293,6 +293,7 @@ class StatsHandler(StateDeltasHandler):
                 room_state["guest_access"] = event_content.get("guest_access")
 
         for room_id, state in room_to_state_updates.items():
-            yield self.store.update_room_state(room_id, state)
+            logger.debug("Updating room_stats_state for %s: %s", room_id, state)
+            await self.store.update_room_state(room_id, state)
 
         return room_to_stats_deltas, user_to_stats_deltas