summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py44
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/deviceinbox.py4
-rw-r--r--synapse/storage/events.py45
-rw-r--r--synapse/storage/keys.py8
-rw-r--r--synapse/storage/registration.py132
-rw-r--r--synapse/storage/schema/delta/55/users_alter_deactivated.sql19
7 files changed, 225 insertions, 31 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..0ca6f6121f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,37 @@ class DataStore(
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
+        yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+        return self.runInteraction("count_daily_users", self._count_users, yesterday,)
 
-        def _count_users(txn):
-            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
-            sql = """
-                SELECT COALESCE(count(*), 0) FROM (
-                    SELECT user_id FROM user_ips
-                    WHERE last_seen > ?
-                    GROUP BY user_id
-                ) u
-            """
-
-            txn.execute(sql, (yesterday,))
-            count, = txn.fetchone()
-            return count
+    def count_monthly_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 30 days.
+        Note this method is intended for phonehome metrics only and is different
+        from the mau figure in synapse.storage.monthly_active_users which,
+        amongst other things, includes a 3 day grace period before a user counts.
+        """
+        thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+        return self.runInteraction(
+            "count_monthly_users",
+            self._count_users,
+            thirty_days_ago,
+        )
 
-        return self.runInteraction("count_users", _count_users)
+    def _count_users(self, txn, time_from):
+        """
+        Returns number of users seen in the past time_from period
+        """
+        sql = """
+            SELECT COALESCE(count(*), 0) FROM (
+                SELECT user_id FROM user_ips
+                WHERE last_seen > ?
+                GROUP BY user_id
+            ) u
+        """
+        txn.execute(sql, (time_from,))
+        count, = txn.fetchone()
+        return count
 
     def count_r30_users(self):
         """
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ae891aa332..941c07fce5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -299,12 +299,12 @@ class SQLBaseStore(object):
 
         def select_users_with_no_expiration_date_txn(txn):
             """Retrieves the list of registered users with no expiration date from the
-            database.
+            database, filtering out deactivated users.
             """
             sql = (
                 "SELECT users.name FROM users"
                 " LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
-                " WHERE account_validity.user_id is NULL;"
+                " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
             )
             txn.execute(sql, [])
 
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 9b0a99cb49..4ea0deea4f 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -138,6 +138,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         if not has_changed or last_stream_id == current_stream_id:
             return defer.succeed(([], current_stream_id))
 
+        if limit <= 0:
+            # This can happen if we run out of room for EDUs in the transaction.
+            return defer.succeed(([], last_stream_id))
+
         def get_new_messages_for_remote_destination_txn(txn):
             sql = (
                 "SELECT stream_id, messages_json FROM device_federation_outbox"
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..f631fb1733 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
 
 import itertools
 import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
 from functools import wraps
 
 from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,39 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
 
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+        # Collect metrics on the number of forward extremities that exist.
+        # Counter of number of extremities to count
+        self._current_forward_extremities_amount = c_counter()
+
+        BucketCollector(
+            "synapse_forward_extremities",
+            lambda: self._current_forward_extremities_amount,
+            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+        )
+
+        # Read the extrems every 60 minutes
+        hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+    @defer.inlineCallbacks
+    def _read_forward_extremities(self):
+        def fetch(txn):
+            txn.execute(
+                """
+                select count(*) c from event_forward_extremities
+                group by room_id
+                """
+            )
+            return txn.fetchall()
+
+        res = yield self.runInteraction("read_forward_extremities", fetch)
+        self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
@@ -568,17 +595,11 @@ class EventsStore(
             )
 
             txn.execute(sql, batch)
-            results.extend(
-                r[0]
-                for r in txn
-                if not json.loads(r[1]).get("soft_failed")
-            )
+            results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_events_which_are_prevs",
-                _get_events_which_are_prevs_txn,
-                chunk,
+                "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
             )
 
         defer.returnValue(results)
@@ -640,9 +661,7 @@ class EventsStore(
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_prevs_before_rejected",
-                _get_prevs_before_rejected_txn,
-                chunk,
+                "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
             )
 
         defer.returnValue(existing_prevs)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 5300720dbb..e3655ad8d7 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -80,6 +80,14 @@ class KeyStore(SQLBaseStore):
 
             for row in txn:
                 server_name, key_id, key_bytes, ts_valid_until_ms = row
+
+                if ts_valid_until_ms is None:
+                    # Old keys may be stored with a ts_valid_until_ms of null,
+                    # in which case we treat this as if it was set to `0`, i.e.
+                    # it won't match key requests that define a minimum
+                    # `ts_valid_until_ms`.
+                    ts_valid_until_ms = 0
+
                 res = FetchKeyResult(
                     verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
                     valid_until_ts=ts_valid_until_ms,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b41cbd757..d36917e4d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import re
 
 from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
 
+logger = logging.getLogger(__name__)
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
@@ -249,6 +252,20 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
+    def delete_account_validity_for_user(self, user_id):
+        """Deletes the entry for the given user in the account validity table, removing
+        their expiration date and renewal token.
+
+        Args:
+            user_id (str): ID of the user to remove from the account validity table.
+        """
+        yield self._simple_delete_one(
+            table="account_validity",
+            keyvalues={"user_id": user_id},
+            desc="delete_account_validity_for_user",
+        )
+
+    @defer.inlineCallbacks
     def is_server_admin(self, user):
         res = yield self._simple_select_one_onecol(
             table="users",
@@ -598,12 +615,78 @@ class RegistrationStore(
             "user_threepids_grandfather", self._bg_user_threepids_grandfather,
         )
 
+        self.register_background_update_handler(
+            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+        )
+
         # Create a background job for culling expired 3PID validity tokens
         hs.get_clock().looping_call(
             self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
         )
 
     @defer.inlineCallbacks
+    def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+        """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+        for each of them.
+        """
+
+        last_user = progress.get("user_id", "")
+
+        def _backgroud_update_set_deactivated_flag_txn(txn):
+            txn.execute(
+                """
+                SELECT
+                    users.name,
+                    COUNT(access_tokens.token) AS count_tokens,
+                    COUNT(user_threepids.address) AS count_threepids
+                FROM users
+                    LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+                    LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+                WHERE (users.password_hash IS NULL OR users.password_hash = '')
+                AND (users.appservice_id IS NULL OR users.appservice_id = '')
+                AND users.is_guest = 0
+                AND users.name > ?
+                GROUP BY users.name
+                ORDER BY users.name ASC
+                LIMIT ?;
+                """,
+                (last_user, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            rows_processed_nb = 0
+
+            for user in rows:
+                if not user["count_tokens"] and not user["count_threepids"]:
+                    self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+                    rows_processed_nb += 1
+
+            logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+            self._background_update_progress_txn(
+                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "users_set_deactivated_flag",
+            _backgroud_update_set_deactivated_flag_txn,
+        )
+
+        if end:
+            yield self._end_background_update("users_set_deactivated_flag")
+
+        defer.returnValue(batch_size)
+
+    @defer.inlineCallbacks
     def add_access_token_to_user(self, user_id, token, device_id=None):
         """Adds an access token for the given user.
 
@@ -998,7 +1081,7 @@ class RegistrationStore(
         client_secret,
         address=None,
         sid=None,
-        validated=None,
+        validated=True,
     ):
         """Gets a session_id and last_send_attempt (if available) for a
         client_secret/medium/(address|session_id) combo
@@ -1268,3 +1351,50 @@ class RegistrationStore(
             "delete_threepid_session",
             delete_threepid_session_txn,
         )
+
+    def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+        self._simple_update_one_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            updatevalues={"deactivated": 1 if deactivated else 0},
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_deactivated_status, (user_id,),
+        )
+
+    @defer.inlineCallbacks
+    def set_user_deactivated_status(self, user_id, deactivated):
+        """Set the `deactivated` property for the provided user to the provided value.
+
+        Args:
+            user_id (str): The ID of the user to set the status for.
+            deactivated (bool): The value to set for `deactivated`.
+        """
+
+        yield self.runInteraction(
+            "set_user_deactivated_status",
+            self.set_user_deactivated_status_txn,
+            user_id, deactivated,
+        )
+
+    @cachedInlineCallbacks()
+    def get_user_deactivated_status(self, user_id):
+        """Retrieve the value for the `deactivated` property for the provided user.
+
+        Args:
+            user_id (str): The ID of the user to retrieve the status for.
+
+        Returns:
+            defer.Deferred(bool): The requested value.
+        """
+
+        res = yield self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="deactivated",
+            desc="get_user_deactivated_status",
+        )
+
+        # Convert the integer into a boolean.
+        defer.returnValue(res == 1)
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('users_set_deactivated_flag', '{}');