diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a5fc6c5dbf..5c493b8d63 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +16,11 @@
import logging
-from twisted.internet import defer
+from six.moves import range
+
+from signedjson.sign import sign_json
+
+from twisted.internet import defer, reactor
from synapse.api.errors import (
AuthError,
@@ -26,6 +31,7 @@ from synapse.api.errors import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
+from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -43,6 +49,8 @@ class BaseProfileHandler(BaseHandler):
subclass MasterProfileHandler
"""
+ PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
def __init__(self, hs):
super(BaseProfileHandler, self).__init__(hs)
@@ -53,6 +61,84 @@ class BaseProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
+ self.http_client = hs.get_simple_http_client()
+
+ if hs.config.worker_app is None:
+ self.clock.looping_call(
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ )
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ reactor.callWhenRunning(self._assign_profile_replication_batches)
+ reactor.callWhenRunning(self._replicate_profiles)
+ # Add a looping call to replicate_profiles: this handles retries
+ # if the replication is unsuccessful when the user updated their
+ # profile.
+ self.clock.looping_call(
+ self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+ )
+
+ @defer.inlineCallbacks
+ def _assign_profile_replication_batches(self):
+ """If no profile replication has been done yet, allocate replication batch
+ numbers to each profile to start the replication process.
+ """
+ logger.info("Assigning profile batch numbers...")
+ total = 0
+ while True:
+ assigned = yield self.store.assign_profile_batch()
+ total += assigned
+ if assigned == 0:
+ break
+ logger.info("Assigned %d profile batch numbers", total)
+
+ @defer.inlineCallbacks
+ def _replicate_profiles(self):
+ """If any profile data has been updated and not pushed to the replication targets,
+ replicate it.
+ """
+ host_batches = yield self.store.get_replication_hosts()
+ latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+ if latest_batch is None:
+ latest_batch = -1
+ for repl_host in self.hs.config.replicate_user_profiles_to:
+ if repl_host not in host_batches:
+ host_batches[repl_host] = -1
+ try:
+ for i in range(host_batches[repl_host] + 1, latest_batch + 1):
+ yield self._replicate_host_profile_batch(repl_host, i)
+ except Exception:
+ logger.exception(
+ "Exception while replicating to %s: aborting for now", repl_host,
+ )
+
+ @defer.inlineCallbacks
+ def _replicate_host_profile_batch(self, host, batchnum):
+ logger.info("Replicating profile batch %d to %s", batchnum, host)
+ batch_rows = yield self.store.get_profile_batch(batchnum)
+ batch = {
+ UserID(r["user_id"], self.hs.hostname).to_string(): ({
+ "display_name": r["displayname"],
+ "avatar_url": r["avatar_url"],
+ } if r["active"] else None) for r in batch_rows
+ }
+
+ url = "https://%s/_matrix/identity/api/v1/replicate_profiles" % (host,)
+ body = {
+ "batchnum": batchnum,
+ "batch": batch,
+ "origin_server": self.hs.hostname,
+ }
+ signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
+ try:
+ yield self.http_client.post_json_get_json(url, signed_body)
+ yield self.store.update_replication_batch_for_host(host, batchnum)
+ logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+ except Exception:
+ # This will get retried when the looping call next comes around
+ logger.exception("Failed to replicate profile batch %d to %s", batchnum, host)
+ raise
+
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -162,9 +248,14 @@ class BaseProfileHandler(BaseHandler):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
- if not by_admin and target_user != requester.user:
+ if not by_admin and requester and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
+ if not by_admin and self.hs.config.disable_set_displayname:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.display_name:
+ raise SynapseError(400, "Changing displayname is disabled on this server")
+
if len(new_displayname) > MAX_DISPLAYNAME_LEN:
raise SynapseError(
400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
@@ -173,8 +264,14 @@ class BaseProfileHandler(BaseHandler):
if new_displayname == '':
new_displayname = None
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
yield self.store.set_profile_displayname(
- target_user.localpart, new_displayname
+ target_user.localpart, new_displayname, new_batchnum
)
if self.hs.config.user_directory_search_all_users:
@@ -183,7 +280,37 @@ class BaseProfileHandler(BaseHandler):
target_user.to_string(), profile
)
- yield self._update_join_states(requester, target_user)
+ if requester:
+ yield self._update_join_states(requester, target_user)
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
+ @defer.inlineCallbacks
+ def set_active(self, target_user, active, hide):
+ """
+ Sets the 'active' flag on a user profile. If set to false, the user
+ account is considered deactivated or hidden.
+
+ If 'hide' is true, then we interpret active=False as a request to try to
+ hide the user rather than deactivating it. This means withholding the
+ profile from replication (and mark it as inactive) rather than clearing
+ the profile from the HS DB. Note that unlike set_displayname and
+ set_avatar_url, this does *not* perform authorization checks! This is
+ because the only place it's used currently is in account deactivation
+ where we've already done these checks anyway.
+ """
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+ yield self.store.set_profile_active(
+ target_user.localpart, active, hide, new_batchnum
+ )
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
@defer.inlineCallbacks
def get_avatar_url(self, target_user):
@@ -225,13 +352,24 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
+ if not by_admin and self.hs.config.disable_set_avatar_url:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.avatar_url:
+ raise SynapseError(400, "Changing avatar url is disabled on this server")
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = yield self.store.get_latest_profile_replication_batch_number()
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
raise SynapseError(
400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
)
yield self.store.set_profile_avatar_url(
- target_user.localpart, new_avatar_url
+ target_user.localpart, new_avatar_url, new_batchnum,
)
if self.hs.config.user_directory_search_all_users:
@@ -242,6 +380,9 @@ class BaseProfileHandler(BaseHandler):
yield self._update_join_states(requester, target_user)
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
@defer.inlineCallbacks
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
@@ -321,6 +462,10 @@ class BaseProfileHandler(BaseHandler):
if not self.hs.config.require_auth_for_profile_requests or not requester:
return
+ # Always allow the user to query their own profile.
+ if target_user.to_string() == requester.to_string():
+ return
+
try:
requester_rooms = yield self.store.get_rooms_for_user(
requester.to_string()
|