diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..0ca6f6121f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,37 @@ class DataStore(
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+ return self.runInteraction("count_daily_users", self._count_users, yesterday,)
- def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
- sql = """
- SELECT COALESCE(count(*), 0) FROM (
- SELECT user_id FROM user_ips
- WHERE last_seen > ?
- GROUP BY user_id
- ) u
- """
-
- txn.execute(sql, (yesterday,))
- count, = txn.fetchone()
- return count
+ def count_monthly_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 30 days.
+ Note this method is intended for phonehome metrics only and is different
+ from the mau figure in synapse.storage.monthly_active_users which,
+ amongst other things, includes a 3 day grace period before a user counts.
+ """
+ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+ return self.runInteraction(
+ "count_monthly_users",
+ self._count_users,
+ thirty_days_ago,
+ )
- return self.runInteraction("count_users", _count_users)
+ def _count_users(self, txn, time_from):
+ """
+ Returns number of users seen in the past time_from period
+ """
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+ txn.execute(sql, (time_from,))
+ count, = txn.fetchone()
+ return count
def count_r30_users(self):
"""
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ae891aa332..941c07fce5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -299,12 +299,12 @@ class SQLBaseStore(object):
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
- database.
+ database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
- " WHERE account_validity.user_id is NULL;"
+ " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 9b0a99cb49..4ea0deea4f 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -138,6 +138,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
if not has_changed or last_stream_id == current_stream_id:
return defer.succeed(([], current_stream_id))
+ if limit <= 0:
+ # This can happen if we run out of room for EDUs in the transaction.
+ return defer.succeed(([], last_stream_id))
+
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..f631fb1733 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,39 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ # Counter of number of extremities to count
+ self._current_forward_extremities_amount = c_counter()
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -568,17 +595,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +661,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 5300720dbb..e3655ad8d7 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -80,6 +80,14 @@ class KeyStore(SQLBaseStore):
for row in txn:
server_name, key_id, key_bytes, ts_valid_until_ms = row
+
+ if ts_valid_until_ms is None:
+ # Old keys may be stored with a ts_valid_until_ms of null,
+ # in which case we treat this as if it was set to `0`, i.e.
+ # it won't match key requests that define a minimum
+ # `ts_valid_until_ms`.
+ ts_valid_until_ms = 0
+
res = FetchKeyResult(
verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
valid_until_ts=ts_valid_until_ms,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b41cbd757..d36917e4d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import re
from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+logger = logging.getLogger(__name__)
+
class RegistrationWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
@@ -249,6 +252,20 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def delete_account_validity_for_user(self, user_id):
+ """Deletes the entry for the given user in the account validity table, removing
+ their expiration date and renewal token.
+
+ Args:
+ user_id (str): ID of the user to remove from the account validity table.
+ """
+ yield self._simple_delete_one(
+ table="account_validity",
+ keyvalues={"user_id": user_id},
+ desc="delete_account_validity_for_user",
+ )
+
+ @defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
table="users",
@@ -598,12 +615,78 @@ class RegistrationStore(
"user_threepids_grandfather", self._bg_user_threepids_grandfather,
)
+ self.register_background_update_handler(
+ "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+ )
+
# Create a background job for culling expired 3PID validity tokens
hs.get_clock().looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
)
@defer.inlineCallbacks
+ def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+ """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+ for each of them.
+ """
+
+ last_user = progress.get("user_id", "")
+
+ def _backgroud_update_set_deactivated_flag_txn(txn):
+ txn.execute(
+ """
+ SELECT
+ users.name,
+ COUNT(access_tokens.token) AS count_tokens,
+ COUNT(user_threepids.address) AS count_threepids
+ FROM users
+ LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+ LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+ WHERE (users.password_hash IS NULL OR users.password_hash = '')
+ AND (users.appservice_id IS NULL OR users.appservice_id = '')
+ AND users.is_guest = 0
+ AND users.name > ?
+ GROUP BY users.name
+ ORDER BY users.name ASC
+ LIMIT ?;
+ """,
+ (last_user, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ rows_processed_nb = 0
+
+ for user in rows:
+ if not user["count_tokens"] and not user["count_threepids"]:
+ self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+ rows_processed_nb += 1
+
+ logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+ self._background_update_progress_txn(
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "users_set_deactivated_flag",
+ _backgroud_update_set_deactivated_flag_txn,
+ )
+
+ if end:
+ yield self._end_background_update("users_set_deactivated_flag")
+
+ defer.returnValue(batch_size)
+
+ @defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -998,7 +1081,7 @@ class RegistrationStore(
client_secret,
address=None,
sid=None,
- validated=None,
+ validated=True,
):
"""Gets a session_id and last_send_attempt (if available) for a
client_secret/medium/(address|session_id) combo
@@ -1268,3 +1351,50 @@ class RegistrationStore(
"delete_threepid_session",
delete_threepid_session_txn,
)
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,),
+ )
+
+ @defer.inlineCallbacks
+ def set_user_deactivated_status(self, user_id, deactivated):
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id (str): The ID of the user to set the status for.
+ deactivated (bool): The value to set for `deactivated`.
+ """
+
+ yield self.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id, deactivated,
+ )
+
+ @cachedInlineCallbacks()
+ def get_user_deactivated_status(self, user_id):
+ """Retrieve the value for the `deactivated` property for the provided user.
+
+ Args:
+ user_id (str): The ID of the user to retrieve the status for.
+
+ Returns:
+ defer.Deferred(bool): The requested value.
+ """
+
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="deactivated",
+ desc="get_user_deactivated_status",
+ )
+
+ # Convert the integer into a boolean.
+ defer.returnValue(res == 1)
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_set_deactivated_flag', '{}');
|