diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 755a52a50d..14b03490fa 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -96,12 +96,7 @@ class MessageHandler(object):
)
async def get_room_data(
- self,
- user_id: str,
- room_id: str,
- event_type: str,
- state_key: str,
- is_guest: bool,
+ self, user_id: str, room_id: str, event_type: str, state_key: str,
) -> dict:
""" Get data from a room.
@@ -110,11 +105,10 @@ class MessageHandler(object):
room_id
event_type
state_key
- is_guest
Returns:
The path data content.
Raises:
- SynapseError if something went wrong.
+ SynapseError or AuthError if the user is not in the room
"""
(
membership,
@@ -131,6 +125,16 @@ class MessageHandler(object):
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
+ else:
+ # check_user_in_room_or_world_readable, if it doesn't raise an AuthError, should
+ # only ever return a Membership.JOIN/LEAVE object
+ #
+ # Safeguard in case it returned something else
+ logger.error(
+ "Attempted to retrieve data from a room for a user that has never been in it. "
+ "This should not have happened."
+ )
+ raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
return data
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4f3198896e..adb9dc7c42 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,11 +15,12 @@
# limitations under the License.
import logging
+import random
from typing import List
from signedjson.sign import sign_json
-from twisted.internet import defer, reactor
+from twisted.internet import reactor
from synapse.api.errors import (
AuthError,
@@ -73,36 +74,45 @@ class BaseProfileHandler(BaseHandler):
)
if len(self.hs.config.replicate_user_profiles_to) > 0:
- reactor.callWhenRunning(self._assign_profile_replication_batches)
- reactor.callWhenRunning(self._replicate_profiles)
+ reactor.callWhenRunning(self._do_assign_profile_replication_batches)
+ reactor.callWhenRunning(self._start_replicate_profiles)
# Add a looping call to replicate_profiles: this handles retries
# if the replication is unsuccessful when the user updated their
# profile.
self.clock.looping_call(
- self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+ self._start_replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
)
- @defer.inlineCallbacks
- def _assign_profile_replication_batches(self):
+ def _do_assign_profile_replication_batches(self):
+ return run_as_background_process(
+ "_assign_profile_replication_batches",
+ self._assign_profile_replication_batches,
+ )
+
+ def _start_replicate_profiles(self):
+ return run_as_background_process(
+ "_replicate_profiles", self._replicate_profiles
+ )
+
+ async def _assign_profile_replication_batches(self):
"""If no profile replication has been done yet, allocate replication batch
numbers to each profile to start the replication process.
"""
logger.info("Assigning profile batch numbers...")
total = 0
while True:
- assigned = yield self.store.assign_profile_batch()
+ assigned = await self.store.assign_profile_batch()
total += assigned
if assigned == 0:
break
logger.info("Assigned %d profile batch numbers", total)
- @defer.inlineCallbacks
- def _replicate_profiles(self):
+ async def _replicate_profiles(self):
"""If any profile data has been updated and not pushed to the replication targets,
replicate it.
"""
- host_batches = yield self.store.get_replication_hosts()
- latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+ host_batches = await self.store.get_replication_hosts()
+ latest_batch = await self.store.get_latest_profile_replication_batch_number()
if latest_batch is None:
latest_batch = -1
for repl_host in self.hs.config.replicate_user_profiles_to:
@@ -110,16 +120,15 @@ class BaseProfileHandler(BaseHandler):
host_batches[repl_host] = -1
try:
for i in range(host_batches[repl_host] + 1, latest_batch + 1):
- yield self._replicate_host_profile_batch(repl_host, i)
+ await self._replicate_host_profile_batch(repl_host, i)
except Exception:
logger.exception(
"Exception while replicating to %s: aborting for now", repl_host
)
- @defer.inlineCallbacks
- def _replicate_host_profile_batch(self, host, batchnum):
+ async def _replicate_host_profile_batch(self, host, batchnum):
logger.info("Replicating profile batch %d to %s", batchnum, host)
- batch_rows = yield self.store.get_profile_batch(batchnum)
+ batch_rows = await self.store.get_profile_batch(batchnum)
batch = {
UserID(r["user_id"], self.hs.hostname).to_string(): (
{"display_name": r["displayname"], "avatar_url": r["avatar_url"]}
@@ -133,13 +142,11 @@ class BaseProfileHandler(BaseHandler):
body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname}
signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
try:
- yield defer.ensureDeferred(
- self.http_client.post_json_get_json(url, signed_body)
- )
- yield defer.ensureDeferred(
- self.store.update_replication_batch_for_host(host, batchnum)
+ await self.http_client.post_json_get_json(url, signed_body)
+ await self.store.update_replication_batch_for_host(host, batchnum)
+ logger.info(
+ "Successfully replicated profile batch %d to %s", batchnum, host
)
- logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
except Exception:
# This will get retried when the looping call next comes around
logger.exception(
@@ -292,8 +299,7 @@ class BaseProfileHandler(BaseHandler):
# start a profile replication push
run_in_background(self._replicate_profiles)
- @defer.inlineCallbacks
- def set_active(
+ async def set_active(
self, users: List[UserID], active: bool, hide: bool,
):
"""
@@ -316,19 +322,16 @@ class BaseProfileHandler(BaseHandler):
hide: Whether to hide the user (withold from replication). If
False and active is False, user will have their profile
erased
-
- Returns:
- Deferred
"""
if len(self.replicate_user_profiles_to) > 0:
cur_batchnum = (
- yield self.store.get_latest_profile_replication_batch_number()
+ await self.store.get_latest_profile_replication_batch_number()
)
new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
else:
new_batchnum = None
- yield self.store.set_profiles_active(users, active, hide, new_batchnum)
+ await self.store.set_profiles_active(users, active, hide, new_batchnum)
# start a profile replication push
run_in_background(self._replicate_profiles)
@@ -362,8 +365,14 @@ class BaseProfileHandler(BaseHandler):
async def set_avatar_url(
self, target_user, requester, new_avatar_url, by_admin=False
):
- """target_user is the user whose avatar_url is to be changed;
- auth_user is the user attempting to make this change."""
+ """Set a new avatar URL for a user.
+
+ Args:
+ target_user (UserID): the user whose avatar URL is to be changed.
+ requester (Requester): The user attempting to make this change.
+ new_avatar_url (str): The avatar URL to give this user.
+ by_admin (bool): Whether this change was made by an administrator.
+ """
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -484,6 +493,12 @@ class BaseProfileHandler(BaseHandler):
await self.ratelimit(requester)
+ # Do not actually update the room state for shadow-banned users.
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ return
+
room_ids = await self.store.get_rooms_for_user(target_user.to_string())
for room_id in room_ids:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8ee9b2063d..50f3756446 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -384,7 +384,7 @@ class RoomMemberHandler(object):
# later on.
content = dict(content)
- if not self.allow_per_room_profiles:
+ if not self.allow_per_room_profiles or requester.shadow_banned:
# Strip profile data, knowing that new profile data will be added to the
# event's content in event_creation_handler.create_event() using the target's
# global profile.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a86ac0150e..1d828bd7be 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -14,10 +14,11 @@
# limitations under the License.
import logging
+import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
-from synapse.api.errors import AuthError, SynapseError
+from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
@@ -227,9 +228,9 @@ class TypingWriterHandler(FollowerTypingHandler):
self._stopped_typing(member)
return
- async def started_typing(self, target_user, auth_user, room_id, timeout):
+ async def started_typing(self, target_user, requester, room_id, timeout):
target_user_id = target_user.to_string()
- auth_user_id = auth_user.to_string()
+ auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -237,6 +238,11 @@ class TypingWriterHandler(FollowerTypingHandler):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has started typing in %s", target_user_id, room_id)
@@ -256,9 +262,9 @@ class TypingWriterHandler(FollowerTypingHandler):
self._push_update(member=member, typing=True)
- async def stopped_typing(self, target_user, auth_user, room_id):
+ async def stopped_typing(self, target_user, requester, room_id):
target_user_id = target_user.to_string()
- auth_user_id = auth_user.to_string()
+ auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -266,6 +272,11 @@ class TypingWriterHandler(FollowerTypingHandler):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
+ if requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
|