diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 53b49bc15f..e51e1c32fe 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,15 +17,16 @@
import abc
import logging
+from typing import Dict, Iterable, List, Optional, Tuple, Union
from six.moves import http_client
-from twisted.internet import defer
-
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.types import Collection, RoomID, UserID
+from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
+from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -76,84 +77,84 @@ class RoomMemberHandler(object):
self.base_handler = BaseHandler(hs)
@abc.abstractmethod
- def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+ async def _remote_join(
+ self,
+ requester: Requester,
+ remote_room_hosts: List[str],
+ room_id: str,
+ user: UserID,
+ content: dict,
+ ) -> Optional[dict]:
"""Try and join a room that this server is not in
Args:
- requester (Requester)
- remote_room_hosts (list[str]): List of servers that can be used
- to join via.
- room_id (str): Room that we are trying to join
- user (UserID): User who is trying to join
- content (dict): A dict that should be used as the content of the
- join event.
-
- Returns:
- Deferred
+ requester
+ remote_room_hosts: List of servers that can be used to join via.
+ room_id: Room that we are trying to join
+ user: User who is trying to join
+ content: A dict that should be used as the content of the join event.
"""
raise NotImplementedError()
@abc.abstractmethod
- def _remote_reject_invite(
- self, requester, remote_room_hosts, room_id, target, content
- ):
+ async def _remote_reject_invite(
+ self,
+ requester: Requester,
+ remote_room_hosts: List[str],
+ room_id: str,
+ target: UserID,
+ content: dict,
+ ) -> dict:
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
Args:
- requester (Requester)
- remote_room_hosts (list[str]): List of servers to use to try and
- reject invite
- room_id (str)
- target (UserID): The user rejecting the invite
- content (dict): The content for the rejection event
+ requester
+ remote_room_hosts: List of servers to use to try and reject invite
+ room_id
+ target: The user rejecting the invite
+ content: The content for the rejection event
Returns:
- Deferred[dict]: A dictionary to be returned to the client, may
+ A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
"""
raise NotImplementedError()
@abc.abstractmethod
- def _user_joined_room(self, target, room_id):
+ async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
room.
Args:
- target (UserID)
- room_id (str)
-
- Returns:
- Deferred|None
+ target
+ room_id
"""
raise NotImplementedError()
@abc.abstractmethod
- def _user_left_room(self, target, room_id):
+ async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
room.
Args:
- target (UserID)
- room_id (str)
-
- Returns:
- Deferred|None
+ target
+ room_id
"""
raise NotImplementedError()
async def _local_membership_update(
self,
- requester,
- target,
- room_id,
- membership,
+ requester: Requester,
+ target: UserID,
+ room_id: str,
+ membership: str,
prev_event_ids: Collection[str],
- txn_id=None,
- ratelimit=True,
- content=None,
- require_consent=True,
- ):
+ txn_id: Optional[str] = None,
+ ratelimit: bool = True,
+ content: Optional[dict] = None,
+ require_consent: bool = True,
+ ) -> EventBase:
user_id = target.to_string()
if content is None:
@@ -214,20 +215,18 @@ class RoomMemberHandler(object):
return event
- @defer.inlineCallbacks
- def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id):
+ async def copy_room_tags_and_direct_to_room(
+ self, old_room_id, new_room_id, user_id
+ ) -> None:
"""Copies the tags and direct room state from one room to another.
Args:
- old_room_id (str)
- new_room_id (str)
- user_id (str)
-
- Returns:
- Deferred[None]
+ old_room_id: The room ID of the old room.
+ new_room_id: The room ID of the new room.
+ user_id: The user's ID.
"""
# Retrieve user account data for predecessor room
- user_account_data, _ = yield self.store.get_account_data_for_user(user_id)
+ user_account_data, _ = await self.store.get_account_data_for_user(user_id)
# Copy direct message state if applicable
direct_rooms = user_account_data.get("m.direct", {})
@@ -240,31 +239,31 @@ class RoomMemberHandler(object):
direct_rooms[key].append(new_room_id)
# Save back to user's m.direct account data
- yield self.store.add_account_data_for_user(
+ await self.store.add_account_data_for_user(
user_id, "m.direct", direct_rooms
)
break
# Copy room tags if applicable
- room_tags = yield self.store.get_tags_for_room(user_id, old_room_id)
+ room_tags = await self.store.get_tags_for_room(user_id, old_room_id)
# Copy each room tag to the new room
for tag, tag_content in room_tags.items():
- yield self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
+ await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
async def update_membership(
self,
- requester,
- target,
- room_id,
- action,
- txn_id=None,
- remote_room_hosts=None,
- third_party_signed=None,
- ratelimit=True,
- content=None,
- require_consent=True,
- ):
+ requester: Requester,
+ target: UserID,
+ room_id: str,
+ action: str,
+ txn_id: Optional[str] = None,
+ remote_room_hosts: Optional[List[str]] = None,
+ third_party_signed: Optional[dict] = None,
+ ratelimit: bool = True,
+ content: Optional[dict] = None,
+ require_consent: bool = True,
+ ) -> Union[EventBase, Optional[dict]]:
key = (room_id,)
with (await self.member_linearizer.queue(key)):
@@ -285,17 +284,17 @@ class RoomMemberHandler(object):
async def _update_membership(
self,
- requester,
- target,
- room_id,
- action,
- txn_id=None,
- remote_room_hosts=None,
- third_party_signed=None,
- ratelimit=True,
- content=None,
- require_consent=True,
- ):
+ requester: Requester,
+ target: UserID,
+ room_id: str,
+ action: str,
+ txn_id: Optional[str] = None,
+ remote_room_hosts: Optional[List[str]] = None,
+ third_party_signed: Optional[dict] = None,
+ ratelimit: bool = True,
+ content: Optional[dict] = None,
+ require_consent: bool = True,
+ ) -> Union[EventBase, Optional[dict]]:
content_specified = bool(content)
if content is None:
content = {}
@@ -469,12 +468,11 @@ class RoomMemberHandler(object):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
- res = await self._remote_reject_invite(
+ return await self._remote_reject_invite(
requester, remote_room_hosts, room_id, target, content,
)
- return res
- res = await self._local_membership_update(
+ return await self._local_membership_update(
requester=requester,
target=target,
room_id=room_id,
@@ -485,10 +483,10 @@ class RoomMemberHandler(object):
content=content,
require_consent=require_consent,
)
- return res
- @defer.inlineCallbacks
- def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
+ async def transfer_room_state_on_room_upgrade(
+ self, old_room_id: str, room_id: str
+ ) -> None:
"""Upon our server becoming aware of an upgraded room, either by upgrading a room
ourselves or joining one, we can transfer over information from the previous room.
@@ -496,50 +494,44 @@ class RoomMemberHandler(object):
well as migrating the room directory state.
Args:
- old_room_id (str): The ID of the old room
-
- room_id (str): The ID of the new room
-
- Returns:
- Deferred
+ old_room_id: The ID of the old room
+ room_id: The ID of the new room
"""
logger.info("Transferring room state from %s to %s", old_room_id, room_id)
# Find all local users that were in the old room and copy over each user's state
- users = yield self.store.get_users_in_room(old_room_id)
- yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
+ users = await self.store.get_users_in_room(old_room_id)
+ await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
# Add new room to the room directory if the old room was there
# Remove old room from the room directory
- old_room = yield self.store.get_room(old_room_id)
+ old_room = await self.store.get_room(old_room_id)
if old_room and old_room["is_public"]:
- yield self.store.set_room_is_public(old_room_id, False)
- yield self.store.set_room_is_public(room_id, True)
+ await self.store.set_room_is_public(old_room_id, False)
+ await self.store.set_room_is_public(room_id, True)
# Transfer alias mappings in the room directory
- yield self.store.update_aliases_for_room(old_room_id, room_id)
+ await self.store.update_aliases_for_room(old_room_id, room_id)
# Check if any groups we own contain the predecessor room
- local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
+ local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids:
# Add new the new room to those groups
- yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+ await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
# Remove the old room from those groups
- yield self.store.remove_room_from_group(group_id, old_room_id)
+ await self.store.remove_room_from_group(group_id, old_room_id)
- @defer.inlineCallbacks
- def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
+ async def copy_user_state_on_room_upgrade(
+ self, old_room_id: str, new_room_id: str, user_ids: Iterable[str]
+ ) -> None:
"""Copy user-specific information when they join a new room when that new room is the
result of a room upgrade
Args:
- old_room_id (str): The ID of upgraded room
- new_room_id (str): The ID of the new room
- user_ids (Iterable[str]): User IDs to copy state for
-
- Returns:
- Deferred
+ old_room_id: The ID of upgraded room
+ new_room_id: The ID of the new room
+ user_ids: User IDs to copy state for
"""
logger.debug(
@@ -552,11 +544,11 @@ class RoomMemberHandler(object):
for user_id in user_ids:
try:
# It is an upgraded room. Copy over old tags
- yield self.copy_room_tags_and_direct_to_room(
+ await self.copy_room_tags_and_direct_to_room(
old_room_id, new_room_id, user_id
)
# Copy over push rules
- yield self.store.copy_push_rules_from_room_to_room_for_user(
+ await self.store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
except Exception:
@@ -569,17 +561,23 @@ class RoomMemberHandler(object):
)
continue
- async def send_membership_event(self, requester, event, context, ratelimit=True):
+ async def send_membership_event(
+ self,
+ requester: Requester,
+ event: EventBase,
+ context: EventContext,
+ ratelimit: bool = True,
+ ):
"""
Change the membership status of a user in a room.
Args:
- requester (Requester): The local user who requested the membership
+ requester: The local user who requested the membership
event. If None, certain checks, like whether this homeserver can
act as the sender, will be skipped.
- event (SynapseEvent): The membership event.
+ event: The membership event.
context: The context of the event.
- ratelimit (bool): Whether to rate limit this request.
+ ratelimit: Whether to rate limit this request.
Raises:
SynapseError if there was a problem changing the membership.
"""
@@ -639,8 +637,9 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id)
- @defer.inlineCallbacks
- def _can_guest_join(self, current_state_ids):
+ async def _can_guest_join(
+ self, current_state_ids: Dict[Tuple[str, str], str]
+ ) -> bool:
"""
Returns whether a guest can join a room based on its current state.
"""
@@ -648,7 +647,7 @@ class RoomMemberHandler(object):
if not guest_access_id:
return False
- guest_access = yield self.store.get_event(guest_access_id)
+ guest_access = await self.store.get_event(guest_access_id)
return (
guest_access
@@ -657,13 +656,14 @@ class RoomMemberHandler(object):
and guest_access.content["guest_access"] == "can_join"
)
- @defer.inlineCallbacks
- def lookup_room_alias(self, room_alias):
+ async def lookup_room_alias(
+ self, room_alias: RoomAlias
+ ) -> Tuple[RoomID, List[str]]:
"""
Get the room ID associated with a room alias.
Args:
- room_alias (RoomAlias): The alias to look up.
+ room_alias: The alias to look up.
Returns:
A tuple of:
The room ID as a RoomID object.
@@ -672,7 +672,7 @@ class RoomMemberHandler(object):
SynapseError if room alias could not be found.
"""
directory_handler = self.directory_handler
- mapping = yield directory_handler.get_association(room_alias)
+ mapping = await directory_handler.get_association(room_alias)
if not mapping:
raise SynapseError(404, "No such room alias")
@@ -687,25 +687,25 @@ class RoomMemberHandler(object):
return RoomID.from_string(room_id), servers
- @defer.inlineCallbacks
- def _get_inviter(self, user_id, room_id):
- invite = yield self.store.get_invite_for_local_user_in_room(
+ async def _get_inviter(self, user_id: str, room_id: str) -> Optional[UserID]:
+ invite = await self.store.get_invite_for_local_user_in_room(
user_id=user_id, room_id=room_id
)
if invite:
return UserID.from_string(invite.sender)
+ return None
async def do_3pid_invite(
self,
- room_id,
- inviter,
- medium,
- address,
- id_server,
- requester,
- txn_id,
- id_access_token=None,
- ):
+ room_id: str,
+ inviter: UserID,
+ medium: str,
+ address: str,
+ id_server: str,
+ requester: Requester,
+ txn_id: Optional[str],
+ id_access_token: Optional[str] = None,
+ ) -> None:
if self.config.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
if not is_requester_admin:
@@ -754,15 +754,15 @@ class RoomMemberHandler(object):
async def _make_and_store_3pid_invite(
self,
- requester,
- id_server,
- medium,
- address,
- room_id,
- user,
- txn_id,
- id_access_token=None,
- ):
+ requester: Requester,
+ id_server: str,
+ medium: str,
+ address: str,
+ room_id: str,
+ user: UserID,
+ txn_id: Optional[str],
+ id_access_token: Optional[str] = None,
+ ) -> None:
room_state = await self.state_handler.get_current_state(room_id)
inviter_display_name = ""
@@ -836,8 +836,9 @@ class RoomMemberHandler(object):
txn_id=txn_id,
)
- @defer.inlineCallbacks
- def _is_host_in_room(self, current_state_ids):
+ async def _is_host_in_room(
+ self, current_state_ids: Dict[Tuple[str, str], str]
+ ) -> bool:
# Have we just created the room, and is this about to be the very
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
@@ -850,7 +851,7 @@ class RoomMemberHandler(object):
continue
event_id = current_state_ids[(etype, state_key)]
- event = yield self.store.get_event(event_id, allow_none=True)
+ event = await self.store.get_event(event_id, allow_none=True)
if not event:
continue
@@ -859,11 +860,10 @@ class RoomMemberHandler(object):
return False
- @defer.inlineCallbacks
- def _is_server_notice_room(self, room_id):
+ async def _is_server_notice_room(self, room_id: str) -> bool:
if self._server_notices_mxid is None:
return False
- user_ids = yield self.store.get_users_in_room(room_id)
+ user_ids = await self.store.get_users_in_room(room_id)
return self._server_notices_mxid in user_ids
@@ -875,20 +875,21 @@ class RoomMemberMasterHandler(RoomMemberHandler):
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
- @defer.inlineCallbacks
- def _is_remote_room_too_complex(self, room_id, remote_room_hosts):
+ async def _is_remote_room_too_complex(
+ self, room_id: str, remote_room_hosts: List[str]
+ ) -> Optional[bool]:
"""
Check if complexity of a remote room is too great.
Args:
- room_id (str)
- remote_room_hosts (list[str])
+ room_id
+ remote_room_hosts
Returns: bool of whether the complexity is too great, or None
if unable to be fetched
"""
max_complexity = self.hs.config.limit_remote_rooms.complexity
- complexity = yield self.federation_handler.get_room_complexity(
+ complexity = await self.federation_handler.get_room_complexity(
remote_room_hosts, room_id
)
@@ -896,22 +897,26 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return complexity["v1"] > max_complexity
return None
- @defer.inlineCallbacks
- def _is_local_room_too_complex(self, room_id):
+ async def _is_local_room_too_complex(self, room_id: str) -> bool:
"""
Check if the complexity of a local room is too great.
Args:
- room_id (str)
-
- Returns: bool
+ room_id: The room ID to check for complexity.
"""
max_complexity = self.hs.config.limit_remote_rooms.complexity
- complexity = yield self.store.get_room_complexity(room_id)
+ complexity = await self.store.get_room_complexity(room_id)
return complexity["v1"] > max_complexity
- async def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+ async def _remote_join(
+ self,
+ requester: Requester,
+ remote_room_hosts: List[str],
+ room_id: str,
+ user: UserID,
+ content: dict,
+ ) -> None:
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
@@ -970,18 +975,20 @@ class RoomMemberMasterHandler(RoomMemberHandler):
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
)
- @defer.inlineCallbacks
- def _remote_reject_invite(
- self, requester, remote_room_hosts, room_id, target, content
- ):
+ async def _remote_reject_invite(
+ self,
+ requester: Requester,
+ remote_room_hosts: List[str],
+ room_id: str,
+ target: UserID,
+ content: dict,
+ ) -> dict:
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
- ret = yield defer.ensureDeferred(
- fed_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, target.to_string(), content=content,
- )
+ ret = await fed_handler.do_remotely_reject_invite(
+ remote_room_hosts, room_id, target.to_string(), content=content,
)
return ret
except Exception as e:
@@ -993,24 +1000,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
logger.warning("Failed to reject invite: %s", e)
- yield self.store.locally_reject_invite(target.to_string(), room_id)
+ await self.store.locally_reject_invite(target.to_string(), room_id)
return {}
- def _user_joined_room(self, target, room_id):
+ async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
- return defer.succeed(user_joined_room(self.distributor, target, room_id))
+ user_joined_room(self.distributor, target, room_id)
- def _user_left_room(self, target, room_id):
+ async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
- return defer.succeed(user_left_room(self.distributor, target, room_id))
+ user_left_room(self.distributor, target, room_id)
- @defer.inlineCallbacks
- def forget(self, user, room_id):
+ async def forget(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
- member = yield self.state_handler.get_current_state(
+ member = await self.state_handler.get_current_state(
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
)
membership = member.membership if member else None
@@ -1022,4 +1028,4 @@ class RoomMemberMasterHandler(RoomMemberHandler):
raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
if membership:
- yield self.store.forget(user_id, room_id)
+ await self.store.forget(user_id, room_id)
|