summary refs log tree commit diff
path: root/synapse/handlers/profile.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/profile.py')
-rw-r--r--synapse/handlers/profile.py75
1 files changed, 45 insertions, 30 deletions
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py

index 4f3198896e..adb9dc7c42 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py
@@ -15,11 +15,12 @@ # limitations under the License. import logging +import random from typing import List from signedjson.sign import sign_json -from twisted.internet import defer, reactor +from twisted.internet import reactor from synapse.api.errors import ( AuthError, @@ -73,36 +74,45 @@ class BaseProfileHandler(BaseHandler): ) if len(self.hs.config.replicate_user_profiles_to) > 0: - reactor.callWhenRunning(self._assign_profile_replication_batches) - reactor.callWhenRunning(self._replicate_profiles) + reactor.callWhenRunning(self._do_assign_profile_replication_batches) + reactor.callWhenRunning(self._start_replicate_profiles) # Add a looping call to replicate_profiles: this handles retries # if the replication is unsuccessful when the user updated their # profile. self.clock.looping_call( - self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL + self._start_replicate_profiles, self.PROFILE_REPLICATE_INTERVAL ) - @defer.inlineCallbacks - def _assign_profile_replication_batches(self): + def _do_assign_profile_replication_batches(self): + return run_as_background_process( + "_assign_profile_replication_batches", + self._assign_profile_replication_batches, + ) + + def _start_replicate_profiles(self): + return run_as_background_process( + "_replicate_profiles", self._replicate_profiles + ) + + async def _assign_profile_replication_batches(self): """If no profile replication has been done yet, allocate replication batch numbers to each profile to start the replication process. """ logger.info("Assigning profile batch numbers...") total = 0 while True: - assigned = yield self.store.assign_profile_batch() + assigned = await self.store.assign_profile_batch() total += assigned if assigned == 0: break logger.info("Assigned %d profile batch numbers", total) - @defer.inlineCallbacks - def _replicate_profiles(self): + async def _replicate_profiles(self): """If any profile data has been updated and not pushed to the replication targets, replicate it. """ - host_batches = yield self.store.get_replication_hosts() - latest_batch = yield self.store.get_latest_profile_replication_batch_number() + host_batches = await self.store.get_replication_hosts() + latest_batch = await self.store.get_latest_profile_replication_batch_number() if latest_batch is None: latest_batch = -1 for repl_host in self.hs.config.replicate_user_profiles_to: @@ -110,16 +120,15 @@ class BaseProfileHandler(BaseHandler): host_batches[repl_host] = -1 try: for i in range(host_batches[repl_host] + 1, latest_batch + 1): - yield self._replicate_host_profile_batch(repl_host, i) + await self._replicate_host_profile_batch(repl_host, i) except Exception: logger.exception( "Exception while replicating to %s: aborting for now", repl_host ) - @defer.inlineCallbacks - def _replicate_host_profile_batch(self, host, batchnum): + async def _replicate_host_profile_batch(self, host, batchnum): logger.info("Replicating profile batch %d to %s", batchnum, host) - batch_rows = yield self.store.get_profile_batch(batchnum) + batch_rows = await self.store.get_profile_batch(batchnum) batch = { UserID(r["user_id"], self.hs.hostname).to_string(): ( {"display_name": r["displayname"], "avatar_url": r["avatar_url"]} @@ -133,13 +142,11 @@ class BaseProfileHandler(BaseHandler): body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname} signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0]) try: - yield defer.ensureDeferred( - self.http_client.post_json_get_json(url, signed_body) - ) - yield defer.ensureDeferred( - self.store.update_replication_batch_for_host(host, batchnum) + await self.http_client.post_json_get_json(url, signed_body) + await self.store.update_replication_batch_for_host(host, batchnum) + logger.info( + "Successfully replicated profile batch %d to %s", batchnum, host ) - logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) except Exception: # This will get retried when the looping call next comes around logger.exception( @@ -292,8 +299,7 @@ class BaseProfileHandler(BaseHandler): # start a profile replication push run_in_background(self._replicate_profiles) - @defer.inlineCallbacks - def set_active( + async def set_active( self, users: List[UserID], active: bool, hide: bool, ): """ @@ -316,19 +322,16 @@ class BaseProfileHandler(BaseHandler): hide: Whether to hide the user (withold from replication). If False and active is False, user will have their profile erased - - Returns: - Deferred """ if len(self.replicate_user_profiles_to) > 0: cur_batchnum = ( - yield self.store.get_latest_profile_replication_batch_number() + await self.store.get_latest_profile_replication_batch_number() ) new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 else: new_batchnum = None - yield self.store.set_profiles_active(users, active, hide, new_batchnum) + await self.store.set_profiles_active(users, active, hide, new_batchnum) # start a profile replication push run_in_background(self._replicate_profiles) @@ -362,8 +365,14 @@ class BaseProfileHandler(BaseHandler): async def set_avatar_url( self, target_user, requester, new_avatar_url, by_admin=False ): - """target_user is the user whose avatar_url is to be changed; - auth_user is the user attempting to make this change.""" + """Set a new avatar URL for a user. + + Args: + target_user (UserID): the user whose avatar URL is to be changed. + requester (Requester): The user attempting to make this change. + new_avatar_url (str): The avatar URL to give this user. + by_admin (bool): Whether this change was made by an administrator. + """ if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this homeserver") @@ -484,6 +493,12 @@ class BaseProfileHandler(BaseHandler): await self.ratelimit(requester) + # Do not actually update the room state for shadow-banned users. + if requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + return + room_ids = await self.store.get_rooms_for_user(target_user.to_string()) for room_id in room_ids: