diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-07-22 17:32:19 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-07-22 17:32:19 +0100 |
commit | 0887bb305247d2a132c93f5b26cb241da26c477f (patch) | |
tree | 83ba63e1118c1d745fa9c66f86ae42ec3cad562f | |
parent | Introduce `total_events` column in `room_stats`. (diff) | |
download | synapse-0887bb305247d2a132c93f5b26cb241da26c477f.tar.xz |
Add counting for `total_events` in room statistics. github/rei/room_stats_total_events rei/room_stats_total_events
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r-- | synapse/handlers/stats.py | 43 | ||||
-rw-r--r-- | synapse/storage/stats.py | 150 |
2 files changed, 132 insertions, 61 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f595de1537..d86a412b7f 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -88,20 +88,42 @@ class StatsHandler(StateDeltasHandler): if self.pos is None: defer.returnValue(None) - # Loop round handling deltas until we're up to date - while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) + max_pos = yield self.store.get_room_max_stream_ordering() + + if max_pos == self.pos: + return + + pos_of_delta = self.pos + with Measure(self.clock, "stats_delta"): + # Loop round handling deltas until we're up to date with deltas + while True: + # note that get_current_state_deltas isn't greedy – + # it is limited + deltas = yield self.store.get_current_state_deltas(pos_of_delta) + if not deltas: - return + break logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + pos_of_delta = deltas[-1]["stream_id"] + + if pos_of_delta > self.pos: + new_pos = max(pos_of_delta, max_pos) + else: + new_pos = max_pos + + with Measure(self.clock, "stats_total_events"): + yield self.store.update_total_event_count_between( + old_pos=self.pos, new_pos=new_pos + ) + + self.pos = new_pos + + yield self.store.update_stats_stream_pos(self.pos) - event_processing_positions.labels("stats").set(self.pos) + event_processing_positions.labels("stats").set(self.pos) @defer.inlineCallbacks def _handle_deltas(self, deltas): @@ -268,12 +290,15 @@ class StatsHandler(StateDeltasHandler): now, { "bucket_size": self.stats_bucket_size, + # This m.room.create state event is indeed a state event + # so we count it as one. "current_state_events": 1, "joined_members": 0, "invited_members": 0, "left_members": 0, "banned_members": 0, - # this column is disused but not yet removed from the + "total_events": 0, + # This column is disused but not yet removed from the # schema, so fill with -1. "state_events": -1, }, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 440bac41de..2b373e9b1a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -97,7 +97,7 @@ class StatsStore(StateDeltasStore): sql = ( "CREATE TABLE IF NOT EXISTS " + TEMP_TABLE - + "_position(position TEXT NOT NULL)" + + "_position(position BIGINT NOT NULL)" ) txn.execute(sql) @@ -258,6 +258,8 @@ class StatsStore(StateDeltasStore): membership_counts = self._get_user_counts_in_room_txn(txn, room_id) + room_total_event_count = self._count_events_in_room_txn(txn, room_id, current_token) + self._update_stats_txn( txn, "room", @@ -270,6 +272,7 @@ class StatsStore(StateDeltasStore): "invited_members": membership_counts.get(Membership.INVITE, 0), "left_members": membership_counts.get(Membership.LEAVE, 0), "banned_members": membership_counts.get(Membership.BAN, 0), + "total_events": room_total_event_count, # this column is disused but not (yet) removed from the # schema, so we fill it with -1. "state_events": -1, @@ -425,56 +428,99 @@ class StatsStore(StateDeltasStore): ) def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] + return self.runInteraction("update_stats_delta", self._update_stats_delta_txn, ts, stats_type, stats_id, field, value) - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, - ) - else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, - ) - txn.execute(sql, (value, stats_id, current_ts)) + def _update_stats_delta_txn(self, txn, ts, stats_type, stats_id, field, value): + table, id_col = TYPE_TO_ROOM[stats_type] + + sql = ( + "SELECT * FROM %s" + " WHERE %s=? and ts=(" + " SELECT MAX(ts) FROM %s" + " WHERE %s=?" + ")" + ) % (table, id_col, table, id_col) + txn.execute(sql, (stats_id, stats_id)) + rows = self.cursor_to_dict(txn) + if len(rows) == 0: + # silently skip as we don't have anything to apply a delta to yet. + # this tries to minimise any race between the initial sync and + # subsequent deltas arriving. + return + + current_ts = ts + latest_ts = rows[0]["ts"] + if current_ts < latest_ts: + # This one is in the past, but we're just encountering it now. + # Mark it as part of the current bucket. + current_ts = latest_ts + elif ts != latest_ts: + # we have to copy our absolute counters over to the new entry. + values = { + key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] + } + values[id_col] = stats_id + values["ts"] = ts + values["bucket_size"] = self.stats_bucket_size + + self._simple_insert_txn(txn, table=table, values=values) + + # actually update the new value + if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: + self._simple_update_txn( + txn, + table=table, + keyvalues={id_col: stats_id, "ts": current_ts}, + updatevalues={field: value}, + ) + else: + sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( + table, + field, + field, + id_col, + ) + txn.execute(sql, (value, stats_id, current_ts)) + + def update_total_event_count_between(self, old_pos, new_pos): + """ + Updates the total_events counts for rooms + + Args: + old_pos: The old stream position (stream position of the last event + that was already handled.) + new_pos: The new stream position (stream position of the new last + event to handle.) + + """ + + # TODO pass in now or calc here? + now = self.hs.get_reactor().seconds() + + # quantise time to the nearest bucket + now = (now // self.stats_bucket_size) * self.stats_bucket_size + + def _update_total_event_count_between(txn): + sql = """ + SELECT room_id, COUNT(*) AS new_events + FROM events + WHERE ? < stream_ordering AND stream_ordering <= ? + GROUP BY room_id + """ + + txn.execute(sql, (old_pos, new_pos)) + + for room_id, new_events in txn: + self._update_stats_delta_txn(txn, now, "room", room_id, "total_events", new_events) + + return self.runInteraction("update_total_event_count_between", _update_total_event_count_between) + + def _count_events_in_room_txn(self, txn, room_id, up_to_token): + sql = """ + SELECT COUNT(*) AS num_events + FROM events + WHERE room_id = ? AND stream_ordering <= ? + """ - return self.runInteraction("update_stats_delta", _update_stats_delta) + txn.execute(sql, (room_id, up_to_token)) + return txn.fetchone()[0] \ No newline at end of file |