summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-27 14:13:14 +0000
committerErik Johnston <erik@matrix.org>2020-10-27 14:13:14 +0000
commitbcb6b243e9e6d0d7c8744e152ed447cbbb58b32e (patch)
tree7c173749c83a7738042bd17786ca8d9f5bf7a9ec /synapse/storage
parentMerge branch 'release-v1.22.0' into matrix-org-hotfixes (diff)
parentAdd admin API to list users' local media (#8647) (diff)
downloadsynapse-bcb6b243e9e6d0d7c8744e152ed447cbbb58b32e.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/__init__.py1
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py7
-rw-r--r--synapse/storage/databases/main/media_repository.py104
-rw-r--r--synapse/storage/databases/main/profile.py4
-rw-r--r--synapse/storage/databases/main/registration.py160
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql2
7 files changed, 287 insertions, 95 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 9b16f45f3e..43660ec4fb 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -146,7 +146,6 @@ class DataStore(
             db_conn, "e2e_cross_signing_keys", "stream_id"
         )
 
-        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
         self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
         self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 5e4af2eb51..97b6754846 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             where_clause="NOT have_censored",
         )
 
+        self.db_pool.updates.register_background_index_update(
+            "users_have_local_media",
+            index_name="users_have_local_media",
+            table="local_media_repository",
+            columns=["user_id", "created_ts"],
+        )
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index cc538c5c10..daf57675d8 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -93,6 +93,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
 
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
+        self.server_name = hs.hostname
 
     async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
         """Get the metadata for a local piece of media
@@ -115,6 +116,109 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_local_media",
         )
 
+    async def get_local_media_by_user_paginate(
+        self, start: int, limit: int, user_id: str
+    ) -> Tuple[List[Dict[str, Any]], int]:
+        """Get a paginated list of metadata for a local piece of media
+        which an user_id has uploaded
+
+        Args:
+            start: offset in the list
+            limit: maximum amount of media_ids to retrieve
+            user_id: fully-qualified user id
+        Returns:
+            A paginated list of all metadata of user's media,
+            plus the total count of all the user's media
+        """
+
+        def get_local_media_by_user_paginate_txn(txn):
+
+            args = [user_id]
+            sql = """
+                SELECT COUNT(*) as total_media
+                FROM local_media_repository
+                WHERE user_id = ?
+            """
+            txn.execute(sql, args)
+            count = txn.fetchone()[0]
+
+            sql = """
+                SELECT
+                    "media_id",
+                    "media_type",
+                    "media_length",
+                    "upload_name",
+                    "created_ts",
+                    "last_access_ts",
+                    "quarantined_by",
+                    "safe_from_quarantine"
+                FROM local_media_repository
+                WHERE user_id = ?
+                ORDER BY created_ts DESC, media_id DESC
+                LIMIT ? OFFSET ?
+            """
+
+            args += [limit, start]
+            txn.execute(sql, args)
+            media = self.db_pool.cursor_to_dict(txn)
+            return media, count
+
+        return await self.db_pool.runInteraction(
+            "get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn
+        )
+
+    async def get_local_media_before(
+        self, before_ts: int, size_gt: int, keep_profiles: bool,
+    ) -> Optional[List[str]]:
+
+        # to find files that have never been accessed (last_access_ts IS NULL)
+        # compare with `created_ts`
+        sql = """
+            SELECT media_id
+            FROM local_media_repository AS lmr
+            WHERE
+                ( last_access_ts < ?
+                OR ( created_ts < ? AND last_access_ts IS NULL ) )
+                AND media_length > ?
+        """
+
+        if keep_profiles:
+            sql_keep = """
+                AND (
+                    NOT EXISTS
+                        (SELECT 1
+                         FROM profiles
+                         WHERE profiles.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM groups
+                         WHERE groups.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM room_memberships
+                         WHERE room_memberships.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM user_directory
+                         WHERE user_directory.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM room_stats_state
+                         WHERE room_stats_state.avatar = '{media_prefix}' || lmr.media_id)
+                )
+            """.format(
+                media_prefix="mxc://%s/" % (self.server_name,),
+            )
+            sql += sql_keep
+
+        def _get_local_media_before_txn(txn):
+            txn.execute(sql, (before_ts, before_ts, size_gt))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_local_media_before", _get_local_media_before_txn
+        )
+
     async def store_local_media(
         self,
         media_id,
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index a6d1eb908a..0e25ca3d7a 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -39,7 +39,7 @@ class ProfileWorkerStore(SQLBaseStore):
             avatar_url=profile["avatar_url"], display_name=profile["displayname"]
         )
 
-    async def get_profile_displayname(self, user_localpart: str) -> str:
+    async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
@@ -47,7 +47,7 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_profile_displayname",
         )
 
-    async def get_profile_avatar_url(self, user_localpart: str) -> str:
+    async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 4c843b7679..e7b17a7385 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,29 +16,33 @@
 # limitations under the License.
 import logging
 import re
-from typing import Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.types import Connection, Cursor
+from synapse.storage.util.id_generators import IdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import UserID
 from synapse.util.caches.descriptors import cached
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
 
 logger = logging.getLogger(__name__)
 
 
-class RegistrationWorkerStore(SQLBaseStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+class RegistrationWorkerStore(CacheInvalidationWorkerStore):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self.config = hs.config
-        self.clock = hs.get_clock()
 
         # Note: we don't check this sequence for consistency as we'd have to
         # call `find_max_generated_user_id_localpart` each time, which is
@@ -55,7 +59,7 @@ class RegistrationWorkerStore(SQLBaseStore):
 
         # Create a background job for culling expired 3PID validity tokens
         if hs.config.run_background_tasks:
-            self.clock.looping_call(
+            self._clock.looping_call(
                 self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
             )
 
@@ -92,7 +96,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         if not info:
             return False
 
-        now = self.clock.time_msec()
+        now = self._clock.time_msec()
         trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
         is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
         return is_trial
@@ -236,13 +240,13 @@ class RegistrationWorkerStore(SQLBaseStore):
             desc="get_renewal_token_for_user",
         )
 
-    async def get_users_expiring_soon(self) -> List[Dict[str, int]]:
+    async def get_users_expiring_soon(self) -> List[Dict[str, Any]]:
         """Selects users whose account will expire in the [now, now + renew_at] time
         window (see configuration for account_validity for information on what renew_at
         refers to).
 
         Returns:
-            A list of dictionaries mapping user ID to expiration time (in milliseconds).
+            A list of dictionaries, each with a user ID and expiration time (in milliseconds).
         """
 
         def select_users_txn(txn, now_ms, renew_at):
@@ -257,7 +261,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         return await self.db_pool.runInteraction(
             "get_users_expiring_soon",
             select_users_txn,
-            self.clock.time_msec(),
+            self._clock.time_msec(),
             self.config.account_validity.renew_at,
         )
 
@@ -328,13 +332,17 @@ class RegistrationWorkerStore(SQLBaseStore):
         await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
 
     def _query_for_auth(self, txn, token):
-        sql = (
-            "SELECT users.name, users.is_guest, users.shadow_banned, access_tokens.id as token_id,"
-            " access_tokens.device_id, access_tokens.valid_until_ms"
-            " FROM users"
-            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
-            " WHERE token = ?"
-        )
+        sql = """
+            SELECT users.name,
+                users.is_guest,
+                users.shadow_banned,
+                access_tokens.id as token_id,
+                access_tokens.device_id,
+                access_tokens.valid_until_ms
+            FROM users
+            INNER JOIN access_tokens on users.name = access_tokens.user_id
+            WHERE token = ?
+        """
 
         txn.execute(sql, (token,))
         rows = self.db_pool.cursor_to_dict(txn)
@@ -803,7 +811,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         await self.db_pool.runInteraction(
             "cull_expired_threepid_validation_tokens",
             cull_expired_threepid_validation_tokens_txn,
-            self.clock.time_msec(),
+            self._clock.time_msec(),
         )
 
     @wrap_as_background_process("account_validity_set_expiration_dates")
@@ -890,10 +898,10 @@ class RegistrationWorkerStore(SQLBaseStore):
 
 
 class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
-        self.clock = hs.get_clock()
+        self._clock = hs.get_clock()
         self.config = hs.config
 
         self.db_pool.updates.register_background_index_update(
@@ -1016,13 +1024,56 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
 
         return 1
 
+    async def set_user_deactivated_status(
+        self, user_id: str, deactivated: bool
+    ) -> None:
+        """Set the `deactivated` property for the provided user to the provided value.
+
+        Args:
+            user_id: The ID of the user to set the status for.
+            deactivated: The value to set for `deactivated`.
+        """
+
+        await self.db_pool.runInteraction(
+            "set_user_deactivated_status",
+            self.set_user_deactivated_status_txn,
+            user_id,
+            deactivated,
+        )
+
+    def set_user_deactivated_status_txn(self, txn, user_id: str, deactivated: bool):
+        self.db_pool.simple_update_one_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            updatevalues={"deactivated": 1 if deactivated else 0},
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_deactivated_status, (user_id,)
+        )
+        txn.call_after(self.is_guest.invalidate, (user_id,))
+
+    @cached()
+    async def is_guest(self, user_id: str) -> bool:
+        res = await self.db_pool.simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="is_guest",
+            allow_none=True,
+            desc="is_guest",
+        )
+
+        return res if res else False
+
 
-class RegistrationStore(RegistrationBackgroundUpdateStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors
 
+        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
+
     async def add_access_token_to_user(
         self,
         user_id: str,
@@ -1138,19 +1189,19 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
     def _register_user(
         self,
         txn,
-        user_id,
-        password_hash,
-        was_guest,
-        make_guest,
-        appservice_id,
-        create_profile_with_displayname,
-        admin,
-        user_type,
-        shadow_banned,
+        user_id: str,
+        password_hash: Optional[str],
+        was_guest: bool,
+        make_guest: bool,
+        appservice_id: Optional[str],
+        create_profile_with_displayname: Optional[str],
+        admin: bool,
+        user_type: Optional[str],
+        shadow_banned: bool,
     ):
         user_id_obj = UserID.from_string(user_id)
 
-        now = int(self.clock.time())
+        now = int(self._clock.time())
 
         try:
             if was_guest:
@@ -1374,18 +1425,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         await self.db_pool.runInteraction("delete_access_token", f)
 
-    @cached()
-    async def is_guest(self, user_id: str) -> bool:
-        res = await self.db_pool.simple_select_one_onecol(
-            table="users",
-            keyvalues={"name": user_id},
-            retcol="is_guest",
-            allow_none=True,
-            desc="is_guest",
-        )
-
-        return res if res else False
-
     async def add_user_pending_deactivation(self, user_id: str) -> None:
         """
         Adds a user to the table of users who need to be parted from all the rooms they're
@@ -1479,7 +1518,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
-                updatevalues={"validated_at": self.clock.time_msec()},
+                updatevalues={"validated_at": self._clock.time_msec()},
             )
 
             return next_link
@@ -1547,35 +1586,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             start_or_continue_validation_session_txn,
         )
 
-    async def set_user_deactivated_status(
-        self, user_id: str, deactivated: bool
-    ) -> None:
-        """Set the `deactivated` property for the provided user to the provided value.
-
-        Args:
-            user_id: The ID of the user to set the status for.
-            deactivated: The value to set for `deactivated`.
-        """
-
-        await self.db_pool.runInteraction(
-            "set_user_deactivated_status",
-            self.set_user_deactivated_status_txn,
-            user_id,
-            deactivated,
-        )
-
-    def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
-        self.db_pool.simple_update_one_txn(
-            txn=txn,
-            table="users",
-            keyvalues={"name": user_id},
-            updatevalues={"deactivated": 1 if deactivated else 0},
-        )
-        self._invalidate_cache_and_stream(
-            txn, self.get_user_deactivated_status, (user_id,)
-        )
-        txn.call_after(self.is_guest.invalidate, (user_id,))
-
 
 def find_max_generated_user_id_localpart(cur: Cursor) -> int:
     """
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e83d961c20..dc0c4b5499 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1411,6 +1411,65 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             desc="add_event_report",
         )
 
+    async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
+        """Retrieve an event report
+
+        Args:
+            report_id: ID of reported event in database
+        Returns:
+            event_report: json list of information from event report
+        """
+
+        def _get_event_report_txn(txn, report_id):
+
+            sql = """
+                SELECT
+                    er.id,
+                    er.received_ts,
+                    er.room_id,
+                    er.event_id,
+                    er.user_id,
+                    er.content,
+                    events.sender,
+                    room_stats_state.canonical_alias,
+                    room_stats_state.name,
+                    event_json.json AS event_json
+                FROM event_reports AS er
+                LEFT JOIN events
+                    ON events.event_id = er.event_id
+                JOIN event_json
+                    ON event_json.event_id = er.event_id
+                JOIN room_stats_state
+                    ON room_stats_state.room_id = er.room_id
+                WHERE er.id = ?
+            """
+
+            txn.execute(sql, [report_id])
+            row = txn.fetchone()
+
+            if not row:
+                return None
+
+            event_report = {
+                "id": row[0],
+                "received_ts": row[1],
+                "room_id": row[2],
+                "event_id": row[3],
+                "user_id": row[4],
+                "score": db_to_json(row[5]).get("score"),
+                "reason": db_to_json(row[5]).get("reason"),
+                "sender": row[6],
+                "canonical_alias": row[7],
+                "name": row[8],
+                "event_json": db_to_json(row[9]),
+            }
+
+            return event_report
+
+        return await self.db_pool.runInteraction(
+            "get_event_report", _get_event_report_txn, report_id
+        )
+
     async def get_event_reports_paginate(
         self,
         start: int,
@@ -1468,18 +1527,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                     er.room_id,
                     er.event_id,
                     er.user_id,
-                    er.reason,
                     er.content,
                     events.sender,
-                    room_aliases.room_alias,
-                    event_json.json AS event_json
+                    room_stats_state.canonical_alias,
+                    room_stats_state.name
                 FROM event_reports AS er
-                LEFT JOIN room_aliases
-                    ON room_aliases.room_id = er.room_id
-                JOIN events
+                LEFT JOIN events
                     ON events.event_id = er.event_id
-                JOIN event_json
-                    ON event_json.event_id = er.event_id
+                JOIN room_stats_state
+                    ON room_stats_state.room_id = er.room_id
                 {where_clause}
                 ORDER BY er.received_ts {order}
                 LIMIT ?
@@ -1490,15 +1546,29 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
 
             args += [limit, start]
             txn.execute(sql, args)
-            event_reports = self.db_pool.cursor_to_dict(txn)
-
-            if count > 0:
-                for row in event_reports:
-                    try:
-                        row["content"] = db_to_json(row["content"])
-                        row["event_json"] = db_to_json(row["event_json"])
-                    except Exception:
-                        continue
+
+            event_reports = []
+            for row in txn:
+                try:
+                    s = db_to_json(row[5]).get("score")
+                    r = db_to_json(row[5]).get("reason")
+                except Exception:
+                    logger.error("Unable to parse json from event_reports: %s", row[0])
+                    continue
+                event_reports.append(
+                    {
+                        "id": row[0],
+                        "received_ts": row[1],
+                        "room_id": row[2],
+                        "event_id": row[3],
+                        "user_id": row[4],
+                        "score": s,
+                        "reason": r,
+                        "sender": row[6],
+                        "canonical_alias": row[7],
+                        "name": row[8],
+                    }
+                )
 
             return event_reports, count
 
diff --git a/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
new file mode 100644
index 0000000000..a2842687f1
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
@@ -0,0 +1,2 @@
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('users_have_local_media', '{}');
\ No newline at end of file