diff options
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r-- | synapse/handlers/stats.py | 64 |
1 files changed, 32 insertions, 32 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 3dde19fc81..817b41aa37 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -27,6 +27,7 @@ from typing import ( from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import JsonDict if TYPE_CHECKING: @@ -142,7 +143,7 @@ class StatsHandler: self.pos = max_pos async def _handle_deltas( - self, deltas: Iterable[JsonDict] + self, deltas: Iterable[StateDelta] ) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]: """Called with the state deltas to process @@ -157,51 +158,50 @@ class StatsHandler: room_to_state_updates: Dict[str, Dict[str, Any]] = {} for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id = delta["event_id"] - stream_id = delta["stream_id"] - prev_event_id = delta["prev_event_id"] - - logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) + logger.debug( + "Handling: %r, %r %r, %s", + delta.room_id, + delta.event_type, + delta.state_key, + delta.event_id, + ) - token = await self.store.get_earliest_token_for_stats("room", room_id) + token = await self.store.get_earliest_token_for_stats("room", delta.room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. - if token is not None and token >= stream_id: + if token is not None and token >= delta.stream_id: logger.debug( "Ignoring: %s as earlier than this room's initial ingestion event", - event_id, + delta.event_id, ) continue - if event_id is None and prev_event_id is None: + if delta.event_id is None and delta.prev_event_id is None: logger.error( "event ID is None and so is the previous event ID. stream_id: %s", - stream_id, + delta.stream_id, ) continue event_content: JsonDict = {} - if event_id is not None: - event = await self.store.get_event(event_id, allow_none=True) + if delta.event_id is not None: + event = await self.store.get_event(delta.event_id, allow_none=True) if event: event_content = event.content or {} # All the values in this dict are deltas (RELATIVE changes) - room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter()) + room_stats_delta = room_to_stats_deltas.setdefault(delta.room_id, Counter()) - room_state = room_to_state_updates.setdefault(room_id, {}) + room_state = room_to_state_updates.setdefault(delta.room_id, {}) - if prev_event_id is None: + if delta.prev_event_id is None: # this state event doesn't overwrite another, # so it is a new effective/current state event room_stats_delta["current_state_events"] += 1 - if typ == EventTypes.Member: + if delta.event_type == EventTypes.Member: # we could use StateDeltasHandler._get_key_change here but it's # a bit inefficient given we're not testing for a specific # result; might as well just grab the prev_membership and @@ -210,9 +210,9 @@ class StatsHandler: # in the absence of a previous event because we do not want to # reduce the leave count when a new-to-the-room user joins. prev_membership = None - if prev_event_id is not None: + if delta.prev_event_id is not None: prev_event = await self.store.get_event( - prev_event_id, allow_none=True + delta.prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content @@ -256,7 +256,7 @@ class StatsHandler: else: raise ValueError("%r is not a valid membership" % (membership,)) - user_id = state_key + user_id = delta.state_key if self.is_mine_id(user_id): # this accounts for transitions like leave → ban and so on. has_changed_joinedness = (prev_membership == Membership.JOIN) != ( @@ -272,30 +272,30 @@ class StatsHandler: room_stats_delta["local_users_in_room"] += membership_delta - elif typ == EventTypes.Create: + elif delta.event_type == EventTypes.Create: room_state["is_federatable"] = ( event_content.get(EventContentFields.FEDERATE, True) is True ) room_type = event_content.get(EventContentFields.ROOM_TYPE) if isinstance(room_type, str): room_state["room_type"] = room_type - elif typ == EventTypes.JoinRules: + elif delta.event_type == EventTypes.JoinRules: room_state["join_rules"] = event_content.get("join_rule") - elif typ == EventTypes.RoomHistoryVisibility: + elif delta.event_type == EventTypes.RoomHistoryVisibility: room_state["history_visibility"] = event_content.get( "history_visibility" ) - elif typ == EventTypes.RoomEncryption: + elif delta.event_type == EventTypes.RoomEncryption: room_state["encryption"] = event_content.get("algorithm") - elif typ == EventTypes.Name: + elif delta.event_type == EventTypes.Name: room_state["name"] = event_content.get("name") - elif typ == EventTypes.Topic: + elif delta.event_type == EventTypes.Topic: room_state["topic"] = event_content.get("topic") - elif typ == EventTypes.RoomAvatar: + elif delta.event_type == EventTypes.RoomAvatar: room_state["avatar"] = event_content.get("url") - elif typ == EventTypes.CanonicalAlias: + elif delta.event_type == EventTypes.CanonicalAlias: room_state["canonical_alias"] = event_content.get("alias") - elif typ == EventTypes.GuestAccess: + elif delta.event_type == EventTypes.GuestAccess: room_state["guest_access"] = event_content.get( EventContentFields.GUEST_ACCESS ) |