diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-08 11:32:17 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-08 11:32:17 +0100 |
commit | 5ca4cd5ad406c8a3167bb61398e0f106f15173f4 (patch) | |
tree | 8dfe414ed154115221373a7527ee04e3cd5eb64c /synapse/storage/stats.py | |
parent | Fix type signature in `get_current_state_deltas` (diff) | |
download | synapse-5ca4cd5ad406c8a3167bb61398e0f106f15173f4.tar.xz |
Track more stats positions
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 59 |
1 files changed, 48 insertions, 11 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 9196fa664a..91ab02ddd1 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -324,20 +325,56 @@ class StatsStore(StateDeltasStore): return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - def get_stats_stream_pos(self): - return self._simple_select_one_onecol( - table="stats_stream_pos", - keyvalues={}, - retcol="stream_id", - desc="stats_stream_pos", + def get_stats_positions(self, for_initial_processor=False): + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), ) - def update_stats_stream_pos(self, stream_id): + def update_stats_positions(self, positions, for_initial_processor=False): + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, ) def update_room_state(self, room_id, fields): |