diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 123c1ae220..e8b1ce240b 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
import logging
from synapse.storage.state_deltas import StateDeltasStore
+from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "total_events",
),
"user": ("public_rooms", "private_rooms"),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+
+class OldCollectionRequired(Exception):
+ """ Signal that we need to collect old stats rows and retry. """
+
+ pass
class StatsStore(StateDeltasStore):
@@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore):
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
+
+ Args:
+ ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+ Returns:
+ a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
+ """
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
def update_room_state(self, room_id, fields):
"""
Args:
@@ -76,3 +102,144 @@ class StatsStore(StateDeltasStore):
values=fields,
desc="update_room_state",
)
+
+ @defer.inlineCallbacks
+ def update_stats_delta(
+ self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ """
+
+ while True:
+ try:
+ res = yield self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ )
+ return res
+ except OldCollectionRequired:
+ # retry after collecting old rows
+ # TODO (implement later)
+ raise NotImplementedError("old collection not in this PR")
+
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=None,
+ absolute_fields=None,
+ ):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+ """
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
+ field_values = list(fields.values())
+
+ if absolute_fields is not None:
+ field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
+ field_values += list(absolute_fields.values())
+
+ if complete_with_stream_id is not None:
+ field_sqls.append("completed_delta_stream_id = ?")
+ field_values.append(complete_with_stream_id)
+
+ sql = (
+ "UPDATE %s_current SET end_ts = ?, %s"
+ " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
+ " AND %s = ?"
+ ) % (table, ", ".join(field_sqls), id_col)
+
+ qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
+
+ txn.execute(sql, qargs)
+
+ if txn.rowcount > 0:
+ # success.
+ return
+
+ # if we're here, it's because we didn't succeed in updating a stats
+ # row. Why? Let's find out…
+
+ current_row = self._simple_select_one_txn(
+ txn,
+ table + "_current",
+ {id_col: stats_id},
+ ("end_ts", "completed_delta_stream_id"),
+ allow_none=True,
+ )
+
+ if current_row is None:
+ # we need to insert a row! (insert a dirty, incomplete row)
+ insertee = {
+ id_col: stats_id,
+ "end_ts": end_ts,
+ "start_ts": ts,
+ "completed_delta_stream_id": complete_with_stream_id,
+ }
+
+ # we assume that, by default, blank fields should be zero.
+ for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
+ insertee[field_name] = 0
+
+ for field_name in PER_SLICE_FIELDS[stats_type]:
+ insertee[field_name] = 0
+
+ for (field, value) in fields.items():
+ insertee[field] = value
+
+ if absolute_fields is not None:
+ for (field, value) in absolute_fields.items():
+ insertee[field] = value
+
+ self._simple_insert_txn(txn, table + "_current", insertee)
+
+ elif current_row["end_ts"] is None:
+ # update the row, including start_ts
+ sql = (
+ "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
+ " WHERE end_ts IS NULL AND %s = ?"
+ ) % (table, ", ".join(field_sqls), id_col)
+
+ qargs = (
+ [end_ts - self.stats_bucket_size, end_ts]
+ + list(field_values)
+ + [stats_id]
+ )
+
+ txn.execute(sql, qargs)
+ if txn.rowcount == 0:
+ raise RuntimeError(
+ "Should be impossible: No rows updated"
+ " but all conditions are known to be met."
+ )
+
+ elif current_row["end_ts"] < end_ts:
+ # we need to perform old collection first
+ raise OldCollectionRequired()
|