summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:32:17 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:32:17 +0100
commit5ca4cd5ad406c8a3167bb61398e0f106f15173f4 (patch)
tree8dfe414ed154115221373a7527ee04e3cd5eb64c /synapse/storage/stats.py
parentFix type signature in `get_current_state_deltas` (diff)
downloadsynapse-5ca4cd5ad406c8a3167bb61398e0f106f15173f4.tar.xz
Track more stats positions
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py59
1 files changed, 48 insertions, 11 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 9196fa664a..91ab02ddd1 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -324,20 +325,56 @@ class StatsStore(StateDeltasStore):
 
         return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
 
-    def get_stats_stream_pos(self):
-        return self._simple_select_one_onecol(
-            table="stats_stream_pos",
-            keyvalues={},
-            retcol="stream_id",
-            desc="stats_stream_pos",
+    def get_stats_positions(self, for_initial_processor=False):
+        return self._simple_select_one(
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
+            desc="stats_incremental_position",
+        )
+
+    def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+        return self._simple_select_one_txn(
+            txn=txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
         )
 
-    def update_stats_stream_pos(self, stream_id):
+    def update_stats_positions(self, positions, for_initial_processor=False):
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
         return self._simple_update_one(
-            table="stats_stream_pos",
-            keyvalues={},
-            updatevalues={"stream_id": stream_id},
-            desc="update_stats_stream_pos",
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
+            desc="update_stats_incremental_position",
+        )
+
+    def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
+        return self._simple_update_one_txn(
+            txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
         )
 
     def update_room_state(self, room_id, fields):