diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index a99637d4b4..1c0b183a56 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore):
# Get all the rooms that we want to process.
def _make_staging_area(txn):
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
- )
- txn.execute(sql)
+ # Create the temporary tables
+ stmts = get_statements("""
+ -- We just recreate the table, we'll be reinserting the
+ -- correct entries again later anyway.
+ DROP TABLE IF EXISTS {temp}_rooms;
+
+ CREATE TABLE IF NOT EXISTS {temp}_rooms(
+ room_id TEXT NOT NULL,
+ events BIGINT NOT NULL
+ );
+
+ CREATE INDEX {temp}_rooms_events
+ ON {temp}_rooms(events);
+ CREATE INDEX {temp}_rooms_id
+ ON {temp}_rooms(room_id);
+ """.format(temp=TEMP_TABLE).splitlines())
+
+ for statement in stmts:
+ txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
@@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore):
)
txn.execute(sql)
- # Get rooms we want to process from the database
+ # Get rooms we want to process from the database, only adding
+ # those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
- SELECT room_id, count(*) FROM current_state_events
- GROUP BY room_id
- """
+ INSERT INTO %s_rooms (room_id, events)
+ SELECT c.room_id, count(*) FROM current_state_events AS c
+ LEFT JOIN room_stats_earliest_token AS t USING (room_id)
+ WHERE t.room_id IS NULL
+ GROUP BY c.room_id
+ """ % (TEMP_TABLE,)
txn.execute(sql)
- rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
- self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
- del rooms
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
|