summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-07-22 17:32:19 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-07-22 17:32:19 +0100
commit0887bb305247d2a132c93f5b26cb241da26c477f (patch)
tree83ba63e1118c1d745fa9c66f86ae42ec3cad562f
parentIntroduce `total_events` column in `room_stats`. (diff)
downloadsynapse-0887bb305247d2a132c93f5b26cb241da26c477f.tar.xz
Add counting for `total_events` in room statistics. github/rei/room_stats_total_events rei/room_stats_total_events
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r--synapse/handlers/stats.py43
-rw-r--r--synapse/storage/stats.py150
2 files changed, 132 insertions, 61 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index f595de1537..d86a412b7f 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -88,20 +88,42 @@ class StatsHandler(StateDeltasHandler):
         if self.pos is None:
             defer.returnValue(None)
 
-        # Loop round handling deltas until we're up to date
-        while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
+        max_pos = yield self.store.get_room_max_stream_ordering()
+
+        if max_pos == self.pos:
+            return
+
+        pos_of_delta = self.pos
+        with Measure(self.clock, "stats_delta"):
+            # Loop round handling deltas until we're up to date with deltas
+            while True:
+                # note that get_current_state_deltas isn't greedy –
+                # it is limited
+                deltas = yield self.store.get_current_state_deltas(pos_of_delta)
+
                 if not deltas:
-                    return
+                    break
 
                 logger.info("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+                pos_of_delta = deltas[-1]["stream_id"]
+
+        if pos_of_delta > self.pos:
+            new_pos = max(pos_of_delta, max_pos)
+        else:
+            new_pos = max_pos
+
+        with Measure(self.clock, "stats_total_events"):
+            yield self.store.update_total_event_count_between(
+                old_pos=self.pos, new_pos=new_pos
+            )
+
+        self.pos = new_pos
+
+        yield self.store.update_stats_stream_pos(self.pos)
 
-                event_processing_positions.labels("stats").set(self.pos)
+        event_processing_positions.labels("stats").set(self.pos)
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
@@ -268,12 +290,15 @@ class StatsHandler(StateDeltasHandler):
                     now,
                     {
                         "bucket_size": self.stats_bucket_size,
+                        # This m.room.create state event is indeed a state event
+                        # so we count it as one.
                         "current_state_events": 1,
                         "joined_members": 0,
                         "invited_members": 0,
                         "left_members": 0,
                         "banned_members": 0,
-                        # this column is disused but not yet removed from the
+                        "total_events": 0,
+                        # This column is disused but not yet removed from the
                         # schema, so fill with -1.
                         "state_events": -1,
                     },
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 440bac41de..2b373e9b1a 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -97,7 +97,7 @@ class StatsStore(StateDeltasStore):
             sql = (
                 "CREATE TABLE IF NOT EXISTS "
                 + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
+                + "_position(position BIGINT NOT NULL)"
             )
             txn.execute(sql)
 
@@ -258,6 +258,8 @@ class StatsStore(StateDeltasStore):
 
                 membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
 
+                room_total_event_count = self._count_events_in_room_txn(txn, room_id, current_token)
+
                 self._update_stats_txn(
                     txn,
                     "room",
@@ -270,6 +272,7 @@ class StatsStore(StateDeltasStore):
                         "invited_members": membership_counts.get(Membership.INVITE, 0),
                         "left_members": membership_counts.get(Membership.LEAVE, 0),
                         "banned_members": membership_counts.get(Membership.BAN, 0),
+                        "total_events": room_total_event_count,
                         # this column is disused but not (yet) removed from the
                         # schema, so we fill it with -1.
                         "state_events": -1,
@@ -425,56 +428,99 @@ class StatsStore(StateDeltasStore):
         )
 
     def update_stats_delta(self, ts, stats_type, stats_id, field, value):
-        def _update_stats_delta(txn):
-            table, id_col = TYPE_TO_ROOM[stats_type]
+        return self.runInteraction("update_stats_delta", self._update_stats_delta_txn, ts, stats_type, stats_id, field, value)
 
-            sql = (
-                "SELECT * FROM %s"
-                " WHERE %s=? and ts=("
-                "  SELECT MAX(ts) FROM %s"
-                "  WHERE %s=?"
-                ")"
-            ) % (table, id_col, table, id_col)
-            txn.execute(sql, (stats_id, stats_id))
-            rows = self.cursor_to_dict(txn)
-            if len(rows) == 0:
-                # silently skip as we don't have anything to apply a delta to yet.
-                # this tries to minimise any race between the initial sync and
-                # subsequent deltas arriving.
-                return
-
-            current_ts = ts
-            latest_ts = rows[0]["ts"]
-            if current_ts < latest_ts:
-                # This one is in the past, but we're just encountering it now.
-                # Mark it as part of the current bucket.
-                current_ts = latest_ts
-            elif ts != latest_ts:
-                # we have to copy our absolute counters over to the new entry.
-                values = {
-                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
-                }
-                values[id_col] = stats_id
-                values["ts"] = ts
-                values["bucket_size"] = self.stats_bucket_size
-
-                self._simple_insert_txn(txn, table=table, values=values)
-
-            # actually update the new value
-            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
-                self._simple_update_txn(
-                    txn,
-                    table=table,
-                    keyvalues={id_col: stats_id, "ts": current_ts},
-                    updatevalues={field: value},
-                )
-            else:
-                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
-                    table,
-                    field,
-                    field,
-                    id_col,
-                )
-                txn.execute(sql, (value, stats_id, current_ts))
+    def _update_stats_delta_txn(self, txn, ts, stats_type, stats_id, field, value):
+        table, id_col = TYPE_TO_ROOM[stats_type]
+
+        sql = (
+                  "SELECT * FROM %s"
+                  " WHERE %s=? and ts=("
+                  "  SELECT MAX(ts) FROM %s"
+                  "  WHERE %s=?"
+                  ")"
+              ) % (table, id_col, table, id_col)
+        txn.execute(sql, (stats_id, stats_id))
+        rows = self.cursor_to_dict(txn)
+        if len(rows) == 0:
+            # silently skip as we don't have anything to apply a delta to yet.
+            # this tries to minimise any race between the initial sync and
+            # subsequent deltas arriving.
+            return
+
+        current_ts = ts
+        latest_ts = rows[0]["ts"]
+        if current_ts < latest_ts:
+            # This one is in the past, but we're just encountering it now.
+            # Mark it as part of the current bucket.
+            current_ts = latest_ts
+        elif ts != latest_ts:
+            # we have to copy our absolute counters over to the new entry.
+            values = {
+                key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+            }
+            values[id_col] = stats_id
+            values["ts"] = ts
+            values["bucket_size"] = self.stats_bucket_size
+
+            self._simple_insert_txn(txn, table=table, values=values)
+
+        # actually update the new value
+        if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+            self._simple_update_txn(
+                txn,
+                table=table,
+                keyvalues={id_col: stats_id, "ts": current_ts},
+                updatevalues={field: value},
+            )
+        else:
+            sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+                table,
+                field,
+                field,
+                id_col,
+            )
+            txn.execute(sql, (value, stats_id, current_ts))
+
+    def update_total_event_count_between(self, old_pos, new_pos):
+        """
+        Updates the total_events counts for rooms
+
+        Args:
+            old_pos: The old stream position (stream position of the last event
+                that was already handled.)
+            new_pos: The new stream position (stream position of the new last
+                event to handle.)
+
+        """
+
+        # TODO pass in now or calc here?
+        now = self.hs.get_reactor().seconds()
+
+        # quantise time to the nearest bucket
+        now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+        def _update_total_event_count_between(txn):
+            sql = """
+                SELECT room_id, COUNT(*) AS new_events
+                FROM events
+                WHERE ? < stream_ordering AND stream_ordering <= ?
+                GROUP BY room_id
+            """
+
+            txn.execute(sql, (old_pos, new_pos))
+
+            for room_id, new_events in txn:
+                self._update_stats_delta_txn(txn, now, "room", room_id, "total_events", new_events)
+
+        return self.runInteraction("update_total_event_count_between", _update_total_event_count_between)
+
+    def _count_events_in_room_txn(self, txn, room_id, up_to_token):
+        sql = """
+            SELECT COUNT(*) AS num_events
+            FROM events
+            WHERE room_id = ? AND stream_ordering <= ?
+        """
 
-        return self.runInteraction("update_stats_delta", _update_stats_delta)
+        txn.execute(sql, (room_id, up_to_token))
+        return txn.fetchone()[0]
\ No newline at end of file