diff options
author | Erik Johnston <erik@matrix.org> | 2019-09-02 13:04:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-02 13:04:54 +0100 |
commit | 745f2da4b8d90efbfe1cd813936816052d6c9a47 (patch) | |
tree | 20d8067f2e1b5d19b4232fdb2d97322e27c92ad1 /synapse | |
parent | Merge pull request #5941 from matrix-org/rei/rss_inc7 (diff) | |
parent | Renamve get_room_state (diff) | |
download | synapse-github/rei/rss_target.tar.xz |
Merge pull request #5946 from matrix-org/rei/rss_inc8 github/rei/rss_target rei/rss_target
Separated Statistics [8/7ish EOF]
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/stats.py | 29 | ||||
-rw-r--r-- | synapse/storage/stats.py | 84 |
2 files changed, 92 insertions, 21 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f44adfc07b..e849c38b85 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -98,15 +98,16 @@ class StatsHandler(StateDeltasHandler): self.pos["state_delta_stream_id"] ) - logger.debug("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) + if deltas: + logger.debug("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) - self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] - yield self.store.update_stats_positions(self.pos) + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) - event_processing_positions.labels("stats").set( - self.pos["state_delta_stream_id"] - ) + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) # Then count deltas for total_events and total_event_bytes. with Measure(self.clock, "stats_total_events_and_bytes"): @@ -129,7 +130,6 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] - stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -158,12 +158,9 @@ class StatsHandler(StateDeltasHandler): if event: event_content = event.content or {} - # We use stream_pos here rather than fetch by event_id as event_id - # may be None - stream_timestamp = yield self.store.get_received_ts_by_stream_pos( - stream_pos - ) - stream_timestamp = int(stream_timestamp) + # We can't afford for this time to stray into the past, so we count + # it as now. + stream_timestamp = int(self.clock.time_msec()) # All the values in this dict are deltas (RELATIVE changes) room_stats_delta = {} @@ -261,7 +258,7 @@ class StatsHandler(StateDeltasHandler): is_newly_created = True elif typ == EventTypes.JoinRules: - old_room_state = yield self.store.get_room_state(room_id) + old_room_state = yield self.store.get_room_stats_state(room_id) yield self.store.update_room_state( room_id, {"join_rules": event_content.get("join_rule")} ) @@ -282,7 +279,7 @@ class StatsHandler(StateDeltasHandler): ) elif typ == EventTypes.RoomHistoryVisibility: - old_room_state = yield self.store.get_room_state(room_id) + old_room_state = yield self.store.get_room_stats_state(room_id) yield self.store.update_room_state( room_id, {"history_visibility": event_content.get("history_visibility")}, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 24f739a017..f20d8ba8a4 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -661,6 +661,79 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) + def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): + """ + Get statistics for a given subject. + + Args: + stats_type (str): The type of subject + stats_id (str): The ID of the subject (e.g. room_id or user_id) + start (int): Pagination start. Number of entries, not timestamp. + size (int): How many entries to return. + + Returns: + Deferred[list[dict]], where the dict has the keys of + ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". + """ + return self.runInteraction( + "get_statistics_for_subject", + self._get_statistics_for_subject_txn, + stats_type, + stats_id, + start, + size, + ) + + def _get_statistics_for_subject_txn( + self, txn, stats_type, stats_id, start, size=100 + ): + """ + Transaction-bound version of L{get_statistics_for_subject}. + """ + + table, id_col = TYPE_TO_TABLE[stats_type] + selected_columns = list( + ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] + ) + + slice_list = self._simple_select_list_paginate_txn( + txn, + table + "_historical", + {id_col: stats_id}, + "end_ts", + start, + size, + retcols=selected_columns + ["bucket_size", "end_ts"], + order_direction="DESC", + ) + + return slice_list + + def get_room_stats_state(self, room_id): + """ + Returns the current room_stats_state for a room. + + Args: + room_id (str): The ID of the room to return state for. + + Returns (dict): + Dictionary containing these keys: + "name", "topic", "canonical_alias", "avatar", "join_rules", + "history_visibility" + """ + return self._simple_select_one( + "room_stats_state", + {"room_id": room_id}, + retcols=( + "name", + "topic", + "canonical_alias", + "avatar", + "join_rules", + "history_visibility", + ), + ) + @cached() def get_earliest_token_for_stats(self, stats_type, id): """ @@ -948,10 +1021,11 @@ class StatsStore(StateDeltasStore): src_row = self._simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) + all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} dest_current_row = self._simple_select_one_txn( txn, into_table, - keyvalues, + keyvalues=all_dest_keyvalues, retcols=list(chain(additive_relatives.keys(), copy_columns)), allow_none=True, ) @@ -968,7 +1042,7 @@ class StatsStore(StateDeltasStore): else: for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val - self._simple_update_txn(txn, into_table, keyvalues, src_row) + self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) def incremental_update_room_total_events_and_bytes(self, in_positions): """ @@ -1059,14 +1133,14 @@ class StatsStore(StateDeltasStore): new_bytes_expression = "LENGTH(CAST(json AS BLOB))" sql = """ - SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes FROM events INNER JOIN event_json USING (event_id) WHERE ? %s stream_ordering AND stream_ordering %s ? - GROUP BY room_id + GROUP BY events.room_id """ % ( + new_bytes_expression, low_comparator, high_comparator, - new_bytes_expression, ) txn.execute(sql, (low_pos, high_pos)) |