summary refs log tree commit diff
path: root/synapse/handlers/profile.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/profile.py')
-rw-r--r--synapse/handlers/profile.py219
1 files changed, 212 insertions, 7 deletions
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 8690f69d45..fb31711b29 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,8 +17,11 @@
 import logging
 
 from six import raise_from
+from six.moves import range
 
-from twisted.internet import defer
+from signedjson.sign import sign_json
+
+from twisted.internet import defer, reactor
 
 from synapse.api.errors import (
     AuthError,
@@ -27,6 +31,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
 
@@ -46,6 +51,8 @@ class BaseProfileHandler(BaseHandler):
     subclass MasterProfileHandler
     """
 
+    PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
     def __init__(self, hs):
         super(BaseProfileHandler, self).__init__(hs)
 
@@ -56,6 +63,87 @@ class BaseProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
+        self.http_client = hs.get_simple_http_client()
+
+        self.max_avatar_size = hs.config.max_avatar_size
+        self.allowed_avatar_mimetypes = hs.config.allowed_avatar_mimetypes
+
+        if hs.config.worker_app is None:
+            self.clock.looping_call(
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
+            )
+
+            if len(self.hs.config.replicate_user_profiles_to) > 0:
+                reactor.callWhenRunning(self._assign_profile_replication_batches)
+                reactor.callWhenRunning(self._replicate_profiles)
+                # Add a looping call to replicate_profiles: this handles retries
+                # if the replication is unsuccessful when the user updated their
+                # profile.
+                self.clock.looping_call(
+                    self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+                )
+
+    @defer.inlineCallbacks
+    def _assign_profile_replication_batches(self):
+        """If no profile replication has been done yet, allocate replication batch
+        numbers to each profile to start the replication process.
+        """
+        logger.info("Assigning profile batch numbers...")
+        total = 0
+        while True:
+            assigned = yield self.store.assign_profile_batch()
+            total += assigned
+            if assigned == 0:
+                break
+        logger.info("Assigned %d profile batch numbers", total)
+
+    @defer.inlineCallbacks
+    def _replicate_profiles(self):
+        """If any profile data has been updated and not pushed to the replication targets,
+        replicate it.
+        """
+        host_batches = yield self.store.get_replication_hosts()
+        latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+        if latest_batch is None:
+            latest_batch = -1
+        for repl_host in self.hs.config.replicate_user_profiles_to:
+            if repl_host not in host_batches:
+                host_batches[repl_host] = -1
+            try:
+                for i in range(host_batches[repl_host] + 1, latest_batch + 1):
+                    yield self._replicate_host_profile_batch(repl_host, i)
+            except Exception:
+                logger.exception(
+                    "Exception while replicating to %s: aborting for now", repl_host
+                )
+
+    @defer.inlineCallbacks
+    def _replicate_host_profile_batch(self, host, batchnum):
+        logger.info("Replicating profile batch %d to %s", batchnum, host)
+        batch_rows = yield self.store.get_profile_batch(batchnum)
+        batch = {
+            UserID(r["user_id"], self.hs.hostname).to_string(): (
+                {"display_name": r["displayname"], "avatar_url": r["avatar_url"]}
+                if r["active"]
+                else None
+            )
+            for r in batch_rows
+        }
+
+        url = "https://%s/_matrix/identity/api/v1/replicate_profiles" % (host,)
+        body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname}
+        signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
+        try:
+            yield self.http_client.post_json_get_json(url, signed_body)
+            yield self.store.update_replication_batch_for_host(host, batchnum)
+            logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+        except Exception:
+            # This will get retried when the looping call next comes around
+            logger.exception(
+                "Failed to replicate profile batch %d to %s", batchnum, host
+            )
+            raise
+
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
@@ -154,9 +242,16 @@ class BaseProfileHandler(BaseHandler):
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if not by_admin and target_user != requester.user:
+        if not by_admin and requester and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
+        if not by_admin and self.hs.config.disable_set_displayname:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            if profile.display_name:
+                raise SynapseError(
+                    400, "Changing displayname is disabled on this server"
+                )
+
         if len(new_displayname) > MAX_DISPLAYNAME_LEN:
             raise SynapseError(
                 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,)
@@ -165,7 +260,17 @@ class BaseProfileHandler(BaseHandler):
         if new_displayname == "":
             new_displayname = None
 
-        yield self.store.set_profile_displayname(target_user.localpart, new_displayname)
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+
+        yield self.store.set_profile_displayname(
+            target_user.localpart, new_displayname, new_batchnum
+        )
 
         if self.hs.config.user_directory_search_all_users:
             profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -173,7 +278,39 @@ class BaseProfileHandler(BaseHandler):
                 target_user.to_string(), profile
             )
 
-        yield self._update_join_states(requester, target_user)
+        if requester:
+            yield self._update_join_states(requester, target_user)
+
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
+
+    @defer.inlineCallbacks
+    def set_active(self, target_user, active, hide):
+        """
+        Sets the 'active' flag on a user profile. If set to false, the user
+        account is considered deactivated or hidden.
+
+        If 'hide' is true, then we interpret active=False as a request to try to
+        hide the user rather than deactivating it.  This means withholding the
+        profile from replication (and mark it as inactive) rather than clearing
+        the profile from the HS DB. Note that unlike set_displayname and
+        set_avatar_url, this does *not* perform authorization checks! This is
+        because the only place it's used currently is in account deactivation
+        where we've already done these checks anyway.
+        """
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+        yield self.store.set_profile_active(
+            target_user.localpart, active, hide, new_batchnum
+        )
+
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
 
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
@@ -212,12 +349,59 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
+        if not by_admin and self.hs.config.disable_set_avatar_url:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            if profile.avatar_url:
+                raise SynapseError(
+                    400, "Changing avatar url is disabled on this server"
+                )
+
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+
         if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
             raise SynapseError(
                 400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN,)
             )
 
-        yield self.store.set_profile_avatar_url(target_user.localpart, new_avatar_url)
+        # Enforce a max avatar size if one is defined
+        if self.max_avatar_size or self.allowed_avatar_mimetypes:
+            media_id = self._validate_and_parse_media_id_from_avatar_url(new_avatar_url)
+
+            # Check that this media exists locally
+            media_info = yield self.store.get_local_media(media_id)
+            if not media_info:
+                raise SynapseError(
+                    400, "Unknown media id supplied", errcode=Codes.NOT_FOUND
+                )
+
+            # Ensure avatar does not exceed max allowed avatar size
+            media_size = media_info["media_length"]
+            if self.max_avatar_size and media_size > self.max_avatar_size:
+                raise SynapseError(
+                    400,
+                    "Avatars must be less than %s bytes in size"
+                    % (self.max_avatar_size,),
+                    errcode=Codes.TOO_LARGE,
+                )
+
+            # Ensure the avatar's file type is allowed
+            if (
+                self.allowed_avatar_mimetypes
+                and media_info["media_type"] not in self.allowed_avatar_mimetypes
+            ):
+                raise SynapseError(
+                    400, "Avatar file type '%s' not allowed" % media_info["media_type"]
+                )
+
+        yield self.store.set_profile_avatar_url(
+            target_user.localpart, new_avatar_url, new_batchnum
+        )
 
         if self.hs.config.user_directory_search_all_users:
             profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -227,6 +411,23 @@ class BaseProfileHandler(BaseHandler):
 
         yield self._update_join_states(requester, target_user)
 
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
+
+    def _validate_and_parse_media_id_from_avatar_url(self, mxc):
+        """Validate and parse a provided avatar url and return the local media id
+
+        Args:
+            mxc (str): A mxc URL
+
+        Returns:
+            str: The ID of the media
+        """
+        avatar_pieces = mxc.split("/")
+        if len(avatar_pieces) != 4 or avatar_pieces[0] != "mxc:":
+            raise SynapseError(400, "Invalid avatar URL '%s' supplied" % mxc)
+        return avatar_pieces[-1]
+
     @defer.inlineCallbacks
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
@@ -282,7 +483,7 @@ class BaseProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def check_profile_query_allowed(self, target_user, requester=None):
         """Checks whether a profile query is allowed. If the
-        'require_auth_for_profile_requests' config flag is set to True and a
+        'limit_profile_requests_to_known_users' config flag is set to True and a
         'requester' is provided, the query is only allowed if the two users
         share a room.
 
@@ -300,7 +501,11 @@ class BaseProfileHandler(BaseHandler):
         # be None when this function is called outside of a profile query, e.g.
         # when building a membership event. In this case, we must allow the
         # lookup.
-        if not self.hs.config.require_auth_for_profile_requests or not requester:
+        if not self.hs.config.limit_profile_requests_to_known_users or not requester:
+            return
+
+        # Always allow the user to query their own profile.
+        if target_user.to_string() == requester.to_string():
             return
 
         # Always allow the user to query their own profile.