diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index f595de1537..d86a412b7f 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -88,20 +88,42 @@ class StatsHandler(StateDeltasHandler):
if self.pos is None:
defer.returnValue(None)
- # Loop round handling deltas until we're up to date
- while True:
- with Measure(self.clock, "stats_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
+ max_pos = yield self.store.get_room_max_stream_ordering()
+
+ if max_pos == self.pos:
+ return
+
+ pos_of_delta = self.pos
+ with Measure(self.clock, "stats_delta"):
+ # Loop round handling deltas until we're up to date with deltas
+ while True:
+ # note that get_current_state_deltas isn't greedy –
+ # it is limited
+ deltas = yield self.store.get_current_state_deltas(pos_of_delta)
+
if not deltas:
- return
+ break
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
- self.pos = deltas[-1]["stream_id"]
- yield self.store.update_stats_stream_pos(self.pos)
+ pos_of_delta = deltas[-1]["stream_id"]
+
+ if pos_of_delta > self.pos:
+ new_pos = max(pos_of_delta, max_pos)
+ else:
+ new_pos = max_pos
+
+ with Measure(self.clock, "stats_total_events"):
+ yield self.store.update_total_event_count_between(
+ old_pos=self.pos, new_pos=new_pos
+ )
+
+ self.pos = new_pos
+
+ yield self.store.update_stats_stream_pos(self.pos)
- event_processing_positions.labels("stats").set(self.pos)
+ event_processing_positions.labels("stats").set(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -268,12 +290,15 @@ class StatsHandler(StateDeltasHandler):
now,
{
"bucket_size": self.stats_bucket_size,
+ # This m.room.create state event is indeed a state event
+ # so we count it as one.
"current_state_events": 1,
"joined_members": 0,
"invited_members": 0,
"left_members": 0,
"banned_members": 0,
- # this column is disused but not yet removed from the
+ "total_events": 0,
+ # This column is disused but not yet removed from the
# schema, so fill with -1.
"state_events": -1,
},
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 440bac41de..2b373e9b1a 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -97,7 +97,7 @@ class StatsStore(StateDeltasStore):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
+ + "_position(position BIGINT NOT NULL)"
)
txn.execute(sql)
@@ -258,6 +258,8 @@ class StatsStore(StateDeltasStore):
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+ room_total_event_count = self._count_events_in_room_txn(txn, room_id, current_token)
+
self._update_stats_txn(
txn,
"room",
@@ -270,6 +272,7 @@ class StatsStore(StateDeltasStore):
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
+ "total_events": room_total_event_count,
# this column is disused but not (yet) removed from the
# schema, so we fill it with -1.
"state_events": -1,
@@ -425,56 +428,99 @@ class StatsStore(StateDeltasStore):
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
+ return self.runInteraction("update_stats_delta", self._update_stats_delta_txn, ts, stats_type, stats_id, field, value)
- sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
- )
- else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
- )
- txn.execute(sql, (value, stats_id, current_ts))
+ def _update_stats_delta_txn(self, txn, ts, stats_type, stats_id, field, value):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+
+ sql = (
+ "SELECT * FROM %s"
+ " WHERE %s=? and ts=("
+ " SELECT MAX(ts) FROM %s"
+ " WHERE %s=?"
+ ")"
+ ) % (table, id_col, table, id_col)
+ txn.execute(sql, (stats_id, stats_id))
+ rows = self.cursor_to_dict(txn)
+ if len(rows) == 0:
+ # silently skip as we don't have anything to apply a delta to yet.
+ # this tries to minimise any race between the initial sync and
+ # subsequent deltas arriving.
+ return
+
+ current_ts = ts
+ latest_ts = rows[0]["ts"]
+ if current_ts < latest_ts:
+ # This one is in the past, but we're just encountering it now.
+ # Mark it as part of the current bucket.
+ current_ts = latest_ts
+ elif ts != latest_ts:
+ # we have to copy our absolute counters over to the new entry.
+ values = {
+ key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+ }
+ values[id_col] = stats_id
+ values["ts"] = ts
+ values["bucket_size"] = self.stats_bucket_size
+
+ self._simple_insert_txn(txn, table=table, values=values)
+
+ # actually update the new value
+ if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+ self._simple_update_txn(
+ txn,
+ table=table,
+ keyvalues={id_col: stats_id, "ts": current_ts},
+ updatevalues={field: value},
+ )
+ else:
+ sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+ table,
+ field,
+ field,
+ id_col,
+ )
+ txn.execute(sql, (value, stats_id, current_ts))
+
+ def update_total_event_count_between(self, old_pos, new_pos):
+ """
+ Updates the total_events counts for rooms
+
+ Args:
+ old_pos: The old stream position (stream position of the last event
+ that was already handled.)
+ new_pos: The new stream position (stream position of the new last
+ event to handle.)
+
+ """
+
+ # TODO pass in now or calc here?
+ now = self.hs.get_reactor().seconds()
+
+ # quantise time to the nearest bucket
+ now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+ def _update_total_event_count_between(txn):
+ sql = """
+ SELECT room_id, COUNT(*) AS new_events
+ FROM events
+ WHERE ? < stream_ordering AND stream_ordering <= ?
+ GROUP BY room_id
+ """
+
+ txn.execute(sql, (old_pos, new_pos))
+
+ for room_id, new_events in txn:
+ self._update_stats_delta_txn(txn, now, "room", room_id, "total_events", new_events)
+
+ return self.runInteraction("update_total_event_count_between", _update_total_event_count_between)
+
+ def _count_events_in_room_txn(self, txn, room_id, up_to_token):
+ sql = """
+ SELECT COUNT(*) AS num_events
+ FROM events
+ WHERE room_id = ? AND stream_ordering <= ?
+ """
- return self.runInteraction("update_stats_delta", _update_stats_delta)
+ txn.execute(sql, (room_id, up_to_token))
+ return txn.fetchone()[0]
\ No newline at end of file
|