summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:40:51 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:40:51 +0100
commit4a45fb5ab892bb6b62a80ecf782f590eda87815f (patch)
treeeb2317299dad59cee39ceb6493f56aa43e129205
parentIntroduce `total_events` tracking and rework statistics tracking. (diff)
downloadsynapse-4a45fb5ab892bb6b62a80ecf782f590eda87815f.tar.xz
Adapt state delta handling to match the new interface
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r--synapse/handlers/stats.py140
1 files changed, 91 insertions, 49 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 5de9a039a9..bbd120a493 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -122,7 +122,7 @@ class StatsHandler(StateDeltasHandler):
 
             logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -147,44 +147,56 @@ class StatsHandler(StateDeltasHandler):
             # We use stream_pos here rather than fetch by event_id as event_id
             # may be None
             now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            now = int(now) // 1000
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            room_stats_delta = {}
+            room_stats_complete = False
+
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] = (
+                    room_stats_delta.get("current_state_events", 0) + 1
+                )
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
 
-                if prev_membership == membership:
-                    continue
-
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] = (
+                        room_stats_delta.get("joined_members", 0) - 1
                     )
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
+                    room_stats_delta["invited_members"] = (
+                        room_stats_delta.get("invited_members", 0) - 1
                     )
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
+                    room_stats_delta["left_members"] = (
+                        room_stats_delta.get("left_members", 0) - 1
                     )
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
+                    room_stats_delta["banned_members"] = (
+                        room_stats_delta.get("banned_members", 0) - 1
                     )
                 else:
                     err = "%s is not a valid prev_membership" % (repr(prev_membership),)
@@ -192,20 +204,20 @@ class StatsHandler(StateDeltasHandler):
                     raise ValueError(err)
 
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
+                    room_stats_delta["joined_members"] = (
+                        room_stats_delta.get("joined_members", 0) + 1
                     )
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
+                    room_stats_delta["invited_members"] = (
+                        room_stats_delta.get("invited_members", 0) + 1
                     )
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
+                    room_stats_delta["left_members"] = (
+                        room_stats_delta.get("left_members", 0) + 1
                     )
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
+                    room_stats_delta["banned_members"] = (
+                        room_stats_delta.get("banned_members", 0) + 1
                     )
                 else:
                     err = "%s is not a valid membership" % (repr(membership),)
@@ -213,26 +225,21 @@ class StatsHandler(StateDeltasHandler):
                     raise ValueError(err)
 
                 user_id = state_key
-                if self.is_mine_id(user_id):
+                if self.is_mine_id(user_id) and membership in (
+                    Membership.JOIN,
+                    Membership.LEAVE,
+                ):
                     # update user_stats as it's one of our users
                     public = yield self._is_public_room(room_id)
 
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
+                    field = "public_rooms" if public else "private_rooms"
+                    delta = +1 if membership == Membership.JOIN else -1
+
+                    yield ensureDeferred(
+                        self.store.update_stats_delta(
+                            now, "user", user_id, {field: delta}
                         )
+                    )
 
             elif typ == EventTypes.Create:
                 # Newly created room. Add it with all blank portions.
@@ -249,28 +256,46 @@ class StatsHandler(StateDeltasHandler):
                     },
                 )
 
+                room_stats_complete = True
+
             elif typ == EventTypes.JoinRules:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id, {"join_rules": event_content.get("join_rule")}
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                # whether the room would be public anyway,
+                # because of history_visibility
+                other_field_gives_publicity = (
+                    old_room_state["history_visibility"] == "world_readable"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(now, room_id, is_public)
 
             elif typ == EventTypes.RoomHistoryVisibility:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id,
                     {"history_visibility": event_content.get("history_visibility")},
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                # whether the room would be public anyway,
+                # because of join_rule
+                other_field_gives_publicity = (
+                    old_room_state["join_rules"] == JoinRules.PUBLIC
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "history_visibility", "world_readable"
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(now, room_id, is_public)
 
             elif typ == EventTypes.Encryption:
                 yield self.store.update_room_state(
@@ -293,6 +318,23 @@ class StatsHandler(StateDeltasHandler):
                     room_id, {"canonical_alias": event_content.get("alias")}
                 )
 
+            if room_stats_complete:
+                yield ensureDeferred(
+                    self.store.update_stats_delta(
+                        now,
+                        "room",
+                        room_id,
+                        room_stats_delta,
+                        complete_with_stream_id=stream_id,
+                    )
+                )
+            elif len(room_stats_delta) > 0:
+                yield ensureDeferred(
+                    self.store.update_stats_delta(
+                        now, "room", room_id, room_stats_delta
+                    )
+                )
+
     @defer.inlineCallbacks
     def update_public_room_stats(self, ts, room_id, is_public):
         """