From 54d50fbfdf8c39d92c36291a572419cee6b9b916 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 15:15:13 +0100 Subject: Get events all at once --- synapse/storage/stats.py | 59 +++++++++++++++++++++--------------------------- 1 file changed, 26 insertions(+), 33 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index eb0ced5b5e..99b4af5555 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -179,46 +179,39 @@ class StatsStore(StateDeltasStore): current_state_ids = yield self.get_current_state_ids(room_id) - join_rules = yield self.get_event( - current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + history_visibility_id = current_state_ids.get( + (EventTypes.RoomHistoryVisibility, "") ) - history_visibility = yield self.get_event( - current_state_ids.get((EventTypes.RoomHistoryVisibility, "")), - allow_none=True, - ) - encryption = yield self.get_event( - current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True - ) - name = yield self.get_event( - current_state_ids.get((EventTypes.Name, "")), allow_none=True - ) - topic = yield self.get_event( - current_state_ids.get((EventTypes.Topic, "")), allow_none=True - ) - avatar = yield self.get_event( - current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True - ) - canonical_alias = yield self.get_event( - current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True - ) - - def _or_none(x, arg): - if x: - return x.content.get(arg) + encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) + name_id = current_state_ids.get((EventTypes.Name, "")) + topic_id = current_state_ids.get((EventTypes.Topic, "")) + avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) + canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) + + state_events = yield self.get_events([ + join_rules_id, history_visibility_id, encryption_id, name_id, + topic_id, avatar_id, canonical_alias_id, + ]) + + def _get_or_none(event_id, arg): + event = state_events.get(event_id) + if event: + return event.content.get(arg) return None yield self.update_room_state( room_id, { - "join_rules": _or_none(join_rules, "join_rule"), - "history_visibility": _or_none( - history_visibility, "history_visibility" + "join_rules": _get_or_none(join_rules_id, "join_rule"), + "history_visibility": _get_or_none( + history_visibility_id, "history_visibility" ), - "encryption": _or_none(encryption, "algorithm"), - "name": _or_none(name, "name"), - "topic": _or_none(topic, "topic"), - "avatar": _or_none(avatar, "url"), - "canonical_alias": _or_none(canonical_alias, "alias"), + "encryption": _get_or_none(encryption_id, "algorithm"), + "name": _get_or_none(name_id, "name"), + "topic": _get_or_none(topic_id, "topic"), + "avatar": _get_or_none(avatar_id, "url"), + "canonical_alias": _get_or_none(canonical_alias_id, "alias"), }, ) -- cgit 1.4.1 From 04710cc2d71127b1f416e87f7a4aea3ce6d93410 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 15:22:32 +0100 Subject: Fetch membership counts all at once --- synapse/storage/roommember.py | 33 +++++++++++---------------------- synapse/storage/stats.py | 23 +++++++---------------- 2 files changed, 18 insertions(+), 38 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4bd1669458..7617913326 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -142,26 +142,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction("get_room_summary", _get_room_summary_txn) - def _get_user_count_in_room_txn(self, txn, room_id, membership): + def _get_user_counts_in_room_txn(self, txn, room_id): """ - See get_user_count_in_room. - """ - sql = ( - "SELECT count(*) FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id " - " AND m.room_id = c.room_id " - " AND m.user_id = c.state_key" - " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" - ) - - txn.execute(sql, (room_id, membership)) - row = txn.fetchone() - return row[0] - - def get_user_count_in_room(self, room_id, membership): - """ - Get the user count in a room with a particular membership. + Get the user count in a room by membership. Args: room_id (str) @@ -170,9 +153,15 @@ class RoomMemberWorkerStore(EventsWorkerStore): Returns: Deferred[int] """ - return self.runInteraction( - "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership - ) + sql = """ + SELECT m.membership, count(*) FROM room_memberships as m + INNER JOIN current_state_events as c USING(event_id) + WHERE c.type = 'm.room.member' AND c.room_id = ? + GROUP BY m.membership + """ + + txn.execute(sql, (room_id,)) + return {row[0]: row[1] for row in txn} @cached() def get_invited_rooms_for_user(self, user_id): diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 99b4af5555..727f60b3bd 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -226,18 +226,9 @@ class StatsStore(StateDeltasStore): current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) current_state_events = len(current_state_ids) - joined_members = self._get_user_count_in_room_txn( - txn, room_id, Membership.JOIN - ) - invited_members = self._get_user_count_in_room_txn( - txn, room_id, Membership.INVITE - ) - left_members = self._get_user_count_in_room_txn( - txn, room_id, Membership.LEAVE - ) - banned_members = self._get_user_count_in_room_txn( - txn, room_id, Membership.BAN - ) + + membership_counts = self._get_user_counts_in_room_txn(txn, room_id) + total_state_events = self._get_total_state_event_counts_txn( txn, room_id ) @@ -250,10 +241,10 @@ class StatsStore(StateDeltasStore): { "bucket_size": self.stats_bucket_size, "current_state_events": current_state_events, - "joined_members": joined_members, - "invited_members": invited_members, - "left_members": left_members, - "banned_members": banned_members, + "joined_members": membership_counts.get(Membership.JOIN, 0), + "invited_members": membership_counts.get(Membership.INVITE, 0), + "left_members": membership_counts.get(Membership.LEAVE, 0), + "banned_members": membership_counts.get(Membership.BAN, 0), "state_events": total_state_events, }, ) -- cgit 1.4.1 From e2c46ed851599dc08cc8a822e07c0d4f9a050ee2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 15:26:38 +0100 Subject: Move deletion from table inside txn --- synapse/storage/stats.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 727f60b3bd..a99637d4b4 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -254,10 +254,13 @@ class StatsStore(StateDeltasStore): {"room_id": room_id, "token": current_token}, ) + # We've finished a room. Delete it from the table. + self._simple_delete_one_txn( + txn, TEMP_TABLE + "_rooms", {"room_id": room_id}, + ) + yield self.runInteraction("update_room_stats", _fetch_data) - # We've finished a room. Delete it from the table. - yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) # Update the remaining counter. progress["remaining"] -= 1 yield self.runInteraction( -- cgit 1.4.1 From 5037326d6624d1d1780a0536d19ff79e275f8735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 15:37:57 +0100 Subject: Add indices. Remove room_ids accidentally added We have to do this by re-inserting a background update and recreating tables, as the tables only get created during a background update and will later be deleted. We also make sure that we remove any entries that should have been removed but weren't due to a race that has been fixed in a previous commit. --- synapse/storage/schema/delta/54/stats2.sql | 28 ++++++++++++++++++++ synapse/storage/stats.py | 41 ++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 13 deletions(-) create mode 100644 synapse/storage/schema/delta/54/stats2.sql (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/schema/delta/54/stats2.sql b/synapse/storage/schema/delta/54/stats2.sql new file mode 100644 index 0000000000..3b2d48447f --- /dev/null +++ b/synapse/storage/schema/delta/54/stats2.sql @@ -0,0 +1,28 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This delta file gets run after `54/stats.sql` delta. + +-- We want to add some indices to the temporary stats table, so we re-insert +-- 'populate_stats_createtables' if we are still processing the rooms update. +INSERT INTO background_updates (update_name, progress_json) + SELECT 'populate_stats_createtables', '{}' + WHERE + 'populate_stats_process_rooms' IN ( + SELECT update_name FROM background_updates + ) + AND 'populate_stats_createtables' NOT IN ( -- don't insert if already exists + SELECT update_name FROM background_updates + ); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a99637d4b4..1c0b183a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.storage.prepare_database import get_statements from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore): # Get all the rooms that we want to process. def _make_staging_area(txn): - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" - ) - txn.execute(sql) + # Create the temporary tables + stmts = get_statements(""" + -- We just recreate the table, we'll be reinserting the + -- correct entries again later anyway. + DROP TABLE IF EXISTS {temp}_rooms; + + CREATE TABLE IF NOT EXISTS {temp}_rooms( + room_id TEXT NOT NULL, + events BIGINT NOT NULL + ); + + CREATE INDEX {temp}_rooms_events + ON {temp}_rooms(events); + CREATE INDEX {temp}_rooms_id + ON {temp}_rooms(room_id); + """.format(temp=TEMP_TABLE).splitlines()) + + for statement in stmts: + txn.execute(statement) sql = ( "CREATE TABLE IF NOT EXISTS " @@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore): ) txn.execute(sql) - # Get rooms we want to process from the database + # Get rooms we want to process from the database, only adding + # those that we haven't (i.e. those not in room_stats_earliest_token) sql = """ - SELECT room_id, count(*) FROM current_state_events - GROUP BY room_id - """ + INSERT INTO %s_rooms (room_id, events) + SELECT c.room_id, count(*) FROM current_state_events AS c + LEFT JOIN room_stats_earliest_token AS t USING (room_id) + WHERE t.room_id IS NULL + GROUP BY c.room_id + """ % (TEMP_TABLE,) txn.execute(sql) - rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] - self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) - del rooms new_pos = yield self.get_max_stream_id_in_current_state_deltas() yield self.runInteraction("populate_stats_temp_build", _make_staging_area) -- cgit 1.4.1