diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/room.py | 187 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/appservice_room_list.sql | 29 |
2 files changed, 196 insertions, 20 deletions
diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 11813b44f6..8a2fe2fdf5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore from .engines import PostgresEngine, Sqlite3Engine @@ -106,7 +107,11 @@ class RoomStore(SQLBaseStore): entries = self._simple_select_list_txn( txn, table="public_room_list_stream", - keyvalues={"room_id": room_id}, + keyvalues={ + "room_id": room_id, + "appservice_id": None, + "network_id": None, + }, retcols=("stream_id", "visibility"), ) @@ -124,6 +129,8 @@ class RoomStore(SQLBaseStore): "stream_id": next_id, "room_id": room_id, "visibility": is_public, + "appservice_id": None, + "network_id": None, } ) @@ -132,6 +139,87 @@ class RoomStore(SQLBaseStore): "set_room_is_public", set_room_is_public_txn, next_id, ) + self.hs.get_notifier().on_new_replication_data() + + @defer.inlineCallbacks + def set_room_is_public_appservice(self, room_id, appservice_id, network_id, + is_public): + """Edit the appservice/network specific public room list. + + Each appservice can have a number of published room lists associated + with them, keyed off of an appservice defined `network_id`, which + basically represents a single instance of a bridge to a third party + network. + + Args: + room_id (str) + appservice_id (str) + network_id (str) + is_public (bool): Whether to publish or unpublish the room from the + list. + """ + def set_room_is_public_appservice_txn(txn, next_id): + if is_public: + try: + self._simple_insert_txn( + txn, + table="appservice_room_list", + values={ + "appservice_id": appservice_id, + "network_id": network_id, + "room_id": room_id + }, + ) + except self.database_engine.module.IntegrityError: + # We've already inserted, nothing to do. + return + else: + self._simple_delete_txn( + txn, + table="appservice_room_list", + keyvalues={ + "appservice_id": appservice_id, + "network_id": network_id, + "room_id": room_id + }, + ) + + entries = self._simple_select_list_txn( + txn, + table="public_room_list_stream", + keyvalues={ + "room_id": room_id, + "appservice_id": appservice_id, + "network_id": network_id, + }, + retcols=("stream_id", "visibility"), + ) + + entries.sort(key=lambda r: r["stream_id"]) + + add_to_stream = True + if entries: + add_to_stream = bool(entries[-1]["visibility"]) != is_public + + if add_to_stream: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + "appservice_id": appservice_id, + "network_id": network_id, + } + ) + + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "set_room_is_public_appservice", + set_room_is_public_appservice_txn, next_id, + ) + self.hs.get_notifier().on_new_replication_data() def get_public_room_ids(self): return self._simple_select_onecol( @@ -259,38 +347,96 @@ class RoomStore(SQLBaseStore): def get_current_public_room_stream_id(self): return self._public_room_id_gen.get_current_token() - def get_public_room_ids_at_stream_id(self, stream_id): + @cached(num_args=2, max_entries=100) + def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): + """Get pulbic rooms for a particular list, or across all lists. + + Args: + stream_id (int) + network_tuple (ThirdPartyInstanceID): The list to use (None, None) + means the main list, None means all lsits. + """ return self.runInteraction( "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, stream_id + self.get_public_room_ids_at_stream_id_txn, + stream_id, network_tuple=network_tuple ) - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, + network_tuple): return { rm - for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + for rm, vis in self.get_published_at_stream_id_txn( + txn, stream_id, network_tuple=network_tuple + ).items() if vis } - def get_published_at_stream_id_txn(self, txn, stream_id): - sql = (""" - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id + def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): + if network_tuple: + # We want to get from a particular list. No aggregation required. + + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? %s + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + if network_tuple.appservice_id is not None: + txn.execute( + sql % ("AND appservice_id = ? AND network_id = ?",), + (stream_id, network_tuple.appservice_id, network_tuple.network_id,) + ) + else: + txn.execute( + sql % ("AND appservice_id IS NULL",), + (stream_id,) + ) + return dict(txn.fetchall()) + else: + # We want to get from all lists, so we need to aggregate the results + + logger.info("Executing full list") + + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id - ) grouped USING (room_id, stream_id) - """) + INNER JOIN ( + SELECT + room_id, max(stream_id) AS stream_id, appservice_id, + network_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id, appservice_id, network_id + ) grouped USING (room_id, stream_id) + """) - txn.execute(sql, (stream_id,)) - return dict(txn.fetchall()) + txn.execute( + sql, + (stream_id,) + ) + + results = {} + # A room is visible if its visible on any list. + for room_id, visibility in txn.fetchall(): + results[room_id] = bool(visibility) or results.get(room_id, False) - def get_public_room_changes(self, prev_stream_id, new_stream_id): + return results + + def get_public_room_changes(self, prev_stream_id, new_stream_id, + network_tuple): def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + then_rooms = self.get_public_room_ids_at_stream_id_txn( + txn, prev_stream_id, network_tuple + ) - now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + now_rooms_dict = self.get_published_at_stream_id_txn( + txn, new_stream_id, network_tuple + ) now_rooms_visible = set( rm for rm, vis in now_rooms_dict.items() if vis @@ -311,7 +457,8 @@ class RoomStore(SQLBaseStore): def get_all_new_public_rooms(self, prev_id, current_id, limit): def get_all_new_public_rooms(txn): sql = (""" - SELECT stream_id, room_id, visibility FROM public_room_list_stream + SELECT stream_id, room_id, visibility, appservice_id, network_id + FROM public_room_list_stream WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id ASC LIMIT ? diff --git a/synapse/storage/schema/delta/39/appservice_room_list.sql b/synapse/storage/schema/delta/39/appservice_room_list.sql new file mode 100644 index 0000000000..74bdc49073 --- /dev/null +++ b/synapse/storage/schema/delta/39/appservice_room_list.sql @@ -0,0 +1,29 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE appservice_room_list( + appservice_id TEXT NOT NULL, + network_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +-- Each appservice can have multiple published room lists associated with them, +-- keyed of a particular network_id +CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list( + appservice_id, network_id, room_id +); + +ALTER TABLE public_room_list_stream ADD COLUMN appservice_id TEXT; +ALTER TABLE public_room_list_stream ADD COLUMN network_id TEXT; |