diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a5fc6c5dbf..584f804986 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +16,11 @@
import logging
-from twisted.internet import defer
+from six.moves import range
+
+from signedjson.sign import sign_json
+
+from twisted.internet import defer, reactor
from synapse.api.errors import (
AuthError,
@@ -26,6 +31,7 @@ from synapse.api.errors import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
+from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -43,6 +49,8 @@ class BaseProfileHandler(BaseHandler):
subclass MasterProfileHandler
"""
+ PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
def __init__(self, hs):
super(BaseProfileHandler, self).__init__(hs)
@@ -53,6 +61,87 @@ class BaseProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
+ self.http_client = hs.get_simple_http_client()
+
+ self.max_avatar_size = hs.config.max_avatar_size
+ self.allowed_avatar_mimetypes = hs.config.allowed_avatar_mimetypes
+
+ if hs.config.worker_app is None:
+ self.clock.looping_call(
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ )
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ reactor.callWhenRunning(self._assign_profile_replication_batches)
+ reactor.callWhenRunning(self._replicate_profiles)
+ # Add a looping call to replicate_profiles: this handles retries
+ # if the replication is unsuccessful when the user updated their
+ # profile.
+ self.clock.looping_call(
+ self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+ )
+
+ @defer.inlineCallbacks
+ def _assign_profile_replication_batches(self):
+ """If no profile replication has been done yet, allocate replication batch
+ numbers to each profile to start the replication process.
+ """
+ logger.info("Assigning profile batch numbers...")
+ total = 0
+ while True:
+ assigned = yield self.store.assign_profile_batch()
+ total += assigned
+ if assigned == 0:
+ break
+ logger.info("Assigned %d profile batch numbers", total)
+
+ @defer.inlineCallbacks
+ def _replicate_profiles(self):
+ """If any profile data has been updated and not pushed to the replication targets,
+ replicate it.
+ """
+ host_batches = yield self.store.get_replication_hosts()
+ latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+ if latest_batch is None:
+ latest_batch = -1
+ for repl_host in self.hs.config.replicate_user_profiles_to:
+ if repl_host not in host_batches:
+ host_batches[repl_host] = -1
+ try:
+ for i in range(host_batches[repl_host] + 1, latest_batch + 1):
+ yield self._replicate_host_profile_batch(repl_host, i)
+ except Exception:
+ logger.exception(
+ "Exception while replicating to %s: aborting for now", repl_host,
+ )
+
+ @defer.inlineCallbacks
+ def _replicate_host_profile_batch(self, host, batchnum):
+ logger.info("Replicating profile batch %d to %s", batchnum, host)
+ batch_rows = yield self.store.get_profile_batch(batchnum)
+ batch = {
+ UserID(r["user_id"], self.hs.hostname).to_string(): ({
+ "display_name": r["displayname"],
+ "avatar_url": r["avatar_url"],
+ } if r["active"] else None) for r in batch_rows
+ }
+
+ url = "https://%s/_matrix/identity/api/v1/replicate_profiles" % (host,)
+ body = {
+ "batchnum": batchnum,
+ "batch": batch,
+ "origin_server": self.hs.hostname,
+ }
+ signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
+ try:
+ yield self.http_client.post_json_get_json(url, signed_body)
+ yield self.store.update_replication_batch_for_host(host, batchnum)
+ logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+ except Exception:
+ # This will get retried when the looping call next comes around
+ logger.exception("Failed to replicate profile batch %d to %s", batchnum, host)
+ raise
+
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -162,9 +251,14 @@ class BaseProfileHandler(BaseHandler):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
- if not by_admin and target_user != requester.user:
+ if not by_admin and requester and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
+ if not by_admin and self.hs.config.disable_set_displayname:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.display_name:
+ raise SynapseError(400, "Changing displayname is disabled on this server")
+
if len(new_displayname) > MAX_DISPLAYNAME_LEN:
raise SynapseError(
400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
@@ -173,8 +267,14 @@ class BaseProfileHandler(BaseHandler):
if new_displayname == '':
new_displayname = None
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
yield self.store.set_profile_displayname(
- target_user.localpart, new_displayname
+ target_user.localpart, new_displayname, new_batchnum
)
if self.hs.config.user_directory_search_all_users:
@@ -183,7 +283,37 @@ class BaseProfileHandler(BaseHandler):
target_user.to_string(), profile
)
- yield self._update_join_states(requester, target_user)
+ if requester:
+ yield self._update_join_states(requester, target_user)
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
+ @defer.inlineCallbacks
+ def set_active(self, target_user, active, hide):
+ """
+ Sets the 'active' flag on a user profile. If set to false, the user
+ account is considered deactivated or hidden.
+
+ If 'hide' is true, then we interpret active=False as a request to try to
+ hide the user rather than deactivating it. This means withholding the
+ profile from replication (and mark it as inactive) rather than clearing
+ the profile from the HS DB. Note that unlike set_displayname and
+ set_avatar_url, this does *not* perform authorization checks! This is
+ because the only place it's used currently is in account deactivation
+ where we've already done these checks anyway.
+ """
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+ yield self.store.set_profile_active(
+ target_user.localpart, active, hide, new_batchnum
+ )
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
@defer.inlineCallbacks
def get_avatar_url(self, target_user):
@@ -225,13 +355,53 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
+ if not by_admin and self.hs.config.disable_set_avatar_url:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.avatar_url:
+ raise SynapseError(400, "Changing avatar url is disabled on this server")
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
raise SynapseError(
400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
)
+ # Enforce a max avatar size if one is defined
+ if self.max_avatar_size or self.allowed_avatar_mimetypes:
+ media_id = self._validate_and_parse_media_id_from_avatar_url(new_avatar_url)
+
+ # Check that this media exists locally
+ media_info = yield self.store.get_local_media(media_id)
+ if not media_info:
+ raise SynapseError(
+ 400, "Unknown media id supplied", errcode=Codes.NOT_FOUND
+ )
+
+ # Ensure avatar does not exceed max allowed avatar size
+ media_size = media_info["media_length"]
+ if self.max_avatar_size and media_size > self.max_avatar_size:
+ raise SynapseError(
+ 400, "Avatars must be less than %s bytes in size" %
+ (self.max_avatar_size,), errcode=Codes.TOO_LARGE,
+ )
+
+ # Ensure the avatar's file type is allowed
+ if (
+ self.allowed_avatar_mimetypes
+ and media_info["media_type"] not in self.allowed_avatar_mimetypes
+ ):
+ raise SynapseError(
+ 400, "Avatar file type '%s' not allowed" %
+ media_info["media_type"],
+ )
+
yield self.store.set_profile_avatar_url(
- target_user.localpart, new_avatar_url
+ target_user.localpart, new_avatar_url, new_batchnum,
)
if self.hs.config.user_directory_search_all_users:
@@ -242,6 +412,23 @@ class BaseProfileHandler(BaseHandler):
yield self._update_join_states(requester, target_user)
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
+ def _validate_and_parse_media_id_from_avatar_url(self, mxc):
+ """Validate and parse a provided avatar url and return the local media id
+
+ Args:
+ mxc (str): A mxc URL
+
+ Returns:
+ str: The ID of the media
+ """
+ avatar_pieces = mxc.split("/")
+ if len(avatar_pieces) != 4 or avatar_pieces[0] != "mxc:":
+ raise SynapseError(400, "Invalid avatar URL '%s' supplied" % mxc)
+ return avatar_pieces[-1]
+
@defer.inlineCallbacks
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
@@ -300,7 +487,7 @@ class BaseProfileHandler(BaseHandler):
@defer.inlineCallbacks
def check_profile_query_allowed(self, target_user, requester=None):
"""Checks whether a profile query is allowed. If the
- 'require_auth_for_profile_requests' config flag is set to True and a
+ 'limit_profile_requests_to_known_users' config flag is set to True and a
'requester' is provided, the query is only allowed if the two users
share a room.
@@ -318,7 +505,11 @@ class BaseProfileHandler(BaseHandler):
# be None when this function is called outside of a profile query, e.g.
# when building a membership event. In this case, we must allow the
# lookup.
- if not self.hs.config.require_auth_for_profile_requests or not requester:
+ if not self.hs.config.limit_profile_requests_to_known_users or not requester:
+ return
+
+ # Always allow the user to query their own profile.
+ if target_user.to_string() == requester.to_string():
return
try:
|