From 9b7c28283abf62378cecbde3523b5709448e4140 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 27 Oct 2020 15:12:31 +0100 Subject: Add admin API to list users' local media (#8647) Add admin API `GET /_synapse/admin/v1/users//media` to get information of users' uploaded files. --- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/users.py | 67 +++++++++++++++++++++- .../storage/databases/main/events_bg_updates.py | 7 +++ synapse/storage/databases/main/media_repository.py | 51 ++++++++++++++++ .../schema/delta/58/22users_have_local_media.sql | 2 + 5 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql (limited to 'synapse') diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index df14bdf26e..a79996cae1 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -53,6 +53,7 @@ from synapse.rest.admin.users import ( ResetPasswordRestServlet, SearchUsersRestServlet, UserAdminServlet, + UserMediaRestServlet, UserMembershipRestServlet, UserRegisterServlet, UserRestServletV2, @@ -218,6 +219,7 @@ def register_servlets(hs, http_server): SendServerNoticeServlet(hs).register(http_server) VersionServlet(hs).register(http_server) UserAdminServlet(hs).register(http_server) + UserMediaRestServlet(hs).register(http_server) UserMembershipRestServlet(hs).register(http_server) UserRestServletV2(hs).register(http_server) UsersRestServletV2(hs).register(http_server) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index e71d9b0e1c..933bb45346 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -16,6 +16,7 @@ import hashlib import hmac import logging from http import HTTPStatus +from typing import Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, NotFoundError, SynapseError @@ -27,13 +28,14 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) +from synapse.http.site import SynapseRequest from synapse.rest.admin._base import ( admin_patterns, assert_requester_is_admin, assert_user_is_admin, historical_admin_path_patterns, ) -from synapse.types import UserID +from synapse.types import JsonDict, UserID logger = logging.getLogger(__name__) @@ -709,3 +711,66 @@ class UserMembershipRestServlet(RestServlet): room_ids = await self.store.get_rooms_for_user(user_id) ret = {"joined_rooms": list(room_ids), "total": len(room_ids)} return 200, ret + + +class UserMediaRestServlet(RestServlet): + """ + Gets information about all uploaded local media for a specific `user_id`. + + Example: + http://localhost:8008/_synapse/admin/v1/users/ + @user:server/media + + Args: + The parameters `from` and `limit` are required for pagination. + By default, a `limit` of 100 is used. + Returns: + A list of media and an integer representing the total number of + media that exist given for this user + """ + + PATTERNS = admin_patterns("/users/(?P[^/]+)/media$") + + def __init__(self, hs): + self.is_mine = hs.is_mine + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self.auth, request) + + if not self.is_mine(UserID.from_string(user_id)): + raise SynapseError(400, "Can only lookup local users") + + user = await self.store.get_user_by_id(user_id) + if user is None: + raise NotFoundError("Unknown user") + + start = parse_integer(request, "from", default=0) + limit = parse_integer(request, "limit", default=100) + + if start < 0: + raise SynapseError( + 400, + "Query parameter from must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + if limit < 0: + raise SynapseError( + 400, + "Query parameter limit must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + media, total = await self.store.get_local_media_by_user_paginate( + start, limit, user_id + ) + + ret = {"media": media, "total": total} + if (start + limit) < total: + ret["next_token"] = start + len(media) + + return 200, ret diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 5e4af2eb51..97b6754846 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): where_clause="NOT have_censored", ) + self.db_pool.updates.register_background_index_update( + "users_have_local_media", + index_name="users_have_local_media", + table="local_media_repository", + columns=["user_id", "created_ts"], + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 7ef5f1bf2b..daf57675d8 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -116,6 +116,57 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="get_local_media", ) + async def get_local_media_by_user_paginate( + self, start: int, limit: int, user_id: str + ) -> Tuple[List[Dict[str, Any]], int]: + """Get a paginated list of metadata for a local piece of media + which an user_id has uploaded + + Args: + start: offset in the list + limit: maximum amount of media_ids to retrieve + user_id: fully-qualified user id + Returns: + A paginated list of all metadata of user's media, + plus the total count of all the user's media + """ + + def get_local_media_by_user_paginate_txn(txn): + + args = [user_id] + sql = """ + SELECT COUNT(*) as total_media + FROM local_media_repository + WHERE user_id = ? + """ + txn.execute(sql, args) + count = txn.fetchone()[0] + + sql = """ + SELECT + "media_id", + "media_type", + "media_length", + "upload_name", + "created_ts", + "last_access_ts", + "quarantined_by", + "safe_from_quarantine" + FROM local_media_repository + WHERE user_id = ? + ORDER BY created_ts DESC, media_id DESC + LIMIT ? OFFSET ? + """ + + args += [limit, start] + txn.execute(sql, args) + media = self.db_pool.cursor_to_dict(txn) + return media, count + + return await self.db_pool.runInteraction( + "get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn + ) + async def get_local_media_before( self, before_ts: int, size_gt: int, keep_profiles: bool, ) -> Optional[List[str]]: diff --git a/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql new file mode 100644 index 0000000000..a2842687f1 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql @@ -0,0 +1,2 @@ +INSERT INTO background_updates (update_name, progress_json) VALUES + ('users_have_local_media', '{}'); \ No newline at end of file -- cgit 1.5.1 From 0c7f9cb81fbfa4922f0c7b935374322a7fda4bee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 15:32:19 +0000 Subject: Don't unnecessarily start bg process while handling typing. (#8668) There's no point starting a background process when all its going to do is bail if federation isn't enabled. --- changelog.d/8668.misc | 1 + synapse/handlers/typing.py | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) create mode 100644 changelog.d/8668.misc (limited to 'synapse') diff --git a/changelog.d/8668.misc b/changelog.d/8668.misc new file mode 100644 index 0000000000..cf6023f783 --- /dev/null +++ b/changelog.d/8668.misc @@ -0,0 +1 @@ +Reduce number of OpenTracing spans started. diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8758066c74..e919a8f9ed 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -167,20 +167,25 @@ class FollowerTypingHandler: now_typing = set(row.user_ids) self._room_typing[row.room_id] = row.user_ids - run_as_background_process( - "_handle_change_in_typing", - self._handle_change_in_typing, - row.room_id, - prev_typing, - now_typing, - ) + if self.federation: + run_as_background_process( + "_send_changes_in_typing_to_remotes", + self._send_changes_in_typing_to_remotes, + row.room_id, + prev_typing, + now_typing, + ) - async def _handle_change_in_typing( + async def _send_changes_in_typing_to_remotes( self, room_id: str, prev_typing: Set[str], now_typing: Set[str] ): """Process a change in typing of a room from replication, sending EDUs for any local users. """ + + if not self.federation: + return + for user_id in now_typing - prev_typing: if self.is_mine_id(user_id): await self._push_remote(RoomMember(room_id, user_id), True) -- cgit 1.5.1 From 4215a3acd4a77cb3331144782d43f99635f3d0ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Oct 2020 17:37:08 +0000 Subject: Don't unnecessarily start bg process in replication sending loop. (#8670) --- changelog.d/8670.misc | 1 + synapse/replication/tcp/resource.py | 10 ++++++++++ 2 files changed, 11 insertions(+) create mode 100644 changelog.d/8670.misc (limited to 'synapse') diff --git a/changelog.d/8670.misc b/changelog.d/8670.misc new file mode 100644 index 0000000000..cf6023f783 --- /dev/null +++ b/changelog.d/8670.misc @@ -0,0 +1 @@ +Reduce number of OpenTracing spans started. diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 666c13fdb7..1d4ceac0f1 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -117,6 +117,16 @@ class ReplicationStreamer: stream.discard_updates_and_advance() return + # We check up front to see if anything has actually changed, as we get + # poked because of changes that happened on other instances. + if all( + stream.last_token == stream.current_token(self._instance_name) + for stream in self.streams + ): + return + + # If there are updates then we need to set this even if we're already + # looping, as the loop needs to know that he might need to loop again. self.pending_updates = True if self.is_looping: -- cgit 1.5.1 From a699c044b6598954ec74322923dd6ec698cd272d Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 27 Oct 2020 18:42:46 +0000 Subject: Abstract code for stripping room state into a separate method (#8671) This is a requirement for [knocking](https://github.com/matrix-org/synapse/pull/6739), and is abstracting some code that was originally used by the invite flow. I'm separating it out into this PR as it's a fairly contained change. For a bit of context: when you invite a user to a room, you send them [stripped state events](https://matrix.org/docs/spec/server_server/unstable#put-matrix-federation-v2-invite-roomid-eventid) as part of `invite_room_state`. This is so that their client can display useful information such as the room name and avatar. The same requirement applies to knocking, as it would be nice for clients to be able to display a list of rooms you've knocked on - room name and avatar included. The reason we're sending membership events down as well is in the case that you are invited to a room that does not have an avatar or name set. In that case, the client should use the displayname/avatar of the inviter. That information is located in the inviter's membership event. This is optional as knocks don't really have any user in the room to link up to. When you knock on a room, your knock is sent by you and inserted into the room. It wouldn't *really* make sense to show the avatar of a random user - plus it'd be a data leak. So I've opted not to send membership events to the client here. The UX on the client for when you knock on a room without a name/avatar is a separate problem. In essence this is just moving some inline code to a reusable store method. --- changelog.d/8671.misc | 1 + synapse/handlers/message.py | 35 ++++------------ synapse/storage/databases/main/events_worker.py | 54 ++++++++++++++++++++++++- 3 files changed, 61 insertions(+), 29 deletions(-) create mode 100644 changelog.d/8671.misc (limited to 'synapse') diff --git a/changelog.d/8671.misc b/changelog.d/8671.misc new file mode 100644 index 0000000000..bef8dc425a --- /dev/null +++ b/changelog.d/8671.misc @@ -0,0 +1 @@ +Abstract some invite-related code in preparation for landing knocking. \ No newline at end of file diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f1b4d35182..4ead75ec3a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1100,34 +1100,13 @@ class EventCreationHandler: if event.type == EventTypes.Member: if event.content["membership"] == Membership.INVITE: - - def is_inviter_member_event(e): - return e.type == EventTypes.Member and e.sender == event.sender - - current_state_ids = await context.get_current_state_ids() - - # We know this event is not an outlier, so this must be - # non-None. - assert current_state_ids is not None - - state_to_include_ids = [ - e_id - for k, e_id in current_state_ids.items() - if k[0] in self.room_invite_state_types - or k == (EventTypes.Member, event.sender) - ] - - state_to_include = await self.store.get_events(state_to_include_ids) - - event.unsigned["invite_room_state"] = [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for e in state_to_include.values() - ] + event.unsigned[ + "invite_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_invite_state_types, + membership_user_id=event.sender, + ) invitee = UserID.from_string(event.state_key) if not self.hs.is_mine(invitee): diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6e7f16f39c..cd1f31aa62 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -31,6 +31,7 @@ from synapse.api.room_versions import ( RoomVersions, ) from synapse.events import EventBase, make_event_from_dict +from synapse.events.snapshot import EventContext from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import ( @@ -44,7 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator -from synapse.types import Collection, get_domain_from_id +from synapse.types import Collection, JsonDict, get_domain_from_id from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter @@ -525,6 +526,57 @@ class EventsWorkerStore(SQLBaseStore): return event_map + async def get_stripped_room_state_from_event_context( + self, + context: EventContext, + state_types_to_include: List[EventTypes], + membership_user_id: Optional[str], + ) -> List[JsonDict]: + """ + Retrieve the stripped state from a room, given an event context to retrieve state + from as well as the state types to include. Optionally, include the membership + events from a specific user. + + "Stripped" state means that only the `type`, `state_key`, `content` and `sender` keys + are included from each state event. + + Args: + context: The event context to retrieve state of the room from. + state_types_to_include: The type of state events to include. + membership_user_id: An optional user ID to include the stripped membership state + events of. This is useful when generating the stripped state of a room for + invites. We want to send membership events of the inviter, so that the + invitee can display the inviter's profile information if the room lacks any. + + Returns: + A list of dictionaries, each representing a stripped state event from the room. + """ + current_state_ids = await context.get_current_state_ids() + + # We know this event is not an outlier, so this must be + # non-None. + assert current_state_ids is not None + + # The state to include + state_to_include_ids = [ + e_id + for k, e_id in current_state_ids.items() + if k[0] in state_types_to_include + or (membership_user_id and k == (EventTypes.Member, membership_user_id)) + ] + + state_to_include = await self.get_events(state_to_include_ids) + + return [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "sender": e.sender, + } + for e in state_to_include.values() + ] + def _do_fetch(self, conn): """Takes a database connection and waits for requests for events from the _event_fetch_list queue. -- cgit 1.5.1