From 6e614e9e10d35006707cc6eceafe80c13eb13948 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2017 14:58:13 +0100 Subject: Add background task to clear out old event_auth --- .../storage/schema/delta/42/event_auth_state_only.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 synapse/storage/schema/delta/42/event_auth_state_only.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/42/event_auth_state_only.sql b/synapse/storage/schema/delta/42/event_auth_state_only.sql new file mode 100644 index 0000000000..b8821ac759 --- /dev/null +++ b/synapse/storage/schema/delta/42/event_auth_state_only.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('event_auth_state_only', '{}'); -- cgit 1.5.1 From a584a81b3e59e9a4763d81d1a8b893fbd1a45ce0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 14:41:42 +0100 Subject: Add current_state_delta_stream table --- synapse/storage/events.py | 31 ++++++++++++++++------ .../schema/delta/42/current_state_delta.sql | 25 +++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) create mode 100644 synapse/storage/schema/delta/42/current_state_delta.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3d4f53ea55..c37a2a6f16 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -648,9 +648,10 @@ class EventsStore(SQLBaseStore): list of the event ids which are the forward extremities. """ - self._update_current_state_txn(txn, current_state_for_room) - max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering + + self._update_current_state_txn(txn, current_state_for_room, max_stream_order) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremeties, @@ -713,7 +714,7 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room): + def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in state_delta_by_room.iteritems(): to_delete, to_insert, _ = current_state_tuple txn.executemany( @@ -735,6 +736,24 @@ class EventsStore(SQLBaseStore): ], ) + state_deltas = {key: None for key in to_delete} + state_deltas.update(to_insert) + + self._simple_insert_many_txn( + txn, + table="current_state_delta_stream", + values=[ + { + "stream_id": max_stream_order, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": ev_id, + } + for key, ev_id in state_deltas.iteritems() + ] + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the @@ -743,11 +762,7 @@ class EventsStore(SQLBaseStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in to_delete.iterkeys() - if ev_type == EventTypes.Member - ) - members_changed.update( - state_key for ev_type, state_key in to_insert.iterkeys() + state_key for ev_type, state_key in state_deltas if ev_type == EventTypes.Member ) diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql new file mode 100644 index 0000000000..1a55aa912b --- /dev/null +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -0,0 +1,25 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE current_state_delta_stream ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + event_id TEXT -- Is null if the key was removed +); + +CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.5.1 From 04095f75810176d7ba2b5ef70b40dd1a3281850d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 14:53:01 +0100 Subject: Add clobbered event_id --- synapse/storage/events.py | 1 + synapse/storage/schema/delta/42/current_state_delta.sql | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c37a2a6f16..dfb57f9d12 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -749,6 +749,7 @@ class EventsStore(SQLBaseStore): "type": key[0], "state_key": key[1], "event_id": ev_id, + "prev_event_id": to_delete.get(key, None), } for key, ev_id in state_deltas.iteritems() ] diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql index 1a55aa912b..bf124f3def 100644 --- a/synapse/storage/schema/delta/42/current_state_delta.sql +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -19,7 +19,8 @@ CREATE TABLE current_state_delta_stream ( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - event_id TEXT -- Is null if the key was removed + event_id TEXT, -- Is null if the key was removed + prev_event_id TEXT ); CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.5.1 From dd48f7204c511d5ba4438dfe01679bcb7367216d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 15:01:22 +0100 Subject: Add comment --- synapse/storage/schema/delta/42/current_state_delta.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql index bf124f3def..d28851aff8 100644 --- a/synapse/storage/schema/delta/42/current_state_delta.sql +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -20,7 +20,7 @@ CREATE TABLE current_state_delta_stream ( type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT, -- Is null if the key was removed - prev_event_id TEXT + prev_event_id TEXT -- Is null if the key was added ); CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.5.1 From eeb2f9e546060ca9f2ef7260220b51d85d9b0d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:51:01 +0100 Subject: Add user_directory to database --- synapse/handlers/user_directory.py | 218 ++++++++++++++++++++++++++++ synapse/notifier.py | 6 +- synapse/server.py | 5 + synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/42/user_dir.py | 69 +++++++++ synapse/storage/user_directory.py | 145 ++++++++++++++++++ 6 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 synapse/handlers/user_directory.py create mode 100644 synapse/storage/schema/delta/42/user_dir.py create mode 100644 synapse/storage/user_directory.py (limited to 'synapse/storage/schema') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py new file mode 100644 index 0000000000..43e917c1a0 --- /dev/null +++ b/synapse/handlers/user_directory.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.util.metrics import Measure + + +logger = logging.getLogger(__name__) + + +class UserDirectoyHandler(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + + self.initially_handled_users = set() + + self.pos = None + + self._is_processing = False + + @defer.inlineCallbacks + def notify_new_event(self): + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + if self.pos is None: + self.pos = yield self.store.get_user_directory_stream_pos() + + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_user_directory_stream_pos() + + while True: + with Measure(self.clock, "user_dir_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + yield self._handle_deltas(deltas) + + max_stream_id = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(max_stream_id) + + @defer.inlineCallbacks + def _handle_room(self, room_id): + # TODO: Check we're still joined to room + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: + return + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users + + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) + + self.initially_handled_users |= unhandled_users + + @defer.inlineCallbacks + def _do_initial_spam(self): + yield self.store.delete_all_from_user_dir() + + room_ids = yield self.store.get_all_rooms() + + for room_id in room_ids: + yield self._handle_room(room_id) + + self.initially_handled_users = None + + yield self.store.update_user_directory_stream_pos(-1) + + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if is_public: + return + + yield self.store.remove_from_user_dir(user_id) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rules", + public_value=JoinRules.PUBLIC, + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.Member: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if change is None: + continue + + if change: + event = yield self.store.get_event(event_id) + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) + + yield self._handle_new_user(room_id, state_key, profile) + else: + yield self._handle_remove_user(room_id, state_key) + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + defer.returnValue(None) + + prev_hist_vis = None + hist_vis = None + + if prev_event: + prev_hist_vis = prev_event.content.get(key_name, None) + + if event: + hist_vis = event.content.get(key_name, None) + + logger.info("prev: %r, new: %r", prev_hist_vis, hist_vis) + + if hist_vis == public_value and prev_hist_vis != public_value: + defer.returnValue(True) + elif hist_vis != public_value and prev_hist_vis == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48566187ab..6b1709d700 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -167,6 +167,7 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.user_directory_handler = hs.get_user_directory_handler() if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() @@ -251,7 +252,10 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id) + room_stream_id + ) + + preserve_fn(self.user_directory_handler.notify_new_event)() if self.federation_sender: preserve_fn(self.federation_sender.notify_new_events)( diff --git a/synapse/server.py b/synapse/server.py index e400e278c6..a38e5179e0 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -49,6 +49,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler +from synapse.handlers.user_directory import UserDirectoyHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -137,6 +138,7 @@ class HomeServer(object): 'tcp_replication', 'read_marker_handler', 'action_generator', + 'user_directory_handler', ] def __init__(self, hostname, **kwargs): @@ -304,6 +306,9 @@ class HomeServer(object): def build_action_generator(self): return ActionGenerator(self) + def build_user_directory_handler(self): + return UserDirectoyHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d604e7668f..11655bf60f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -49,6 +49,7 @@ from .tags import TagsStore from .account_data import AccountDataStore from .openid import OpenIdStore from .client_ips import ClientIpStore +from .user_directory import UserDirectoryStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from .engines import PostgresEngine @@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore, ClientIpStore, DeviceStore, DeviceInboxStore, + UserDirectoryStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py new file mode 100644 index 0000000000..38538960a4 --- /dev/null +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -0,0 +1,69 @@ +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +logger = logging.getLogger(__name__) + + +BOTH_TABLES = """ +CREATE TABLE user_directory_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); +""" + + +POSTGRES_TABLE = """ +CREATE TABLE user_directory ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + display_name TEXT, + avatar_url TEXT, + vector tsvector +); + +CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory + USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(BOTH_TABLES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) + elif isinstance(database_engine, Sqlite3Engine): + for statement in get_statements(SQLITE_TABLE.splitlines()): + cur.execute(statement) + else: + raise Exception("Unrecognized database engine") + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py new file mode 100644 index 0000000000..6c7c8c4bee --- /dev/null +++ b/synapse/storage/user_directory.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +class UserDirectoryStore(SQLBaseStore): + + @cachedInlineCallbacks(cache_context=True) + def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + current_state_ids = yield self.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate + ) + + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + if join_rules_id: + join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + if join_rule_ev: + if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + defer.returnValue(True) + + hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist_vis_id: + hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + if hist_vis_ev: + if hist_vis_ev.content.get("history_visibility") == "world_readable": + defer.returnValue(True) + + defer.returnValue(False) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, vector) + VALUES (?,?,?,?,to_tsvector('english', ?)) + """ + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, value) + VALUES (?,?,?,?,?) + """ + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, ( + ( + user_id, room_id, p.display_name, p.avatar_url, + "%s %s" % (user_id, p.display_name,) if p.display_name else user_id + ) + for user_id, p in users_with_profile.iteritems() + )) + for user_id in users_with_profile: + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + + return self.runInteraction( + "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn + ) + + @defer.inlineCallbacks + def remove_from_user_dir(self, user_id): + yield self._simple_delete( + table="user_directory", + keyvalues={"user_id": user_id}, + desc="remove_from_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + + def get_all_rooms(self): + return self._simple_select_onecol( + table="current_state_events", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms", + ) + + def delete_all_from_user_dir(self): + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.call_after(self.get_user_in_directory.invalidate_all) + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("room_id", "display_name", "avatar_url",), + allow_none=True, + desc="get_user_in_directory", + ) + + def get_user_directory_stream_pos(self): + return self._simple_select_one_onecol( + table="user_directory_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="get_user_directory_stream_pos", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + def get_current_state_deltas(self, prev_stream_id): + # TODO: Add stream change cache + # TODO: Add limit + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE stream_id > ? + ORDER BY stream_id ASC + """ + + return self._execute( + "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + ) -- cgit 1.5.1 From 63fda37e20015f0fe56aed86f907035d42fdc2ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:00:29 +0100 Subject: Add comments --- synapse/handlers/user_directory.py | 161 ++++++++++++++++++------- synapse/rest/client/v2_alpha/user_directory.py | 16 +++ synapse/storage/schema/delta/42/user_dir.py | 2 +- synapse/storage/user_directory.py | 39 +++++- 4 files changed, 173 insertions(+), 45 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4a9565df93..88b79e3325 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -26,25 +26,54 @@ logger = logging.getLogger(__name__) class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + """ + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already self.initially_handled_users = set() + # The current position in the current_state_delta stream self.pos = None + # Guard to ensure we only process deltas one at a time self._is_processing = False + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory self.clock.call_later(0, self.notify_new_event) def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ return self.store.search_user_dir(search_term, limit) @defer.inlineCallbacks def notify_new_event(self): + """Called when there may be more deltas to process + """ if self._is_processing: return @@ -56,13 +85,16 @@ class UserDirectoyHandler(object): @defer.inlineCallbacks def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() + # If still None then we need to do the initial fill of directory if self.pos is None: yield self._do_initial_spam() self.pos = yield self.store.get_user_directory_stream_pos() + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): deltas = yield self.store.get_current_state_deltas(self.pos) @@ -74,69 +106,53 @@ class UserDirectoyHandler(object): self.pos = deltas[-1]["stream_id"] yield self.store.update_user_directory_stream_pos(self.pos) - @defer.inlineCallbacks - def _handle_room(self, room_id): - # TODO: Check we're still joined to room - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } - ) - - self.initially_handled_users |= unhandled_users - @defer.inlineCallbacks def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + # TODO: pull from current delta stream_id new_pos = self.store.get_room_max_stream_ordering() + # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() + # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() for room_id in room_ids: - yield self._handle_room(room_id) + yield self._handle_intial_room(room_id) self.initially_handled_users = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): - row = yield self.store.get_user_in_directory(user_id) - if row: - return - - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + # TODO: Check we're still joined to room - def _handle_remove_user(self, room_id, user_id): - row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: return - # TODO: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users - if is_public: - yield self.store.update_user_in_user_dir(user_id, j_room_id) - return + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) - yield self.store.remove_from_user_dir(user_id) + self.initially_handled_users |= unhandled_users @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -144,22 +160,33 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) + + # If change is None, no change. True => become world readable, + # False => was world readable if change is None: continue + # There's been a change to or from being world readable. + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - if change and is_public: + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change continue - elif not change and not is_public: + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change continue users_with_profile = yield self.state.get_current_user_in_room(room_id) @@ -213,8 +240,60 @@ class UserDirectoyHandler(object): else: yield self._handle_remove_user(room_id, state_key) + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + # Either the user wasn't in directory or we're still in a room that + # is public (i.e. the room_id in the database) + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) + return + + yield self.store.remove_from_user_dir(user_id) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` o + neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + Falsse if it did match `public_value` but now doesn't + """ prev_event = None event = None if prev_event_id: diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index fe91207195..17d3dffc8f 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -39,6 +39,22 @@ class UserDirectorySearchRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 38538960a4..57b89ba552 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -34,7 +34,7 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, avatar_url TEXT, vector tsvector diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ebcc8b9633..83812bf092 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -26,6 +26,8 @@ class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + """Check if the room is either world_readable or publically joinable + """ current_state_ids = yield self.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) @@ -47,14 +49,24 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are in that is world_readable + or publically joinable + users_with_profile (dict): Users to add to directory in the form of + mapping of user_id -> ProfileInfo + """ if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) VALUES (?,?,?,?, setweight(to_tsvector('english', ?), 'A') - || to_tsvector('english', ?) - || to_tsvector('english', COALESCE(?, '')) + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ args = ( @@ -113,6 +125,8 @@ class UserDirectoryStore(SQLBaseStore): self.get_user_in_directory.invalidate((user_id,)) def get_all_rooms(self): + """Get all room_ids we've ever known about + """ return self._simple_select_onecol( table="current_state_events", keyvalues={}, @@ -121,6 +135,8 @@ class UserDirectoryStore(SQLBaseStore): ) def delete_all_from_user_dir(self): + """Delete the entire user directory + """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.call_after(self.get_user_in_directory.invalidate_all) @@ -170,12 +186,29 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def search_user_dir(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ + if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) -- cgit 1.5.1 From 350622a107c356da630eba09b63ed4b6de94b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:11:36 +0100 Subject: Handle the server leaving a public room --- synapse/handlers/user_directory.py | 23 ++++++++++++++++++++--- synapse/state.py | 11 +++++++++++ synapse/storage/schema/delta/42/user_dir.py | 4 ++++ synapse/storage/user_directory.py | 11 +++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 88b79e3325..4e491a43e6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -132,7 +132,9 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - # TODO: Check we're still joined to room + is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + if not is_in_room: + return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: @@ -229,7 +231,22 @@ class UserDirectoyHandler(object): if change is None: continue - if change: + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.get_is_host_in_room( + room_id, self.server_name, + ) + if not is_in_room: + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + + if change: # The user joined event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -237,7 +254,7 @@ class UserDirectoyHandler(object): ) yield self._handle_new_user(room_id, state_key, profile) - else: + else: # The user left yield self._handle_remove_user(room_id, state_key) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 02fee47f39..dffa79e4c9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -186,6 +186,17 @@ class StateHandler(object): ) defer.returnValue(joined_hosts) + @defer.inlineCallbacks + def get_is_host_in_room(self, room_id, host, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_is_host_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + is_host_joined = yield self.store.is_host_joined( + room_id, host, entry.state_id, entry.state + ) + defer.returnValue(is_host_joined) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 57b89ba552..95a7a79fd3 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -41,6 +41,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,6 +49,9 @@ CREATE INDEX user_directory_user_idx ON user_directory(user_id); SQLITE_TABLE = """ CREATE VIRTUAL TABLE user_directory USING fts4 ( user_id, room_id, display_name, avatar_url, value ); + +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83812bf092..0df979cb01 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -124,6 +124,17 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="user_directory", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + def get_all_rooms(self): """Get all room_ids we've ever known about """ -- cgit 1.5.1 From 5d79d728f5f38463171f0d063713905f8cb9faec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:23:49 +0100 Subject: Split out directory and search tables --- synapse/storage/schema/delta/42/user_dir.py | 25 ++++++------ synapse/storage/user_directory.py | 60 ++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 29 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 95a7a79fd3..7e32662928 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -28,30 +28,33 @@ CREATE TABLE user_directory_stream_pos ( ); INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -""" - -POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, - avatar_url TEXT, - vector tsvector + avatar_url TEXT ); -CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ -SQLITE_TABLE = """ -CREATE VIRTUAL TABLE user_directory - USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +POSTGRES_TABLE = """ +CREATE TABLE user_directory_search ( + user_id TEXT NOT NULL, + vector tsvector +); -CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); +CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory_search + USING fts4 ( user_id, value ); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 011c711ec1..b1957cb873 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -61,9 +61,8 @@ class UserDirectoryStore(SQLBaseStore): # We weight the loclpart most highly, then display name and finally # server name sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?, + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') @@ -71,21 +70,19 @@ class UserDirectoryStore(SQLBaseStore): """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - p.display_name, + user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), + profile.display_name, ) - for user_id, p in users_with_profile.iteritems() + for user_id, profile in users_with_profile.iteritems() ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, value) - VALUES (?,?,?,?,?) + INSERT INTO user_directory_search(user_id, value) + VALUES (?,?) """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, + user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() @@ -96,6 +93,19 @@ class UserDirectoryStore(SQLBaseStore): def _add_profiles_to_user_dir_txn(txn): txn.executemany(sql, args) + self._simple_insert_many_txn( + txn, + table="user_directory", + values=[ + { + "user_id": user_id, + "room_id": room_id, + "display_name": profile.display_name, + "avatar_url": profile.avatar_url, + } + for user_id, profile in users_with_profile.iteritems() + ] + ) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -117,12 +127,23 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def remove_from_user_dir(self, user_id): - yield self._simple_delete( - table="user_directory", - keyvalues={"user_id": user_id}, - desc="remove_from_user_dir", + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + ) + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + return self.runInteraction( + "remove_from_user_dir", _remove_from_user_dir_txn, ) - self.get_user_in_directory.invalidate((user_id,)) def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're @@ -150,6 +171,7 @@ class UserDirectoryStore(SQLBaseStore): """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -225,7 +247,8 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE vector @@ plainto_tsquery('english', ?) ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? @@ -234,7 +257,8 @@ class UserDirectoryStore(SQLBaseStore): elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? -- cgit 1.5.1 From 5dd1b2c525cb786614d1503757bf52a7086f3bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:29:12 +0100 Subject: Use unique indices --- synapse/storage/schema/delta/42/user_dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 7e32662928..c34aa5e7d2 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -37,7 +37,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,7 +48,7 @@ CREATE TABLE user_directory_search ( ); CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); -CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id); """ -- cgit 1.5.1 From 21e255a8f1948c2fd298ce2e037d20bdd25f2f69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:50:46 +0100 Subject: Split the table in two --- synapse/handlers/user_directory.py | 77 +++++++++++++++++++---------- synapse/storage/_base.py | 5 ++ synapse/storage/schema/delta/42/user_dir.py | 10 +++- synapse/storage/user_directory.py | 77 +++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 31 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a8525fc86a..d795a9f8d5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,6 +50,7 @@ class UserDirectoyHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() + self.initially_handled_users_in_public = set() # The current position in the current_state_delta stream self.pos = None @@ -145,8 +146,6 @@ class UserDirectoyHandler(object): return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return users_with_profile = yield self.state.get_current_user_in_room(room_id) unhandled_users = set(users_with_profile) - self.initially_handled_users @@ -159,6 +158,13 @@ class UserDirectoyHandler(object): self.initially_handled_users |= unhandled_users + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -206,14 +212,7 @@ class UserDirectoyHandler(object): else: logger.debug("Server is still in room: %r", room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - if change: # The user joined - if not is_public: - return - event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -276,11 +275,13 @@ class UserDirectoyHandler(object): # ignore the change return - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): yield self._handle_new_user(room_id, user_id, profile) - else: + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: yield self._handle_remove_user(room_id, user_id) @defer.inlineCallbacks @@ -292,11 +293,21 @@ class UserDirectoyHandler(object): user_id (str) """ logger.debug("Adding user to dir, %r", user_id) + row = yield self.store.get_user_in_directory(user_id) - if row: + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: return - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -309,15 +320,20 @@ class UserDirectoyHandler(object): logger.debug("Maybe removing user %r", user_id) row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: - # Either the user wasn't in directory or we're still in a room that - # is public (i.e. the room_id in the database) - logger.debug("Not removing as row: %r", row) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: return # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + is_in_room = yield self.state.get_is_host_in_room( j_room_id, self.server_name, ) @@ -325,16 +341,23 @@ class UserDirectoyHandler(object): if not is_in_room: continue - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: + if update_user_dir: + update_user_dir = False yield self.store.update_user_in_user_dir(user_id, j_room_id) - logger.debug("Not removing as found other public room: %r", j_room_id) - return - yield self.store.remove_from_user_dir(user_id) + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..db816346f5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -425,6 +425,11 @@ class SQLBaseStore(object): txn.execute(sql, vals) + def _simple_insert_many(self, table, values, desc): + return self.runInteraction( + desc, self._simple_insert_many_txn, table, values + ) + @staticmethod def _simple_insert_many_txn(txn, table, values): if not values: diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index c34aa5e7d2..ea6a18196d 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -31,13 +31,21 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, -- A room_id that we know is public + room_id TEXT NOT NULL, -- A room_id that we know the user is joined to display_name TEXT, avatar_url TEXT ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); + +CREATE TABLE users_in_pubic_room ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL -- A room_id that we know is public +); + +CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id); +CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4fe30ce72e..cab0afc5c3 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -50,12 +50,34 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - def add_profiles_to_user_dir(self, room_id, users_with_profile): - """Add profiles to the user directory + @defer.inlineCallbacks + def add_users_to_public_room(self, room_id, user_ids): + """Add user to the list of users in public rooms Args: room_id (str): A room_id that all users are in that is world_readable or publically joinable + user_ids (list(str)): Users to add + """ + yield self._simple_insert_many( + table="users_in_pubic_room", + values=[ + { + "user_id": user_id, + "room_id": room_id, + } + for user_id in user_ids + ], + desc="add_users_to_public_room" + ) + for user_id in user_ids: + self.get_user_in_public_room.invalidate((user_id,)) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ @@ -125,7 +147,15 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) - self.get_user_in_directory.invalidate((user_id,)) + + @defer.inlineCallbacks + def update_user_in_public_user_list(self, user_id, room_id): + yield self._simple_update_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_public_user_list", + ) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -139,13 +169,41 @@ class UserDirectoryStore(SQLBaseStore): table="user_directory_search", keyvalues={"user_id": user_id}, ) + self._simple_delete_txn( + txn, + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + ) txn.call_after( self.get_user_in_directory.invalidate, (user_id,) ) + txn.call_after( + self.get_user_in_public_room.invalidate, (user_id,) + ) return self.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn, ) + @defer.inlineCallbacks + def remove_from_user_in_public_room(self, user_id): + yield self._simple_delete( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + desc="remove_from_user_in_public_room", + ) + self.get_user_in_public_room.invalidate((user_id,)) + + def get_users_in_public_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_public_due_to_room", + ) + def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id @@ -173,6 +231,7 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -188,6 +247,16 @@ class UserDirectoryStore(SQLBaseStore): desc="get_user_in_directory", ) + @cached() + def get_user_in_public_room(self, user_id): + return self._simple_select_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + retcols=("room_id",), + allow_none=True, + desc="get_user_in_public_room", + ) + def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -282,6 +351,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, @@ -295,6 +365,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC, -- cgit 1.5.1 From 65f0513a3306d21aa5e6959b21642ba15dbdcad5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:02:38 +0100 Subject: Split up device_lists_outbound_pokes table for faster updates. --- synapse/storage/devices.py | 82 +++++++--------------- .../schema/delta/42/device_list_last_id.sql | 33 +++++++++ 2 files changed, 57 insertions(+), 58 deletions(-) create mode 100644 synapse/storage/schema/delta/42/device_list_last_id.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d9936c88bb..77b02c8a28 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,10 +37,6 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) - self._clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 - ) - self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -368,7 +364,7 @@ class DeviceStore(SQLBaseStore): prev_sent_id_sql = """ SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_pokes + FROM device_lists_outbound_last_success WHERE destination = ? AND user_id = ? AND stream_id <= ? """ @@ -510,32 +506,43 @@ class DeviceStore(SQLBaseStore): ) def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # First we DELETE all rows such that only the latest row for each - # (destination, user_id is left. We do this by selecting first and - # deleting. + # We update the device_lists_outbound_last_success with the successfully + # poked users. We do the join to see which users need to be inserted and + # which updated. sql = """ - SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? + SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + FROM device_lists_outbound_pokes as o + LEFT JOIN device_lists_outbound_last_success as s + USING (destination, user_id) + WHERE destination = ? AND o.stream_id <= ? GROUP BY user_id - HAVING count(*) > 1 """ txn.execute(sql, (destination, stream_id,)) rows = txn.fetchall() sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND stream_id < ? + UPDATE device_lists_outbound_last_success + SET stream_id = ? + WHERE destination = ? AND user_id = ? """ txn.executemany( - sql, ((destination, row[0], row[1],) for row in rows) + sql, ((row[1], destination, row[0],) for row in rows if row[2]) ) - # Mark everything that is left as sent sql = """ - UPDATE device_lists_outbound_pokes SET sent = ? + INSERT INTO device_lists_outbound_last_success + (destination, user_id, stream_id) VALUES (?, ?, ?) + """ + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows if not row[2]) + ) + + # Delete all sent outbound pokes + sql = """ + DELETE FROM device_lists_outbound_pokes WHERE destination = ? AND stream_id <= ? """ - txn.execute(sql, (True, destination, stream_id,)) + txn.execute(sql, (destination, stream_id,)) @defer.inlineCallbacks def get_user_whose_devices_changed(self, from_key): @@ -634,44 +641,3 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - - def _prune_old_outbound_device_pokes(self): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. We keep one entry per - (destination, user_id) tuple to ensure that the prev_ids remain correct - if the server does come back. - """ - yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 - - def _prune_txn(txn): - select_sql = """ - SELECT destination, user_id, max(stream_id) as stream_id - FROM device_lists_outbound_pokes - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? - """ - - txn.executemany( - delete_sql, - ( - (yesterday, row[0], row[1], row[2]) - for row in rows - ) - ) - - logger.info("Pruned %d device list outbound pokes", txn.rowcount) - - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn - ) diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql new file mode 100644 index 0000000000..9ab8c14fa3 --- /dev/null +++ b/synapse/storage/schema/delta/42/device_list_last_id.sql @@ -0,0 +1,33 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Table of last stream_id that we sent to destination for user_id. This is +-- used to fill out the `prev_id` fields of outbound device list updates. +CREATE TABLE device_lists_outbound_last_success ( + destination TEXT NOT NULL, + user_id TEXT NOT NULL, + stream_id BIGINT NOT NULL +); + +INSERT INTO device_lists_outbound_last_success + SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_pokes + WHERE sent = (1 = 1) -- sqlite doesn't have inbuilt boolean values + GROUP BY destination, user_id; + +CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success( + destination, user_id, stream_id +); -- cgit 1.5.1 From ebcd55d641345467d583836ce1f06ffe5d36f98d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:45:48 +0100 Subject: Add DB schema for tracking users who share rooms --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/43/user_share.sql | 32 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/43/user_share.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index eaba699e29..72b670b83b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 42 +SCHEMA_VERSION = 43 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql new file mode 100644 index 0000000000..f552b6eb7b --- /dev/null +++ b/synapse/storage/schema/delta/43/user_share.sql @@ -0,0 +1,32 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Table keeping track of who shares a room with who. We only keep track +-- of this for local users, so `user_id` is local users only (but we do keep track +-- of which remote users share a room) +CREATE TABLE users_who_share_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + share_private BOOLEAN NOT NULL -- is the shared room private? i.e. they share a private room +); + + +CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id); +CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id); + + +-- Make sure that we popualte the table initially +UPDATE user_directory_stream_pos SET stream_id = NULL; -- cgit 1.5.1 From d7fe6b356c5b74ffc5681f85a0d6100f4b4f2295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 12:36:28 +0100 Subject: Add shutdown room API --- synapse/handlers/federation.py | 4 ++ synapse/handlers/room_member.py | 5 ++ synapse/rest/client/v1/admin.py | 67 ++++++++++++++++++++++- synapse/storage/directory.py | 14 +++++ synapse/storage/room.py | 24 ++++++++ synapse/storage/schema/delta/43/blocked_rooms.sql | 21 +++++++ 6 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/43/blocked_rooms.sql (limited to 'synapse/storage/schema') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 39d2bee8da..f7ae369a1d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1068,6 +1068,10 @@ class FederationHandler(BaseHandler): """ event = pdu + is_blocked = yield self.store.is_room_blocked(event.room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1ca88517a2..c0d9c08367 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler): if not remote_room_hosts: remote_room_hosts = [] + if effective_membership_state not in ("leave", "ban",): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 29fcd72375..086f7a0984 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -15,8 +15,9 @@ from twisted.internet import defer +from synapse.api.constants import Membership from synapse.api.errors import AuthError, SynapseError -from synapse.types import UserID +from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns @@ -157,6 +158,69 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) +class ShutdownRoomRestServlet(ClientV1RestServlet): + """Shuts down a room by removing all local users from the room and blocking + all future invites and joins to the room. Any local aliases will be repointed + to a given room id. + """ + PATTERNS = client_path_patterns("/admin/shutdown_room/(?P[^/]+)") + + def __init__(self, hs): + super(ShutdownRoomRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.handlers = hs.get_handlers() + self.state = hs.get_state_handler() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + content = parse_json_object_from_request(request) + + repoint_aliases_to_room_id = content.get("repoint_aliases_to_room_id") + if not repoint_aliases_to_room_id: + raise SynapseError(400, "Please provide field `repoint_aliases_to_room_id`") + + requester_user_id = requester.user.to_string() + + logger.info("Shutting down room %r", room_id) + + yield self.store.block_room(room_id, requester_user_id) + + users = yield self.state.get_current_user_in_room(room_id) + kicked_users = [] + for user_id in users: + if not self.hs.is_mine_id(user_id): + continue + + logger.info("Kicking %r from %r...", user_id, room_id) + + target_requester = create_requester(user_id) + yield self.handlers.room_member_handler.update_membership( + requester=target_requester, + target=target_requester.user, + room_id=room_id, + action=Membership.LEAVE, + content={}, + ) + + kicked_users.append(user_id) + + aliases_for_room = yield self.store.get_aliases_for_room(room_id) + + yield self.store.update_aliases_for_room( + room_id, repoint_aliases_to_room_id, requester_user_id + ) + + defer.returnValue((200, { + "kicked_users": kicked_users, + "local_aliases": aliases_for_room, + })) + + class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. This need a user have a administrator access in Synapse. @@ -353,3 +417,4 @@ def register_servlets(hs, http_server): ResetPasswordRestServlet(hs).register(http_server) GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) + ShutdownRoomRestServlet(hs).register(http_server) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 9caaf81f2c..79e7c540ad 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -170,3 +170,17 @@ class DirectoryStore(SQLBaseStore): "room_alias", desc="get_aliases_for_room", ) + + def update_aliases_for_room(self, old_room_id, new_room_id, creator): + def _update_aliases_for_room_txn(txn): + sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?" + txn.execute(sql, (new_room_id, creator, old_room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (old_room_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (new_room_id,) + ) + return self.runInteraction( + "_update_aliases_for_room_txn", _update_aliases_for_room_txn + ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 5d543652bb..07366f66b6 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -507,3 +507,27 @@ class RoomStore(SQLBaseStore): )) else: defer.returnValue(None) + + @cached(max_entries=10000) + def is_room_blocked(self, room_id): + return self._simple_select_one_onecol( + table="blocked_rooms", + keyvalues={ + "room_id": room_id, + }, + retcol="1", + allow_none=True, + desc="is_room_blocked", + ) + + @defer.inlineCallbacks + def block_room(self, room_id, user_id): + yield self._simple_insert( + table="blocked_rooms", + values={ + "room_id": room_id, + "user_id": user_id, + }, + desc="block_room", + ) + self.is_room_blocked.invalidate((room_id,)) diff --git a/synapse/storage/schema/delta/43/blocked_rooms.sql b/synapse/storage/schema/delta/43/blocked_rooms.sql new file mode 100644 index 0000000000..0e3cd143ff --- /dev/null +++ b/synapse/storage/schema/delta/43/blocked_rooms.sql @@ -0,0 +1,21 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE blocked_rooms ( + room_id TEXT NOT NULL, + user_id TEXT NOT NULL -- Admin who blocked the room +); + +CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id); -- cgit 1.5.1 From b8b936a6eab46cec2460fb723124bb3a750d3c83 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 17:39:21 +0100 Subject: Add API to quarantine media --- synapse/rest/client/v1/admin.py | 25 ++++++++ synapse/rest/media/v1/download_resource.py | 2 +- synapse/rest/media/v1/media_repository.py | 2 + synapse/rest/media/v1/thumbnail_resource.py | 4 +- synapse/storage/media_repository.py | 4 +- synapse/storage/room.py | 70 ++++++++++++++++++++++ .../storage/schema/delta/43/quarantine_media.sql | 17 ++++++ 7 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/43/quarantine_media.sql (limited to 'synapse/storage/schema') diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index aaa3dffb1b..7d786e8de3 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -270,6 +270,30 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): })) +class QuarantineMediaInRoom(ClientV1RestServlet): + """Quarantines all media in a room so that no one can download it via + this server. + """ + PATTERNS = client_path_patterns("/admin/quarantine_media/(?P[^/]+)") + + def __init__(self, hs): + super(QuarantineMediaInRoom, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + num_quarantined = yield self.store.quarantine_media_ids_in_room( + room_id, requester.user.to_string(), + ) + + defer.returnValue((200, {"num_quarantined": num_quarantined})) + + class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. This need a user have a administrator access in Synapse. @@ -467,3 +491,4 @@ def register_servlets(hs, http_server): GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) ShutdownRoomRestServlet(hs).register(http_server) + QuarantineMediaInRoom(hs).register(http_server) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 6788375e85..39a286b83c 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -66,7 +66,7 @@ class DownloadResource(Resource): @defer.inlineCallbacks def _respond_local_file(self, request, media_id, name): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index bae2b4c757..0718f75241 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -135,6 +135,8 @@ class MediaRepository(object): media_info = yield self._download_remote_file( server_name, media_id ) + elif media_info["quarantined_by"]: + raise NotFoundError() else: self.recently_accessed_remotes.add((server_name, media_id)) yield self.store.update_cached_last_access_time( diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index d8f54adc99..59b2c39b2f 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -81,7 +81,7 @@ class ThumbnailResource(Resource): method, m_type): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return @@ -117,7 +117,7 @@ class ThumbnailResource(Resource): desired_type): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 4c0f82353d..5f0f18ee66 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -30,7 +30,7 @@ class MediaRepositoryStore(SQLBaseStore): return self._simple_select_one( "local_media_repository", {"media_id": media_id}, - ("media_type", "media_length", "upload_name", "created_ts"), + ("media_type", "media_length", "upload_name", "created_ts", "quarantined_by"), allow_none=True, desc="get_local_media", ) @@ -138,7 +138,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_origin": origin, "media_id": media_id}, ( "media_type", "media_length", "upload_name", "created_ts", - "filesystem_id", + "filesystem_id", "quarantined_by", ), allow_none=True, desc="get_cached_remote_media", diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 07366f66b6..e9c1549c00 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -24,6 +24,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging import ujson as json +import re logger = logging.getLogger(__name__) @@ -531,3 +532,72 @@ class RoomStore(SQLBaseStore): desc="block_room", ) self.is_room_blocked.invalidate((room_id,)) + + def quarantine_media_ids_in_room(self, room_id, quarantined_by): + """For a room loops through all events with media and quarantines + the associated media + """ + def _get_media_ids_in_room(txn): + mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") + + next_token = self.get_current_events_token() + 1 + + total_media_quarantined = 0 + + while next_token: + sql = """ + SELECT stream_ordering, content FROM events + WHERE room_id = ? + AND stream_ordering < ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, next_token, True, False, 100)) + + next_token = None + local_media_mxcs = [] + remote_media_mxcs = [] + for stream_ordering, content_json in txn: + next_token = stream_ordering + content = json.loads(content_json) + + url = content.get("url") + if not url: + continue + + matches = mxc_re.match(url) + if matches: + hostname = matches.group(1) + media_id = matches.group(2) + if hostname == self.hostname: + local_media_mxcs.append(media_id) + else: + remote_media_mxcs.append((hostname, media_id)) + + # Now update all the tables to set the quarantined_by flag + + txn.executemany(""" + UPDATE local_media_repository + SET quarantined_by = ? + WHERE media_id = ? + """, ((quarantined_by, media_id) for media_id in local_media_mxcs)) + + txn.executemany( + """ + UPDATE remote_media_cache + SET quarantined_by = ? + WHERE media_origin AND media_id = ? + """, + ( + (quarantined_by, origin, media_id) + for origin, media_id in remote_media_mxcs + ) + ) + + total_media_quarantined += len(local_media_mxcs) + total_media_quarantined += len(remote_media_mxcs) + + return total_media_quarantined + + return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room) diff --git a/synapse/storage/schema/delta/43/quarantine_media.sql b/synapse/storage/schema/delta/43/quarantine_media.sql new file mode 100644 index 0000000000..630907ec4f --- /dev/null +++ b/synapse/storage/schema/delta/43/quarantine_media.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE local_media_repository ADD COLUMN quarantined_by TEXT; +ALTER TABLE remote_media_cache ADD COLUMN quarantined_by TEXT; -- cgit 1.5.1