diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index f44adfc07b..e849c38b85 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -98,15 +98,16 @@ class StatsHandler(StateDeltasHandler):
self.pos["state_delta_stream_id"]
)
- logger.debug("Handling %d state deltas", len(deltas))
- yield self._handle_deltas(deltas)
+ if deltas:
+ logger.debug("Handling %d state deltas", len(deltas))
+ yield self._handle_deltas(deltas)
- self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
- yield self.store.update_stats_positions(self.pos)
+ self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+ yield self.store.update_stats_positions(self.pos)
- event_processing_positions.labels("stats").set(
- self.pos["state_delta_stream_id"]
- )
+ event_processing_positions.labels("stats").set(
+ self.pos["state_delta_stream_id"]
+ )
# Then count deltas for total_events and total_event_bytes.
with Measure(self.clock, "stats_total_events_and_bytes"):
@@ -129,7 +130,6 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
- stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
@@ -158,12 +158,9 @@ class StatsHandler(StateDeltasHandler):
if event:
event_content = event.content or {}
- # We use stream_pos here rather than fetch by event_id as event_id
- # may be None
- stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
- stream_pos
- )
- stream_timestamp = int(stream_timestamp)
+ # We can't afford for this time to stray into the past, so we count
+ # it as now.
+ stream_timestamp = int(self.clock.time_msec())
# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = {}
@@ -261,7 +258,7 @@ class StatsHandler(StateDeltasHandler):
is_newly_created = True
elif typ == EventTypes.JoinRules:
- old_room_state = yield self.store.get_room_state(room_id)
+ old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
@@ -282,7 +279,7 @@ class StatsHandler(StateDeltasHandler):
)
elif typ == EventTypes.RoomHistoryVisibility:
- old_room_state = yield self.store.get_room_state(room_id)
+ old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 24f739a017..f20d8ba8a4 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -661,6 +661,79 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
+ """
+ Get statistics for a given subject.
+
+ Args:
+ stats_type (str): The type of subject
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
+ start (int): Pagination start. Number of entries, not timestamp.
+ size (int): How many entries to return.
+
+ Returns:
+ Deferred[list[dict]], where the dict has the keys of
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
+ """
+ return self.runInteraction(
+ "get_statistics_for_subject",
+ self._get_statistics_for_subject_txn,
+ stats_type,
+ stats_id,
+ start,
+ size,
+ )
+
+ def _get_statistics_for_subject_txn(
+ self, txn, stats_type, stats_id, start, size=100
+ ):
+ """
+ Transaction-bound version of L{get_statistics_for_subject}.
+ """
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ selected_columns = list(
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+ )
+
+ slice_list = self._simple_select_list_paginate_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ "end_ts",
+ start,
+ size,
+ retcols=selected_columns + ["bucket_size", "end_ts"],
+ order_direction="DESC",
+ )
+
+ return slice_list
+
+ def get_room_stats_state(self, room_id):
+ """
+ Returns the current room_stats_state for a room.
+
+ Args:
+ room_id (str): The ID of the room to return state for.
+
+ Returns (dict):
+ Dictionary containing these keys:
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
+ "history_visibility"
+ """
+ return self._simple_select_one(
+ "room_stats_state",
+ {"room_id": room_id},
+ retcols=(
+ "name",
+ "topic",
+ "canonical_alias",
+ "avatar",
+ "join_rules",
+ "history_visibility",
+ ),
+ )
+
@cached()
def get_earliest_token_for_stats(self, stats_type, id):
"""
@@ -948,10 +1021,11 @@ class StatsStore(StateDeltasStore):
src_row = self._simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
+ all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
dest_current_row = self._simple_select_one_txn(
txn,
into_table,
- keyvalues,
+ keyvalues=all_dest_keyvalues,
retcols=list(chain(additive_relatives.keys(), copy_columns)),
allow_none=True,
)
@@ -968,7 +1042,7 @@ class StatsStore(StateDeltasStore):
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
- self._simple_update_txn(txn, into_table, keyvalues, src_row)
+ self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
def incremental_update_room_total_events_and_bytes(self, in_positions):
"""
@@ -1059,14 +1133,14 @@ class StatsStore(StateDeltasStore):
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
sql = """
- SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
FROM events INNER JOIN event_json USING (event_id)
WHERE ? %s stream_ordering AND stream_ordering %s ?
- GROUP BY room_id
+ GROUP BY events.room_id
""" % (
+ new_bytes_expression,
low_comparator,
high_comparator,
- new_bytes_expression,
)
txn.execute(sql, (low_pos, high_pos))
|