diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 9b16f45f3e..43660ec4fb 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -146,7 +146,6 @@ class DataStore(
db_conn, "e2e_cross_signing_keys", "stream_id"
)
- self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 5e4af2eb51..97b6754846 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
where_clause="NOT have_censored",
)
+ self.db_pool.updates.register_background_index_update(
+ "users_have_local_media",
+ index_name="users_have_local_media",
+ table="local_media_repository",
+ columns=["user_id", "created_ts"],
+ )
+
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index cc538c5c10..daf57675d8 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -93,6 +93,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
+ self.server_name = hs.hostname
async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
"""Get the metadata for a local piece of media
@@ -115,6 +116,109 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="get_local_media",
)
+ async def get_local_media_by_user_paginate(
+ self, start: int, limit: int, user_id: str
+ ) -> Tuple[List[Dict[str, Any]], int]:
+ """Get a paginated list of metadata for a local piece of media
+ which an user_id has uploaded
+
+ Args:
+ start: offset in the list
+ limit: maximum amount of media_ids to retrieve
+ user_id: fully-qualified user id
+ Returns:
+ A paginated list of all metadata of user's media,
+ plus the total count of all the user's media
+ """
+
+ def get_local_media_by_user_paginate_txn(txn):
+
+ args = [user_id]
+ sql = """
+ SELECT COUNT(*) as total_media
+ FROM local_media_repository
+ WHERE user_id = ?
+ """
+ txn.execute(sql, args)
+ count = txn.fetchone()[0]
+
+ sql = """
+ SELECT
+ "media_id",
+ "media_type",
+ "media_length",
+ "upload_name",
+ "created_ts",
+ "last_access_ts",
+ "quarantined_by",
+ "safe_from_quarantine"
+ FROM local_media_repository
+ WHERE user_id = ?
+ ORDER BY created_ts DESC, media_id DESC
+ LIMIT ? OFFSET ?
+ """
+
+ args += [limit, start]
+ txn.execute(sql, args)
+ media = self.db_pool.cursor_to_dict(txn)
+ return media, count
+
+ return await self.db_pool.runInteraction(
+ "get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn
+ )
+
+ async def get_local_media_before(
+ self, before_ts: int, size_gt: int, keep_profiles: bool,
+ ) -> Optional[List[str]]:
+
+ # to find files that have never been accessed (last_access_ts IS NULL)
+ # compare with `created_ts`
+ sql = """
+ SELECT media_id
+ FROM local_media_repository AS lmr
+ WHERE
+ ( last_access_ts < ?
+ OR ( created_ts < ? AND last_access_ts IS NULL ) )
+ AND media_length > ?
+ """
+
+ if keep_profiles:
+ sql_keep = """
+ AND (
+ NOT EXISTS
+ (SELECT 1
+ FROM profiles
+ WHERE profiles.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM groups
+ WHERE groups.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM room_memberships
+ WHERE room_memberships.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM user_directory
+ WHERE user_directory.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM room_stats_state
+ WHERE room_stats_state.avatar = '{media_prefix}' || lmr.media_id)
+ )
+ """.format(
+ media_prefix="mxc://%s/" % (self.server_name,),
+ )
+ sql += sql_keep
+
+ def _get_local_media_before_txn(txn):
+ txn.execute(sql, (before_ts, before_ts, size_gt))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_local_media_before", _get_local_media_before_txn
+ )
+
async def store_local_media(
self,
media_id,
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index a6d1eb908a..0e25ca3d7a 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -39,7 +39,7 @@ class ProfileWorkerStore(SQLBaseStore):
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)
- async def get_profile_displayname(self, user_localpart: str) -> str:
+ async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
@@ -47,7 +47,7 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_displayname",
)
- async def get_profile_avatar_url(self, user_localpart: str) -> str:
+ async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 4c843b7679..e7b17a7385 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,29 +16,33 @@
# limitations under the License.
import logging
import re
-from typing import Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.types import Connection, Cursor
+from synapse.storage.util.id_generators import IdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import UserID
from synapse.util.caches.descriptors import cached
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
logger = logging.getLogger(__name__)
-class RegistrationWorkerStore(SQLBaseStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+class RegistrationWorkerStore(CacheInvalidationWorkerStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
self.config = hs.config
- self.clock = hs.get_clock()
# Note: we don't check this sequence for consistency as we'd have to
# call `find_max_generated_user_id_localpart` each time, which is
@@ -55,7 +59,7 @@ class RegistrationWorkerStore(SQLBaseStore):
# Create a background job for culling expired 3PID validity tokens
if hs.config.run_background_tasks:
- self.clock.looping_call(
+ self._clock.looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
)
@@ -92,7 +96,7 @@ class RegistrationWorkerStore(SQLBaseStore):
if not info:
return False
- now = self.clock.time_msec()
+ now = self._clock.time_msec()
trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
return is_trial
@@ -236,13 +240,13 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="get_renewal_token_for_user",
)
- async def get_users_expiring_soon(self) -> List[Dict[str, int]]:
+ async def get_users_expiring_soon(self) -> List[Dict[str, Any]]:
"""Selects users whose account will expire in the [now, now + renew_at] time
window (see configuration for account_validity for information on what renew_at
refers to).
Returns:
- A list of dictionaries mapping user ID to expiration time (in milliseconds).
+ A list of dictionaries, each with a user ID and expiration time (in milliseconds).
"""
def select_users_txn(txn, now_ms, renew_at):
@@ -257,7 +261,7 @@ class RegistrationWorkerStore(SQLBaseStore):
return await self.db_pool.runInteraction(
"get_users_expiring_soon",
select_users_txn,
- self.clock.time_msec(),
+ self._clock.time_msec(),
self.config.account_validity.renew_at,
)
@@ -328,13 +332,17 @@ class RegistrationWorkerStore(SQLBaseStore):
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
def _query_for_auth(self, txn, token):
- sql = (
- "SELECT users.name, users.is_guest, users.shadow_banned, access_tokens.id as token_id,"
- " access_tokens.device_id, access_tokens.valid_until_ms"
- " FROM users"
- " INNER JOIN access_tokens on users.name = access_tokens.user_id"
- " WHERE token = ?"
- )
+ sql = """
+ SELECT users.name,
+ users.is_guest,
+ users.shadow_banned,
+ access_tokens.id as token_id,
+ access_tokens.device_id,
+ access_tokens.valid_until_ms
+ FROM users
+ INNER JOIN access_tokens on users.name = access_tokens.user_id
+ WHERE token = ?
+ """
txn.execute(sql, (token,))
rows = self.db_pool.cursor_to_dict(txn)
@@ -803,7 +811,7 @@ class RegistrationWorkerStore(SQLBaseStore):
await self.db_pool.runInteraction(
"cull_expired_threepid_validation_tokens",
cull_expired_threepid_validation_tokens_txn,
- self.clock.time_msec(),
+ self._clock.time_msec(),
)
@wrap_as_background_process("account_validity_set_expiration_dates")
@@ -890,10 +898,10 @@ class RegistrationWorkerStore(SQLBaseStore):
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
- self.clock = hs.get_clock()
+ self._clock = hs.get_clock()
self.config = hs.config
self.db_pool.updates.register_background_index_update(
@@ -1016,13 +1024,56 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return 1
+ async def set_user_deactivated_status(
+ self, user_id: str, deactivated: bool
+ ) -> None:
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id: The ID of the user to set the status for.
+ deactivated: The value to set for `deactivated`.
+ """
+
+ await self.db_pool.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id,
+ deactivated,
+ )
+
+ def set_user_deactivated_status_txn(self, txn, user_id: str, deactivated: bool):
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
+ txn.call_after(self.is_guest.invalidate, (user_id,))
+
+ @cached()
+ async def is_guest(self, user_id: str) -> bool:
+ res = await self.db_pool.simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="is_guest",
+ allow_none=True,
+ desc="is_guest",
+ )
+
+ return res if res else False
+
-class RegistrationStore(RegistrationBackgroundUpdateStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors
+ self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
+
async def add_access_token_to_user(
self,
user_id: str,
@@ -1138,19 +1189,19 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
def _register_user(
self,
txn,
- user_id,
- password_hash,
- was_guest,
- make_guest,
- appservice_id,
- create_profile_with_displayname,
- admin,
- user_type,
- shadow_banned,
+ user_id: str,
+ password_hash: Optional[str],
+ was_guest: bool,
+ make_guest: bool,
+ appservice_id: Optional[str],
+ create_profile_with_displayname: Optional[str],
+ admin: bool,
+ user_type: Optional[str],
+ shadow_banned: bool,
):
user_id_obj = UserID.from_string(user_id)
- now = int(self.clock.time())
+ now = int(self._clock.time())
try:
if was_guest:
@@ -1374,18 +1425,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
await self.db_pool.runInteraction("delete_access_token", f)
- @cached()
- async def is_guest(self, user_id: str) -> bool:
- res = await self.db_pool.simple_select_one_onecol(
- table="users",
- keyvalues={"name": user_id},
- retcol="is_guest",
- allow_none=True,
- desc="is_guest",
- )
-
- return res if res else False
-
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
@@ -1479,7 +1518,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
txn,
table="threepid_validation_session",
keyvalues={"session_id": session_id},
- updatevalues={"validated_at": self.clock.time_msec()},
+ updatevalues={"validated_at": self._clock.time_msec()},
)
return next_link
@@ -1547,35 +1586,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn,
)
- async def set_user_deactivated_status(
- self, user_id: str, deactivated: bool
- ) -> None:
- """Set the `deactivated` property for the provided user to the provided value.
-
- Args:
- user_id: The ID of the user to set the status for.
- deactivated: The value to set for `deactivated`.
- """
-
- await self.db_pool.runInteraction(
- "set_user_deactivated_status",
- self.set_user_deactivated_status_txn,
- user_id,
- deactivated,
- )
-
- def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"deactivated": 1 if deactivated else 0},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_deactivated_status, (user_id,)
- )
- txn.call_after(self.is_guest.invalidate, (user_id,))
-
def find_max_generated_user_id_localpart(cur: Cursor) -> int:
"""
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e83d961c20..dc0c4b5499 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1411,6 +1411,65 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
desc="add_event_report",
)
+ async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
+ """Retrieve an event report
+
+ Args:
+ report_id: ID of reported event in database
+ Returns:
+ event_report: json list of information from event report
+ """
+
+ def _get_event_report_txn(txn, report_id):
+
+ sql = """
+ SELECT
+ er.id,
+ er.received_ts,
+ er.room_id,
+ er.event_id,
+ er.user_id,
+ er.content,
+ events.sender,
+ room_stats_state.canonical_alias,
+ room_stats_state.name,
+ event_json.json AS event_json
+ FROM event_reports AS er
+ LEFT JOIN events
+ ON events.event_id = er.event_id
+ JOIN event_json
+ ON event_json.event_id = er.event_id
+ JOIN room_stats_state
+ ON room_stats_state.room_id = er.room_id
+ WHERE er.id = ?
+ """
+
+ txn.execute(sql, [report_id])
+ row = txn.fetchone()
+
+ if not row:
+ return None
+
+ event_report = {
+ "id": row[0],
+ "received_ts": row[1],
+ "room_id": row[2],
+ "event_id": row[3],
+ "user_id": row[4],
+ "score": db_to_json(row[5]).get("score"),
+ "reason": db_to_json(row[5]).get("reason"),
+ "sender": row[6],
+ "canonical_alias": row[7],
+ "name": row[8],
+ "event_json": db_to_json(row[9]),
+ }
+
+ return event_report
+
+ return await self.db_pool.runInteraction(
+ "get_event_report", _get_event_report_txn, report_id
+ )
+
async def get_event_reports_paginate(
self,
start: int,
@@ -1468,18 +1527,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
er.room_id,
er.event_id,
er.user_id,
- er.reason,
er.content,
events.sender,
- room_aliases.room_alias,
- event_json.json AS event_json
+ room_stats_state.canonical_alias,
+ room_stats_state.name
FROM event_reports AS er
- LEFT JOIN room_aliases
- ON room_aliases.room_id = er.room_id
- JOIN events
+ LEFT JOIN events
ON events.event_id = er.event_id
- JOIN event_json
- ON event_json.event_id = er.event_id
+ JOIN room_stats_state
+ ON room_stats_state.room_id = er.room_id
{where_clause}
ORDER BY er.received_ts {order}
LIMIT ?
@@ -1490,15 +1546,29 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
args += [limit, start]
txn.execute(sql, args)
- event_reports = self.db_pool.cursor_to_dict(txn)
-
- if count > 0:
- for row in event_reports:
- try:
- row["content"] = db_to_json(row["content"])
- row["event_json"] = db_to_json(row["event_json"])
- except Exception:
- continue
+
+ event_reports = []
+ for row in txn:
+ try:
+ s = db_to_json(row[5]).get("score")
+ r = db_to_json(row[5]).get("reason")
+ except Exception:
+ logger.error("Unable to parse json from event_reports: %s", row[0])
+ continue
+ event_reports.append(
+ {
+ "id": row[0],
+ "received_ts": row[1],
+ "room_id": row[2],
+ "event_id": row[3],
+ "user_id": row[4],
+ "score": s,
+ "reason": r,
+ "sender": row[6],
+ "canonical_alias": row[7],
+ "name": row[8],
+ }
+ )
return event_reports, count
diff --git a/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
new file mode 100644
index 0000000000..a2842687f1
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
@@ -0,0 +1,2 @@
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_have_local_media', '{}');
\ No newline at end of file
|