diff options
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r-- | synapse/handlers/stats.py | 47 |
1 files changed, 20 insertions, 27 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index d93a276693..149f861239 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -16,17 +16,14 @@ import logging from collections import Counter -from twisted.internet import defer - from synapse.api.constants import EventTypes, Membership -from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) -class StatsHandler(StateDeltasHandler): +class StatsHandler: """Handles keeping the *_stats tables updated with a simple time-series of information about the users, rooms and media on the server, such that admins have some idea of who is consuming their resources. @@ -35,7 +32,6 @@ class StatsHandler(StateDeltasHandler): """ def __init__(self, hs): - super(StatsHandler, self).__init__(hs) self.hs = hs self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -68,20 +64,18 @@ class StatsHandler(StateDeltasHandler): self._is_processing = True - @defer.inlineCallbacks - def process(): + async def process(): try: - yield self._unsafe_process() + await self._unsafe_process() finally: self._is_processing = False run_as_background_process("stats.notify_new_event", process) - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: - self.pos = yield self.store.get_stats_positions() + self.pos = await self.store.get_stats_positions() # Loop round handling deltas until we're up to date @@ -96,13 +90,13 @@ class StatsHandler(StateDeltasHandler): logger.debug( "Processing room stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = yield self.store.get_current_state_deltas( + max_pos, deltas = await self.store.get_current_state_deltas( self.pos, room_max_stream_ordering ) if deltas: logger.debug("Handling %d state deltas", len(deltas)) - room_deltas, user_deltas = yield self._handle_deltas(deltas) + room_deltas, user_deltas = await self._handle_deltas(deltas) else: room_deltas = {} user_deltas = {} @@ -111,7 +105,7 @@ class StatsHandler(StateDeltasHandler): ( room_count, user_count, - ) = yield self.store.get_changes_room_total_events_and_bytes( + ) = await self.store.get_changes_room_total_events_and_bytes( self.pos, max_pos ) @@ -125,7 +119,7 @@ class StatsHandler(StateDeltasHandler): logger.debug("user_deltas: %s", user_deltas) # Always call this so that we update the stats position. - yield self.store.bulk_update_stats_delta( + await self.store.bulk_update_stats_delta( self.clock.time_msec(), updates={"room": room_deltas, "user": user_deltas}, stream_id=max_pos, @@ -137,13 +131,12 @@ class StatsHandler(StateDeltasHandler): self.pos = max_pos - @defer.inlineCallbacks - def _handle_deltas(self, deltas): + async def _handle_deltas(self, deltas): """Called with the state deltas to process Returns: - Deferred[tuple[dict[str, Counter], dict[str, counter]]] - Resovles to two dicts, the room deltas and the user deltas, + tuple[dict[str, Counter], dict[str, counter]] + Two dicts: the room deltas and the user deltas, mapping from room/user ID to changes in the various fields. """ @@ -162,7 +155,7 @@ class StatsHandler(StateDeltasHandler): logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_stats("room", room_id) + token = await self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -184,7 +177,7 @@ class StatsHandler(StateDeltasHandler): sender = None if event_id is not None: - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if event: event_content = event.content or {} sender = event.sender @@ -200,16 +193,16 @@ class StatsHandler(StateDeltasHandler): room_stats_delta["current_state_events"] += 1 if typ == EventTypes.Member: - # we could use _get_key_change here but it's a bit inefficient - # given we're not testing for a specific result; might as well - # just grab the prev_membership and membership strings and - # compare them. + # we could use StateDeltasHandler._get_key_change here but it's + # a bit inefficient given we're not testing for a specific + # result; might as well just grab the prev_membership and + # membership strings and compare them. # We take None rather than leave as a previous membership # in the absence of a previous event because we do not want to # reduce the leave count when a new-to-the-room user joins. prev_membership = None if prev_event_id is not None: - prev_event = yield self.store.get_event( + prev_event = await self.store.get_event( prev_event_id, allow_none=True ) if prev_event: @@ -301,6 +294,6 @@ class StatsHandler(StateDeltasHandler): for room_id, state in room_to_state_updates.items(): logger.debug("Updating room_stats_state for %s: %s", room_id, state) - yield self.store.update_room_state(room_id, state) + await self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas |