diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 829f52eca1..6c46c995d2 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -20,6 +20,8 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List
+from twisted.internet import defer
+
from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -43,6 +45,8 @@ class AccountValidityHandler(object):
self.clock = self.hs.get_clock()
self._account_validity = self.hs.config.account_validity
+ self._show_users_in_user_directory = self.hs.config.show_users_in_user_directory
+ self.profile_handler = self.hs.get_profile_handler()
if self._account_validity.renew_by_email_enabled and load_jinja2_templates:
# Don't do email-specific configuration if renewal by email is disabled.
@@ -82,6 +86,9 @@ class AccountValidityHandler(object):
self.clock.looping_call(send_emails, 30 * 60 * 1000)
+ # Check every hour to remove expired users from the user directory
+ self.clock.looping_call(self._mark_expired_users_as_inactive, 60 * 60 * 1000)
+
async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity``
@@ -262,4 +269,27 @@ class AccountValidityHandler(object):
user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
)
+ # Check if renewed users should be reintroduced to the user directory
+ if self._show_users_in_user_directory:
+ # Show the user in the directory again by setting them to active
+ await self.profile_handler.set_active(
+ UserID.from_string(user_id), True, True
+ )
+
return expiration_ts
+
+ @defer.inlineCallbacks
+ def _mark_expired_users_as_inactive(self):
+ """Iterate over expired users. Mark them as inactive in order to hide them from the
+ user directory.
+
+ Returns:
+ Deferred
+ """
+ # Get expired users
+ expired_user_ids = yield self.store.get_expired_users()
+ expired_users = [UserID.from_string(user_id) for user_id in expired_user_ids]
+
+ # Mark each one as non-active
+ for user in expired_users:
+ yield self.profile_handler.set_active(user, False, True)
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 9205865231..f3c0aeceb6 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -58,8 +58,10 @@ class AdminHandler(BaseHandler):
ret = await self.store.get_user_by_id(user.to_string())
if ret:
profile = await self.store.get_profileinfo(user.localpart)
+ threepids = await self.store.user_get_threepids(user.to_string())
ret["displayname"] = profile.display_name
ret["avatar_url"] = profile.avatar_url
+ ret["threepids"] = threepids
return ret
async def export_user_data(self, user_id, writer):
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 54a71c49d2..48a88d3c2a 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -816,6 +816,14 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at):
+ # check if medium has a valid value
+ if medium not in ["email", "msisdn"]:
+ raise SynapseError(
+ code=400,
+ msg=("'%s' is not a valid value for 'medium'" % (medium,)),
+ errcode=Codes.INVALID_PARAM,
+ )
+
# 'Canonicalise' email addresses down to lower case.
# We've now moving towards the homeserver being the entity that
# is responsible for validating threepids used for resetting passwords
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 2afb390a92..f624c2a3f9 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -33,6 +33,7 @@ class DeactivateAccountHandler(BaseHandler):
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_handlers().identity_handler
+ self._profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
# Flag that indicates whether the process to part users from rooms is running
@@ -104,6 +105,9 @@ class DeactivateAccountHandler(BaseHandler):
await self.store.user_set_password_hash(user_id, None)
+ user = UserID.from_string(user_id)
+ await self._profile_handler.set_active(user, False, False)
+
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
await self.store.add_user_pending_deactivation(user_id)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a9bd431486..50cea3f378 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
+ SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
@@ -39,6 +40,8 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+MAX_DEVICE_DISPLAY_NAME_LEN = 100
+
class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
@@ -225,6 +228,22 @@ class DeviceWorkerHandler(BaseHandler):
return result
+ @defer.inlineCallbacks
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+ self_signing_key = yield self.store.get_e2e_cross_signing_key(
+ user_id, "self_signing"
+ )
+
+ return {
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ "master_key": master_key,
+ "self_signing_key": self_signing_key,
+ }
+
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs):
@@ -239,9 +258,6 @@ class DeviceHandler(DeviceWorkerHandler):
federation_registry.register_edu_handler(
"m.device_list_update", self.device_list_updater.incoming_device_list_update
)
- federation_registry.register_query_handler(
- "user_devices", self.on_federation_query_user_devices
- )
hs.get_distributor().observe("user_left_room", self.user_left_room)
@@ -391,9 +407,18 @@ class DeviceHandler(DeviceWorkerHandler):
defer.Deferred:
"""
+ # Reject a new displayname which is too long.
+ new_display_name = content.get("display_name")
+ if new_display_name and len(new_display_name) > MAX_DEVICE_DISPLAY_NAME_LEN:
+ raise SynapseError(
+ 400,
+ "Device display name is too long (max %i)"
+ % (MAX_DEVICE_DISPLAY_NAME_LEN,),
+ )
+
try:
yield self.store.update_device(
- user_id, device_id, new_display_name=content.get("display_name")
+ user_id, device_id, new_display_name=new_display_name
)
yield self.notify_device_update(user_id, [device_id])
except errors.StoreError as e:
@@ -457,22 +482,6 @@ class DeviceHandler(DeviceWorkerHandler):
self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
@defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
- self_signing_key = yield self.store.get_e2e_cross_signing_key(
- user_id, "self_signing"
- )
-
- return {
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- "master_key": master_key,
- "self_signing_key": self_signing_key,
- }
-
- @defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
room_ids = yield self.store.get_rooms_for_user(user_id)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e9441bbeff..1ec61340ad 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -65,7 +65,7 @@ from synapse.replication.http.federation import (
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
-from synapse.types import StateMap, UserID, get_domain_from_id
+from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
@@ -187,7 +187,7 @@ class FederationHandler(BaseHandler):
room_id = pdu.room_id
event_id = pdu.event_id
- logger.info("handling received PDU: %s", pdu)
+ logger.info("[%s %s] handling received PDU: %s", room_id, event_id, pdu)
# We reprocess pdus when we have seen them only as outliers
existing = await self.store.get_event(
@@ -302,6 +302,14 @@ class FederationHandler(BaseHandler):
room_id,
event_id,
)
+ elif missing_prevs:
+ logger.info(
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id,
+ event_id,
+ len(missing_prevs),
+ shortstr(missing_prevs),
+ )
if prevs - seen:
# We've still not been able to get all of the prev_events for this event.
@@ -346,12 +354,6 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- logger.info(
- "Event %s is missing prev_events: calculating state for a "
- "backwards extremity",
- event_id,
- )
-
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
@@ -369,7 +371,10 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
logger.info(
- "Requesting state at missing prev_event %s", event_id,
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id,
+ event_id,
+ p,
)
with nested_logging_context(p):
@@ -405,7 +410,6 @@ class FederationHandler(BaseHandler):
evs = await self.store.get_events(
list(state_map.values()),
get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
)
event_map.update(evs)
@@ -1156,7 +1160,7 @@ class FederationHandler(BaseHandler):
Logs a warning if we can't find the given event.
"""
- room_version = await self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version(room_id)
event_infos = []
@@ -1230,13 +1234,12 @@ class FederationHandler(BaseHandler):
)
raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
- @defer.inlineCallbacks
- def send_invite(self, target_host, event):
+ async def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
Invites must be signed by the invitee's server before distribution.
"""
- pdu = yield self.federation_client.send_invite(
+ pdu = await self.federation_client.send_invite(
destination=target_host,
room_id=event.room_id,
event_id=event.event_id,
@@ -1245,17 +1248,16 @@ class FederationHandler(BaseHandler):
return pdu
- @defer.inlineCallbacks
- def on_event_auth(self, event_id):
- event = yield self.store.get_event(event_id)
- auth = yield self.store.get_auth_chain(
+ async def on_event_auth(self, event_id: str) -> List[EventBase]:
+ event = await self.store.get_event(event_id)
+ auth = await self.store.get_auth_chain(
[auth_id for auth_id in event.auth_event_ids()], include_given=True
)
- return [e for e in auth]
+ return list(auth)
- @log_function
- @defer.inlineCallbacks
- def do_invite_join(self, target_hosts, room_id, joinee, content):
+ async def do_invite_join(
+ self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
+ ) -> None:
""" Attempts to join the `joinee` to the room `room_id` via the
servers contained in `target_hosts`.
@@ -1268,17 +1270,17 @@ class FederationHandler(BaseHandler):
have finished processing the join.
Args:
- target_hosts (Iterable[str]): List of servers to attempt to join the room with.
+ target_hosts: List of servers to attempt to join the room with.
- room_id (str): The ID of the room to join.
+ room_id: The ID of the room to join.
- joinee (str): The User ID of the joining user.
+ joinee: The User ID of the joining user.
- content (dict): The event content to use for the join event.
+ content: The event content to use for the join event.
"""
logger.debug("Joining %s to %s", joinee, room_id)
- origin, event, room_version_obj = yield self._make_and_verify_event(
+ origin, event, room_version_obj = await self._make_and_verify_event(
target_hosts,
room_id,
joinee,
@@ -1294,7 +1296,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
- yield self._clean_room_for_join(room_id)
+ await self._clean_room_for_join(room_id)
handled_events = set()
@@ -1307,9 +1309,8 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- event_format_version = room_version_obj.event_format
- ret = yield self.federation_client.send_join(
- target_hosts, event, event_format_version
+ ret = await self.federation_client.send_join(
+ target_hosts, event, room_version_obj
)
origin = ret["origin"]
@@ -1327,7 +1328,7 @@ class FederationHandler(BaseHandler):
logger.debug("do_invite_join event: %s", event)
try:
- yield self.store.store_room(
+ await self.store.store_room(
room_id=room_id,
room_creator_user_id="",
is_public=False,
@@ -1337,13 +1338,13 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- yield self._persist_auth_tree(
+ await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
)
# Check whether this room is the result of an upgrade of a room we already know
# about. If so, migrate over user information
- predecessor = yield self.store.get_room_predecessor(room_id)
+ predecessor = await self.store.get_room_predecessor(room_id)
if not predecessor or not isinstance(predecessor.get("room_id"), str):
return
old_room_id = predecessor["room_id"]
@@ -1353,7 +1354,7 @@ class FederationHandler(BaseHandler):
# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
- yield member_handler.transfer_room_state_on_room_upgrade(
+ await member_handler.transfer_room_state_on_room_upgrade(
old_room_id, room_id
)
@@ -1370,8 +1371,6 @@ class FederationHandler(BaseHandler):
run_in_background(self._handle_queued_pdus, room_queue)
- return True
-
async def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.
@@ -1394,20 +1393,17 @@ class FederationHandler(BaseHandler):
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
)
- @defer.inlineCallbacks
- @log_function
- def on_make_join_request(self, origin, room_id, user_id):
+ async def on_make_join_request(
+ self, origin: str, room_id: str, user_id: str
+ ) -> EventBase:
""" We've received a /make_join/ request, so we create a partial
join event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
Args:
- origin (str): The (verified) server name of the requesting server.
- room_id (str): Room to create join event in
- user_id (str): The user to create the join for
-
- Returns:
- Deferred[FrozenEvent]
+ origin: The (verified) server name of the requesting server.
+ room_id: Room to create join event in
+ user_id: The user to create the join for
"""
if get_domain_from_id(user_id) != origin:
logger.info(
@@ -1419,7 +1415,7 @@ class FederationHandler(BaseHandler):
event_content = {"membership": Membership.JOIN}
- room_version = yield self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
builder = self.event_builder_factory.new(
room_version,
@@ -1433,14 +1429,14 @@ class FederationHandler(BaseHandler):
)
try:
- event, context = yield self.event_creation_handler.create_new_client_event(
+ event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
except AuthError as e:
logger.warning("Failed to create join to %s because %s", room_id, e)
raise e
- event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
if not event_allowed:
@@ -1451,15 +1447,13 @@ class FederationHandler(BaseHandler):
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
- yield self.auth.check_from_context(
+ await self.auth.check_from_context(
room_version, event, context, do_sig_check=False
)
return event
- @defer.inlineCallbacks
- @log_function
- def on_send_join_request(self, origin, pdu):
+ async def on_send_join_request(self, origin, pdu):
""" We have received a join event for a room. Fully process it and
respond with the current state and auth chains.
"""
@@ -1496,9 +1490,9 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
- context = yield self._handle_new_event(origin, event)
+ context = await self._handle_new_event(origin, event)
- event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
if not event_allowed:
@@ -1516,19 +1510,18 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ await self.user_joined_room(user, event.room_id)
- prev_state_ids = yield context.get_prev_state_ids()
+ prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
- auth_chain = yield self.store.get_auth_chain(state_ids)
+ auth_chain = await self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(prev_state_ids.values()))
+ state = await self.store.get_events(list(prev_state_ids.values()))
return {"state": list(state.values()), "auth_chain": auth_chain}
- @defer.inlineCallbacks
- def on_invite_request(
+ async def on_invite_request(
self, origin: str, event: EventBase, room_version: RoomVersion
):
""" We've got an invite event. Process and persist it. Sign it.
@@ -1538,15 +1531,22 @@ class FederationHandler(BaseHandler):
if event.state_key is None:
raise SynapseError(400, "The invite event did not have a state key")
- is_blocked = yield self.store.is_room_blocked(event.room_id)
+ is_blocked = await self.store.is_room_blocked(event.room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
if self.hs.config.block_non_admin_invites:
raise SynapseError(403, "This server does not accept room invites")
+ is_published = yield self.store.is_room_published(event.room_id)
+
if not self.spam_checker.user_may_invite(
- event.sender, event.state_key, event.room_id
+ event.sender,
+ event.state_key,
+ None,
+ room_id=event.room_id,
+ new_room=False,
+ published_room=is_published,
):
raise SynapseError(
403, "This user is not permitted to send invites to this server/user"
@@ -1581,14 +1581,15 @@ class FederationHandler(BaseHandler):
)
)
- context = yield self.state_handler.compute_event_context(event)
- yield self.persist_events_and_notify([(event, context)])
+ context = await self.state_handler.compute_event_context(event)
+ await self.persist_events_and_notify([(event, context)])
return event
- @defer.inlineCallbacks
- def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
- origin, event, room_version = yield self._make_and_verify_event(
+ async def do_remotely_reject_invite(
+ self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict
+ ) -> EventBase:
+ origin, event, room_version = await self._make_and_verify_event(
target_hosts, room_id, user_id, "leave", content=content
)
# Mark as outlier as we don't have any state for this event; we're not
@@ -1604,22 +1605,27 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- yield self.federation_client.send_leave(target_hosts, event)
+ await self.federation_client.send_leave(target_hosts, event)
- context = yield self.state_handler.compute_event_context(event)
- yield self.persist_events_and_notify([(event, context)])
+ context = await self.state_handler.compute_event_context(event)
+ await self.persist_events_and_notify([(event, context)])
return event
- @defer.inlineCallbacks
- def _make_and_verify_event(
- self, target_hosts, room_id, user_id, membership, content={}, params=None
- ):
+ async def _make_and_verify_event(
+ self,
+ target_hosts: Iterable[str],
+ room_id: str,
+ user_id: str,
+ membership: str,
+ content: JsonDict = {},
+ params: Optional[Dict[str, str]] = None,
+ ) -> Tuple[str, EventBase, RoomVersion]:
(
origin,
event,
room_version,
- ) = yield self.federation_client.make_membership_event(
+ ) = await self.federation_client.make_membership_event(
target_hosts, room_id, user_id, membership, content, params=params
)
@@ -1633,20 +1639,17 @@ class FederationHandler(BaseHandler):
assert event.room_id == room_id
return origin, event, room_version
- @defer.inlineCallbacks
- @log_function
- def on_make_leave_request(self, origin, room_id, user_id):
+ async def on_make_leave_request(
+ self, origin: str, room_id: str, user_id: str
+ ) -> EventBase:
""" We've received a /make_leave/ request, so we create a partial
leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
Args:
- origin (str): The (verified) server name of the requesting server.
- room_id (str): Room to create leave event in
- user_id (str): The user to create the leave for
-
- Returns:
- Deferred[FrozenEvent]
+ origin: The (verified) server name of the requesting server.
+ room_id: Room to create leave event in
+ user_id: The user to create the leave for
"""
if get_domain_from_id(user_id) != origin:
logger.info(
@@ -1656,7 +1659,7 @@ class FederationHandler(BaseHandler):
)
raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
- room_version = yield self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
builder = self.event_builder_factory.new(
room_version,
{
@@ -1668,11 +1671,11 @@ class FederationHandler(BaseHandler):
},
)
- event, context = yield self.event_creation_handler.create_new_client_event(
+ event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
- event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
if not event_allowed:
@@ -1684,7 +1687,7 @@ class FederationHandler(BaseHandler):
try:
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_leave_request`
- yield self.auth.check_from_context(
+ await self.auth.check_from_context(
room_version, event, context, do_sig_check=False
)
except AuthError as e:
@@ -1693,9 +1696,7 @@ class FederationHandler(BaseHandler):
return event
- @defer.inlineCallbacks
- @log_function
- def on_send_leave_request(self, origin, pdu):
+ async def on_send_leave_request(self, origin, pdu):
""" We have received a leave event for a room. Fully process it."""
event = pdu
@@ -1715,9 +1716,9 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context = yield self._handle_new_event(origin, event)
+ context = await self._handle_new_event(origin, event)
- event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
if not event_allowed:
@@ -1798,6 +1799,9 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
+ # Synapse asks for 100 events per backfill request. Do not allow more.
+ limit = min(limit, 100)
+
events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
events = yield filter_events_for_server(self.storage, origin, events)
@@ -1839,11 +1843,10 @@ class FederationHandler(BaseHandler):
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
- @defer.inlineCallbacks
- def _handle_new_event(
+ async def _handle_new_event(
self, origin, event, state=None, auth_events=None, backfilled=False
):
- context = yield self._prep_event(
+ context = await self._prep_event(
origin, event, state=state, auth_events=auth_events, backfilled=backfilled
)
@@ -1856,11 +1859,11 @@ class FederationHandler(BaseHandler):
and not backfilled
and not context.rejected
):
- yield self.action_generator.handle_push_actions_for_event(
+ await self.action_generator.handle_push_actions_for_event(
event, context
)
- yield self.persist_events_and_notify(
+ await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
)
success = True
@@ -1872,13 +1875,12 @@ class FederationHandler(BaseHandler):
return context
- @defer.inlineCallbacks
- def _handle_new_events(
+ async def _handle_new_events(
self,
origin: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
- ):
+ ) -> None:
"""Creates the appropriate contexts and persists events. The events
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
@@ -1887,11 +1889,10 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- @defer.inlineCallbacks
- def prep(ev_info: _NewEventInfo):
+ async def prep(ev_info: _NewEventInfo):
event = ev_info.event
with nested_logging_context(suffix=event.event_id):
- res = yield self._prep_event(
+ res = await self._prep_event(
origin,
event,
state=ev_info.state,
@@ -1900,14 +1901,14 @@ class FederationHandler(BaseHandler):
)
return res
- contexts = yield make_deferred_yieldable(
+ contexts = await make_deferred_yieldable(
defer.gatherResults(
[run_in_background(prep, ev_info) for ev_info in event_infos],
consumeErrors=True,
)
)
- yield self.persist_events_and_notify(
+ await self.persist_events_and_notify(
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
@@ -1915,15 +1916,14 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
- @defer.inlineCallbacks
- def _persist_auth_tree(
+ async def _persist_auth_tree(
self,
origin: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
- ):
+ ) -> None:
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event separately. Notifies about the persisted events
@@ -1938,14 +1938,11 @@ class FederationHandler(BaseHandler):
event
room_version: The room version we expect this room to have, and
will raise if it doesn't match the version in the create event.
-
- Returns:
- Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
e.internal_metadata.outlier = True
- ctx = yield self.state_handler.compute_event_context(e)
+ ctx = await self.state_handler.compute_event_context(e)
events_to_context[e.event_id] = ctx
event_map = {
@@ -1977,12 +1974,8 @@ class FederationHandler(BaseHandler):
missing_auth_events.add(e_id)
for e_id in missing_auth_events:
- m_ev = yield self.federation_client.get_pdu(
- [origin],
- e_id,
- room_version=room_version.identifier,
- outlier=True,
- timeout=10000,
+ m_ev = await self.federation_client.get_pdu(
+ [origin], e_id, room_version=room_version, outlier=True, timeout=10000,
)
if m_ev and m_ev.event_id == e_id:
event_map[e_id] = m_ev
@@ -2013,91 +2006,74 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self.persist_events_and_notify(
+ await self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
)
- new_event_context = yield self.state_handler.compute_event_context(
+ new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
- yield self.persist_events_and_notify([(event, new_event_context)])
+ await self.persist_events_and_notify([(event, new_event_context)])
- @defer.inlineCallbacks
- def _prep_event(
+ async def _prep_event(
self,
origin: str,
event: EventBase,
state: Optional[Iterable[EventBase]],
auth_events: Optional[StateMap[EventBase]],
backfilled: bool,
- ):
- """
-
- Args:
- origin:
- event:
- state:
- auth_events:
- backfilled:
-
- Returns:
- Deferred, which resolves to synapse.events.snapshot.EventContext
- """
- context = yield self.state_handler.compute_event_context(event, old_state=state)
+ ) -> EventContext:
+ context = await self.state_handler.compute_event_context(event, old_state=state)
if not auth_events:
- prev_state_ids = yield context.get_prev_state_ids()
- auth_events_ids = yield self.auth.compute_auth_events(
+ prev_state_ids = await context.get_prev_state_ids()
+ auth_events_ids = await self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
- auth_events = yield self.store.get_events(auth_events_ids)
+ auth_events = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_event_ids():
if len(event.prev_event_ids()) == 1 and event.depth < 5:
- c = yield self.store.get_event(
+ c = await self.store.get_event(
event.prev_event_ids()[0], allow_none=True
)
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
- context = yield self.do_auth(origin, event, context, auth_events=auth_events)
+ context = await self.do_auth(origin, event, context, auth_events=auth_events)
if not context.rejected:
- yield self._check_for_soft_fail(event, state, backfilled)
+ await self._check_for_soft_fail(event, state, backfilled)
if event.type == EventTypes.GuestAccess and not context.rejected:
- yield self.maybe_kick_guest_users(event)
+ await self.maybe_kick_guest_users(event)
return context
- @defer.inlineCallbacks
- def _check_for_soft_fail(
+ async def _check_for_soft_fail(
self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool
- ):
- """Checks if we should soft fail the event, if so marks the event as
+ ) -> None:
+ """Checks if we should soft fail the event; if so, marks the event as
such.
Args:
event
state: The state at the event if we don't have all the event's prev events
backfilled: Whether the event is from backfill
-
- Returns:
- Deferred
"""
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
if do_soft_fail_check:
- extrem_ids = yield self.store.get_latest_event_ids_in_room(event.room_id)
+ extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id)
extrem_ids = set(extrem_ids)
prev_event_ids = set(event.prev_event_ids())
@@ -2108,7 +2084,7 @@ class FederationHandler(BaseHandler):
do_soft_fail_check = False
if do_soft_fail_check:
- room_version = yield self.store.get_room_version_id(event.room_id)
+ room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
# Calculate the "current state".
@@ -2125,19 +2101,19 @@ class FederationHandler(BaseHandler):
# given state at the event. This should correctly handle cases
# like bans, especially with state res v2.
- state_sets = yield self.state_store.get_state_groups(
+ state_sets = await self.state_store.get_state_groups(
event.room_id, extrem_ids
)
state_sets = list(state_sets.values())
state_sets.append(state)
- current_state_ids = yield self.state_handler.resolve_events(
+ current_state_ids = await self.state_handler.resolve_events(
room_version, state_sets, event
)
current_state_ids = {
k: e.event_id for k, e in iteritems(current_state_ids)
}
else:
- current_state_ids = yield self.state_handler.get_current_state_ids(
+ current_state_ids = await self.state_handler.get_current_state_ids(
event.room_id, latest_event_ids=extrem_ids
)
@@ -2153,7 +2129,7 @@ class FederationHandler(BaseHandler):
e for k, e in iteritems(current_state_ids) if k in auth_types
]
- current_auth_events = yield self.store.get_events(current_state_ids)
+ current_auth_events = await self.store.get_events(current_state_ids)
current_auth_events = {
(e.type, e.state_key): e for e in current_auth_events.values()
}
@@ -2166,15 +2142,14 @@ class FederationHandler(BaseHandler):
logger.warning("Soft-failing %r because %s", event, e)
event.internal_metadata.soft_failed = True
- @defer.inlineCallbacks
- def on_query_auth(
+ async def on_query_auth(
self, origin, event_id, room_id, remote_auth_chain, rejects, missing
):
- in_room = yield self.auth.check_host_in_room(room_id, origin)
+ in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
- event = yield self.store.get_event(
+ event = await self.store.get_event(
event_id, allow_none=False, check_room_id=room_id
)
@@ -2182,57 +2157,61 @@ class FederationHandler(BaseHandler):
# don't want to fall into the trap of `missing` being wrong.
for e in remote_auth_chain:
try:
- yield self._handle_new_event(origin, e)
+ await self._handle_new_event(origin, e)
except AuthError:
pass
# Now get the current auth_chain for the event.
- local_auth_chain = yield self.store.get_auth_chain(
+ local_auth_chain = await self.store.get_auth_chain(
[auth_id for auth_id in event.auth_event_ids()], include_given=True
)
# TODO: Check if we would now reject event_id. If so we need to tell
# everyone.
- ret = yield self.construct_auth_difference(local_auth_chain, remote_auth_chain)
+ ret = await self.construct_auth_difference(local_auth_chain, remote_auth_chain)
logger.debug("on_query_auth returning: %s", ret)
return ret
- @defer.inlineCallbacks
- def on_get_missing_events(
+ async def on_get_missing_events(
self, origin, room_id, earliest_events, latest_events, limit
):
- in_room = yield self.auth.check_host_in_room(room_id, origin)
+ in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
+ # Only allow up to 20 events to be retrieved per request.
limit = min(limit, 20)
- missing_events = yield self.store.get_missing_events(
+ missing_events = await self.store.get_missing_events(
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
limit=limit,
)
- missing_events = yield filter_events_for_server(
+ missing_events = await filter_events_for_server(
self.storage, origin, missing_events
)
return missing_events
- @defer.inlineCallbacks
- @log_function
- def do_auth(self, origin, event, context, auth_events):
+ async def do_auth(
+ self,
+ origin: str,
+ event: EventBase,
+ context: EventContext,
+ auth_events: StateMap[EventBase],
+ ) -> EventContext:
"""
Args:
- origin (str):
- event (synapse.events.EventBase):
- context (synapse.events.snapshot.EventContext):
- auth_events (dict[(str, str)->synapse.events.EventBase]):
+ origin:
+ event:
+ context:
+ auth_events:
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
@@ -2242,13 +2221,13 @@ class FederationHandler(BaseHandler):
Also NB that this function adds entries to it.
Returns:
- defer.Deferred[EventContext]: updated context object
+ updated context object
"""
- room_version = yield self.store.get_room_version_id(event.room_id)
+ room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
try:
- context = yield self._update_auth_events_and_context_for_auth(
+ context = await self._update_auth_events_and_context_for_auth(
origin, event, context, auth_events
)
except Exception:
@@ -2270,10 +2249,13 @@ class FederationHandler(BaseHandler):
return context
- @defer.inlineCallbacks
- def _update_auth_events_and_context_for_auth(
- self, origin, event, context, auth_events
- ):
+ async def _update_auth_events_and_context_for_auth(
+ self,
+ origin: str,
+ event: EventBase,
+ context: EventContext,
+ auth_events: StateMap[EventBase],
+ ) -> EventContext:
"""Helper for do_auth. See there for docs.
Checks whether a given event has the expected auth events. If it
@@ -2281,16 +2263,16 @@ class FederationHandler(BaseHandler):
we can come to a consensus (e.g. if one server missed some valid
state).
- This attempts to resovle any potential divergence of state between
+ This attempts to resolve any potential divergence of state between
servers, but is not essential and so failures should not block further
processing of the event.
Args:
- origin (str):
- event (synapse.events.EventBase):
- context (synapse.events.snapshot.EventContext):
+ origin:
+ event:
+ context:
- auth_events (dict[(str, str)->synapse.events.EventBase]):
+ auth_events:
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
@@ -2301,7 +2283,7 @@ class FederationHandler(BaseHandler):
Also NB that this function adds entries to it.
Returns:
- defer.Deferred[EventContext]: updated context
+ updated context
"""
event_auth_events = set(event.auth_event_ids())
@@ -2315,7 +2297,7 @@ class FederationHandler(BaseHandler):
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
- have_events = yield self.store.have_seen_events(missing_auth)
+ have_events = await self.store.have_seen_events(missing_auth)
logger.debug("Events %s are in the store", have_events)
missing_auth.difference_update(have_events)
@@ -2324,7 +2306,7 @@ class FederationHandler(BaseHandler):
logger.info("auth_events contains unknown events: %s", missing_auth)
try:
try:
- remote_auth_chain = yield self.federation_client.get_event_auth(
+ remote_auth_chain = await self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
)
except RequestSendFailed as e:
@@ -2333,7 +2315,7 @@ class FederationHandler(BaseHandler):
logger.info("Failed to get event auth from remote: %s", e)
return context
- seen_remotes = yield self.store.have_seen_events(
+ seen_remotes = await self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
@@ -2356,7 +2338,7 @@ class FederationHandler(BaseHandler):
logger.debug(
"do_auth %s missing_auth: %s", event.event_id, e.event_id
)
- yield self._handle_new_event(origin, e, auth_events=auth)
+ await self._handle_new_event(origin, e, auth_events=auth)
if e.event_id in event_auth_events:
auth_events[(e.type, e.state_key)] = e
@@ -2390,7 +2372,7 @@ class FederationHandler(BaseHandler):
# XXX: currently this checks for redactions but I'm not convinced that is
# necessary?
- different_events = yield self.store.get_events_as_list(different_auth)
+ different_events = await self.store.get_events_as_list(different_auth)
for d in different_events:
if d.room_id != event.room_id:
@@ -2416,8 +2398,8 @@ class FederationHandler(BaseHandler):
remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
remote_state = remote_auth_events.values()
- room_version = yield self.store.get_room_version_id(event.room_id)
- new_state = yield self.state_handler.resolve_events(
+ room_version = await self.store.get_room_version_id(event.room_id)
+ new_state = await self.state_handler.resolve_events(
room_version, (local_state, remote_state), event
)
@@ -2432,27 +2414,27 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
- context = yield self._update_context_for_auth_events(
+ context = await self._update_context_for_auth_events(
event, context, auth_events
)
return context
- @defer.inlineCallbacks
- def _update_context_for_auth_events(self, event, context, auth_events):
+ async def _update_context_for_auth_events(
+ self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
+ ) -> EventContext:
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.
Args:
- event (Event): The event we're handling the context for
+ event: The event we're handling the context for
- context (synapse.events.snapshot.EventContext): initial event context
+ context: initial event context
- auth_events (dict[(str, str)->EventBase]): Events to update in the event
- context.
+ auth_events: Events to update in the event context.
Returns:
- Deferred[EventContext]: new event context
+ new event context
"""
# exclude the state key of the new event from the current_state in the context.
if event.is_state():
@@ -2463,19 +2445,19 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
- current_state_ids = yield context.get_current_state_ids()
+ current_state_ids = await context.get_current_state_ids()
current_state_ids = dict(current_state_ids)
current_state_ids.update(state_updates)
- prev_state_ids = yield context.get_prev_state_ids()
+ prev_state_ids = await context.get_prev_state_ids()
prev_state_ids = dict(prev_state_ids)
prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
# create a new state group as a delta from the existing one.
prev_group = context.state_group
- state_group = yield self.state_store.store_state_group(
+ state_group = await self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=prev_group,
@@ -2492,8 +2474,9 @@ class FederationHandler(BaseHandler):
delta_ids=state_updates,
)
- @defer.inlineCallbacks
- def construct_auth_difference(self, local_auth, remote_auth):
+ async def construct_auth_difference(
+ self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase]
+ ) -> Dict:
""" Given a local and remote auth chain, find the differences. This
assumes that we have already processed all events in remote_auth
@@ -2602,7 +2585,7 @@ class FederationHandler(BaseHandler):
reason_map = {}
for e in base_remote_rejected:
- reason = yield self.store.get_rejection_reason(e.event_id)
+ reason = await self.store.get_rejection_reason(e.event_id)
if reason is None:
# TODO: e is not in the current state, so we should
# construct some proof of that.
@@ -2687,33 +2670,31 @@ class FederationHandler(BaseHandler):
destinations, room_id, event_dict
)
- @defer.inlineCallbacks
- @log_function
- def on_exchange_third_party_invite_request(self, room_id, event_dict):
+ async def on_exchange_third_party_invite_request(
+ self, room_id: str, event_dict: JsonDict
+ ) -> None:
"""Handle an exchange_third_party_invite request from a remote server
The remote server will call this when it wants to turn a 3pid invite
into a normal m.room.member invite.
Args:
- room_id (str): The ID of the room.
+ room_id: The ID of the room.
event_dict (dict[str, Any]): Dictionary containing the event body.
- Returns:
- Deferred: resolves (to None)
"""
- room_version = yield self.store.get_room_version_id(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
# NB: event_dict has a particular specced format we might need to fudge
# if we change event formats too much.
builder = self.event_builder_factory.new(room_version, event_dict)
- event, context = yield self.event_creation_handler.create_new_client_event(
+ event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
- event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
if not event_allowed:
@@ -2724,16 +2705,16 @@ class FederationHandler(BaseHandler):
403, "This event is not allowed in this context", Codes.FORBIDDEN
)
- event, context = yield self.add_display_name_to_third_party_invite(
+ event, context = await self.add_display_name_to_third_party_invite(
room_version, event_dict, event, context
)
try:
- yield self.auth.check_from_context(room_version, event, context)
+ await self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warning("Denying third party invite %r because %s", event, e)
raise e
- yield self._check_signature(event, context)
+ await self._check_signature(event, context)
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
@@ -2741,7 +2722,7 @@ class FederationHandler(BaseHandler):
# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
- yield member_handler.send_membership_event(None, event, context)
+ await member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
def add_display_name_to_third_party_invite(
@@ -2889,27 +2870,27 @@ class FederationHandler(BaseHandler):
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
- @defer.inlineCallbacks
- def persist_events_and_notify(self, event_and_contexts, backfilled=False):
+ async def persist_events_and_notify(
+ self,
+ event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+ backfilled: bool = False,
+ ) -> None:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
- event_and_contexts(list[tuple[FrozenEvent, EventContext]])
- backfilled (bool): Whether these events are a result of
+ event_and_contexts:
+ backfilled: Whether these events are a result of
backfilling or not
-
- Returns:
- Deferred
"""
if self.config.worker_app:
- yield self._send_events_to_master(
+ await self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
else:
- max_stream_id = yield self.storage.persistence.persist_events(
+ max_stream_id = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
@@ -2920,15 +2901,17 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- yield self._notify_persisted_event(event, max_stream_id)
+ await self._notify_persisted_event(event, max_stream_id)
- def _notify_persisted_event(self, event, max_stream_id):
+ async def _notify_persisted_event(
+ self, event: EventBase, max_stream_id: int
+ ) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
Args:
- event (FrozenEvent)
- max_stream_id (int): The max_stream_id returned by persist_events
+ event:
+ max_stream_id: The max_stream_id returned by persist_events
"""
extra_users = []
@@ -2952,29 +2935,29 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- return self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
+ await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
- def _clean_room_for_join(self, room_id):
+ async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
- room_id (str)
+ room_id
"""
if self.config.worker_app:
- return self._clean_room_for_join_client(room_id)
+ await self._clean_room_for_join_client(room_id)
else:
- return self.store.clean_room_for_join(room_id)
+ await self.store.clean_room_for_join(room_id)
- def user_joined_room(self, user, room_id):
+ async def user_joined_room(self, user: UserID, room_id: str) -> None:
"""Called when a new user has joined the room
"""
if self.config.worker_app:
- return self._notify_user_membership_change(
+ await self._notify_user_membership_change(
room_id=room_id, user_id=user.to_string(), change="joined"
)
else:
- return defer.succeed(user_joined_room(self.distributor, user, room_id))
+ user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def get_room_complexity(self, remote_room_hosts, room_id):
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 319565510f..ad22415782 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -63,7 +63,7 @@ def _create_rerouter(func_name):
return f
-class GroupsLocalHandler(object):
+class GroupsLocalWorkerHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
self.profile_handler = hs.get_profile_handler()
- # Ensure attestations get renewed
- hs.get_groups_attestation_renewer()
-
# The following functions merely route the query to the local groups server
# or federation depending on if the group is local or remote
get_group_profile = _create_rerouter("get_group_profile")
- update_group_profile = _create_rerouter("update_group_profile")
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
-
get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
-
- add_room_to_group = _create_rerouter("add_room_to_group")
- update_room_in_group = _create_rerouter("update_room_in_group")
- remove_room_from_group = _create_rerouter("remove_room_from_group")
-
- update_group_summary_room = _create_rerouter("update_group_summary_room")
- delete_group_summary_room = _create_rerouter("delete_group_summary_room")
-
- update_group_category = _create_rerouter("update_group_category")
- delete_group_category = _create_rerouter("delete_group_category")
get_group_category = _create_rerouter("get_group_category")
get_group_categories = _create_rerouter("get_group_categories")
-
- update_group_summary_user = _create_rerouter("update_group_summary_user")
- delete_group_summary_user = _create_rerouter("delete_group_summary_user")
-
- update_group_role = _create_rerouter("update_group_role")
- delete_group_role = _create_rerouter("delete_group_role")
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
- set_group_join_policy = _create_rerouter("set_group_join_policy")
-
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -170,6 +147,144 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
+ def get_users_in_group(self, group_id, requester_user_id):
+ """Get users in a group
+ """
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.get_users_in_group(
+ group_id, requester_user_id
+ )
+ return res
+
+ group_server_name = get_domain_from_id(group_id)
+
+ try:
+ res = yield self.transport_client.get_users_in_group(
+ get_domain_from_id(group_id), group_id, requester_user_id
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse_error()
+ except RequestSendFailed:
+ raise SynapseError(502, "Failed to contact group server")
+
+ chunk = res["chunk"]
+ valid_entries = []
+ for entry in chunk:
+ g_user_id = entry["user_id"]
+ attestation = entry.pop("attestation", {})
+ try:
+ if get_domain_from_id(g_user_id) != group_server_name:
+ yield self.attestations.verify_attestation(
+ attestation,
+ group_id=group_id,
+ user_id=g_user_id,
+ server_name=get_domain_from_id(g_user_id),
+ )
+ valid_entries.append(entry)
+ except Exception as e:
+ logger.info("Failed to verify user is in group: %s", e)
+
+ res["chunk"] = valid_entries
+
+ return res
+
+ @defer.inlineCallbacks
+ def get_joined_groups(self, user_id):
+ group_ids = yield self.store.get_joined_groups(user_id)
+ return {"groups": group_ids}
+
+ @defer.inlineCallbacks
+ def get_publicised_groups_for_user(self, user_id):
+ if self.hs.is_mine_id(user_id):
+ result = yield self.store.get_publicised_groups_for_user(user_id)
+
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ result.extend(app_service.get_groups_for_user(user_id))
+
+ return {"groups": result}
+ else:
+ try:
+ bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+ get_domain_from_id(user_id), [user_id]
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse_error()
+ except RequestSendFailed:
+ raise SynapseError(502, "Failed to contact group server")
+
+ result = bulk_result.get("users", {}).get(user_id)
+ # TODO: Verify attestations
+ return {"groups": result}
+
+ @defer.inlineCallbacks
+ def bulk_get_publicised_groups(self, user_ids, proxy=True):
+ destinations = {}
+ local_users = set()
+
+ for user_id in user_ids:
+ if self.hs.is_mine_id(user_id):
+ local_users.add(user_id)
+ else:
+ destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
+
+ if not proxy and destinations:
+ raise SynapseError(400, "Some user_ids are not local")
+
+ results = {}
+ failed_results = []
+ for destination, dest_user_ids in iteritems(destinations):
+ try:
+ r = yield self.transport_client.bulk_get_publicised_groups(
+ destination, list(dest_user_ids)
+ )
+ results.update(r["users"])
+ except Exception:
+ failed_results.extend(dest_user_ids)
+
+ for uid in local_users:
+ results[uid] = yield self.store.get_publicised_groups_for_user(uid)
+
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ results[uid].extend(app_service.get_groups_for_user(uid))
+
+ return {"users": results}
+
+
+class GroupsLocalHandler(GroupsLocalWorkerHandler):
+ def __init__(self, hs):
+ super(GroupsLocalHandler, self).__init__(hs)
+
+ # Ensure attestations get renewed
+ hs.get_groups_attestation_renewer()
+
+ # The following functions merely route the query to the local groups server
+ # or federation depending on if the group is local or remote
+
+ update_group_profile = _create_rerouter("update_group_profile")
+
+ add_room_to_group = _create_rerouter("add_room_to_group")
+ update_room_in_group = _create_rerouter("update_room_in_group")
+ remove_room_from_group = _create_rerouter("remove_room_from_group")
+
+ update_group_summary_room = _create_rerouter("update_group_summary_room")
+ delete_group_summary_room = _create_rerouter("delete_group_summary_room")
+
+ update_group_category = _create_rerouter("update_group_category")
+ delete_group_category = _create_rerouter("delete_group_category")
+
+ update_group_summary_user = _create_rerouter("update_group_summary_user")
+ delete_group_summary_user = _create_rerouter("delete_group_summary_user")
+
+ update_group_role = _create_rerouter("update_group_role")
+ delete_group_role = _create_rerouter("delete_group_role")
+
+ set_group_join_policy = _create_rerouter("set_group_join_policy")
+
+ @defer.inlineCallbacks
def create_group(self, group_id, user_id, content):
"""Create a group
"""
@@ -220,48 +335,6 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
- def get_users_in_group(self, group_id, requester_user_id):
- """Get users in a group
- """
- if self.is_mine_id(group_id):
- res = yield self.groups_server_handler.get_users_in_group(
- group_id, requester_user_id
- )
- return res
-
- group_server_name = get_domain_from_id(group_id)
-
- try:
- res = yield self.transport_client.get_users_in_group(
- get_domain_from_id(group_id), group_id, requester_user_id
- )
- except HttpResponseException as e:
- raise e.to_synapse_error()
- except RequestSendFailed:
- raise SynapseError(502, "Failed to contact group server")
-
- chunk = res["chunk"]
- valid_entries = []
- for entry in chunk:
- g_user_id = entry["user_id"]
- attestation = entry.pop("attestation", {})
- try:
- if get_domain_from_id(g_user_id) != group_server_name:
- yield self.attestations.verify_attestation(
- attestation,
- group_id=group_id,
- user_id=g_user_id,
- server_name=get_domain_from_id(g_user_id),
- )
- valid_entries.append(entry)
- except Exception as e:
- logger.info("Failed to verify user is in group: %s", e)
-
- res["chunk"] = valid_entries
-
- return res
-
- @defer.inlineCallbacks
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
@@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
group_id, user_id, membership="leave"
)
self.notifier.on_new_event("groups_key", token, users=[user_id])
-
- @defer.inlineCallbacks
- def get_joined_groups(self, user_id):
- group_ids = yield self.store.get_joined_groups(user_id)
- return {"groups": group_ids}
-
- @defer.inlineCallbacks
- def get_publicised_groups_for_user(self, user_id):
- if self.hs.is_mine_id(user_id):
- result = yield self.store.get_publicised_groups_for_user(user_id)
-
- # Check AS associated groups for this user - this depends on the
- # RegExps in the AS registration file (under `users`)
- for app_service in self.store.get_app_services():
- result.extend(app_service.get_groups_for_user(user_id))
-
- return {"groups": result}
- else:
- try:
- bulk_result = yield self.transport_client.bulk_get_publicised_groups(
- get_domain_from_id(user_id), [user_id]
- )
- except HttpResponseException as e:
- raise e.to_synapse_error()
- except RequestSendFailed:
- raise SynapseError(502, "Failed to contact group server")
-
- result = bulk_result.get("users", {}).get(user_id)
- # TODO: Verify attestations
- return {"groups": result}
-
- @defer.inlineCallbacks
- def bulk_get_publicised_groups(self, user_ids, proxy=True):
- destinations = {}
- local_users = set()
-
- for user_id in user_ids:
- if self.hs.is_mine_id(user_id):
- local_users.add(user_id)
- else:
- destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
-
- if not proxy and destinations:
- raise SynapseError(400, "Some user_ids are not local")
-
- results = {}
- failed_results = []
- for destination, dest_user_ids in iteritems(destinations):
- try:
- r = yield self.transport_client.bulk_get_publicised_groups(
- destination, list(dest_user_ids)
- )
- results.update(r["users"])
- except Exception:
- failed_results.extend(dest_user_ids)
-
- for uid in local_users:
- results[uid] = yield self.store.get_publicised_groups_for_user(uid)
-
- # Check AS associated groups for this user - this depends on the
- # RegExps in the AS registration file (under `users`)
- for app_service in self.store.get_app_services():
- results[uid].extend(app_service.get_groups_for_user(uid))
-
- return {"users": results}
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 23f07832e7..94b5279aa6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018, 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@ from synapse.api.errors import (
CodeMessageException,
Codes,
HttpResponseException,
+ ProxiedRequestError,
SynapseError,
)
from synapse.config.emailconfig import ThreepidBehaviour
@@ -51,14 +52,21 @@ class IdentityHandler(BaseHandler):
def __init__(self, hs):
super(IdentityHandler, self).__init__(hs)
- self.http_client = SimpleHttpClient(hs)
+ self.hs = hs
+ self.http_client = hs.get_simple_http_client()
# We create a blacklisting instance of SimpleHttpClient for contacting identity
# servers specified by clients
self.blacklisting_http_client = SimpleHttpClient(
hs, ip_blacklist=hs.config.federation_ip_range_blacklist
)
self.federation_http_client = hs.get_http_client()
- self.hs = hs
+
+ self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
+ self.trust_any_id_server_just_for_testing_do_not_use = (
+ hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
+ )
+ self.rewrite_identity_server_urls = hs.config.rewrite_identity_server_urls
+ self._enable_lookup = hs.config.enable_3pid_lookup
@defer.inlineCallbacks
def threepid_from_creds(self, id_server, creds):
@@ -94,7 +102,15 @@ class IdentityHandler(BaseHandler):
query_params = {"sid": session_id, "client_secret": client_secret}
- url = id_server + "/_matrix/identity/api/v1/3pid/getValidated3pid"
+ # if we have a rewrite rule set for the identity server,
+ # apply it now.
+ if id_server in self.rewrite_identity_server_urls:
+ id_server = self.rewrite_identity_server_urls[id_server]
+
+ url = "https://%s%s" % (
+ id_server,
+ "/_matrix/identity/api/v1/3pid/getValidated3pid",
+ )
try:
data = yield self.http_client.get_json(url, query_params)
@@ -149,14 +165,24 @@ class IdentityHandler(BaseHandler):
if id_access_token is None:
use_v2 = False
+ # if we have a rewrite rule set for the identity server,
+ # apply it now, but only for sending the request (not
+ # storing in the database).
+ if id_server in self.rewrite_identity_server_urls:
+ id_server_host = self.rewrite_identity_server_urls[id_server]
+ else:
+ id_server_host = id_server
+
# Decide which API endpoint URLs to use
headers = {}
bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
if use_v2:
- bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
- headers["Authorization"] = create_id_access_token_header(id_access_token)
+ bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server_host,)
+ headers["Authorization"] = create_id_access_token_header(
+ id_access_token
+ )
else:
- bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
+ bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server_host,)
try:
# Use the blacklisting http client as this call is only to identity servers
@@ -263,6 +289,16 @@ class IdentityHandler(BaseHandler):
)
headers = {b"Authorization": auth_headers}
+ # if we have a rewrite rule set for the identity server,
+ # apply it now.
+ #
+ # Note that destination_is has to be the real id_server, not
+ # the server we connect to.
+ if id_server in self.rewrite_identity_server_urls:
+ id_server = self.rewrite_identity_server_urls[id_server]
+
+ url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+
try:
# Use the blacklisting http client as this call is only to identity servers
# provided by a client
@@ -400,6 +436,12 @@ class IdentityHandler(BaseHandler):
"client_secret": client_secret,
"send_attempt": send_attempt,
}
+
+ # if we have a rewrite rule set for the identity server,
+ # apply it now.
+ if id_server in self.rewrite_identity_server_urls:
+ id_server = self.rewrite_identity_server_urls[id_server]
+
if next_link:
params["next_link"] = next_link
@@ -466,6 +508,10 @@ class IdentityHandler(BaseHandler):
"details and update your config file."
)
+ # if we have a rewrite rule set for the identity server,
+ # apply it now.
+ if id_server in self.rewrite_identity_server_urls:
+ id_server = self.rewrite_identity_server_urls[id_server]
try:
data = yield self.http_client.post_json_get_json(
id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
@@ -566,6 +612,89 @@ class IdentityHandler(BaseHandler):
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
raise SynapseError(400, "Error contacting the identity server")
+ # TODO: The following two methods are used for proxying IS requests using
+ # the CS API. They should be consolidated with those in RoomMemberHandler
+ # https://github.com/matrix-org/synapse-dinsic/issues/25
+
+ @defer.inlineCallbacks
+ def proxy_lookup_3pid(self, id_server, medium, address):
+ """Looks up a 3pid in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ Deferred[dict]: The result of the lookup. See
+ https://matrix.org/docs/spec/identity_service/r0.1.0.html#association-lookup
+ for details
+ """
+ if not self._enable_lookup:
+ raise AuthError(
+ 403, "Looking up third-party identifiers is denied from this server"
+ )
+
+ target = self.rewrite_identity_server_urls.get(id_server, id_server)
+
+ try:
+ data = yield self.http_client.get_json(
+ "https://%s/_matrix/identity/api/v1/lookup" % (target,),
+ {"medium": medium, "address": address},
+ )
+
+ if "mxid" in data:
+ if "signatures" not in data:
+ raise AuthError(401, "No signatures on 3pid binding")
+ yield self._verify_any_signature(data, id_server)
+
+ except HttpResponseException as e:
+ logger.info("Proxied lookup failed: %r", e)
+ raise e.to_synapse_error()
+ except IOError as e:
+ logger.info("Failed to contact %r: %s", id_server, e)
+ raise ProxiedRequestError(503, "Failed to contact identity server")
+
+ defer.returnValue(data)
+
+ @defer.inlineCallbacks
+ def proxy_bulk_lookup_3pid(self, id_server, threepids):
+ """Looks up given 3pids in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ threepids ([[str, str]]): The third party identifiers to lookup, as
+ a list of 2-string sized lists ([medium, address]).
+
+ Returns:
+ Deferred[dict]: The result of the lookup. See
+ https://matrix.org/docs/spec/identity_service/r0.1.0.html#association-lookup
+ for details
+ """
+ if not self._enable_lookup:
+ raise AuthError(
+ 403, "Looking up third-party identifiers is denied from this server"
+ )
+
+ target = self.rewrite_identity_server_urls.get(id_server, id_server)
+
+ try:
+ data = yield self.http_client.post_json_get_json(
+ "https://%s/_matrix/identity/api/v1/bulk_lookup" % (target,),
+ {"threepids": threepids},
+ )
+
+ except HttpResponseException as e:
+ logger.info("Proxied lookup failed: %r", e)
+ raise e.to_synapse_error()
+ except IOError as e:
+ logger.info("Failed to contact %r: %s", id_server, e)
+ raise ProxiedRequestError(503, "Failed to contact identity server")
+
+ defer.returnValue(data)
+
@defer.inlineCallbacks
def lookup_3pid(self, id_server, medium, address, id_access_token=None):
"""Looks up a 3pid in the passed identity server.
@@ -581,6 +710,9 @@ class IdentityHandler(BaseHandler):
Returns:
str|None: the matrix ID of the 3pid, or None if it is not recognized.
"""
+ # Rewrite id_server URL if necessary
+ id_server = self._get_id_server_target(id_server)
+
if id_access_token is not None:
try:
results = yield self._lookup_3pid_v2(
@@ -618,7 +750,7 @@ class IdentityHandler(BaseHandler):
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
- data = yield self.blacklisting_http_client.get_json(
+ data = yield self.http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
{"medium": medium, "address": address},
)
@@ -651,7 +783,7 @@ class IdentityHandler(BaseHandler):
"""
# Check what hashing details are supported by this identity server
try:
- hash_details = yield self.blacklisting_http_client.get_json(
+ hash_details = yield self.http_client.get_json(
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
{"access_token": id_access_token},
)
@@ -669,7 +801,7 @@ class IdentityHandler(BaseHandler):
400,
"Non-dict object from %s%s during v2 hash_details request: %s"
% (id_server_scheme, id_server, hash_details),
- )
+ )
# Extract information from hash_details
supported_lookup_algorithms = hash_details.get("algorithms")
@@ -684,7 +816,7 @@ class IdentityHandler(BaseHandler):
400,
"Invalid hash details received from identity server %s%s: %s"
% (id_server_scheme, id_server, hash_details),
- )
+ )
# Check if any of the supported lookup algorithms are present
if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
@@ -718,7 +850,7 @@ class IdentityHandler(BaseHandler):
headers = {"Authorization": create_id_access_token_header(id_access_token)}
try:
- lookup_results = yield self.blacklisting_http_client.post_json_get_json(
+ lookup_results = yield self.http_client.post_json_get_json(
"%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
{
"addresses": [lookup_value],
@@ -726,7 +858,7 @@ class IdentityHandler(BaseHandler):
"pepper": lookup_pepper,
},
headers=headers,
- )
+ )
except TimeoutError:
raise SynapseError(500, "Timed out contacting identity server")
except Exception as e:
@@ -750,14 +882,15 @@ class IdentityHandler(BaseHandler):
def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
+
for key_name, signature in data["signatures"][server_hostname].items():
- try:
- key_data = yield self.blacklisting_http_client.get_json(
- "%s%s/_matrix/identity/api/v1/pubkey/%s"
- % (id_server_scheme, server_hostname, key_name)
- )
- except TimeoutError:
- raise SynapseError(500, "Timed out contacting identity server")
+ target = self.rewrite_identity_server_urls.get(
+ server_hostname, server_hostname
+ )
+
+ key_data = yield self.http_client.get_json(
+ "https://%s/_matrix/identity/api/v1/pubkey/%s" % (target, key_name)
+ )
if "public_key" not in key_data:
raise AuthError(
401, "No public key named %s from %s" % (key_name, server_hostname)
@@ -771,6 +904,23 @@ class IdentityHandler(BaseHandler):
)
return
+ raise AuthError(401, "No signature from server %s" % (server_hostname,))
+
+ def _get_id_server_target(self, id_server):
+ """Looks up an id_server's actual http endpoint
+
+ Args:
+ id_server (str): the server name to lookup.
+
+ Returns:
+ the http endpoint to connect to.
+ """
+ if id_server in self.rewrite_identity_server_urls:
+ return self.rewrite_identity_server_urls[id_server]
+
+ return id_server
+
+
@defer.inlineCallbacks
def ask_id_server_for_third_party_invite(
self,
@@ -831,6 +981,9 @@ class IdentityHandler(BaseHandler):
"sender_avatar_url": inviter_avatar_url,
}
+ # Rewrite the identity server URL if necessary
+ id_server = self._get_id_server_target(id_server)
+
# Add the identity service access token to the JSON body and use the v2
# Identity Service endpoints if id_access_token is present
data = None
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bdf16c84d3..be6ae18a92 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -932,10 +932,9 @@ class EventCreationHandler(object):
# way? If we have been invited by a remote server, we need
# to get them to sign the event.
- returned_invite = yield federation_handler.send_invite(
- invitee.domain, event
+ returned_invite = yield defer.ensureDeferred(
+ federation_handler.send_invite(invitee.domain, event)
)
-
event.unsigned.pop("room_state", None)
# TODO: Make sure the signatures actually are correct.
diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py
new file mode 100644
index 0000000000..d06b110269
--- /dev/null
+++ b/synapse/handlers/password_policy.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from synapse.api.errors import Codes, PasswordRefusedError
+
+logger = logging.getLogger(__name__)
+
+
+class PasswordPolicyHandler(object):
+ def __init__(self, hs):
+ self.policy = hs.config.password_policy
+ self.enabled = hs.config.password_policy_enabled
+
+ # Regexps for the spec'd policy parameters.
+ self.regexp_digit = re.compile("[0-9]")
+ self.regexp_symbol = re.compile("[^a-zA-Z0-9]")
+ self.regexp_uppercase = re.compile("[A-Z]")
+ self.regexp_lowercase = re.compile("[a-z]")
+
+ def validate_password(self, password):
+ """Checks whether a given password complies with the server's policy.
+
+ Args:
+ password (str): The password to check against the server's policy.
+
+ Raises:
+ PasswordRefusedError: The password doesn't comply with the server's policy.
+ """
+
+ if not self.enabled:
+ return
+
+ minimum_accepted_length = self.policy.get("minimum_length", 0)
+ if len(password) < minimum_accepted_length:
+ raise PasswordRefusedError(
+ msg=(
+ "The password must be at least %d characters long"
+ % minimum_accepted_length
+ ),
+ errcode=Codes.PASSWORD_TOO_SHORT,
+ )
+
+ if (
+ self.policy.get("require_digit", False)
+ and self.regexp_digit.search(password) is None
+ ):
+ raise PasswordRefusedError(
+ msg="The password must include at least one digit",
+ errcode=Codes.PASSWORD_NO_DIGIT,
+ )
+
+ if (
+ self.policy.get("require_symbol", False)
+ and self.regexp_symbol.search(password) is None
+ ):
+ raise PasswordRefusedError(
+ msg="The password must include at least one symbol",
+ errcode=Codes.PASSWORD_NO_SYMBOL,
+ )
+
+ if (
+ self.policy.get("require_uppercase", False)
+ and self.regexp_uppercase.search(password) is None
+ ):
+ raise PasswordRefusedError(
+ msg="The password must include at least one uppercase letter",
+ errcode=Codes.PASSWORD_NO_UPPERCASE,
+ )
+
+ if (
+ self.policy.get("require_lowercase", False)
+ and self.regexp_lowercase.search(password) is None
+ ):
+ raise PasswordRefusedError(
+ msg="The password must include at least one lowercase letter",
+ errcode=Codes.PASSWORD_NO_LOWERCASE,
+ )
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index f9579d69ee..75227ae34b 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,8 +17,11 @@
import logging
from six import raise_from
+from six.moves import range
-from twisted.internet import defer
+from signedjson.sign import sign_json
+
+from twisted.internet import defer, reactor
from synapse.api.errors import (
AuthError,
@@ -27,6 +31,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
@@ -46,6 +51,8 @@ class BaseProfileHandler(BaseHandler):
subclass MasterProfileHandler
"""
+ PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
def __init__(self, hs):
super(BaseProfileHandler, self).__init__(hs)
@@ -56,6 +63,87 @@ class BaseProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
+ self.http_client = hs.get_simple_http_client()
+
+ self.max_avatar_size = hs.config.max_avatar_size
+ self.allowed_avatar_mimetypes = hs.config.allowed_avatar_mimetypes
+
+ if hs.config.worker_app is None:
+ self.clock.looping_call(
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
+ )
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ reactor.callWhenRunning(self._assign_profile_replication_batches)
+ reactor.callWhenRunning(self._replicate_profiles)
+ # Add a looping call to replicate_profiles: this handles retries
+ # if the replication is unsuccessful when the user updated their
+ # profile.
+ self.clock.looping_call(
+ self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+ )
+
+ @defer.inlineCallbacks
+ def _assign_profile_replication_batches(self):
+ """If no profile replication has been done yet, allocate replication batch
+ numbers to each profile to start the replication process.
+ """
+ logger.info("Assigning profile batch numbers...")
+ total = 0
+ while True:
+ assigned = yield self.store.assign_profile_batch()
+ total += assigned
+ if assigned == 0:
+ break
+ logger.info("Assigned %d profile batch numbers", total)
+
+ @defer.inlineCallbacks
+ def _replicate_profiles(self):
+ """If any profile data has been updated and not pushed to the replication targets,
+ replicate it.
+ """
+ host_batches = yield self.store.get_replication_hosts()
+ latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+ if latest_batch is None:
+ latest_batch = -1
+ for repl_host in self.hs.config.replicate_user_profiles_to:
+ if repl_host not in host_batches:
+ host_batches[repl_host] = -1
+ try:
+ for i in range(host_batches[repl_host] + 1, latest_batch + 1):
+ yield self._replicate_host_profile_batch(repl_host, i)
+ except Exception:
+ logger.exception(
+ "Exception while replicating to %s: aborting for now", repl_host
+ )
+
+ @defer.inlineCallbacks
+ def _replicate_host_profile_batch(self, host, batchnum):
+ logger.info("Replicating profile batch %d to %s", batchnum, host)
+ batch_rows = yield self.store.get_profile_batch(batchnum)
+ batch = {
+ UserID(r["user_id"], self.hs.hostname).to_string(): (
+ {"display_name": r["displayname"], "avatar_url": r["avatar_url"]}
+ if r["active"]
+ else None
+ )
+ for r in batch_rows
+ }
+
+ url = "https://%s/_matrix/identity/api/v1/replicate_profiles" % (host,)
+ body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname}
+ signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
+ try:
+ yield self.http_client.post_json_get_json(url, signed_body)
+ yield self.store.update_replication_batch_for_host(host, batchnum)
+ logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+ except Exception:
+ # This will get retried when the looping call next comes around
+ logger.exception(
+ "Failed to replicate profile batch %d to %s", batchnum, host
+ )
+ raise
+
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -154,9 +242,16 @@ class BaseProfileHandler(BaseHandler):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
- if not by_admin and target_user != requester.user:
+ if not by_admin and requester and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
+ if not by_admin and self.hs.config.disable_set_displayname:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.display_name:
+ raise SynapseError(
+ 400, "Changing displayname is disabled on this server"
+ )
+
if len(new_displayname) > MAX_DISPLAYNAME_LEN:
raise SynapseError(
400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,)
@@ -165,7 +260,17 @@ class BaseProfileHandler(BaseHandler):
if new_displayname == "":
new_displayname = None
- yield self.store.set_profile_displayname(target_user.localpart, new_displayname)
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = (
+ yield self.store.get_latest_profile_replication_batch_number()
+ )
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
+ yield self.store.set_profile_displayname(
+ target_user.localpart, new_displayname, new_batchnum
+ )
if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -173,7 +278,39 @@ class BaseProfileHandler(BaseHandler):
target_user.to_string(), profile
)
- yield self._update_join_states(requester, target_user)
+ if requester:
+ yield self._update_join_states(requester, target_user)
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
+ @defer.inlineCallbacks
+ def set_active(self, target_user, active, hide):
+ """
+ Sets the 'active' flag on a user profile. If set to false, the user
+ account is considered deactivated or hidden.
+
+ If 'hide' is true, then we interpret active=False as a request to try to
+ hide the user rather than deactivating it. This means withholding the
+ profile from replication (and mark it as inactive) rather than clearing
+ the profile from the HS DB. Note that unlike set_displayname and
+ set_avatar_url, this does *not* perform authorization checks! This is
+ because the only place it's used currently is in account deactivation
+ where we've already done these checks anyway.
+ """
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = (
+ yield self.store.get_latest_profile_replication_batch_number()
+ )
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+ yield self.store.set_profile_active(
+ target_user.localpart, active, hide, new_batchnum
+ )
+
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
@defer.inlineCallbacks
def get_avatar_url(self, target_user):
@@ -212,12 +349,59 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
+ if not by_admin and self.hs.config.disable_set_avatar_url:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ if profile.avatar_url:
+ raise SynapseError(
+ 400, "Changing avatar url is disabled on this server"
+ )
+
+ if len(self.hs.config.replicate_user_profiles_to) > 0:
+ cur_batchnum = (
+ yield self.store.get_latest_profile_replication_batch_number()
+ )
+ new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+ else:
+ new_batchnum = None
+
if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
raise SynapseError(
400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN,)
)
- yield self.store.set_profile_avatar_url(target_user.localpart, new_avatar_url)
+ # Enforce a max avatar size if one is defined
+ if self.max_avatar_size or self.allowed_avatar_mimetypes:
+ media_id = self._validate_and_parse_media_id_from_avatar_url(new_avatar_url)
+
+ # Check that this media exists locally
+ media_info = yield self.store.get_local_media(media_id)
+ if not media_info:
+ raise SynapseError(
+ 400, "Unknown media id supplied", errcode=Codes.NOT_FOUND
+ )
+
+ # Ensure avatar does not exceed max allowed avatar size
+ media_size = media_info["media_length"]
+ if self.max_avatar_size and media_size > self.max_avatar_size:
+ raise SynapseError(
+ 400,
+ "Avatars must be less than %s bytes in size"
+ % (self.max_avatar_size,),
+ errcode=Codes.TOO_LARGE,
+ )
+
+ # Ensure the avatar's file type is allowed
+ if (
+ self.allowed_avatar_mimetypes
+ and media_info["media_type"] not in self.allowed_avatar_mimetypes
+ ):
+ raise SynapseError(
+ 400, "Avatar file type '%s' not allowed" % media_info["media_type"]
+ )
+
+ yield self.store.set_profile_avatar_url(
+ target_user.localpart, new_avatar_url, new_batchnum
+ )
if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -227,6 +411,23 @@ class BaseProfileHandler(BaseHandler):
yield self._update_join_states(requester, target_user)
+ # start a profile replication push
+ run_in_background(self._replicate_profiles)
+
+ def _validate_and_parse_media_id_from_avatar_url(self, mxc):
+ """Validate and parse a provided avatar url and return the local media id
+
+ Args:
+ mxc (str): A mxc URL
+
+ Returns:
+ str: The ID of the media
+ """
+ avatar_pieces = mxc.split("/")
+ if len(avatar_pieces) != 4 or avatar_pieces[0] != "mxc:":
+ raise SynapseError(400, "Invalid avatar URL '%s' supplied" % mxc)
+ return avatar_pieces[-1]
+
@defer.inlineCallbacks
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
@@ -282,7 +483,7 @@ class BaseProfileHandler(BaseHandler):
@defer.inlineCallbacks
def check_profile_query_allowed(self, target_user, requester=None):
"""Checks whether a profile query is allowed. If the
- 'require_auth_for_profile_requests' config flag is set to True and a
+ 'limit_profile_requests_to_known_users' config flag is set to True and a
'requester' is provided, the query is only allowed if the two users
share a room.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7ffc194f0c..696d90996a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -49,6 +49,7 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
+ self.http_client = hs.get_simple_http_client()
self.identity_handler = self.hs.get_handlers().identity_handler
self.ratelimiter = hs.get_registration_ratelimiter()
@@ -61,6 +62,8 @@ class RegistrationHandler(BaseHandler):
)
self._server_notices_mxid = hs.config.server_notices_mxid
+ self._show_in_user_directory = self.hs.config.show_users_in_user_directory
+
if hs.config.worker_app:
self._register_client = ReplicationRegisterServlet.make_client(hs)
self._register_device_client = RegisterDeviceReplicationServlet.make_client(
@@ -203,6 +206,11 @@ class RegistrationHandler(BaseHandler):
address=address,
)
+ if default_display_name:
+ yield self.profile_handler.set_displayname(
+ user, None, default_display_name, by_admin=True
+ )
+
if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(localpart)
yield self.user_directory_handler.handle_local_profile_change(
@@ -233,6 +241,10 @@ class RegistrationHandler(BaseHandler):
address=address,
)
+ yield self.profile_handler.set_displayname(
+ user, None, default_display_name, by_admin=True
+ )
+
# Successfully registered
break
except SynapseError:
@@ -262,6 +274,14 @@ class RegistrationHandler(BaseHandler):
# Bind email to new account
yield self._register_email_threepid(user_id, threepid_dict, None)
+ # Prevent the new user from showing up in the user directory if the server
+ # mandates it.
+ if not self._show_in_user_directory:
+ yield self.store.add_account_data_for_user(
+ user_id, "im.vector.hide_profile", {"hide_profile": True}
+ )
+ yield self.profile_handler.set_active(user, False, True)
+
return user_id
@defer.inlineCallbacks
@@ -328,7 +348,9 @@ class RegistrationHandler(BaseHandler):
yield self._auto_join_rooms(user_id)
@defer.inlineCallbacks
- def appservice_register(self, user_localpart, as_token):
+ def appservice_register(self, user_localpart, as_token, password, display_name):
+ # FIXME: this should be factored out and merged with normal register()
+
user = UserID(user_localpart, self.hs.hostname)
user_id = user.to_string()
service = self.store.get_app_service_by_token(as_token)
@@ -347,12 +369,29 @@ class RegistrationHandler(BaseHandler):
user_id, allowed_appservice=service
)
+ password_hash = ""
+ if password:
+ password_hash = yield self.auth_handler().hash(password)
+
+ display_name = display_name or user.localpart
+
yield self.register_with_store(
user_id=user_id,
- password_hash="",
+ password_hash=password_hash,
appservice_id=service_id,
- create_profile_with_displayname=user.localpart,
+ create_profile_with_displayname=display_name,
)
+
+ yield self.profile_handler.set_displayname(
+ user, None, display_name, by_admin=True
+ )
+
+ if self.hs.config.user_directory_search_all_users:
+ profile = yield self.store.get_profileinfo(user_localpart)
+ yield self.user_directory_handler.handle_local_profile_change(
+ user_id, profile
+ )
+
return user_id
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
@@ -380,6 +419,39 @@ class RegistrationHandler(BaseHandler):
)
@defer.inlineCallbacks
+ def shadow_register(self, localpart, display_name, auth_result, params):
+ """Invokes the current registration on another server, using
+ shared secret registration, passing in any auth_results from
+ other registration UI auth flows (e.g. validated 3pids)
+ Useful for setting up shadow/backup accounts on a parallel deployment.
+ """
+
+ # TODO: retries
+ shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+ as_token = self.hs.config.shadow_server.get("as_token")
+
+ yield self.http_client.post_json_get_json(
+ "%s/_matrix/client/r0/register?access_token=%s" % (shadow_hs_url, as_token),
+ {
+ # XXX: auth_result is an unspecified extension for shadow registration
+ "auth_result": auth_result,
+ # XXX: another unspecified extension for shadow registration to ensure
+ # that the displayname is correctly set by the masters erver
+ "display_name": display_name,
+ "username": localpart,
+ "password": params.get("password"),
+ "bind_email": params.get("bind_email"),
+ "bind_msisdn": params.get("bind_msisdn"),
+ "device_id": params.get("device_id"),
+ "initial_device_display_name": params.get(
+ "initial_device_display_name"
+ ),
+ "inhibit_login": False,
+ "access_token": as_token,
+ },
+ )
+
+ @defer.inlineCallbacks
def _generate_user_id(self):
if self._next_generated_user_id is None:
with (yield self._generate_user_id_linearizer.queue(())):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b609a65f47..ccf0e962f6 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -64,12 +64,14 @@ class RoomCreationHandler(BaseHandler):
"history_visibility": "shared",
"original_invitees_have_ops": False,
"guest_can_join": True,
+ "encryption_alg": "m.megolm.v1.aes-sha2",
},
RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
"join_rules": JoinRules.INVITE,
"history_visibility": "shared",
"original_invitees_have_ops": True,
"guest_can_join": True,
+ "encryption_alg": "m.megolm.v1.aes-sha2",
},
RoomCreationPreset.PUBLIC_CHAT: {
"join_rules": JoinRules.PUBLIC,
@@ -259,7 +261,7 @@ class RoomCreationHandler(BaseHandler):
for v in ("invite", "events_default"):
current = int(pl_content.get(v, 0))
if current < restricted_level:
- logger.info(
+ logger.debug(
"Setting level for %s in %s to %i (was %i)",
v,
old_room_id,
@@ -269,7 +271,7 @@ class RoomCreationHandler(BaseHandler):
pl_content[v] = restricted_level
updated = True
else:
- logger.info("Not setting level for %s (already %i)", v, current)
+ logger.debug("Not setting level for %s (already %i)", v, current)
if updated:
try:
@@ -296,7 +298,7 @@ class RoomCreationHandler(BaseHandler):
EventTypes.Aliases, events_default
)
- logger.info("Setting correct PLs in new room to %s", new_pl_content)
+ logger.debug("Setting correct PLs in new room to %s", new_pl_content)
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
@@ -332,7 +334,19 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
- if not self.spam_checker.user_may_create_room(user_id):
+ if (
+ self._server_notices_mxid is not None
+ and requester.user.to_string() == self._server_notices_mxid
+ ):
+ # allow the server notices mxid to create rooms
+ is_requester_admin = True
+
+ else:
+ is_requester_admin = yield self.auth.is_server_admin(requester.user)
+
+ if not is_requester_admin and not self.spam_checker.user_may_create_room(
+ user_id, invite_list=[], third_party_invite_list=[], cloning=True
+ ):
raise SynapseError(403, "You are not permitted to create rooms")
creation_content = {
@@ -579,12 +593,22 @@ class RoomCreationHandler(BaseHandler):
# Check whether the third party rules allows/changes the room create
# request.
- yield self.third_party_event_rules.on_create_room(
+ event_allowed = yield self.third_party_event_rules.on_create_room(
requester, config, is_requester_admin=is_requester_admin
)
+ if not event_allowed:
+ raise SynapseError(
+ 403, "You are not permitted to create rooms", Codes.FORBIDDEN
+ )
+
+ invite_list = config.get("invite", [])
+ invite_3pid_list = config.get("invite_3pid", [])
if not is_requester_admin and not self.spam_checker.user_may_create_room(
- user_id
+ user_id,
+ invite_list=invite_list,
+ third_party_invite_list=invite_3pid_list,
+ cloning=False,
):
raise SynapseError(403, "You are not permitted to create rooms")
@@ -619,7 +643,6 @@ class RoomCreationHandler(BaseHandler):
else:
room_alias = None
- invite_list = config.get("invite", [])
for i in invite_list:
try:
uid = UserID.from_string(i)
@@ -641,8 +664,6 @@ class RoomCreationHandler(BaseHandler):
% (user_id,),
)
- invite_3pid_list = config.get("invite_3pid", [])
-
visibility = config.get("visibility", None)
is_public = visibility == "public"
@@ -732,6 +753,7 @@ class RoomCreationHandler(BaseHandler):
"invite",
ratelimit=False,
content=content,
+ new_room=True,
)
for invite_3pid in invite_3pid_list:
@@ -747,6 +769,7 @@ class RoomCreationHandler(BaseHandler):
id_server,
requester,
txn_id=None,
+ new_room=True,
id_access_token=id_access_token,
)
@@ -782,7 +805,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
- logger.info("Sending %s in new room", etype)
+ logger.debug("Sending %s in new room", etype)
yield self.event_creation_handler.create_and_send_nonmember_event(
creator, event, ratelimit=False
)
@@ -796,7 +819,7 @@ class RoomCreationHandler(BaseHandler):
creation_content.update({"creator": creator_id})
yield send(etype=EventTypes.Create, content=creation_content)
- logger.info("Sending %s in new room", EventTypes.Member)
+ logger.debug("Sending %s in new room", EventTypes.Member)
yield self.room_member_handler.update_membership(
creator,
creator.user,
@@ -804,6 +827,7 @@ class RoomCreationHandler(BaseHandler):
"join",
ratelimit=False,
content=creator_join_profile,
+ new_room=True,
)
# We treat the power levels override specially as this needs to be one
@@ -869,6 +893,13 @@ class RoomCreationHandler(BaseHandler):
for (etype, state_key), content in initial_state.items():
yield send(etype=etype, state_key=state_key, content=content)
+ if "encryption_alg" in config:
+ yield send(
+ etype=EventTypes.Encryption,
+ state_key="",
+ content={"algorithm": config["encryption_alg"]},
+ )
+
@defer.inlineCallbacks
def _generate_room_id(
self, creator_id: str, is_public: str, room_version: RoomVersion,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 15e8aa5249..decef944ff 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -24,13 +24,20 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
+from synapse.api.ratelimiting import Ratelimiter
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ HttpResponseException,
+ SynapseError,
+)
+from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header
+from synapse.http.client import SimpleHttpClient
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import Collection, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
-from ._base import BaseHandler
-
logger = logging.getLogger(__name__)
@@ -60,6 +67,7 @@ class RoomMemberHandler(object):
self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.identity_handler = hs.get_handlers().identity_handler
self.member_linearizer = Linearizer(name="member")
@@ -67,13 +75,10 @@ class RoomMemberHandler(object):
self.spam_checker = hs.get_spam_checker()
self.third_party_event_rules = hs.get_third_party_event_rules()
self._server_notices_mxid = self.config.server_notices_mxid
+ self.rewrite_identity_server_urls = self.config.rewrite_identity_server_urls
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
-
- # This is only used to get at ratelimit function, and
- # maybe_kick_guest_users. It's fine there are multiple of these as
- # it doesn't store state.
- self.base_handler = BaseHandler(hs)
+ self.ratelimiter = Ratelimiter()
@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -265,8 +270,31 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
+ new_room=False,
require_consent=True,
):
+ """Update a users membership in a room
+
+ Args:
+ requester (Requester)
+ target (UserID)
+ room_id (str)
+ action (str): The "action" the requester is performing against the
+ target. One of join/leave/kick/ban/invite/unban.
+ txn_id (str|None): The transaction ID associated with the request,
+ or None not provided.
+ remote_room_hosts (list[str]|None): List of remote servers to try
+ and join via if server isn't already in the room.
+ third_party_signed (dict|None): The signed object for third party
+ invites.
+ ratelimit (bool): Whether to apply ratelimiting to this request.
+ content (dict|None): Fields to include in the new events content.
+ new_room (bool): Whether these membership changes are happening
+ as part of a room creation (e.g. initial joins and invites)
+
+ Returns:
+ Deferred[FrozenEvent]
+ """
key = (room_id,)
with (yield self.member_linearizer.queue(key)):
@@ -280,6 +308,7 @@ class RoomMemberHandler(object):
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
+ new_room=new_room,
require_consent=require_consent,
)
@@ -297,6 +326,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
+ new_room=False,
require_consent=True,
):
content_specified = bool(content)
@@ -361,8 +391,15 @@ class RoomMemberHandler(object):
)
block_invite = True
+ is_published = yield self.store.is_room_published(room_id)
+
if not self.spam_checker.user_may_invite(
- requester.user.to_string(), target.to_string(), room_id
+ requester.user.to_string(),
+ target.to_string(),
+ third_party_invite=None,
+ room_id=room_id,
+ new_room=new_room,
+ published_room=is_published,
):
logger.info("Blocking invite due to spam checker")
block_invite = True
@@ -434,8 +471,26 @@ class RoomMemberHandler(object):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
+ if (
+ self._server_notices_mxid is not None
+ and requester.user.to_string() == self._server_notices_mxid
+ ):
+ # allow the server notices mxid to join rooms
+ is_requester_admin = True
+
+ else:
+ is_requester_admin = yield self.auth.is_server_admin(requester.user)
+
+ inviter = yield self._get_inviter(target.to_string(), room_id)
+ if not is_requester_admin:
+ # We assume that if the spam checker allowed the user to create
+ # a room then they're allowed to join it.
+ if not new_room and not self.spam_checker.user_may_join_room(
+ target.to_string(), room_id, is_invited=inviter is not None
+ ):
+ raise SynapseError(403, "Not allowed to join this room")
+
if not is_host_in_room:
- inviter = yield self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
@@ -706,6 +761,7 @@ class RoomMemberHandler(object):
id_server,
requester,
txn_id,
+ new_room=False,
id_access_token=None,
):
if self.config.block_non_admin_invites:
@@ -717,7 +773,23 @@ class RoomMemberHandler(object):
# We need to rate limit *before* we send out any 3PID invites, so we
# can't just rely on the standard ratelimiting of events.
- yield self.base_handler.ratelimit(requester)
+ self.ratelimiter.ratelimit(
+ requester.user.to_string(),
+ time_now_s=self.hs.clock.time(),
+ rate_hz=self.hs.config.rc_third_party_invite.per_second,
+ burst_count=self.hs.config.rc_third_party_invite.burst_count,
+ update=True,
+ )
+
+ can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited(
+ medium, address, room_id
+ )
+ if not can_invite:
+ raise SynapseError(
+ 403,
+ "This third-party identifier can not be invited in this room",
+ Codes.FORBIDDEN,
+ )
can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited(
medium, address, room_id
@@ -738,6 +810,19 @@ class RoomMemberHandler(object):
id_server, medium, address, id_access_token
)
+ is_published = yield self.store.is_room_published(room_id)
+
+ if not self.spam_checker.user_may_invite(
+ requester.user.to_string(),
+ invitee,
+ third_party_invite={"medium": medium, "address": address},
+ room_id=room_id,
+ new_room=new_room,
+ published_room=is_published,
+ ):
+ logger.info("Blocking invite due to spam checker")
+ raise SynapseError(403, "Invites have been disabled on this server")
+
if invitee:
yield self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
@@ -944,8 +1029,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- yield self.federation_handler.do_invite_join(
- remote_room_hosts, room_id, user.to_string(), content
+ yield defer.ensureDeferred(
+ self.federation_handler.do_invite_join(
+ remote_room_hosts, room_id, user.to_string(), content
+ )
)
yield self._user_joined_room(user, room_id)
@@ -982,8 +1069,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"""
fed_handler = self.federation_handler
try:
- ret = yield fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string(), content=content,
+ ret = yield defer.ensureDeferred(
+ fed_handler.do_remotely_reject_invite(
+ remote_room_hosts, room_id, target.to_string(), content=content,
+ )
)
return ret
except Exception as e:
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index d90c9e0108..3f50d6de47 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2017 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,12 +31,15 @@ class SetPasswordHandler(BaseHandler):
super(SetPasswordHandler, self).__init__(hs)
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
+ self._password_policy_handler = hs.get_password_policy_handler()
@defer.inlineCallbacks
def set_password(self, user_id, newpassword, requester=None):
if not self.hs.config.password_localdb_enabled:
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
+ self._password_policy_handler.validate_password(newpassword)
+
password_hash = yield self._auth_handler.hash(newpassword)
except_device_id = requester.device_id if requester else None
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 68e6edace5..d93a276693 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -300,7 +300,7 @@ class StatsHandler(StateDeltasHandler):
room_state["guest_access"] = event_content.get("guest_access")
for room_id, state in room_to_state_updates.items():
- logger.info("Updating room_stats_state for %s: %s", room_id, state)
+ logger.debug("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2b62fd83fd..4324bc702e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -14,20 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import collections
import itertools
import logging
+from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
from six import iteritems, itervalues
+import attr
from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership
+from synapse.api.filtering import FilterCollection
+from synapse.events import EventBase
from synapse.logging.context import LoggingContext
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.types import (
+ Collection,
+ JsonDict,
+ RoomStreamToken,
+ StateMap,
+ StreamToken,
+ UserID,
+)
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
@@ -62,17 +72,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
-SyncConfig = collections.namedtuple(
- "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
-)
+@attr.s(slots=True, frozen=True)
+class SyncConfig:
+ user = attr.ib(type=UserID)
+ filter_collection = attr.ib(type=FilterCollection)
+ is_guest = attr.ib(type=bool)
+ request_key = attr.ib(type=Tuple[Any, ...])
+ device_id = attr.ib(type=str)
-class TimelineBatch(
- collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
-):
- __slots__ = []
+@attr.s(slots=True, frozen=True)
+class TimelineBatch:
+ prev_batch = attr.ib(type=StreamToken)
+ events = attr.ib(type=List[EventBase])
+ limited = attr.ib(bool)
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
@@ -81,23 +96,17 @@ class TimelineBatch(
__bool__ = __nonzero__ # python3
-class JoinedSyncResult(
- collections.namedtuple(
- "JoinedSyncResult",
- [
- "room_id", # str
- "timeline", # TimelineBatch
- "state", # dict[(str, str), FrozenEvent]
- "ephemeral",
- "account_data",
- "unread_notifications",
- "summary",
- ],
- )
-):
- __slots__ = []
-
- def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class JoinedSyncResult:
+ room_id = attr.ib(type=str)
+ timeline = attr.ib(type=TimelineBatch)
+ state = attr.ib(type=StateMap[EventBase])
+ ephemeral = attr.ib(type=List[JsonDict])
+ account_data = attr.ib(type=List[JsonDict])
+ unread_notifications = attr.ib(type=JsonDict)
+ summary = attr.ib(type=Optional[JsonDict])
+
+ def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
@@ -113,20 +122,14 @@ class JoinedSyncResult(
__bool__ = __nonzero__ # python3
-class ArchivedSyncResult(
- collections.namedtuple(
- "ArchivedSyncResult",
- [
- "room_id", # str
- "timeline", # TimelineBatch
- "state", # dict[(str, str), FrozenEvent]
- "account_data",
- ],
- )
-):
- __slots__ = []
-
- def __nonzero__(self):
+@attr.s(slots=True, frozen=True)
+class ArchivedSyncResult:
+ room_id = attr.ib(type=str)
+ timeline = attr.ib(type=TimelineBatch)
+ state = attr.ib(type=StateMap[EventBase])
+ account_data = attr.ib(type=List[JsonDict])
+
+ def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
@@ -135,70 +138,88 @@ class ArchivedSyncResult(
__bool__ = __nonzero__ # python3
-class InvitedSyncResult(
- collections.namedtuple(
- "InvitedSyncResult",
- ["room_id", "invite"], # str # FrozenEvent: the invite event
- )
-):
- __slots__ = []
+@attr.s(slots=True, frozen=True)
+class InvitedSyncResult:
+ room_id = attr.ib(type=str)
+ invite = attr.ib(type=EventBase)
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
"""Invited rooms should always be reported to the client"""
return True
__bool__ = __nonzero__ # python3
-class GroupsSyncResult(
- collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
-):
- __slots__ = []
+@attr.s(slots=True, frozen=True)
+class GroupsSyncResult:
+ join = attr.ib(type=JsonDict)
+ invite = attr.ib(type=JsonDict)
+ leave = attr.ib(type=JsonDict)
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
return bool(self.join or self.invite or self.leave)
__bool__ = __nonzero__ # python3
-class DeviceLists(
- collections.namedtuple(
- "DeviceLists",
- [
- "changed", # list of user_ids whose devices may have changed
- "left", # list of user_ids whose devices we no longer track
- ],
- )
-):
- __slots__ = []
+@attr.s(slots=True, frozen=True)
+class DeviceLists:
+ """
+ Attributes:
+ changed: List of user_ids whose devices may have changed
+ left: List of user_ids whose devices we no longer track
+ """
+
+ changed = attr.ib(type=Collection[str])
+ left = attr.ib(type=Collection[str])
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
return bool(self.changed or self.left)
__bool__ = __nonzero__ # python3
-class SyncResult(
- collections.namedtuple(
- "SyncResult",
- [
- "next_batch", # Token for the next sync
- "presence", # List of presence events for the user.
- "account_data", # List of account_data events for the user.
- "joined", # JoinedSyncResult for each joined room.
- "invited", # InvitedSyncResult for each invited room.
- "archived", # ArchivedSyncResult for each archived room.
- "to_device", # List of direct messages for the device.
- "device_lists", # List of user_ids whose devices have changed
- "device_one_time_keys_count", # Dict of algorithm to count for one time keys
- # for this device
- "groups",
- ],
- )
-):
- __slots__ = []
-
- def __nonzero__(self):
+@attr.s
+class _RoomChanges:
+ """The set of room entries to include in the sync, plus the set of joined
+ and left room IDs since last sync.
+ """
+
+ room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
+ invited = attr.ib(type=List[InvitedSyncResult])
+ newly_joined_rooms = attr.ib(type=List[str])
+ newly_left_rooms = attr.ib(type=List[str])
+
+
+@attr.s(slots=True, frozen=True)
+class SyncResult:
+ """
+ Attributes:
+ next_batch: Token for the next sync
+ presence: List of presence events for the user.
+ account_data: List of account_data events for the user.
+ joined: JoinedSyncResult for each joined room.
+ invited: InvitedSyncResult for each invited room.
+ archived: ArchivedSyncResult for each archived room.
+ to_device: List of direct messages for the device.
+ device_lists: List of user_ids whose devices have changed
+ device_one_time_keys_count: Dict of algorithm to count for one time keys
+ for this device
+ groups: Group updates, if any
+ """
+
+ next_batch = attr.ib(type=StreamToken)
+ presence = attr.ib(type=List[JsonDict])
+ account_data = attr.ib(type=List[JsonDict])
+ joined = attr.ib(type=List[JoinedSyncResult])
+ invited = attr.ib(type=List[InvitedSyncResult])
+ archived = attr.ib(type=List[ArchivedSyncResult])
+ to_device = attr.ib(type=List[JsonDict])
+ device_lists = attr.ib(type=DeviceLists)
+ device_one_time_keys_count = attr.ib(type=JsonDict)
+ groups = attr.ib(type=Optional[GroupsSyncResult])
+
+ def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
@@ -240,13 +261,15 @@ class SyncHandler(object):
)
async def wait_for_sync_for_user(
- self, sync_config, since_token=None, timeout=0, full_state=False
- ):
+ self,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken] = None,
+ timeout: int = 0,
+ full_state: bool = False,
+ ) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
- Returns:
- Deferred[SyncResult]
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@@ -265,8 +288,12 @@ class SyncHandler(object):
return res
async def _wait_for_sync_for_user(
- self, sync_config, since_token, timeout, full_state
- ):
+ self,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken] = None,
+ timeout: int = 0,
+ full_state: bool = False,
+ ) -> SyncResult:
if since_token is None:
sync_type = "initial_sync"
elif full_state:
@@ -305,25 +332,33 @@ class SyncHandler(object):
return result
- def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
+ async def current_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken] = None,
+ full_state: bool = False,
+ ) -> SyncResult:
"""Get the sync for client needed to match what the server has now.
- Returns:
- A Deferred SyncResult.
"""
- return self.generate_sync_result(sync_config, since_token, full_state)
+ return await self.generate_sync_result(sync_config, since_token, full_state)
- async def push_rules_for_user(self, user):
+ async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
rules = await self.store.get_push_rules_for_user(user_id)
rules = format_push_rules_for_user(user, rules)
return rules
- async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
+ async def ephemeral_by_room(
+ self,
+ sync_result_builder: "SyncResultBuilder",
+ now_token: StreamToken,
+ since_token: Optional[StreamToken] = None,
+ ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
"""Get the ephemeral events for each room the user is in
Args:
- sync_result_builder(SyncResultBuilder)
- now_token (StreamToken): Where the server is currently up to.
- since_token (StreamToken): Where the server was when the client
+ sync_result_builder
+ now_token: Where the server is currently up to.
+ since_token: Where the server was when the client
last synced.
Returns:
A tuple of the now StreamToken, updated to reflect the which typing
@@ -348,7 +383,7 @@ class SyncHandler(object):
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
- ephemeral_by_room = {}
+ ephemeral_by_room = {} # type: JsonDict
for event in typing:
# we want to exclude the room_id from the event, but modifying the
@@ -380,13 +415,13 @@ class SyncHandler(object):
async def _load_filtered_recents(
self,
- room_id,
- sync_config,
- now_token,
- since_token=None,
- recents=None,
- newly_joined_room=False,
- ):
+ room_id: str,
+ sync_config: SyncConfig,
+ now_token: StreamToken,
+ since_token: Optional[StreamToken] = None,
+ potential_recents: Optional[List[EventBase]] = None,
+ newly_joined_room: bool = False,
+ ) -> TimelineBatch:
"""
Returns:
a Deferred TimelineBatch
@@ -397,21 +432,29 @@ class SyncHandler(object):
sync_config.filter_collection.blocks_all_room_timeline()
)
- if recents is None or newly_joined_room or timeline_limit < len(recents):
+ if (
+ potential_recents is None
+ or newly_joined_room
+ or timeline_limit < len(potential_recents)
+ ):
limited = True
else:
limited = False
- if recents:
- recents = sync_config.filter_collection.filter_room_timeline(recents)
+ if potential_recents:
+ recents = sync_config.filter_collection.filter_room_timeline(
+ potential_recents
+ )
# We check if there are any state events, if there are then we pass
# all current state events to the filter_events function. This is to
# ensure that we always include current state in the timeline
- current_state_ids = frozenset()
+ current_state_ids = frozenset() # type: FrozenSet[str]
if any(e.is_state() for e in recents):
- current_state_ids = await self.state.get_current_state_ids(room_id)
- current_state_ids = frozenset(itervalues(current_state_ids))
+ current_state_ids_map = await self.state.get_current_state_ids(
+ room_id
+ )
+ current_state_ids = frozenset(itervalues(current_state_ids_map))
recents = await filter_events_for_client(
self.storage,
@@ -463,8 +506,10 @@ class SyncHandler(object):
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
- current_state_ids = await self.state.get_current_state_ids(room_id)
- current_state_ids = frozenset(itervalues(current_state_ids))
+ current_state_ids_map = await self.state.get_current_state_ids(
+ room_id
+ )
+ current_state_ids = frozenset(itervalues(current_state_ids_map))
loaded_recents = await filter_events_for_client(
self.storage,
@@ -493,17 +538,15 @@ class SyncHandler(object):
limited=limited or newly_joined_room,
)
- async def get_state_after_event(self, event, state_filter=StateFilter.all()):
+ async def get_state_after_event(
+ self, event: EventBase, state_filter: StateFilter = StateFilter.all()
+ ) -> StateMap[str]:
"""
Get the room state after the given event
Args:
- event(synapse.events.EventBase): event of interest
- state_filter (StateFilter): The state filter used to fetch state
- from the database.
-
- Returns:
- A Deferred map from ((type, state_key)->Event)
+ event: event of interest
+ state_filter: The state filter used to fetch state from the database.
"""
state_ids = await self.state_store.get_state_ids_for_event(
event.event_id, state_filter=state_filter
@@ -514,18 +557,17 @@ class SyncHandler(object):
return state_ids
async def get_state_at(
- self, room_id, stream_position, state_filter=StateFilter.all()
- ):
+ self,
+ room_id: str,
+ stream_position: StreamToken,
+ state_filter: StateFilter = StateFilter.all(),
+ ) -> StateMap[str]:
""" Get the room state at a particular stream position
Args:
- room_id(str): room for which to get state
- stream_position(StreamToken): point at which to get state
- state_filter (StateFilter): The state filter used to fetch state
- from the database.
-
- Returns:
- A Deferred map from ((type, state_key)->Event)
+ room_id: room for which to get state
+ stream_position: point at which to get state
+ state_filter: The state filter used to fetch state from the database.
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
@@ -546,23 +588,25 @@ class SyncHandler(object):
state = {}
return state
- async def compute_summary(self, room_id, sync_config, batch, state, now_token):
+ async def compute_summary(
+ self,
+ room_id: str,
+ sync_config: SyncConfig,
+ batch: TimelineBatch,
+ state: StateMap[EventBase],
+ now_token: StreamToken,
+ ) -> Optional[JsonDict]:
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
- Args:
- room_id(str):
- sync_config(synapse.handlers.sync.SyncConfig):
- batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
- the room that will be sent to the user.
- state(dict): dict of (type, state_key) -> Event as returned by
- compute_state_delta
- now_token(str): Token of the end of the current batch.
-
- Returns:
- A deferred dict describing the room summary
+ Args
+ room_id
+ sync_config
+ batch: The timeline batch for the room that will be sent to the user.
+ state: State as returned by compute_state_delta
+ now_token: Token of the end of the current batch.
"""
# FIXME: we could/should get this from room_stats when matthew/stats lands
@@ -681,7 +725,7 @@ class SyncHandler(object):
return summary
- def get_lazy_loaded_members_cache(self, cache_key):
+ def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
@@ -692,23 +736,24 @@ class SyncHandler(object):
return cache
async def compute_state_delta(
- self, room_id, batch, sync_config, since_token, now_token, full_state
- ):
+ self,
+ room_id: str,
+ batch: TimelineBatch,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken],
+ now_token: StreamToken,
+ full_state: bool,
+ ) -> StateMap[EventBase]:
""" Works out the difference in state between the start of the timeline
and the previous sync.
Args:
- room_id(str):
- batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
- the room that will be sent to the user.
- sync_config(synapse.handlers.sync.SyncConfig):
- since_token(str|None): Token of the end of the previous batch. May
- be None.
- now_token(str): Token of the end of the current batch.
- full_state(bool): Whether to force returning the full state.
-
- Returns:
- A deferred dict of (type, state_key) -> Event
+ room_id:
+ batch: The timeline batch for the room that will be sent to the user.
+ sync_config:
+ since_token: Token of the end of the previous batch. May be None.
+ now_token: Token of the end of the current batch.
+ full_state: Whether to force returning the full state.
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -800,6 +845,10 @@ class SyncHandler(object):
# about them).
state_filter = StateFilter.all()
+ # If this is an initial sync then full_state should be set, and
+ # that case is handled above. We assert here to ensure that this
+ # is indeed the case.
+ assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
)
@@ -874,7 +923,7 @@ class SyncHandler(object):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)
- state = {}
+ state = {} # type: Dict[str, EventBase]
if state_ids:
state = await self.store.get_events(list(state_ids.values()))
@@ -886,7 +935,9 @@ class SyncHandler(object):
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
- async def unread_notifs_for_room_id(self, room_id, sync_config):
+ async def unread_notifs_for_room_id(
+ self, room_id: str, sync_config: SyncConfig
+ ) -> Optional[Dict[str, str]]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
@@ -894,7 +945,6 @@ class SyncHandler(object):
receipt_type="m.read",
)
- notifs = []
if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
@@ -906,17 +956,12 @@ class SyncHandler(object):
return None
async def generate_sync_result(
- self, sync_config, since_token=None, full_state=False
- ):
+ self,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken] = None,
+ full_state: bool = False,
+ ) -> SyncResult:
"""Generates a sync result.
-
- Args:
- sync_config (SyncConfig)
- since_token (StreamToken)
- full_state (bool)
-
- Returns:
- Deferred(SyncResult)
"""
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
@@ -924,7 +969,7 @@ class SyncHandler(object):
# Always use the `now_token` in `SyncResultBuilder`
now_token = await self.event_sources.get_current_token()
- logger.info(
+ logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
@@ -978,7 +1023,7 @@ class SyncHandler(object):
)
device_id = sync_config.device_id
- one_time_key_counts = {}
+ one_time_key_counts = {} # type: JsonDict
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
@@ -1008,7 +1053,9 @@ class SyncHandler(object):
)
@measure_func("_generate_sync_entry_for_groups")
- async def _generate_sync_entry_for_groups(self, sync_result_builder):
+ async def _generate_sync_entry_for_groups(
+ self, sync_result_builder: "SyncResultBuilder"
+ ) -> None:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
@@ -1053,27 +1100,22 @@ class SyncHandler(object):
@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
- sync_result_builder,
- newly_joined_rooms,
- newly_joined_or_invited_users,
- newly_left_rooms,
- newly_left_users,
- ):
+ sync_result_builder: "SyncResultBuilder",
+ newly_joined_rooms: Set[str],
+ newly_joined_or_invited_users: Set[str],
+ newly_left_rooms: Set[str],
+ newly_left_users: Set[str],
+ ) -> DeviceLists:
"""Generate the DeviceLists section of sync
Args:
- sync_result_builder (SyncResultBuilder)
- newly_joined_rooms (set[str]): Set of rooms user has joined since
- previous sync
- newly_joined_or_invited_users (set[str]): Set of users that have
- joined or been invited to a room since previous sync.
- newly_left_rooms (set[str]): Set of rooms user has left since
+ sync_result_builder
+ newly_joined_rooms: Set of rooms user has joined since previous sync
+ newly_joined_or_invited_users: Set of users that have joined or
+ been invited to a room since previous sync.
+ newly_left_rooms: Set of rooms user has left since previous sync
+ newly_left_users: Set of users that have left a room we're in since
previous sync
- newly_left_users (set[str]): Set of users that have left a room
- we're in since previous sync
-
- Returns:
- Deferred[DeviceLists]
"""
user_id = sync_result_builder.sync_config.user.to_string()
@@ -1134,15 +1176,11 @@ class SyncHandler(object):
else:
return DeviceLists(changed=[], left=[])
- async def _generate_sync_entry_for_to_device(self, sync_result_builder):
+ async def _generate_sync_entry_for_to_device(
+ self, sync_result_builder: "SyncResultBuilder"
+ ) -> None:
"""Generates the portion of the sync response. Populates
`sync_result_builder` with the result.
-
- Args:
- sync_result_builder(SyncResultBuilder)
-
- Returns:
- Deferred(dict): A dictionary containing the per room account data.
"""
user_id = sync_result_builder.sync_config.user.to_string()
device_id = sync_result_builder.sync_config.device_id
@@ -1180,15 +1218,17 @@ class SyncHandler(object):
else:
sync_result_builder.to_device = []
- async def _generate_sync_entry_for_account_data(self, sync_result_builder):
+ async def _generate_sync_entry_for_account_data(
+ self, sync_result_builder: "SyncResultBuilder"
+ ) -> Dict[str, Dict[str, JsonDict]]:
"""Generates the account data portion of the sync response. Populates
`sync_result_builder` with the result.
Args:
- sync_result_builder(SyncResultBuilder)
+ sync_result_builder
Returns:
- Deferred(dict): A dictionary containing the per room account data.
+ A dictionary containing the per room account data.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
@@ -1232,18 +1272,21 @@ class SyncHandler(object):
return account_data_by_room
async def _generate_sync_entry_for_presence(
- self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
- ):
+ self,
+ sync_result_builder: "SyncResultBuilder",
+ newly_joined_rooms: Set[str],
+ newly_joined_or_invited_users: Set[str],
+ ) -> None:
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
Args:
- sync_result_builder(SyncResultBuilder)
- newly_joined_rooms(list): List of rooms that the user has joined
- since the last sync (or empty if an initial sync)
- newly_joined_or_invited_users(list): List of users that have joined
- or been invited to rooms since the last sync (or empty if an initial
- sync)
+ sync_result_builder
+ newly_joined_rooms: Set of rooms that the user has joined since
+ the last sync (or empty if an initial sync)
+ newly_joined_or_invited_users: Set of users that have joined or
+ been invited to rooms since the last sync (or empty if an
+ initial sync)
"""
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1287,17 +1330,19 @@ class SyncHandler(object):
sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms(
- self, sync_result_builder, account_data_by_room
- ):
+ self,
+ sync_result_builder: "SyncResultBuilder",
+ account_data_by_room: Dict[str, Dict[str, JsonDict]],
+ ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
Args:
- sync_result_builder(SyncResultBuilder)
- account_data_by_room(dict): Dictionary of per room account data
+ sync_result_builder
+ account_data_by_room: Dictionary of per room account data
Returns:
- Deferred(tuple): Returns a 4-tuple of
+ Returns a 4-tuple of
`(newly_joined_rooms, newly_joined_or_invited_users,
newly_left_rooms, newly_left_users)`
"""
@@ -1308,7 +1353,7 @@ class SyncHandler(object):
)
if block_all_room_ephemeral:
- ephemeral_by_room = {}
+ ephemeral_by_room = {} # type: Dict[str, List[JsonDict]]
else:
now_token, ephemeral_by_room = await self.ephemeral_by_room(
sync_result_builder,
@@ -1329,7 +1374,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
- return [], [], [], []
+ return set(), set(), set(), set()
ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
@@ -1341,19 +1386,22 @@ class SyncHandler(object):
ignored_users = frozenset()
if since_token:
- res = await self._get_rooms_changed(sync_result_builder, ignored_users)
- room_entries, invited, newly_joined_rooms, newly_left_rooms = res
-
+ room_changes = await self._get_rooms_changed(
+ sync_result_builder, ignored_users
+ )
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
- res = await self._get_all_rooms(sync_result_builder, ignored_users)
- room_entries, invited, newly_joined_rooms = res
- newly_left_rooms = []
+ room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
tags_by_room = await self.store.get_tags_for_user(user_id)
+ room_entries = room_changes.room_entries
+ invited = room_changes.invited
+ newly_joined_rooms = room_changes.newly_joined_rooms
+ newly_left_rooms = room_changes.newly_left_rooms
+
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
@@ -1393,13 +1441,15 @@ class SyncHandler(object):
newly_left_users -= newly_joined_or_invited_users
return (
- newly_joined_rooms,
+ set(newly_joined_rooms),
newly_joined_or_invited_users,
- newly_left_rooms,
+ set(newly_left_rooms),
newly_left_users,
)
- async def _have_rooms_changed(self, sync_result_builder):
+ async def _have_rooms_changed(
+ self, sync_result_builder: "SyncResultBuilder"
+ ) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
"""
@@ -1423,22 +1473,10 @@ class SyncHandler(object):
return True
return False
- async def _get_rooms_changed(self, sync_result_builder, ignored_users):
+ async def _get_rooms_changed(
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ ) -> _RoomChanges:
"""Gets the the changes that have happened since the last sync.
-
- Args:
- sync_result_builder(SyncResultBuilder)
- ignored_users(set(str)): Set of users ignored by user.
-
- Returns:
- Deferred(tuple): Returns a tuple of the form:
- `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
-
- where:
- room_entries is a list [RoomSyncResultBuilder]
- invited_rooms is a list [InvitedSyncResult]
- newly_joined_rooms is a list[str] of room ids
- newly_left_rooms is a list[str] of room ids
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
@@ -1452,7 +1490,7 @@ class SyncHandler(object):
user_id, since_token.room_key, now_token.room_key
)
- mem_change_events_by_room_id = {}
+ mem_change_events_by_room_id = {} # type: Dict[str, List[EventBase]]
for event in rooms_changed:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
@@ -1461,7 +1499,7 @@ class SyncHandler(object):
room_entries = []
invited = []
for room_id, events in iteritems(mem_change_events_by_room_id):
- logger.info(
+ logger.debug(
"Membership changes in %s: [%s]",
room_id,
", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)),
@@ -1571,7 +1609,7 @@ class SyncHandler(object):
# This is all screaming out for a refactor, as the logic here is
# subtle and the moving parts numerous.
if leave_event.internal_metadata.is_out_of_band_membership():
- batch_events = [leave_event]
+ batch_events = [leave_event] # type: Optional[List[EventBase]]
else:
batch_events = None
@@ -1637,18 +1675,17 @@ class SyncHandler(object):
)
room_entries.append(entry)
- return room_entries, invited, newly_joined_rooms, newly_left_rooms
+ return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
- async def _get_all_rooms(self, sync_result_builder, ignored_users):
+ async def _get_all_rooms(
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ ) -> _RoomChanges:
"""Returns entries for all rooms for the user.
Args:
- sync_result_builder(SyncResultBuilder)
- ignored_users(set(str)): Set of users ignored by user.
+ sync_result_builder
+ ignored_users: Set of users ignored by user.
- Returns:
- Deferred(tuple): Returns a tuple of the form:
- `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
"""
user_id = sync_result_builder.sync_config.user.to_string()
@@ -1710,30 +1747,30 @@ class SyncHandler(object):
)
)
- return room_entries, invited, []
+ return _RoomChanges(room_entries, invited, [], [])
async def _generate_room_entry(
self,
- sync_result_builder,
- ignored_users,
- room_builder,
- ephemeral,
- tags,
- account_data,
- always_include=False,
+ sync_result_builder: "SyncResultBuilder",
+ ignored_users: Set[str],
+ room_builder: "RoomSyncResultBuilder",
+ ephemeral: List[JsonDict],
+ tags: Optional[List[JsonDict]],
+ account_data: Dict[str, JsonDict],
+ always_include: bool = False,
):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
Args:
- sync_result_builder(SyncResultBuilder)
- ignored_users(set(str)): Set of users ignored by user.
- room_builder(RoomSyncResultBuilder)
- ephemeral(list): List of new ephemeral events for room
- tags(list): List of *all* tags for room, or None if there has been
+ sync_result_builder
+ ignored_users: Set of users ignored by user.
+ room_builder
+ ephemeral: List of new ephemeral events for room
+ tags: List of *all* tags for room, or None if there has been
no change.
- account_data(list): List of new account data for room
- always_include(bool): Always include this room in the sync response,
+ account_data: List of new account data for room
+ always_include: Always include this room in the sync response,
even if empty.
"""
newly_joined = room_builder.newly_joined
@@ -1759,7 +1796,7 @@ class SyncHandler(object):
sync_config,
now_token=upto_token,
since_token=since_token,
- recents=events,
+ potential_recents=events,
newly_joined_room=newly_joined,
)
@@ -1810,7 +1847,7 @@ class SyncHandler(object):
room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
- summary = {}
+ summary = {} # type: Optional[JsonDict]
# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
@@ -1834,7 +1871,7 @@ class SyncHandler(object):
)
if room_builder.rtype == "joined":
- unread_notifications = {}
+ unread_notifications = {} # type: Dict[str, str]
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
@@ -1856,23 +1893,25 @@ class SyncHandler(object):
if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
- logger.info(
+ logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
)
elif room_builder.rtype == "archived":
- room_sync = ArchivedSyncResult(
+ archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
- if room_sync or always_include:
- sync_result_builder.archived.append(room_sync)
+ if archived_room_sync or always_include:
+ sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
- async def get_rooms_for_user_at(self, user_id, stream_ordering):
+ async def get_rooms_for_user_at(
+ self, user_id: str, stream_ordering: int
+ ) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
The stream ordering *must* be recent, otherwise this may throw an
@@ -1880,12 +1919,11 @@ class SyncHandler(object):
current token, which should be perfectly fine).
Args:
- user_id (str)
- stream_ordering (int)
+ user_id
+ stream_ordering
ReturnValue:
- Deferred[frozenset[str]]: Set of room_ids the user is in at given
- stream_ordering.
+ Set of room_ids the user is in at given stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
@@ -1912,11 +1950,10 @@ class SyncHandler(object):
if user_id in users_in_room:
joined_room_ids.add(room_id)
- joined_room_ids = frozenset(joined_room_ids)
- return joined_room_ids
+ return frozenset(joined_room_ids)
-def _action_has_highlight(actions):
+def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
try:
if action.get("set_tweak", None) == "highlight":
@@ -1928,22 +1965,23 @@ def _action_has_highlight(actions):
def _calculate_state(
- timeline_contains, timeline_start, previous, current, lazy_load_members
-):
+ timeline_contains: StateMap[str],
+ timeline_start: StateMap[str],
+ previous: StateMap[str],
+ current: StateMap[str],
+ lazy_load_members: bool,
+) -> StateMap[str]:
"""Works out what state to include in a sync response.
Args:
- timeline_contains (dict): state in the timeline
- timeline_start (dict): state at the start of the timeline
- previous (dict): state at the end of the previous sync (or empty dict
+ timeline_contains: state in the timeline
+ timeline_start: state at the start of the timeline
+ previous: state at the end of the previous sync (or empty dict
if this is an initial sync)
- current (dict): state at the end of the timeline
- lazy_load_members (bool): whether to return members from timeline_start
+ current: state at the end of the timeline
+ lazy_load_members: whether to return members from timeline_start
or not. assumes that timeline_start has already been filtered to
include only the members the client needs to know about.
-
- Returns:
- dict
"""
event_id_to_key = {
e: key
@@ -1980,15 +2018,16 @@ def _calculate_state(
return {event_id_to_key[e]: e for e in state_ids}
-class SyncResultBuilder(object):
+@attr.s
+class SyncResultBuilder:
"""Used to help build up a new SyncResult for a user
Attributes:
- sync_config (SyncConfig)
- full_state (bool)
- since_token (StreamToken)
- now_token (StreamToken)
- joined_room_ids (list[str])
+ sync_config
+ full_state: The full_state flag as specified by user
+ since_token: The token supplied by user, or None.
+ now_token: The token to sync up to.
+ joined_room_ids: List of rooms the user is joined to
# The following mirror the fields in a sync response
presence (list)
@@ -1996,61 +2035,45 @@ class SyncResultBuilder(object):
joined (list[JoinedSyncResult])
invited (list[InvitedSyncResult])
archived (list[ArchivedSyncResult])
- device (list)
groups (GroupsSyncResult|None)
to_device (list)
"""
- def __init__(
- self, sync_config, full_state, since_token, now_token, joined_room_ids
- ):
- """
- Args:
- sync_config (SyncConfig)
- full_state (bool): The full_state flag as specified by user
- since_token (StreamToken): The token supplied by user, or None.
- now_token (StreamToken): The token to sync up to.
- joined_room_ids (list[str]): List of rooms the user is joined to
- """
- self.sync_config = sync_config
- self.full_state = full_state
- self.since_token = since_token
- self.now_token = now_token
- self.joined_room_ids = joined_room_ids
-
- self.presence = []
- self.account_data = []
- self.joined = []
- self.invited = []
- self.archived = []
- self.device = []
- self.groups = None
- self.to_device = []
+ sync_config = attr.ib(type=SyncConfig)
+ full_state = attr.ib(type=bool)
+ since_token = attr.ib(type=Optional[StreamToken])
+ now_token = attr.ib(type=StreamToken)
+ joined_room_ids = attr.ib(type=FrozenSet[str])
+
+ presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+ account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+ joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
+ invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
+ archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
+ groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
+ to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+@attr.s
class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
+
+ Attributes:
+ room_id
+ rtype: One of `"joined"` or `"archived"`
+ events: List of events to include in the room (more events may be added
+ when generating result).
+ newly_joined: If the user has newly joined the room
+ full_state: Whether the full state should be sent in result
+ since_token: Earliest point to return events from, or None
+ upto_token: Latest point to return events from.
"""
- def __init__(
- self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
- ):
- """
- Args:
- room_id(str)
- rtype(str): One of `"joined"` or `"archived"`
- events(list[FrozenEvent]): List of events to include in the room
- (more events may be added when generating result).
- newly_joined(bool): If the user has newly joined the room
- full_state(bool): Whether the full state should be sent in result
- since_token(StreamToken): Earliest point to return events from, or None
- upto_token(StreamToken): Latest point to return events from.
- """
- self.room_id = room_id
- self.rtype = rtype
- self.events = events
- self.newly_joined = newly_joined
- self.full_state = full_state
- self.since_token = since_token
- self.upto_token = upto_token
+ room_id = attr.ib(type=str)
+ rtype = attr.ib(type=str)
+ events = attr.ib(type=Optional[List[EventBase]])
+ newly_joined = attr.ib(type=bool)
+ full_state = attr.ib(type=bool)
+ since_token = attr.ib(type=Optional[StreamToken])
+ upto_token = attr.ib(type=StreamToken)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 624f05ab5b..81aa58dc8c 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -149,7 +149,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self.pos, room_max_stream_ordering
)
- logger.info("Handling %d state deltas", len(deltas))
+ logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = max_pos
@@ -195,7 +195,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id, self.server_name
)
if not is_in_room:
- logger.info("Server left room: %r", room_id)
+ logger.debug("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
|