summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:08:02 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:08:02 +0100
commit80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f (patch)
tree87e3c28d9a63fd0065a8b9f614a7c0a0f6283059 /synapse/storage
parentAdd schema for Separated Statistics (diff)
downloadsynapse-80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f.tar.xz
Add storage function for storing stats deltas
Old collection is not included in this commit

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/stats.py171
1 files changed, 169 insertions, 2 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 123c1ae220..e8b1ce240b 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 import logging
 
 from synapse.storage.state_deltas import StateDeltasStore
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "state_events",
+        "total_events",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
 
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+
+class OldCollectionRequired(Exception):
+    """ Signal that we need to collect old stats rows and retry. """
+
+    pass
 
 
 class StatsStore(StateDeltasStore):
@@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore):
         self.register_noop_background_update("populate_stats_process_rooms")
         self.register_noop_background_update("populate_stats_cleanup")
 
+    def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
+
+        Args:
+            ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+        Returns:
+            a timestamp which
+              - is divisible by the bucket size;
+              - is no later than `ts`; and
+              - is the largest such timestamp.
+        """
+        return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
     def update_room_state(self, room_id, fields):
         """
         Args:
@@ -76,3 +102,144 @@ class StatsStore(StateDeltasStore):
             values=fields,
             desc="update_room_state",
         )
+
+    @defer.inlineCallbacks
+    def update_stats_delta(
+        self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+    ):
+        """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
+
+        Args:
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
+            fields (dict[str, int]): Deltas of stats values.
+            complete_with_stream_id (int, optional):
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
+        """
+
+        while True:
+            try:
+                res = yield self.runInteraction(
+                    "update_stats_delta",
+                    self._update_stats_delta_txn,
+                    ts,
+                    stats_type,
+                    stats_id,
+                    fields,
+                    complete_with_stream_id=complete_with_stream_id,
+                )
+                return res
+            except OldCollectionRequired:
+                # retry after collecting old rows
+                # TODO (implement later)
+                raise NotImplementedError("old collection not in this PR")
+
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id=None,
+        absolute_fields=None,
+    ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+        """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
+        field_values = list(fields.values())
+
+        if absolute_fields is not None:
+            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
+            field_values += list(absolute_fields.values())
+
+        if complete_with_stream_id is not None:
+            field_sqls.append("completed_delta_stream_id = ?")
+            field_values.append(complete_with_stream_id)
+
+        sql = (
+            "UPDATE %s_current SET end_ts = ?, %s"
+            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
+            " AND %s = ?"
+        ) % (table, ", ".join(field_sqls), id_col)
+
+        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
+
+        txn.execute(sql, qargs)
+
+        if txn.rowcount > 0:
+            # success.
+            return
+
+        # if we're here, it's because we didn't succeed in updating a stats
+        # row. Why? Let's find out…
+
+        current_row = self._simple_select_one_txn(
+            txn,
+            table + "_current",
+            {id_col: stats_id},
+            ("end_ts", "completed_delta_stream_id"),
+            allow_none=True,
+        )
+
+        if current_row is None:
+            # we need to insert a row! (insert a dirty, incomplete row)
+            insertee = {
+                id_col: stats_id,
+                "end_ts": end_ts,
+                "start_ts": ts,
+                "completed_delta_stream_id": complete_with_stream_id,
+            }
+
+            # we assume that, by default, blank fields should be zero.
+            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
+                insertee[field_name] = 0
+
+            for field_name in PER_SLICE_FIELDS[stats_type]:
+                insertee[field_name] = 0
+
+            for (field, value) in fields.items():
+                insertee[field] = value
+
+            if absolute_fields is not None:
+                for (field, value) in absolute_fields.items():
+                    insertee[field] = value
+
+            self._simple_insert_txn(txn, table + "_current", insertee)
+
+        elif current_row["end_ts"] is None:
+            # update the row, including start_ts
+            sql = (
+                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
+                " WHERE end_ts IS NULL AND %s = ?"
+            ) % (table, ", ".join(field_sqls), id_col)
+
+            qargs = (
+                [end_ts - self.stats_bucket_size, end_ts]
+                + list(field_values)
+                + [stats_id]
+            )
+
+            txn.execute(sql, qargs)
+            if txn.rowcount == 0:
+                raise RuntimeError(
+                    "Should be impossible: No rows updated"
+                    " but all conditions are known to be met."
+                )
+
+        elif current_row["end_ts"] < end_ts:
+            # we need to perform old collection first
+            raise OldCollectionRequired()