summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 09:53:33 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 09:53:33 +0100
commitdfb22fec489f043c2d62d3d5cbe671cff5476881 (patch)
treec2e794aa256f39103bf65a96c96652e1ebd0d72f /synapse/storage
parentUse `DeferredLock` instead of `threading.Lock` (diff)
parentMerge pull request #5889 from matrix-org/rei/rss_inc2 (diff)
downloadsynapse-dfb22fec489f043c2d62d3d5cbe671cff5476881.tar.xz
Merge branch 'rei/rss_target' into rei/rss_inc3
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py5
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql3
-rw-r--r--synapse/storage/stats.py181
3 files changed, 100 insertions, 89 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a95c36a8b..5527dd208e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2270,8 +2270,9 @@ class EventsStore(
             "room_aliases",
             "room_depth",
             "room_memberships",
-            "room_state",
-            "room_stats",
+            "room_stats_state",
+            "room_stats_current",
+            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 045b5ca013..52fb09c0e6 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical
 
 -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
 -- out for us. (We would want it to review stats for a particular user.)
+
+-- Also rename room_state to room_stats_state to make its ownership clear.
+ALTER TABLE room_state RENAME TO room_stats_state;
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index c9687c29d2..3df57b52ea 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -25,6 +25,9 @@ from synapse.util.caches.descriptors import cached
 logger = logging.getLogger(__name__)
 
 # these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
 ABSOLUTE_STATS_FIELDS = {
     "room": (
         "current_state_events",
@@ -38,17 +41,13 @@ ABSOLUTE_STATS_FIELDS = {
 }
 
 # these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
 PER_SLICE_FIELDS = {"room": (), "user": ()}
 
 TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
 
-class OldCollectionRequired(Exception):
-    """ Signal that we need to collect old stats rows and retry. """
-
-    pass
-
-
 class StatsStore(StateDeltasStore):
     def __init__(self, db_conn, hs):
         super(StatsStore, self).__init__(db_conn, hs)
@@ -188,7 +187,7 @@ class StatsStore(StateDeltasStore):
                 fields[col] = None
 
         return self._simple_upsert(
-            table="room_state",
+            table="room_stats_state",
             keyvalues={"room_id": room_id},
             values=fields,
             desc="update_room_state",
@@ -242,6 +241,94 @@ class StatsStore(StateDeltasStore):
             complete_with_stream_id=complete_with_stream_id,
         )
 
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id=None,
+        absolute_field_overrides=None,
+    ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_field_overrides (dict[str, int]): Current stats values
+                (i.e. not deltas) of absolute fields.
+                Does not work with per-slice fields.
+        """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
+        for field in chain(fields.keys(), absolute_field_overrides.keys()):
+            if field not in abs_field_names and field not in slice_field_names:
+                # guard against potential SQL injection dodginess
+                raise ValueError(
+                    "%s is not a recognised field"
+                    " for stats type %s" % (field, stats_type)
+                )
+
+        # only absolute stats fields are tracked in the `_current` stats tables,
+        # so those are the only ones that we process deltas for when
+        # we upsert against the `_current` table.
+
+        # This calculates the deltas (`field = field + ?` values)
+        # for absolute fields,
+        # * defaulting to 0 if not specified
+        #     (required for the INSERT part of upserting to work)
+        # * omitting overrides specified in `absolute_field_overrides`
+        deltas_of_absolute_fields = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_field_overrides
+        }
+
+        if absolute_field_overrides is None:
+            absolute_field_overrides = {}
+
+        if complete_with_stream_id is not None:
+            absolute_field_overrides = absolute_field_overrides.copy()
+            absolute_field_overrides[
+                "completed_delta_stream_id"
+            ] = complete_with_stream_id
+
+        # first upsert the `_current` table
+        self._upsert_with_additive_relatives_txn(
+            txn=txn,
+            table=table + "_current",
+            keyvalues={id_col: stats_id},
+            absolutes=absolute_field_overrides,
+            additive_relatives=deltas_of_absolute_fields,
+        )
+
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the `_historical` table.
+            # we don't support absolute_fields for per-slice fields as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0) for key in slice_field_names
+            }
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn=txn,
+                into_table=table + "_historical",
+                keyvalues={id_col: stats_id},
+                extra_dst_keyvalues={
+                    "end_ts": end_ts,
+                    "bucket_size": self.stats_bucket_size,
+                },
+                additive_relatives=per_slice_additive_relatives,
+                src_table=table + "_current",
+                copy_columns=abs_field_names,
+                additional_where=" AND completed_delta_stream_id IS NOT NULL",
+            )
+
     def _upsert_with_additive_relatives_txn(
         self, txn, table, keyvalues, absolutes, additive_relatives
     ):
@@ -388,83 +475,3 @@ class StatsStore(StateDeltasStore):
                 for (key, val) in additive_relatives.items():
                     src_row[key] = dest_current_row[key] + val
                 self._simple_update_txn(txn, into_table, keyvalues, src_row)
-
-    def _update_stats_delta_txn(
-        self,
-        txn,
-        ts,
-        stats_type,
-        stats_id,
-        fields,
-        complete_with_stream_id=None,
-        absolute_fields=None,
-    ):
-        """
-        See L{update_stats_delta}
-        Additional Args:
-            absolute_fields (dict[str, int]): Absolute current stats values
-                (i.e. not deltas). Does not work with per-slice fields.
-        """
-        table, id_col = TYPE_TO_TABLE[stats_type]
-
-        quantised_ts = self.quantise_stats_time(int(ts))
-        end_ts = quantised_ts + self.stats_bucket_size
-
-        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
-        slice_field_names = PER_SLICE_FIELDS[stats_type]
-        for field in chain(fields.keys(), absolute_fields.keys()):
-            if field not in abs_field_names and field not in slice_field_names:
-                # guard against potential SQL injection dodginess
-                raise ValueError(
-                    "%s is not a recognised field"
-                    " for stats type %s" % (field, stats_type)
-                )
-
-        # only absolute stats fields are tracked in the `_current` stats tables,
-        # so those are the only ones that we process deltas for when
-        # we upsert against the `_current` table.
-        additive_relatives = {
-            key: fields.get(key, 0)
-            for key in abs_field_names
-            if key not in absolute_fields
-        }
-
-        if absolute_fields is None:
-            absolute_fields = {}
-
-        if complete_with_stream_id is not None:
-            absolute_fields = absolute_fields.copy()
-            absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
-            self.get_earliest_token_for_stats.invalidate(stats_type, stats_id)
-
-        # first upsert the `_current` table
-        self._upsert_with_additive_relatives_txn(
-            txn=txn,
-            table=table + "_current",
-            keyvalues={id_col: stats_id},
-            absolutes=absolute_fields,
-            additive_relatives=additive_relatives,
-        )
-
-        if self.has_completed_background_updates():
-            # TODO want to check specifically for stats regenerator, not all
-            #   background updates…
-            # then upsert the `_historical` table.
-            # we don't support absolute_fields for per-slice fields as it makes
-            # no sense.
-            per_slice_additive_relatives = {
-                key: fields.get(key, 0) for key in slice_field_names
-            }
-            self._upsert_copy_from_table_with_additive_relatives_txn(
-                txn=txn,
-                into_table=table + "_historical",
-                keyvalues={id_col: stats_id},
-                extra_dst_keyvalues={
-                    "end_ts": end_ts,
-                    "bucket_size": self.stats_bucket_size,
-                },
-                additive_relatives=per_slice_additive_relatives,
-                src_table=table + "_current",
-                copy_columns=abs_field_names,
-                additional_where=" AND completed_delta_stream_id IS NOT NULL",
-            )