diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index da3a869272..755faaaaa2 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -682,29 +682,67 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
- def get_deltas_for_room(self, room_id, start, size=100):
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
"""
- Get statistics deltas for a given room.
+ Get statistics for a given subject.
Args:
- room_id (str)
+ stats_type (str): The type of subject
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
+ return self.runInteraction(
+ "get_statistics_for_subject",
+ self._get_statistics_for_subject_txn,
+ stats_type,
+ stats_id,
start,
size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ )
+
+ # TODO fix and account for _current.
+ def _get_statistics_for_subject_txn(
+ self, txn, stats_type, stats_id, start, size=100
+ ):
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ selected_columns = list(
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+ )
+
+ slice_list = self._simple_select_list_paginate_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ "end_ts",
+ start,
+ size,
+ retcols=selected_columns + ["bucket_size", "end_ts"],
order_direction="DESC",
)
+ if len(slice_list) < size:
+ # also fetch the current row
+ current = self._simple_select_one_txn(
+ txn,
+ table + "_current",
+ {id_col: stats_id},
+ retcols=selected_columns + ["start_ts", "end_ts"],
+ allow_none=True,
+ )
+
+ if current is not None and current["end_ts"] is not None:
+ # it is dirty, so contains new information, so should be included
+ current["bucket_size"] = current["end_ts"] - current["start_ts"]
+ del current["start_ts"]
+ return [current] + slice_list
+ return slice_list
+
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
index a8b858eb4f..394b289220 100644
--- a/tests/handlers/test_stats.py
+++ b/tests/handlers/test_stats.py
@@ -14,13 +14,11 @@
# limitations under the License.
from mock import Mock
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
-
from tests import unittest
@@ -47,7 +45,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@@ -56,7 +54,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
- "depends_on": "populate_stats_createtables",
+ "depends_on": "populate_stats_prepare",
},
)
)
@@ -64,12 +62,22 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.store._simple_insert(
"background_updates",
{
- "update_name": "populate_stats_cleanup",
+ "update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_users",
+ },
+ )
+ )
def test_initial_room(self):
"""
@@ -114,6 +122,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
+
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@@ -138,12 +147,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
- self.get_success(self.store.update_stats_stream_pos(None))
+ self.get_success(self.store.update_stats_positions(None))
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@@ -154,6 +163,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
+ # orig_delta_processor = self.store.
+
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@@ -185,8 +196,13 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
- # Get the deltas! There should be two -- day 1, and day 2.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+ # self.handler.notify_new_event()
+
+ # We need to let the delta processor advanceā¦
+ self.pump(10 * 60)
+
+ # Get the slices! There should be two -- day 1, and day 2.
+ r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@@ -299,6 +315,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# One delta, with two joined members -- the room creator, and our fake
# user.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+ r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)
|