diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-09 16:52:21 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-09 16:52:21 +0100 |
commit | 5c1f886c75979b606c1e8bb45fcd8b7d26a71f39 (patch) | |
tree | ee651459cda811d4ac5d11f1e2fc4d9c06df037a /synapse/storage/room.py | |
parent | Update (diff) | |
parent | Merge pull request #6185 from matrix-org/erikj/fix_censored_evnets (diff) | |
download | synapse-5c1f886c75979b606c1e8bb45fcd8b7d26a71f39.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/patch_inner
Diffstat (limited to 'synapse/storage/room.py')
-rw-r--r-- | synapse/storage/room.py | 246 |
1 files changed, 171 insertions, 75 deletions
diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 08e13f3a3b..43cc56fa6f 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ import collections import logging import re +from typing import Optional, Tuple from canonicaljson import json @@ -24,6 +26,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.storage.search import SearchStore +from synapse.types import ThirdPartyInstanceID from synapse.util.caches.descriptors import cached, cachedInlineCallbacks logger = logging.getLogger(__name__) @@ -63,103 +66,196 @@ class RoomWorkerStore(SQLBaseStore): desc="get_public_room_ids", ) - @cached(num_args=2, max_entries=100) - def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): - """Get pulbic rooms for a particular list, or across all lists. + def count_public_rooms(self, network_tuple, ignore_non_federatable): + """Counts the number of public rooms as tracked in the room_stats_current + and room_stats_state table. Args: - stream_id (int) - network_tuple (ThirdPartyInstanceID): The list to use (None, None) - means the main list, None means all lsits. + network_tuple (ThirdPartyInstanceID|None) + ignore_non_federatable (bool): If true filters out non-federatable rooms """ - return self.runInteraction( - "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, - stream_id, - network_tuple=network_tuple, - ) - - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple): - return { - rm - for rm, vis in self.get_published_at_stream_id_txn( - txn, stream_id, network_tuple=network_tuple - ).items() - if vis - } - def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): - if network_tuple: - # We want to get from a particular list. No aggregation required. + def _count_public_rooms_txn(txn): + query_args = [] + + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ sql = """ - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id - FROM public_room_list_stream - WHERE stream_id <= ? %s - GROUP BY room_id - ) grouped USING (room_id, stream_id) + SELECT + COALESCE(COUNT(*), 0) + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + """ % { + "published_sql": published_sql + } + + txn.execute(sql, query_args) + return txn.fetchone()[0] + + return self.runInteraction("count_public_rooms", _count_public_rooms_txn) + + @defer.inlineCallbacks + def get_largest_public_rooms( + self, + network_tuple: Optional[ThirdPartyInstanceID], + search_filter: Optional[dict], + limit: Optional[int], + bounds: Optional[Tuple[int, str]], + forwards: bool, + ignore_non_federatable: bool = False, + ): + """Gets the largest public rooms (where largest is in terms of joined + members, as tracked in the statistics table). + + Args: + network_tuple + search_filter + limit: Maxmimum number of rows to return, unlimited otherwise. + bounds: An uppoer or lower bound to apply to result set if given, + consists of a joined member count and room_id (these are + excluded from result set). + forwards: true iff going forwards, going backwards otherwise + ignore_non_federatable: If true filters out non-federatable rooms. + + Returns: + Rooms in order: biggest number of joined users first. + We then arbitrarily use the room_id as a tie breaker. + + """ + + where_clauses = [] + query_args = [] + + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list """ - if network_tuple.appservice_id is not None: - txn.execute( - sql % ("AND appservice_id = ? AND network_id = ?",), - (stream_id, network_tuple.appservice_id, network_tuple.network_id), + # Work out the bounds if we're given them, these bounds look slightly + # odd, but are designed to help query planner use indices by pulling + # out a common bound. + if bounds: + last_joined_members, last_room_id = bounds + if forwards: + where_clauses.append( + """ + joined_members <= ? AND ( + joined_members < ? OR room_id < ? + ) + """ ) else: - txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,)) - return dict(txn) - else: - # We want to get from all lists, so we need to aggregate the results + where_clauses.append( + """ + joined_members >= ? AND ( + joined_members > ? OR room_id > ? + ) + """ + ) - logger.info("Executing full list") + query_args += [last_joined_members, last_joined_members, last_room_id] - sql = """ - SELECT room_id, visibility - FROM public_room_list_stream - INNER JOIN ( - SELECT - room_id, max(stream_id) AS stream_id, appservice_id, - network_id - FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id, appservice_id, network_id - ) grouped USING (room_id, stream_id) - """ + if ignore_non_federatable: + where_clauses.append("is_federatable") - txn.execute(sql, (stream_id,)) + if search_filter and search_filter.get("generic_search_term", None): + search_term = "%" + search_filter["generic_search_term"] + "%" - results = {} - # A room is visible if its visible on any list. - for room_id, visibility in txn: - results[room_id] = bool(visibility) or results.get(room_id, False) + where_clauses.append( + """ + ( + name LIKE ? + OR topic LIKE ? + OR canonical_alias LIKE ? + ) + """ + ) + query_args += [search_term, search_term, search_term] + + where_clause = "" + if where_clauses: + where_clause = " AND " + " AND ".join(where_clauses) + + sql = """ + SELECT + room_id, name, topic, canonical_alias, joined_members, + avatar, history_visibility, joined_members, guest_access + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + %(where_clause)s + ORDER BY joined_members %(dir)s, room_id %(dir)s + """ % { + "published_sql": published_sql, + "where_clause": where_clause, + "dir": "DESC" if forwards else "ASC", + } - return results + if limit is not None: + query_args.append(limit) - def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple): - def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn( - txn, prev_stream_id, network_tuple - ) + sql += """ + LIMIT ? + """ - now_rooms_dict = self.get_published_at_stream_id_txn( - txn, new_stream_id, network_tuple - ) + def _get_largest_public_rooms_txn(txn): + txn.execute(sql, query_args) - now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis) - now_rooms_not_visible = set( - rm for rm, vis in now_rooms_dict.items() if not vis - ) + results = self.cursor_to_dict(txn) - newly_visible = now_rooms_visible - then_rooms - newly_unpublished = now_rooms_not_visible & then_rooms + if not forwards: + results.reverse() - return newly_visible, newly_unpublished + return results - return self.runInteraction( - "get_public_room_changes", get_public_room_changes_txn + ret_val = yield self.runInteraction( + "get_largest_public_rooms", _get_largest_public_rooms_txn ) + defer.returnValue(ret_val) @cached(max_entries=10000) def is_room_blocked(self, room_id): |