From 4a53f3a3e8fb7fe9df052790858ca3f7dbff78f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jan 2018 10:52:32 +0000 Subject: Ensure media is in local cache before thumbnailing --- synapse/rest/media/v1/media_repository.py | 20 +++++++++++--------- synapse/rest/media/v1/media_storage.py | 27 +++++++++++++++++++++++++++ synapse/rest/media/v1/thumbnail_resource.py | 3 ++- 3 files changed, 40 insertions(+), 10 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 97c82c150e..578d7f07c5 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -445,8 +445,10 @@ class MediaRepository(object): @defer.inlineCallbacks def generate_local_exact_thumbnail(self, media_id, t_width, t_height, - t_method, t_type): - input_path = self.filepaths.local_media_filepath(media_id) + t_method, t_type, url_cache): + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + None, media_id, url_cache=url_cache, + )) thumbnailer = Thumbnailer(input_path) t_byte_source = yield make_deferred_yieldable(threads.deferToThread( @@ -459,6 +461,7 @@ class MediaRepository(object): file_info = FileInfo( server_name=None, file_id=media_id, + url_cache=url_cache, thumbnail=True, thumbnail_width=t_width, thumbnail_height=t_height, @@ -485,7 +488,9 @@ class MediaRepository(object): @defer.inlineCallbacks def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, t_width, t_height, t_method, t_type): - input_path = self.filepaths.remote_media_filepath(server_name, file_id) + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + server_name, file_id, url_cache=False, + )) thumbnailer = Thumbnailer(input_path) t_byte_source = yield make_deferred_yieldable(threads.deferToThread( @@ -543,12 +548,9 @@ class MediaRepository(object): if not requirements: return - if server_name: - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - elif url_cache: - input_path = self.filepaths.url_cache_filepath(media_id) - else: - input_path = self.filepaths.local_media_filepath(media_id) + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + server_name, file_id, url_cache=url_cache, + )) thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 001e84578e..d3f54594a2 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -15,6 +15,7 @@ from twisted.internet import defer, threads from twisted.protocols.basic import FileSender +from twisted.protocols.ftp import FileConsumer # This isn't FTP specific from ._base import Responder @@ -151,6 +152,32 @@ class MediaStorage(object): defer.returnValue(None) + @defer.inlineCallbacks + def ensure_media_is_in_local_cache(self, file_info): + """Ensures that the given file is in the local cache. Attempts to + download it from storage providers if it isn't. + + Args: + file_info (FileInfo) + + Returns: + Deferred[str]: Full path to local file + """ + path = self._file_info_to_path(file_info) + local_path = os.path.join(self.local_media_directory, path) + if os.path.exists(local_path): + defer.returnValue(local_path) + + for provider in self.storage_providers: + res = yield provider.fetch(path, file_info) + if res: + with res: + with open(local_path, "w") as f: + res.write_to_consumer(FileConsumer(f)) + defer.returnValue(local_path) + + raise Exception("file could not be found") + def _file_info_to_path(self, file_info): """Converts file_info into a relative path. diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 8c9653843b..49e4af514c 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -162,7 +162,8 @@ class ThumbnailResource(Resource): # Okay, so we generate one. file_path = yield self.media_repo.generate_local_exact_thumbnail( - media_id, desired_width, desired_height, desired_method, desired_type + media_id, desired_width, desired_height, desired_method, desired_type, + url_cache=media_info["url_cache"], ) if file_path: -- cgit 1.4.1 From 2cf6a7bc2068350d0701b156deb2bb3a6f0da88a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Jan 2018 16:56:23 +0000 Subject: Use better file consumer --- synapse/rest/media/v1/media_storage.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index d3f54594a2..5c3e4e5a65 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -15,10 +15,10 @@ from twisted.internet import defer, threads from twisted.protocols.basic import FileSender -from twisted.protocols.ftp import FileConsumer # This isn't FTP specific from ._base import Responder +from synapse.util.file_consumer import BackgroundFileConsumer from synapse.util.logcontext import make_deferred_yieldable import contextlib @@ -27,6 +27,7 @@ import logging import shutil import sys + logger = logging.getLogger(__name__) @@ -168,12 +169,17 @@ class MediaStorage(object): if os.path.exists(local_path): defer.returnValue(local_path) + dirname = os.path.dirname(local_path) + if not os.path.exists(dirname): + os.makedirs(dirname) + for provider in self.storage_providers: res = yield provider.fetch(path, file_info) if res: with res: - with open(local_path, "w") as f: - res.write_to_consumer(FileConsumer(f)) + consumer = BackgroundFileConsumer(open(local_path, "w")) + yield res.write_to_consumer(consumer) + yield consumer.wait() defer.returnValue(local_path) raise Exception("file could not be found") @@ -247,9 +253,8 @@ class FileResponder(Responder): def __init__(self, open_file): self.open_file = open_file - @defer.inlineCallbacks def write_to_consumer(self, consumer): - yield FileSender().beginFileTransfer(self.open_file, consumer) + return FileSender().beginFileTransfer(self.open_file, consumer) def __exit__(self, exc_type, exc_val, exc_tb): self.open_file.close() -- cgit 1.4.1 From 5552ed9a7fb1300142a7aebe7fc85b0bd2535bcf Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Sat, 20 Jan 2018 22:25:23 -0700 Subject: Add an admin route to get all the media in a room This is intended to be used by administrators to monitor the media that is passing through their server, if they wish. Signed-off-by: Travis Ralston --- synapse/rest/client/v1/admin.py | 22 +++++++ synapse/storage/room.py | 131 +++++++++++++++++++++++----------------- 2 files changed, 97 insertions(+), 56 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 5022808ea9..0615e5d807 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -289,6 +289,27 @@ class QuarantineMediaInRoom(ClientV1RestServlet): defer.returnValue((200, {"num_quarantined": num_quarantined})) +class ListMediaInRoom(ClientV1RestServlet): + """Lists all of the media in a given room. + """ + PATTERNS = client_path_patterns("/admin/room/(?P[^/]+)/media") + + def __init__(self, hs): + super(ListMediaInRoom, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id) + + defer.returnValue((200, {"local": local_mxcs, "remote": remote_mxcs})) + + class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. This needs user to have administrator access in Synapse. @@ -487,3 +508,4 @@ def register_servlets(hs, http_server): SearchUsersRestServlet(hs).register(http_server) ShutdownRoomRestServlet(hs).register(http_server) QuarantineMediaInRoom(hs).register(http_server) + ListMediaInRoom(hs).register(http_server) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 23688430b7..cd6899a4b5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -533,73 +533,92 @@ class RoomStore(SQLBaseStore): ) self.is_room_blocked.invalidate((room_id,)) + def get_media_mxcs_in_room(self, room_id): + def _get_media_ids_in_room(txn): + local_media_ids, remote_media_ids = self._get_media_ids_in_room(txn, room_id) + local_media_mxcs = [] + remote_media_mxcs = [] + + # Convert the IDs to MXC URIs + for media_id in local_media_ids: + local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + for hostname, media_id in remote_media_ids: + remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) + + return local_media_mxcs, remote_media_mxcs + return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room) + def quarantine_media_ids_in_room(self, room_id, quarantined_by): """For a room loops through all events with media and quarantines the associated media """ - def _get_media_ids_in_room(txn): - mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") + def _quarantine_media_in_room(txn): + local_media_mxcs, remote_media_mxcs = self._get_media_ids_in_room(txn, room_id) + total_media_quarantined = 0 - next_token = self.get_current_events_token() + 1 + # Now update all the tables to set the quarantined_by flag - total_media_quarantined = 0 + txn.executemany(""" + UPDATE local_media_repository + SET quarantined_by = ? + WHERE media_id = ? + """, ((quarantined_by, media_id) for media_id in local_media_mxcs)) - while next_token: - sql = """ - SELECT stream_ordering, content FROM events - WHERE room_id = ? - AND stream_ordering < ? - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? + txn.executemany( """ - txn.execute(sql, (room_id, next_token, True, False, 100)) - - next_token = None - local_media_mxcs = [] - remote_media_mxcs = [] - for stream_ordering, content_json in txn: - next_token = stream_ordering - content = json.loads(content_json) - - content_url = content.get("url") - thumbnail_url = content.get("info", {}).get("thumbnail_url") - - for url in (content_url, thumbnail_url): - if not url: - continue - matches = mxc_re.match(url) - if matches: - hostname = matches.group(1) - media_id = matches.group(2) - if hostname == self.hostname: - local_media_mxcs.append(media_id) - else: - remote_media_mxcs.append((hostname, media_id)) - - # Now update all the tables to set the quarantined_by flag - - txn.executemany(""" - UPDATE local_media_repository + UPDATE remote_media_cache SET quarantined_by = ? - WHERE media_id = ? - """, ((quarantined_by, media_id) for media_id in local_media_mxcs)) - - txn.executemany( - """ - UPDATE remote_media_cache - SET quarantined_by = ? - WHERE media_origin AND media_id = ? - """, - ( - (quarantined_by, origin, media_id) - for origin, media_id in remote_media_mxcs - ) + WHERE media_origin AND media_id = ? + """, + ( + (quarantined_by, origin, media_id) + for origin, media_id in remote_media_mxcs ) + ) - total_media_quarantined += len(local_media_mxcs) - total_media_quarantined += len(remote_media_mxcs) + total_media_quarantined += len(local_media_mxcs) + total_media_quarantined += len(remote_media_mxcs) return total_media_quarantined - return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room) + return self.runInteraction("quarantine_media_in_room", _quarantine_media_in_room) + + def _get_media_ids_in_room(self, txn, room_id): + mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") + + next_token = self.get_current_events_token() + 1 + local_media_mxcs = [] + remote_media_mxcs = [] + + while next_token: + sql = """ + SELECT stream_ordering, content FROM events + WHERE room_id = ? + AND stream_ordering < ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, next_token, True, False, 100)) + + next_token = None + for stream_ordering, content_json in txn: + next_token = stream_ordering + content = json.loads(content_json) + + content_url = content.get("url") + thumbnail_url = content.get("info", {}).get("thumbnail_url") + + for url in (content_url, thumbnail_url): + if not url: + continue + matches = mxc_re.match(url) + if matches: + hostname = matches.group(1) + media_id = matches.group(2) + if hostname == self.hostname: + local_media_mxcs.append(media_id) + else: + remote_media_mxcs.append((hostname, media_id)) + + return local_media_mxcs, remote_media_mxcs -- cgit 1.4.1 From d5352cbba80005fb226563211c9e7179edef2d65 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Feb 2018 00:35:18 +0000 Subject: Handle url_previews with no content-type avoid failing with an exception if the remote server doesn't give us a Content-Type header. Also, clean up the exception handling a bit. --- synapse/rest/media/v1/preview_url_resource.py | 55 +++++++++++++++++---------- 1 file changed, 34 insertions(+), 21 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 981f01e417..31fe7aa75c 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -12,6 +12,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import cgi +import datetime +import errno +import fnmatch +import itertools +import logging +import os +import re +import shutil +import sys +import traceback +import ujson as json +import urlparse from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -33,18 +46,6 @@ from synapse.http.server import ( from synapse.util.async import ObservableDeferred from synapse.util.stringutils import is_ascii -import os -import re -import fnmatch -import cgi -import ujson as json -import urlparse -import itertools -import datetime -import errno -import shutil - -import logging logger = logging.getLogger(__name__) @@ -286,17 +287,28 @@ class PreviewUrlResource(Resource): url_cache=True, ) - try: - with self.media_storage.store_into_file(file_info) as (f, fname, finish): + with self.media_storage.store_into_file(file_info) as (f, fname, finish): + try: logger.debug("Trying to get url '%s'" % url) length, headers, uri, code = yield self.client.get_file( url, output_stream=f, max_size=self.max_spider_size, ) + except Exception as e: # FIXME: pass through 404s and other error messages nicely + logger.warn("Error downloading %s: %r", url, e) + raise SynapseError( + 500, "Failed to download content: %s" % ( + traceback.format_exception_only(sys.exc_type, e), + ), + Codes.UNKNOWN, + ) + yield finish() - yield finish() - - media_type = headers["Content-Type"][0] + try: + if "Content-Type" in headers: + media_type = headers["Content-Type"][0] + else: + media_type = "application/octet-stream" time_now_ms = self.clock.time_msec() content_disposition = headers.get("Content-Disposition", None) @@ -336,10 +348,11 @@ class PreviewUrlResource(Resource): ) except Exception as e: - raise SynapseError( - 500, ("Failed to download content: %s" % e), - Codes.UNKNOWN - ) + logger.error("Error handling downloaded %s: %r", url, e) + # TODO: we really ought to delete the downloaded file in this + # case, since we won't have recorded it in the db, and will + # therefore not expire it. + raise defer.returnValue({ "media_type": media_type, -- cgit 1.4.1 From 3fa362502cc6c509bac65753954c313d307035e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jan 2018 16:52:07 +0000 Subject: Update places where we create events --- synapse/handlers/directory.py | 7 +++---- synapse/handlers/federation.py | 18 ++++++++---------- synapse/handlers/room.py | 10 ++++------ synapse/handlers/room_member.py | 20 +++++++++++--------- synapse/rest/client/v1/admin.py | 4 ++-- synapse/rest/client/v1/room.py | 16 +++++++++------- synapse/server.py | 5 +++++ 7 files changed, 42 insertions(+), 38 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index a0464ae5c0..8580ada60a 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -34,6 +34,7 @@ class DirectoryHandler(BaseHandler): self.state = hs.get_state_handler() self.appservice_handler = hs.get_application_service_handler() + self.event_creation_handler = hs.get_event_creation_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler): def send_room_alias_update_event(self, requester, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Aliases, @@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler): if not alias_event or alias_event.content.get("alias", "") != alias_str: return - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.CanonicalAlias, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8ee9434c9b..e6b9f5cf53 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,6 +75,7 @@ class FederationHandler(BaseHandler): self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() self.replication_layer.set_handler(self) @@ -1007,8 +1008,7 @@ class FederationHandler(BaseHandler): }) try: - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) except AuthError as e: @@ -1248,8 +1248,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) @@ -2120,8 +2119,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder ) @@ -2159,8 +2157,7 @@ class FederationHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) @@ -2210,8 +2207,9 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event(builder=builder) + event, context = yield self.event_creation_handler._create_new_client_event( + builder=builder, + ) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d1cc87a016..4ea5bf1bcf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -64,6 +64,7 @@ class RoomCreationHandler(BaseHandler): super(RoomCreationHandler, self).__init__(hs) self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): @@ -163,13 +164,11 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) - msg_handler = self.hs.get_handlers().message_handler room_member_handler = self.hs.get_handlers().room_member_handler yield self._send_events_for_new_room( requester, room_id, - msg_handler, room_member_handler, preset_config=preset_config, invite_list=invite_list, @@ -181,7 +180,7 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -194,7 +193,7 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -249,7 +248,6 @@ class RoomCreationHandler(BaseHandler): self, creator, # A Requester object. room_id, - msg_handler, room_member_handler, preset_config, invite_list, @@ -272,7 +270,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e6467cd1d..ab58beb0f5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -46,6 +46,7 @@ class RoomMemberHandler(BaseHandler): super(RoomMemberHandler, self).__init__(hs) self.profile_handler = hs.get_profile_handler() + self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") @@ -66,13 +67,12 @@ class RoomMemberHandler(BaseHandler): ): if content is None: content = {} - msg_handler = self.hs.get_handlers().message_handler content["membership"] = membership if requester.is_guest: content["kind"] = "guest" - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, { "type": EventTypes.Member, @@ -90,12 +90,14 @@ class RoomMemberHandler(BaseHandler): ) # Check if this event matches the previous membership event for the user. - duplicate = yield msg_handler.deduplicate_state_event(event, context) + duplicate = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if duplicate is not None: # Discard the new event since this membership change is a no-op. defer.returnValue(duplicate) - yield msg_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -394,8 +396,9 @@ class RoomMemberHandler(BaseHandler): else: requester = synapse.types.create_requester(target_user) - message_handler = self.hs.get_handlers().message_handler - prev_event = yield message_handler.deduplicate_state_event(event, context) + prev_event = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if prev_event is not None: return @@ -412,7 +415,7 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield message_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -644,8 +647,7 @@ class RoomMemberHandler(BaseHandler): ) ) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_hander.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 0615e5d807..f77f646670 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -171,6 +171,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): self.store = hs.get_datastore() self.handlers = hs.get_handlers() self.state = hs.get_state_handler() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def on_POST(self, request, room_id): @@ -203,8 +204,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - msg_handler = self.handlers.message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( room_creator_requester, { "type": "m.room.message", diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 867ec8602c..ad6534537a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -82,6 +82,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomStateEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_hander = hs.get_event_creation_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -162,15 +163,16 @@ class RoomStateEventRestServlet(ClientV1RestServlet): content=content, ) else: - msg_handler = self.handlers.message_handler - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, ) - yield msg_handler.send_nonmember_event(requester, event, context) + yield self.event_creation_hander.send_nonmember_event( + requester, event, context, + ) ret = {} if event: @@ -184,6 +186,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomSendEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_hander = hs.get_event_creation_handler() def register(self, http_server): # /rooms/$roomid/send/$event_type[/$txn_id] @@ -205,8 +208,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): if 'ts' in request.args and requester.app_service: event_dict['origin_server_ts'] = parse_integer(request, "ts", 0) - msg_handler = self.handlers.message_handler - event = yield msg_handler.create_and_send_nonmember_event( + event = yield self.event_creation_hander.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id, @@ -670,6 +672,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomRedactEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_handler = hs.get_event_creation_handler() def register(self, http_server): PATTERNS = ("/rooms/(?P[^/]*)/redact/(?P[^/]*)") @@ -680,8 +683,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) - msg_handler = self.handlers.message_handler - event = yield msg_handler.create_and_send_nonmember_event( + event = yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Redaction, diff --git a/synapse/server.py b/synapse/server.py index 3173aed1d0..fbd602d40e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -55,6 +55,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoryHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.profile import ProfileHandler +from synapse.handlers.message import EventCreationHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -118,6 +119,7 @@ class HomeServer(object): 'application_service_handler', 'device_message_handler', 'profile_handler', + 'event_creation_handler', 'deactivate_account_handler', 'set_password_handler', 'notifier', @@ -276,6 +278,9 @@ class HomeServer(object): def build_profile_handler(self): return ProfileHandler(self) + def build_event_creation_handler(self): + return EventCreationHandler(self) + def build_deactivate_account_handler(self): return DeactivateAccountHandler(self) -- cgit 1.4.1 From 3e1e69ccafbfdf8aa7c0cd06bc4eaf948a6bafdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 16:40:38 +0000 Subject: Update copyright --- synapse/handlers/federation.py | 1 + synapse/handlers/message.py | 2 +- synapse/handlers/room.py | 1 + synapse/handlers/room_member.py | 1 + synapse/rest/client/v1/admin.py | 1 + synapse/rest/client/v1/room.py | 1 + 6 files changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/rest') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 06d6c8425b..cba96111d1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e8e6a89a3c..1540721815 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2017 New Vector Ltd +# Copyright 2017 - 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4ea5bf1bcf..6ab020bf41 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab58beb0f5..37dc5e99ab 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index f77f646670..20c5c66632 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index ad6534537a..fbb2fc36e4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.4.1 From 5fa571a91b851d372e07217c091b7e3a9ef3d116 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Feb 2018 13:35:08 +0000 Subject: Tell storage providers about new file so they can upload --- synapse/rest/media/v1/media_storage.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/rest') diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index e8e8b3986d..3f8d4b9c22 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -70,6 +70,12 @@ class MediaStorage(object): _write_file_synchronously, source, fname, )) + # Tell the storage providers about the new file. They'll decide + # if they should upload it and whether to do so synchronously + # or not. + for provider in self.storage_providers: + yield provider.store_file(path, file_info) + defer.returnValue(fname) @contextlib.contextmanager -- cgit 1.4.1 From 74fcbf741b3a7b95b5cc44478050e8a40fb7dc46 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Feb 2018 18:44:52 +0000 Subject: delete_local_events for purge_history Add a flag which makes the purger delete local events --- docs/admin_api/purge_history_api.rst | 14 ++++++++++++-- synapse/handlers/message.py | 4 ++-- synapse/http/servlet.py | 18 +++++++++++++++--- synapse/rest/client/v1/admin.py | 11 ++++++++++- synapse/storage/events.py | 35 ++++++++++++++++++++++++++++------- 5 files changed, 67 insertions(+), 15 deletions(-) (limited to 'synapse/rest') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index 08b3306366..b4e5bd9d75 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -4,8 +4,6 @@ Purge History API The purge history API allows server admins to purge historic events from their database, reclaiming disk space. -**NB!** This will not delete local events (locally sent messages content etc) from the database, but will remove lots of the metadata about them and does dramatically reduce the on disk space usage - Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. @@ -15,3 +13,15 @@ The API is simply: ``POST /_matrix/client/r0/admin/purge_history//`` including an ``access_token`` of a server admin. + +By default, events sent by local users are not deleted, as they may represent +the only copies of this content in existence. (Events sent by remote users are +deleted, and room state data before the cutoff is always removed). + +To delete local events as well, set ``delete_local_events`` in the body: + +.. code:: json + + { + "delete_local_events": True, + } diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c7860bb05..276d1a7722 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -63,7 +63,7 @@ class MessageHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks - def purge_history(self, room_id, event_id): + def purge_history(self, room_id, event_id, delete_local_events=False): event = yield self.store.get_event(event_id) if event.room_id != room_id: @@ -72,7 +72,7 @@ class MessageHandler(BaseHandler): depth = event.depth with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth) + yield self.store.purge_history(room_id, depth, delete_local_events) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 71420e54db..ef8e62901b 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False, return default -def parse_json_value_from_request(request): +def parse_json_value_from_request(request, allow_empty_body=False): """Parse a JSON value from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into None Returns: The JSON value. @@ -165,6 +167,9 @@ def parse_json_value_from_request(request): except Exception: raise SynapseError(400, "Error reading JSON content.") + if not content_bytes and allow_empty_body: + return None + try: content = simplejson.loads(content_bytes) except Exception as e: @@ -174,17 +179,24 @@ def parse_json_value_from_request(request): return content -def parse_json_object_from_request(request): +def parse_json_object_from_request(request, allow_empty_body=False): """Parse a JSON object from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into an empty dict. Raises: SynapseError if the request body couldn't be decoded as JSON or if it wasn't a JSON object. """ - content = parse_json_value_from_request(request) + content = parse_json_value_from_request( + request, allow_empty_body=allow_empty_body, + ) + + if allow_empty_body and content is None: + return {} if type(content) != dict: message = "Content must be a JSON object." diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 5022808ea9..f954d2ea65 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -128,7 +128,16 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self.handlers.message_handler.purge_history(room_id, event_id) + body = parse_json_object_from_request(request, allow_empty_body=True) + + delete_local_events = bool( + body.get("delete_local_history", False) + ) + + yield self.handlers.message_handler.purge_history( + room_id, event_id, + delete_local_events=delete_local_events, + ) defer.returnValue((200, {})) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 24d9978304..11a2ff2d8a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2031,16 +2031,32 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def purge_history(self, room_id, topological_ordering): + def purge_history( + self, room_id, topological_ordering, delete_local_events, + ): """Deletes room history before a certain point + + Args: + room_id (str): + + topological_ordering (int): + minimum topo ordering to preserve + + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). """ return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, topological_ordering + self._purge_history_txn, room_id, topological_ordering, + delete_local_events, ) - def _purge_history_txn(self, txn, room_id, topological_ordering): + def _purge_history_txn( + self, txn, room_id, topological_ordering, delete_local_events, + ): # Tables that should be pruned: # event_auth # event_backward_extremities @@ -2093,11 +2109,14 @@ class EventsStore(SQLBaseStore): to_delete = [ (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) + if state_key is None and ( + delete_local_events or not self.hs.is_mine_id(event_id) + ) ] logger.info( - "[purge] found %i events before cutoff, of which %i are remote" - " non-state events to delete", len(event_rows), len(to_delete)) + "[purge] found %i events before cutoff, of which %i can be deleted", + len(event_rows), len(to_delete), + ) logger.info("[purge] Finding new backward extremities") @@ -2273,7 +2292,9 @@ class EventsStore(SQLBaseStore): " WHERE event_id = ?", [ (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) + if state_key is not None or ( + not delete_local_events and self.hs.is_mine_id(event_id) + ) ] ) -- cgit 1.4.1