diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ae891aa332..941c07fce5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -299,12 +299,12 @@ class SQLBaseStore(object):
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
- database.
+ database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
- " WHERE account_validity.user_id is NULL;"
+ " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index eb329ebd8b..9d9b28de13 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -35,7 +35,7 @@ def _make_exclusive_regex(services_cache):
exclusive_user_regexes = [
regex.pattern
for service in services_cache
- for regex in service.get_exlusive_user_regexes()
+ for regex in service.get_exclusive_user_regexes()
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index aeec2f57c4..38524f2545 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,8 +19,11 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.storage.roommember import ProfileInfo
+from . import background_updates
from ._base import SQLBaseStore
+BATCH_SIZE = 100
+
class ProfileWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
@@ -61,6 +65,55 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_avatar_url",
)
+ def get_latest_profile_replication_batch_number(self):
+ def f(txn):
+ txn.execute("SELECT MAX(batch) as maxbatch FROM profiles")
+ rows = self.cursor_to_dict(txn)
+ return rows[0]['maxbatch']
+ return self.runInteraction(
+ "get_latest_profile_replication_batch_number", f,
+ )
+
+ def get_profile_batch(self, batchnum):
+ return self._simple_select_list(
+ table="profiles",
+ keyvalues={
+ "batch": batchnum,
+ },
+ retcols=("user_id", "displayname", "avatar_url", "active"),
+ desc="get_profile_batch",
+ )
+
+ def assign_profile_batch(self):
+ def f(txn):
+ sql = (
+ "UPDATE profiles SET batch = "
+ "(SELECT COALESCE(MAX(batch), -1) + 1 FROM profiles) "
+ "WHERE user_id in ("
+ " SELECT user_id FROM profiles WHERE batch is NULL limit ?"
+ ")"
+ )
+ txn.execute(sql, (BATCH_SIZE,))
+ return txn.rowcount
+ return self.runInteraction("assign_profile_batch", f)
+
+ def get_replication_hosts(self):
+ def f(txn):
+ txn.execute("SELECT host, last_synced_batch FROM profile_replication_status")
+ rows = self.cursor_to_dict(txn)
+ return {r['host']: r['last_synced_batch'] for r in rows}
+ return self.runInteraction("get_replication_hosts", f)
+
+ def update_replication_batch_for_host(self, host, last_synced_batch):
+ return self._simple_upsert(
+ table="profile_replication_status",
+ keyvalues={"host": host},
+ values={
+ "last_synced_batch": last_synced_batch,
+ },
+ desc="update_replication_batch_for_host",
+ )
+
def get_from_remote_profile_cache(self, user_id):
return self._simple_select_one(
table="remote_profile_cache",
@@ -70,29 +123,62 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_from_remote_profile_cache",
)
- def create_profile(self, user_localpart):
- return self._simple_insert(
- table="profiles", values={"user_id": user_localpart}, desc="create_profile"
- )
-
- def set_profile_displayname(self, user_localpart, new_displayname):
- return self._simple_update_one(
+ def set_profile_displayname(self, user_localpart, new_displayname, batchnum):
+ return self._simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
- updatevalues={"displayname": new_displayname},
+ values={
+ "displayname": new_displayname,
+ "batch": batchnum,
+ },
desc="set_profile_displayname",
+ lock=False # we can do this because user_id has a unique index
)
- def set_profile_avatar_url(self, user_localpart, new_avatar_url):
- return self._simple_update_one(
+ def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum):
+ return self._simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
- updatevalues={"avatar_url": new_avatar_url},
+ values={
+ "avatar_url": new_avatar_url,
+ "batch": batchnum,
+ },
desc="set_profile_avatar_url",
+ lock=False # we can do this because user_id has a unique index
+ )
+
+ def set_profile_active(self, user_localpart, active, hide, batchnum):
+ values = {
+ "active": int(active),
+ "batch": batchnum,
+ }
+ if not active and not hide:
+ # we are deactivating for real (not in hide mode)
+ # so clear the profile.
+ values["avatar_url"] = None
+ values["displayname"] = None
+ return self._simple_upsert(
+ table="profiles",
+ keyvalues={"user_id": user_localpart},
+ values=values,
+ desc="set_profile_active",
+ lock=False # we can do this because user_id has a unique index
)
-class ProfileStore(ProfileWorkerStore):
+class ProfileStore(ProfileWorkerStore, background_updates.BackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+
+ super(ProfileStore, self).__init__(db_conn, hs)
+
+ self.register_background_index_update(
+ "profile_replication_status_host_index",
+ index_name="profile_replication_status_idx",
+ table="profile_replication_status",
+ columns=["host"],
+ unique=True,
+ )
+
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1dd1182e82..d36917e4d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import re
from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+logger = logging.getLogger(__name__)
+
class RegistrationWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
@@ -249,6 +252,20 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def delete_account_validity_for_user(self, user_id):
+ """Deletes the entry for the given user in the account validity table, removing
+ their expiration date and renewal token.
+
+ Args:
+ user_id (str): ID of the user to remove from the account validity table.
+ """
+ yield self._simple_delete_one(
+ table="account_validity",
+ keyvalues={"user_id": user_id},
+ desc="delete_account_validity_for_user",
+ )
+
+ @defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
table="users",
@@ -598,12 +615,78 @@ class RegistrationStore(
"user_threepids_grandfather", self._bg_user_threepids_grandfather,
)
+ self.register_background_update_handler(
+ "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+ )
+
# Create a background job for culling expired 3PID validity tokens
hs.get_clock().looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
)
@defer.inlineCallbacks
+ def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+ """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+ for each of them.
+ """
+
+ last_user = progress.get("user_id", "")
+
+ def _backgroud_update_set_deactivated_flag_txn(txn):
+ txn.execute(
+ """
+ SELECT
+ users.name,
+ COUNT(access_tokens.token) AS count_tokens,
+ COUNT(user_threepids.address) AS count_threepids
+ FROM users
+ LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+ LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+ WHERE (users.password_hash IS NULL OR users.password_hash = '')
+ AND (users.appservice_id IS NULL OR users.appservice_id = '')
+ AND users.is_guest = 0
+ AND users.name > ?
+ GROUP BY users.name
+ ORDER BY users.name ASC
+ LIMIT ?;
+ """,
+ (last_user, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ rows_processed_nb = 0
+
+ for user in rows:
+ if not user["count_tokens"] and not user["count_threepids"]:
+ self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+ rows_processed_nb += 1
+
+ logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+ self._background_update_progress_txn(
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "users_set_deactivated_flag",
+ _backgroud_update_set_deactivated_flag_txn,
+ )
+
+ if end:
+ yield self._end_background_update("users_set_deactivated_flag")
+
+ defer.returnValue(batch_size)
+
+ @defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -1268,3 +1351,50 @@ class RegistrationStore(
"delete_threepid_session",
delete_threepid_session_txn,
)
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,),
+ )
+
+ @defer.inlineCallbacks
+ def set_user_deactivated_status(self, user_id, deactivated):
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id (str): The ID of the user to set the status for.
+ deactivated (bool): The value to set for `deactivated`.
+ """
+
+ yield self.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id, deactivated,
+ )
+
+ @cachedInlineCallbacks()
+ def get_user_deactivated_status(self, user_id):
+ """Retrieve the value for the `deactivated` property for the provided user.
+
+ Args:
+ user_id (str): The ID of the user to retrieve the status for.
+
+ Returns:
+ defer.Deferred(bool): The requested value.
+ """
+
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="deactivated",
+ desc="get_user_deactivated_status",
+ )
+
+ # Convert the integer into a boolean.
+ defer.returnValue(res == 1)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fe9d79d792..87854ae08c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -171,6 +171,24 @@ class RoomWorkerStore(SQLBaseStore):
desc="is_room_blocked",
)
+ @defer.inlineCallbacks
+ def is_room_published(self, room_id):
+ """Check whether a room has been published in the local public room
+ directory.
+
+ Args:
+ room_id (str)
+ Returns:
+ bool: Whether the room is currently published in the room directory
+ """
+ # Get room information
+ room_info = yield self.get_room(room_id)
+ if not room_info:
+ defer.returnValue(False)
+
+ # Check the is_public value
+ defer.returnValue(room_info.get("is_public", False))
+
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql
new file mode 100644
index 0000000000..e744c02fe8
--- /dev/null
+++ b/synapse/storage/schema/delta/48/profiles_batch.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Add a batch number to track changes to profiles and the
+ * order they're made in so we can replicate user profiles
+ * to other hosts as they change
+ */
+ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL;
+
+/*
+ * Index on the batch number so we can get profiles
+ * by their batch
+ */
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+
+/*
+ * A table to track what batch of user profiles has been
+ * synced to what profile replication target.
+ */
+CREATE TABLE profile_replication_status (
+ host TEXT NOT NULL,
+ last_synced_batch BIGINT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/50/profiles_deactivated_users.sql b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
new file mode 100644
index 0000000000..c8893ecbe8
--- /dev/null
+++ b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
@@ -0,0 +1,23 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * A flag saying whether the user owning the profile has been deactivated
+ * This really belongs on the users table, not here, but the users table
+ * stores users by their full user_id and profiles stores them by localpart,
+ * so we can't easily join between the two tables. Plus, the batch number
+ * realy ought to represent data in this table that has changed.
+ */
+ALTER TABLE profiles ADD COLUMN active SMALLINT DEFAULT 1 NOT NULL;
diff --git a/synapse/storage/schema/delta/55/profile_replication_status_index.sql b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
new file mode 100644
index 0000000000..18a0f7e10c
--- /dev/null
+++ b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('profile_replication_status_host_index', '{}');
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_set_deactivated_flag', '{}');
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres
index 098434356f..01a2b0e024 100644
--- a/synapse/storage/schema/full_schemas/54/full.sql.postgres
+++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres
@@ -667,10 +667,19 @@ CREATE TABLE presence_stream (
+CREATE TABLE profile_replication_status (
+ host text NOT NULL,
+ last_synced_batch bigint NOT NULL
+);
+
+
+
CREATE TABLE profiles (
user_id text NOT NULL,
displayname text,
- avatar_url text
+ avatar_url text,
+ batch bigint,
+ active smallint DEFAULT 1 NOT NULL
);
@@ -1842,6 +1851,10 @@ CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id);
+CREATE INDEX profiles_batch_idx ON profiles USING btree (batch);
+
+
+
CREATE INDEX public_room_index ON rooms USING btree (is_public);
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
index be9295e4c9..f1a71627f0 100644
--- a/synapse/storage/schema/full_schemas/54/full.sql.sqlite
+++ b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
@@ -6,7 +6,7 @@ CREATE TABLE presence_allow_inbound( observed_user_id TEXT NOT NULL, observer_us
CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, UNIQUE(name) );
CREATE TABLE access_tokens( id BIGINT PRIMARY KEY, user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, last_used BIGINT, UNIQUE(token) );
CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL );
-CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) );
+CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, batch BIGINT DEFAULT NULL, active SMALLINT DEFAULT 1 NOT NULL, UNIQUE(user_id) );
CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) );
CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER );
CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, UNIQUE (event_id) );
@@ -208,6 +208,8 @@ CREATE INDEX group_users_u_idx ON group_users(user_id);
CREATE INDEX group_invites_u_idx ON group_invites(user_id);
CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+CREATE TABLE profile_replication_status ( host TEXT NOT NULL, last_synced_batch BIGINT NOT NULL );
CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL );
CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
|