summary refs log tree commit diff
path: root/synapse/handlers/stats.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r--synapse/handlers/stats.py323
1 files changed, 148 insertions, 175 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..7f7d56390e 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -14,15 +14,14 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -46,6 +45,8 @@ class StatsHandler(StateDeltasHandler):
         self.is_mine_id = hs.is_mine_id
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_enabled = hs.config.stats_enabled
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -62,11 +63,10 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled:
+        if not self.stats_enabled or self._is_processing:
             return
 
-        if self._is_processing:
-            return
+        self._is_processing = True
 
         @defer.inlineCallbacks
         def process():
@@ -75,39 +75,83 @@ class StatsHandler(StateDeltasHandler):
             finally:
                 self._is_processing = False
 
-        self._is_processing = True
         run_as_background_process("stats.notify_new_event", process)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
-
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
-            return None
+            self.pos = yield self.store.get_stats_positions()
 
         # Loop round handling deltas until we're up to date
+
         while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            logger.debug(
+                "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+            )
+            max_pos, deltas = yield self.store.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
+
+            if deltas:
+                logger.debug("Handling %d state deltas", len(deltas))
+                room_deltas, user_deltas = yield self._handle_deltas(deltas)
+            else:
+                room_deltas = {}
+                user_deltas = {}
+
+            # Then count deltas for total_events and total_event_bytes.
+            (
+                room_count,
+                user_count,
+            ) = yield self.store.get_changes_room_total_events_and_bytes(
+                self.pos, max_pos
+            )
+
+            for room_id, fields in room_count.items():
+                room_deltas.setdefault(room_id, {}).update(fields)
+
+            for user_id, fields in user_count.items():
+                user_deltas.setdefault(user_id, {}).update(fields)
+
+            logger.debug("room_deltas: %s", room_deltas)
+            logger.debug("user_deltas: %s", user_deltas)
+
+            # Always call this so that we update the stats position.
+            yield self.store.bulk_update_stats_delta(
+                self.clock.time_msec(),
+                updates={"room": room_deltas, "user": user_deltas},
+                stream_id=max_pos,
+            )
 
-                logger.info("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
+            logger.debug("Handled room stats to %s -> %s", self.pos, max_pos)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+            event_processing_positions.labels("stats").set(max_pos)
 
-                event_processing_positions.labels("stats").set(self.pos)
+            self.pos = max_pos
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+
+        Returns:
+            Deferred[tuple[dict[str, Counter], dict[str, counter]]]
+            Resovles to two dicts, the room deltas and the user deltas,
+            mapping from room/user ID to changes in the various fields.
         """
-        Called with the state deltas to process
-        """
+
+        room_to_stats_deltas = {}
+        user_to_stats_deltas = {}
+
+        room_to_state_updates = {}
+
         for delta in deltas:
             typ = delta["type"]
             state_key = delta["state_key"]
@@ -115,11 +159,10 @@ class StatsHandler(StateDeltasHandler):
             event_id = delta["event_id"]
             stream_id = delta["stream_id"]
             prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
 
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -131,203 +174,133 @@ class StatsHandler(StateDeltasHandler):
                 continue
 
             if event_id is None and prev_event_id is None:
-                # Errr...
+                logger.error(
+                    "event ID is None and so is the previous event ID. stream_id: %s",
+                    stream_id,
+                )
                 continue
 
             event_content = {}
 
+            sender = None
             if event_id is not None:
                 event = yield self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
+                    sender = event.sender
 
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            # All the values in this dict are deltas (RELATIVE changes)
+            room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            room_state = room_to_state_updates.setdefault(room_id, {})
+
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] += 1
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
 
-                if prev_membership == membership:
-                    continue
-
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif membership == prev_membership:
+                    pass  # noop
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] -= 1
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
+                    room_stats_delta["invited_members"] -= 1
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
+                    room_stats_delta["left_members"] -= 1
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
+                    room_stats_delta["banned_members"] -= 1
                 else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError(
+                        "%r is not a valid prev_membership" % (prev_membership,)
+                    )
 
+                if membership == prev_membership:
+                    pass  # noop
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
+                    room_stats_delta["joined_members"] += 1
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
+                    room_stats_delta["invited_members"] += 1
+
+                    if sender and self.is_mine_id(sender):
+                        user_to_stats_deltas.setdefault(sender, Counter())[
+                            "invites_sent"
+                        ] += 1
+
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
+                    room_stats_delta["left_members"] += 1
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
+                    room_stats_delta["banned_members"] += 1
                 else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError("%r is not a valid membership" % (membership,))
 
                 user_id = state_key
                 if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
-
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
+                    # this accounts for transitions like leave → ban and so on.
+                    has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+                        membership == Membership.JOIN
+                    )
 
-            elif typ == EventTypes.Create:
-                # Newly created room. Add it with all blank portions.
-                yield self.store.update_room_state(
-                    room_id,
-                    {
-                        "join_rules": None,
-                        "history_visibility": None,
-                        "encryption": None,
-                        "name": None,
-                        "topic": None,
-                        "avatar": None,
-                        "canonical_alias": None,
-                    },
-                )
+                    if has_changed_joinedness:
+                        delta = +1 if membership == Membership.JOIN else -1
 
-            elif typ == EventTypes.JoinRules:
-                yield self.store.update_room_state(
-                    room_id, {"join_rules": event_content.get("join_rule")}
-                )
+                        user_to_stats_deltas.setdefault(user_id, Counter())[
+                            "joined_rooms"
+                        ] += delta
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+                        room_stats_delta["local_users_in_room"] += delta
 
-            elif typ == EventTypes.RoomHistoryVisibility:
-                yield self.store.update_room_state(
-                    room_id,
-                    {"history_visibility": event_content.get("history_visibility")},
+            elif typ == EventTypes.Create:
+                room_state["is_federatable"] = (
+                    event_content.get("m.federate", True) is True
                 )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                if sender and self.is_mine_id(sender):
+                    user_to_stats_deltas.setdefault(sender, Counter())[
+                        "rooms_created"
+                    ] += 1
+            elif typ == EventTypes.JoinRules:
+                room_state["join_rules"] = event_content.get("join_rule")
+            elif typ == EventTypes.RoomHistoryVisibility:
+                room_state["history_visibility"] = event_content.get(
+                    "history_visibility"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
             elif typ == EventTypes.Encryption:
-                yield self.store.update_room_state(
-                    room_id, {"encryption": event_content.get("algorithm")}
-                )
+                room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
-                yield self.store.update_room_state(
-                    room_id, {"name": event_content.get("name")}
-                )
+                room_state["name"] = event_content.get("name")
             elif typ == EventTypes.Topic:
-                yield self.store.update_room_state(
-                    room_id, {"topic": event_content.get("topic")}
-                )
+                room_state["topic"] = event_content.get("topic")
             elif typ == EventTypes.RoomAvatar:
-                yield self.store.update_room_state(
-                    room_id, {"avatar": event_content.get("url")}
-                )
+                room_state["avatar"] = event_content.get("url")
             elif typ == EventTypes.CanonicalAlias:
-                yield self.store.update_room_state(
-                    room_id, {"canonical_alias": event_content.get("alias")}
-                )
-
-    @defer.inlineCallbacks
-    def update_public_room_stats(self, ts, room_id, is_public):
-        """
-        Increment/decrement a user's number of public rooms when a room they are
-        in changes to/from public visibility.
-
-        Args:
-            ts (int): Timestamp in seconds
-            room_id (str)
-            is_public (bool)
-        """
-        # For now, blindly iterate over all local users in the room so that
-        # we can handle the whole problem of copying buckets over as needed
-        user_ids = yield self.store.get_users_in_room(room_id)
-
-        for user_id in user_ids:
-            if self.hs.is_mine(UserID.from_string(user_id)):
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
-                )
+                room_state["canonical_alias"] = event_content.get("alias")
+            elif typ == EventTypes.GuestAccess:
+                room_state["guest_access"] = event_content.get("guest_access")
 
-    @defer.inlineCallbacks
-    def _is_public_room(self, room_id):
-        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
-        history_visibility = yield self.state.get_current_state(
-            room_id, EventTypes.RoomHistoryVisibility
-        )
+        for room_id, state in room_to_state_updates.items():
+            logger.info("Updating room_stats_state for %s: %s", room_id, state)
+            yield self.store.update_room_state(room_id, state)
 
-        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
-            (
-                history_visibility
-                and history_visibility.content.get("history_visibility")
-                == "world_readable"
-            )
-        ):
-            return True
-        else:
-            return False
+        return room_to_stats_deltas, user_to_stats_deltas