diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4f3198896e..adb9dc7c42 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,11 +15,12 @@
# limitations under the License.
import logging
+import random
from typing import List
from signedjson.sign import sign_json
-from twisted.internet import defer, reactor
+from twisted.internet import reactor
from synapse.api.errors import (
AuthError,
@@ -73,36 +74,45 @@ class BaseProfileHandler(BaseHandler):
)
if len(self.hs.config.replicate_user_profiles_to) > 0:
- reactor.callWhenRunning(self._assign_profile_replication_batches)
- reactor.callWhenRunning(self._replicate_profiles)
+ reactor.callWhenRunning(self._do_assign_profile_replication_batches)
+ reactor.callWhenRunning(self._start_replicate_profiles)
# Add a looping call to replicate_profiles: this handles retries
# if the replication is unsuccessful when the user updated their
# profile.
self.clock.looping_call(
- self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+ self._start_replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
)
- @defer.inlineCallbacks
- def _assign_profile_replication_batches(self):
+ def _do_assign_profile_replication_batches(self):
+ return run_as_background_process(
+ "_assign_profile_replication_batches",
+ self._assign_profile_replication_batches,
+ )
+
+ def _start_replicate_profiles(self):
+ return run_as_background_process(
+ "_replicate_profiles", self._replicate_profiles
+ )
+
+ async def _assign_profile_replication_batches(self):
"""If no profile replication has been done yet, allocate replication batch
numbers to each profile to start the replication process.
"""
logger.info("Assigning profile batch numbers...")
total = 0
while True:
- assigned = yield self.store.assign_profile_batch()
+ assigned = await self.store.assign_profile_batch()
total += assigned
if assigned == 0:
break
logger.info("Assigned %d profile batch numbers", total)
- @defer.inlineCallbacks
- def _replicate_profiles(self):
+ async def _replicate_profiles(self):
"""If any profile data has been updated and not pushed to the replication targets,
replicate it.
"""
- host_batches = yield self.store.get_replication_hosts()
- latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+ host_batches = await self.store.get_replication_hosts()
+ latest_batch = await self.store.get_latest_profile_replication_batch_number()
if latest_batch is None:
latest_batch = -1
for repl_host in self.hs.config.replicate_user_profiles_to:
@@ -110,16 +120,15 @@ class BaseProfileHandler(BaseHandler):
host_batches[repl_host] = -1
try:
for i in range(host_batches[repl_host] + 1, latest_batch + 1):
- yield self._replicate_host_profile_batch(repl_host, i)
+ await self._replicate_host_profile_batch(repl_host, i)
except Exception:
logger.exception(
"Exception while replicating to %s: aborting for now", repl_host
)
- @defer.inlineCallbacks
- def _replicate_host_profile_batch(self, host, batchnum):
+ async def _replicate_host_profile_batch(self, host, batchnum):
logger.info("Replicating profile batch %d to %s", batchnum, host)
- batch_rows = yield self.store.get_profile_batch(batchnum)
+ batch_rows = await self.store.get_profile_batch(batchnum)
batch = {
UserID(r["user_id"], self.hs.hostname).to_string(): (
{"display_name": r["displayname"], "avatar_url": r["avatar_url"]}
@@ -133,13 +142,11 @@ class BaseProfileHandler(BaseHandler):
body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname}
signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
try:
- yield defer.ensureDeferred(
- self.http_client.post_json_get_json(url, signed_body)
- )
- yield defer.ensureDeferred(
- self.store.update_replication_batch_for_host(host, batchnum)
+ await self.http_client.post_json_get_json(url, signed_body)
+ await self.store.update_replication_batch_for_host(host, batchnum)
+ logger.info(
+ "Successfully replicated profile batch %d to %s", batchnum, host
)
- logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
except Exception:
# This will get retried when the looping call next comes around
logger.exception(
@@ -292,8 +299,7 @@ class BaseProfileHandler(BaseHandler):
# start a profile replication push
run_in_background(self._replicate_profiles)
- @defer.inlineCallbacks
- def set_active(
+ async def set_active(
self, users: List[UserID], active: bool, hide: bool,
):
"""
@@ -316,19 +322,16 @@ class BaseProfileHandler(BaseHandler):
hide: Whether to hide the user (withold from replication). If
False and active is False, user will have their profile
erased
-
- Returns:
- Deferred
"""
if len(self.replicate_user_profiles_to) > 0:
cur_batchnum = (
- yield self.store.get_latest_profile_replication_batch_number()
+ await self.store.get_latest_profile_replication_batch_number()
)
new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
else:
new_batchnum = None
- yield self.store.set_profiles_active(users, active, hide, new_batchnum)
+ await self.store.set_profiles_active(users, active, hide, new_batchnum)
# start a profile replication push
run_in_background(self._replicate_profiles)
@@ -362,8 +365,14 @@ class BaseProfileHandler(BaseHandler):
async def set_avatar_url(
self, target_user, requester, new_avatar_url, by_admin=False
):
- """target_user is the user whose avatar_url is to be changed;
- auth_user is the user attempting to make this change."""
+ """Set a new avatar URL for a user.
+
+ Args:
+ target_user (UserID): the user whose avatar URL is to be changed.
+ requester (Requester): The user attempting to make this change.
+ new_avatar_url (str): The avatar URL to give this user.
+ by_admin (bool): Whether this change was made by an administrator.
+ """
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -484,6 +493,12 @@ class BaseProfileHandler(BaseHandler):
await self.ratelimit(requester)
+ # Do not actually update the room state for shadow-banned users.
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ return
+
room_ids = await self.store.get_rooms_for_user(target_user.to_string())
for room_id in room_ids:
|