diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index eb0ced5b5e..1c0b183a56 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore):
# Get all the rooms that we want to process.
def _make_staging_area(txn):
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
- )
- txn.execute(sql)
+ # Create the temporary tables
+ stmts = get_statements("""
+ -- We just recreate the table, we'll be reinserting the
+ -- correct entries again later anyway.
+ DROP TABLE IF EXISTS {temp}_rooms;
+
+ CREATE TABLE IF NOT EXISTS {temp}_rooms(
+ room_id TEXT NOT NULL,
+ events BIGINT NOT NULL
+ );
+
+ CREATE INDEX {temp}_rooms_events
+ ON {temp}_rooms(events);
+ CREATE INDEX {temp}_rooms_id
+ ON {temp}_rooms(room_id);
+ """.format(temp=TEMP_TABLE).splitlines())
+
+ for statement in stmts:
+ txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
@@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore):
)
txn.execute(sql)
- # Get rooms we want to process from the database
+ # Get rooms we want to process from the database, only adding
+ # those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
- SELECT room_id, count(*) FROM current_state_events
- GROUP BY room_id
- """
+ INSERT INTO %s_rooms (room_id, events)
+ SELECT c.room_id, count(*) FROM current_state_events AS c
+ LEFT JOIN room_stats_earliest_token AS t USING (room_id)
+ WHERE t.room_id IS NULL
+ GROUP BY c.room_id
+ """ % (TEMP_TABLE,)
txn.execute(sql)
- rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
- self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
- del rooms
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
@@ -179,46 +194,39 @@ class StatsStore(StateDeltasStore):
current_state_ids = yield self.get_current_state_ids(room_id)
- join_rules = yield self.get_event(
- current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
- )
- history_visibility = yield self.get_event(
- current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
- allow_none=True,
- )
- encryption = yield self.get_event(
- current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
- )
- name = yield self.get_event(
- current_state_ids.get((EventTypes.Name, "")), allow_none=True
- )
- topic = yield self.get_event(
- current_state_ids.get((EventTypes.Topic, "")), allow_none=True
- )
- avatar = yield self.get_event(
- current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+ join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+ history_visibility_id = current_state_ids.get(
+ (EventTypes.RoomHistoryVisibility, "")
)
- canonical_alias = yield self.get_event(
- current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
- )
-
- def _or_none(x, arg):
- if x:
- return x.content.get(arg)
+ encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+ name_id = current_state_ids.get((EventTypes.Name, ""))
+ topic_id = current_state_ids.get((EventTypes.Topic, ""))
+ avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+ canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+ state_events = yield self.get_events([
+ join_rules_id, history_visibility_id, encryption_id, name_id,
+ topic_id, avatar_id, canonical_alias_id,
+ ])
+
+ def _get_or_none(event_id, arg):
+ event = state_events.get(event_id)
+ if event:
+ return event.content.get(arg)
return None
yield self.update_room_state(
room_id,
{
- "join_rules": _or_none(join_rules, "join_rule"),
- "history_visibility": _or_none(
- history_visibility, "history_visibility"
+ "join_rules": _get_or_none(join_rules_id, "join_rule"),
+ "history_visibility": _get_or_none(
+ history_visibility_id, "history_visibility"
),
- "encryption": _or_none(encryption, "algorithm"),
- "name": _or_none(name, "name"),
- "topic": _or_none(topic, "topic"),
- "avatar": _or_none(avatar, "url"),
- "canonical_alias": _or_none(canonical_alias, "alias"),
+ "encryption": _get_or_none(encryption_id, "algorithm"),
+ "name": _get_or_none(name_id, "name"),
+ "topic": _get_or_none(topic_id, "topic"),
+ "avatar": _get_or_none(avatar_id, "url"),
+ "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)
@@ -233,18 +241,9 @@ class StatsStore(StateDeltasStore):
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
current_state_events = len(current_state_ids)
- joined_members = self._get_user_count_in_room_txn(
- txn, room_id, Membership.JOIN
- )
- invited_members = self._get_user_count_in_room_txn(
- txn, room_id, Membership.INVITE
- )
- left_members = self._get_user_count_in_room_txn(
- txn, room_id, Membership.LEAVE
- )
- banned_members = self._get_user_count_in_room_txn(
- txn, room_id, Membership.BAN
- )
+
+ membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
@@ -257,10 +256,10 @@ class StatsStore(StateDeltasStore):
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
- "joined_members": joined_members,
- "invited_members": invited_members,
- "left_members": left_members,
- "banned_members": banned_members,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(Membership.INVITE, 0),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
@@ -270,10 +269,13 @@ class StatsStore(StateDeltasStore):
{"room_id": room_id, "token": current_token},
)
+ # We've finished a room. Delete it from the table.
+ self._simple_delete_one_txn(
+ txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+ )
+
yield self.runInteraction("update_room_stats", _fetch_data)
- # We've finished a room. Delete it from the table.
- yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
|