summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-09-02 13:04:54 +0100
committerGitHub <noreply@github.com>2019-09-02 13:04:54 +0100
commit745f2da4b8d90efbfe1cd813936816052d6c9a47 (patch)
tree20d8067f2e1b5d19b4232fdb2d97322e27c92ad1 /synapse
parentMerge pull request #5941 from matrix-org/rei/rss_inc7 (diff)
parentRenamve get_room_state (diff)
downloadsynapse-github/rei/rss_target.tar.xz
Merge pull request #5946 from matrix-org/rei/rss_inc8 github/rei/rss_target rei/rss_target
Separated Statistics [8/7ish EOF]
Diffstat (limited to '')
-rw-r--r--synapse/handlers/stats.py29
-rw-r--r--synapse/storage/stats.py84
2 files changed, 92 insertions, 21 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index f44adfc07b..e849c38b85 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -98,15 +98,16 @@ class StatsHandler(StateDeltasHandler):
                     self.pos["state_delta_stream_id"]
                 )
 
-                logger.debug("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
+                if deltas:
+                    logger.debug("Handling %d state deltas", len(deltas))
+                    yield self._handle_deltas(deltas)
 
-                self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
-                yield self.store.update_stats_positions(self.pos)
+                    self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+                    yield self.store.update_stats_positions(self.pos)
 
-                event_processing_positions.labels("stats").set(
-                    self.pos["state_delta_stream_id"]
-                )
+                    event_processing_positions.labels("stats").set(
+                        self.pos["state_delta_stream_id"]
+                    )
 
             # Then count deltas for total_events and total_event_bytes.
             with Measure(self.clock, "stats_total_events_and_bytes"):
@@ -129,7 +130,6 @@ class StatsHandler(StateDeltasHandler):
             event_id = delta["event_id"]
             stream_id = delta["stream_id"]
             prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
 
             logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
@@ -158,12 +158,9 @@ class StatsHandler(StateDeltasHandler):
                 if event:
                     event_content = event.content or {}
 
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
-                stream_pos
-            )
-            stream_timestamp = int(stream_timestamp)
+            # We can't afford for this time to stray into the past, so we count
+            # it as now.
+            stream_timestamp = int(self.clock.time_msec())
 
             # All the values in this dict are deltas (RELATIVE changes)
             room_stats_delta = {}
@@ -261,7 +258,7 @@ class StatsHandler(StateDeltasHandler):
                 is_newly_created = True
 
             elif typ == EventTypes.JoinRules:
-                old_room_state = yield self.store.get_room_state(room_id)
+                old_room_state = yield self.store.get_room_stats_state(room_id)
                 yield self.store.update_room_state(
                     room_id, {"join_rules": event_content.get("join_rule")}
                 )
@@ -282,7 +279,7 @@ class StatsHandler(StateDeltasHandler):
                         )
 
             elif typ == EventTypes.RoomHistoryVisibility:
-                old_room_state = yield self.store.get_room_state(room_id)
+                old_room_state = yield self.store.get_room_stats_state(room_id)
                 yield self.store.update_room_state(
                     room_id,
                     {"history_visibility": event_content.get("history_visibility")},
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 24f739a017..f20d8ba8a4 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -661,6 +661,79 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
+    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
+        """
+        Get statistics for a given subject.
+
+        Args:
+            stats_type (str): The type of subject
+            stats_id (str): The ID of the subject (e.g. room_id or user_id)
+            start (int): Pagination start. Number of entries, not timestamp.
+            size (int): How many entries to return.
+
+        Returns:
+            Deferred[list[dict]], where the dict has the keys of
+            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
+        """
+        return self.runInteraction(
+            "get_statistics_for_subject",
+            self._get_statistics_for_subject_txn,
+            stats_type,
+            stats_id,
+            start,
+            size,
+        )
+
+    def _get_statistics_for_subject_txn(
+        self, txn, stats_type, stats_id, start, size=100
+    ):
+        """
+        Transaction-bound version of L{get_statistics_for_subject}.
+        """
+
+        table, id_col = TYPE_TO_TABLE[stats_type]
+        selected_columns = list(
+            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+        )
+
+        slice_list = self._simple_select_list_paginate_txn(
+            txn,
+            table + "_historical",
+            {id_col: stats_id},
+            "end_ts",
+            start,
+            size,
+            retcols=selected_columns + ["bucket_size", "end_ts"],
+            order_direction="DESC",
+        )
+
+        return slice_list
+
+    def get_room_stats_state(self, room_id):
+        """
+        Returns the current room_stats_state for a room.
+
+        Args:
+            room_id (str): The ID of the room to return state for.
+
+        Returns (dict):
+            Dictionary containing these keys:
+                "name", "topic", "canonical_alias", "avatar", "join_rules",
+                "history_visibility"
+        """
+        return self._simple_select_one(
+            "room_stats_state",
+            {"room_id": room_id},
+            retcols=(
+                "name",
+                "topic",
+                "canonical_alias",
+                "avatar",
+                "join_rules",
+                "history_visibility",
+            ),
+        )
+
     @cached()
     def get_earliest_token_for_stats(self, stats_type, id):
         """
@@ -948,10 +1021,11 @@ class StatsStore(StateDeltasStore):
             src_row = self._simple_select_one_txn(
                 txn, src_table, keyvalues, copy_columns
             )
+            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
             dest_current_row = self._simple_select_one_txn(
                 txn,
                 into_table,
-                keyvalues,
+                keyvalues=all_dest_keyvalues,
                 retcols=list(chain(additive_relatives.keys(), copy_columns)),
                 allow_none=True,
             )
@@ -968,7 +1042,7 @@ class StatsStore(StateDeltasStore):
             else:
                 for (key, val) in additive_relatives.items():
                     src_row[key] = dest_current_row[key] + val
-                self._simple_update_txn(txn, into_table, keyvalues, src_row)
+                self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
 
     def incremental_update_room_total_events_and_bytes(self, in_positions):
         """
@@ -1059,14 +1133,14 @@ class StatsStore(StateDeltasStore):
             new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
 
         sql = """
-            SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
             FROM events INNER JOIN event_json USING (event_id)
             WHERE ? %s stream_ordering AND stream_ordering %s ?
-            GROUP BY room_id
+            GROUP BY events.room_id
         """ % (
+            new_bytes_expression,
             low_comparator,
             high_comparator,
-            new_bytes_expression,
         )
 
         txn.execute(sql, (low_pos, high_pos))