summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py433
1 files changed, 255 insertions, 178 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 649ca1f08a..8a7b4916cd 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,14 +15,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional, Tuple
+import random
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
-from six import iteritems, itervalues, string_types
+from canonicaljson import encode_canonical_json
 
-from canonicaljson import encode_canonical_json, json
-
-from twisted.internet import defer
-from twisted.internet.defer import succeed
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
@@ -38,18 +35,22 @@ from synapse.api.errors import (
     Codes,
     ConsentNotGivenError,
     NotFoundError,
+    ShadowBanError,
     SynapseError,
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events import EventBase
+from synapse.events.builder import EventBuilder
+from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
-from synapse.types import Collection, RoomAlias, UserID, create_requester
+from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
+from synapse.util import json_decoder
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.metrics import measure_func
@@ -57,10 +58,13 @@ from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
-class MessageHandler(object):
+class MessageHandler:
     """Contains some read only APIs to get state about a room
     """
 
@@ -83,46 +87,57 @@ class MessageHandler(object):
                 "_schedule_next_expiry", self._schedule_next_expiry
             )
 
-    @defer.inlineCallbacks
-    def get_room_data(
-        self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False
-    ):
+    async def get_room_data(
+        self, user_id: str, room_id: str, event_type: str, state_key: str,
+    ) -> dict:
         """ Get data from a room.
 
         Args:
-            event : The room path event
+            user_id
+            room_id
+            event_type
+            state_key
         Returns:
             The path data content.
         Raises:
-            SynapseError if something went wrong.
+            SynapseError or AuthError if the user is not in the room
         """
         (
             membership,
             membership_event_id,
-        ) = yield self.auth.check_user_in_room_or_world_readable(
+        ) = await self.auth.check_user_in_room_or_world_readable(
             room_id, user_id, allow_departed_users=True
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state.get_current_state(room_id, event_type, state_key)
+            data = await self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
-            room_state = yield self.state_store.get_state_for_events(
+            room_state = await self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
+        else:
+            # check_user_in_room_or_world_readable, if it doesn't raise an AuthError, should
+            # only ever return a Membership.JOIN/LEAVE object
+            #
+            # Safeguard in case it returned something else
+            logger.error(
+                "Attempted to retrieve data from a room for a user that has never been in it. "
+                "This should not have happened."
+            )
+            raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
 
         return data
 
-    @defer.inlineCallbacks
-    def get_state_events(
+    async def get_state_events(
         self,
-        user_id,
-        room_id,
-        state_filter=StateFilter.all(),
-        at_token=None,
-        is_guest=False,
-    ):
+        user_id: str,
+        room_id: str,
+        state_filter: StateFilter = StateFilter.all(),
+        at_token: Optional[StreamToken] = None,
+        is_guest: bool = False,
+    ) -> List[dict]:
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
         left the room return the state events from when they left. If an explicit
@@ -130,15 +145,14 @@ class MessageHandler(object):
         visible.
 
         Args:
-            user_id(str): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-            at_token(StreamToken|None): the stream token of the at which we are requesting
+            user_id: The user requesting state events.
+            room_id: The room ID to get all state events from.
+            state_filter: The state filter used to fetch state from the database.
+            at_token: the stream token of the at which we are requesting
                 the stats. If the user is not allowed to view the state as of that
                 stream token, we raise a 403 SynapseError. If None, returns the current
                 state based on the current_state_events table.
-            is_guest(bool): whether this user is a guest
+            is_guest: whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         Raises:
@@ -152,20 +166,20 @@ class MessageHandler(object):
             # get_recent_events_for_room operates by topo ordering. This therefore
             # does not reliably give you the state at the given stream position.
             # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = yield self.store.get_recent_events_for_room(
+            last_events, _ = await self.store.get_recent_events_for_room(
                 room_id, end_token=at_token.room_key, limit=1
             )
 
             if not last_events:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
-            visible_events = yield filter_events_for_client(
+            visible_events = await filter_events_for_client(
                 self.storage, user_id, last_events, filter_send_to_client=False
             )
 
             event = last_events[0]
             if visible_events:
-                room_state = yield self.state_store.get_state_for_events(
+                room_state = await self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
@@ -179,23 +193,23 @@ class MessageHandler(object):
             (
                 membership,
                 membership_event_id,
-            ) = yield self.auth.check_user_in_room_or_world_readable(
+            ) = await self.auth.check_user_in_room_or_world_readable(
                 room_id, user_id, allow_departed_users=True
             )
 
             if membership == Membership.JOIN:
-                state_ids = yield self.store.get_filtered_current_state_ids(
+                state_ids = await self.store.get_filtered_current_state_ids(
                     room_id, state_filter=state_filter
                 )
-                room_state = yield self.store.get_events(state_ids.values())
+                room_state = await self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = yield self.state_store.get_state_for_events(
+                room_state = await self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
-        events = yield self._event_serializer.serialize_events(
+        events = await self._event_serializer.serialize_events(
             room_state.values(),
             now,
             # We don't bother bundling aggregations in when asked for state
@@ -204,15 +218,14 @@ class MessageHandler(object):
         )
         return events
 
-    @defer.inlineCallbacks
-    def get_joined_members(self, requester, room_id):
+    async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
         """Get all the joined members in the room and their profile information.
 
         If the user has left the room return the state events from when they left.
 
         Args:
-            requester(Requester): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
+            requester: The user requesting state events.
+            room_id: The room ID to get all state events from.
         Returns:
             A dict of user_id to profile info
         """
@@ -220,7 +233,7 @@ class MessageHandler(object):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self.auth.check_user_in_room_or_world_readable(
+            membership, _ = await self.auth.check_user_in_room_or_world_readable(
                 room_id, user_id, allow_departed_users=True
             )
             if membership != Membership.JOIN:
@@ -228,7 +241,7 @@ class MessageHandler(object):
                     "Getting joined members after leaving is not implemented"
                 )
 
-        users_with_profile = yield self.state.get_current_users_in_room(room_id)
+        users_with_profile = await self.state.get_current_users_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
@@ -246,10 +259,10 @@ class MessageHandler(object):
                 "avatar_url": profile.avatar_url,
                 "display_name": profile.display_name,
             }
-            for user_id, profile in iteritems(users_with_profile)
+            for user_id, profile in users_with_profile.items()
         }
 
-    def maybe_schedule_expiry(self, event):
+    def maybe_schedule_expiry(self, event: EventBase):
         """Schedule the expiry of an event if there's not already one scheduled,
         or if the one running is for an event that will expire after the provided
         timestamp.
@@ -258,7 +271,7 @@ class MessageHandler(object):
         the master process, and therefore needs to be run on there.
 
         Args:
-            event (EventBase): The event to schedule the expiry of.
+            event: The event to schedule the expiry of.
         """
 
         expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
@@ -269,8 +282,7 @@ class MessageHandler(object):
         # a task scheduled for a timestamp that's sooner than the provided one.
         self._schedule_expiry_for_event(event.event_id, expiry_ts)
 
-    @defer.inlineCallbacks
-    def _schedule_next_expiry(self):
+    async def _schedule_next_expiry(self):
         """Retrieve the ID and the expiry timestamp of the next event to be expired,
         and schedule an expiry task for it.
 
@@ -278,18 +290,18 @@ class MessageHandler(object):
         future call to save_expiry_ts can schedule a new expiry task.
         """
         # Try to get the expiry timestamp of the next event to expire.
-        res = yield self.store.get_next_event_to_expire()
+        res = await self.store.get_next_event_to_expire()
         if res:
             event_id, expiry_ts = res
             self._schedule_expiry_for_event(event_id, expiry_ts)
 
-    def _schedule_expiry_for_event(self, event_id, expiry_ts):
+    def _schedule_expiry_for_event(self, event_id: str, expiry_ts: int):
         """Schedule an expiry task for the provided event if there's not already one
         scheduled at a timestamp that's sooner than the provided one.
 
         Args:
-            event_id (str): The ID of the event to expire.
-            expiry_ts (int): The timestamp at which to expire the event.
+            event_id: The ID of the event to expire.
+            expiry_ts: The timestamp at which to expire the event.
         """
         if self._scheduled_expiry:
             # If the provided timestamp refers to a time before the scheduled time of the
@@ -319,8 +331,7 @@ class MessageHandler(object):
             event_id,
         )
 
-    @defer.inlineCallbacks
-    def _expire_event(self, event_id):
+    async def _expire_event(self, event_id: str):
         """Retrieve and expire an event that needs to be expired from the database.
 
         If the event doesn't exist in the database, log it and delete the expiry date
@@ -335,12 +346,12 @@ class MessageHandler(object):
         try:
             # Expire the event if we know about it. This function also deletes the expiry
             # date from the database in the same database transaction.
-            yield self.store.expire_event(event_id)
+            await self.store.expire_event(event_id)
         except Exception as e:
             logger.error("Could not expire event %s: %r", event_id, e)
 
         # Schedule the expiry of the next event to expire.
-        yield self._schedule_next_expiry()
+        await self._schedule_next_expiry()
 
 
 # The duration (in ms) after which rooms should be removed
@@ -350,8 +361,8 @@ class MessageHandler(object):
 _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
 
 
-class EventCreationHandler(object):
-    def __init__(self, hs):
+class EventCreationHandler:
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -396,7 +407,7 @@ class EventCreationHandler(object):
         #
         # map from room id to time-of-last-attempt.
         #
-        self._rooms_to_exclude_from_dummy_event_insertion = {}  # type: dict[str, int]
+        self._rooms_to_exclude_from_dummy_event_insertion = {}  # type: Dict[str, int]
 
         # we need to construct a ConsentURIBuilder here, as it checks that the necessary
         # config options, but *only* if we have a configuration for which we are
@@ -422,16 +433,15 @@ class EventCreationHandler(object):
 
         self._dummy_events_threshold = hs.config.dummy_events_threshold
 
-    @defer.inlineCallbacks
-    def create_event(
+    async def create_event(
         self,
-        requester,
-        event_dict,
-        token_id=None,
-        txn_id=None,
-        prev_event_ids: Optional[Collection[str]] = None,
-        require_consent=True,
-    ):
+        requester: Requester,
+        event_dict: dict,
+        token_id: Optional[str] = None,
+        txn_id: Optional[str] = None,
+        prev_event_ids: Optional[List[str]] = None,
+        require_consent: bool = True,
+    ) -> Tuple[EventBase, EventContext]:
         """
         Given a dict from a client, create a new event.
 
@@ -442,31 +452,29 @@ class EventCreationHandler(object):
 
         Args:
             requester
-            event_dict (dict): An entire event
-            token_id (str)
-            txn_id (str)
-
+            event_dict: An entire event
+            token_id
+            txn_id
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
 
                 If None, they will be requested from the database.
-
-            require_consent (bool): Whether to check if the requester has
-                consented to privacy policy.
+            require_consent: Whether to check if the requester has
+                consented to the privacy policy.
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
         Returns:
-            Tuple of created event (FrozenEvent), Context
+            Tuple of created event, Context
         """
-        yield self.auth.check_auth_blocking(requester.user.to_string())
+        await self.auth.check_auth_blocking(requester.user.to_string())
 
         if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
             room_version = event_dict["content"]["room_version"]
         else:
             try:
-                room_version = yield self.store.get_room_version_id(
+                room_version = await self.store.get_room_version_id(
                     event_dict["room_id"]
                 )
             except NotFoundError:
@@ -487,11 +495,11 @@ class EventCreationHandler(object):
 
                 try:
                     if "displayname" not in content:
-                        displayname = yield profile.get_displayname(target)
+                        displayname = await profile.get_displayname(target)
                         if displayname is not None:
                             content["displayname"] = displayname
                     if "avatar_url" not in content:
-                        avatar_url = yield profile.get_avatar_url(target)
+                        avatar_url = await profile.get_avatar_url(target)
                         if avatar_url is not None:
                             content["avatar_url"] = avatar_url
                 except Exception as e:
@@ -499,9 +507,9 @@ class EventCreationHandler(object):
                         "Failed to get profile information for %r: %s", target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
+        is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
         if require_consent and not is_exempt:
-            yield self.assert_accepted_privacy_policy(requester)
+            await self.assert_accepted_privacy_policy(requester)
 
         if token_id is not None:
             builder.internal_metadata.token_id = token_id
@@ -509,7 +517,7 @@ class EventCreationHandler(object):
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
 
-        event, context = yield self.create_new_client_event(
+        event, context = await self.create_new_client_event(
             builder=builder, requester=requester, prev_event_ids=prev_event_ids,
         )
 
@@ -525,10 +533,10 @@ class EventCreationHandler(object):
             # federation as well as those created locally. As of room v3, aliases events
             # can be created by users that are not in the room, therefore we have to
             # tolerate them in event_auth.check().
-            prev_state_ids = yield context.get_prev_state_ids()
+            prev_state_ids = await context.get_prev_state_ids()
             prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
-                yield self.store.get_event(prev_event_id, allow_none=True)
+                await self.store.get_event(prev_event_id, allow_none=True)
                 if prev_event_id
                 else None
             )
@@ -551,37 +559,36 @@ class EventCreationHandler(object):
 
         return (event, context)
 
-    def _is_exempt_from_privacy_policy(self, builder, requester):
+    async def _is_exempt_from_privacy_policy(
+        self, builder: EventBuilder, requester: Requester
+    ) -> bool:
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
-            builder (synapse.events.builder.EventBuilder): event being created
-            requester (Requster): user requesting this event
+            builder: event being created
+            requester: user requesting this event
 
         Returns:
-            Deferred[bool]: true if the event can be sent without the user
-                consenting
+            true if the event can be sent without the user consenting
         """
         # the only thing the user can do is join the server notices room.
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                return self._is_server_notices_room(builder.room_id)
+                return await self._is_server_notices_room(builder.room_id)
             elif membership == Membership.LEAVE:
                 # the user is always allowed to leave (but not kick people)
                 return builder.state_key == requester.user.to_string()
-        return succeed(False)
+        return False
 
-    @defer.inlineCallbacks
-    def _is_server_notices_room(self, room_id):
+    async def _is_server_notices_room(self, room_id: str) -> bool:
         if self.config.server_notices_mxid is None:
             return False
-        user_ids = yield self.store.get_users_in_room(room_id)
+        user_ids = await self.store.get_users_in_room(room_id)
         return self.config.server_notices_mxid in user_ids
 
-    @defer.inlineCallbacks
-    def assert_accepted_privacy_policy(self, requester):
+    async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
         """Check if a user has accepted the privacy policy
 
         Called when the given user is about to do something that requires
@@ -590,12 +597,10 @@ class EventCreationHandler(object):
         raised.
 
         Args:
-            requester (synapse.types.Requester):
-                The user making the request
+            requester: The user making the request
 
         Returns:
-            Deferred[None]: returns normally if the user has consented or is
-                exempt
+            Returns normally if the user has consented or is exempt
 
         Raises:
             ConsentNotGivenError: if the user has not given consent yet
@@ -616,7 +621,7 @@ class EventCreationHandler(object):
         ):
             return
 
-        u = yield self.store.get_user_by_id(user_id)
+        u = await self.store.get_user_by_id(user_id)
         assert u is not None
         if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
             # support and bot users are not required to consent
@@ -634,74 +639,115 @@ class EventCreationHandler(object):
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
     async def send_nonmember_event(
-        self, requester, event, context, ratelimit=True
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        ignore_shadow_ban: bool = False,
     ) -> int:
         """
         Persists and notifies local clients and federation of an event.
 
         Args:
-            event (FrozenEvent) the event to send.
-            context (Context) the context of the event.
-            ratelimit (bool): Whether to rate limit this send.
-            is_guest (bool): Whether the sender is a guest.
+            requester: The requester sending the event.
+            event: The event to send.
+            context: The context of the event.
+            ratelimit: Whether to rate limit this send.
+            ignore_shadow_ban: True if shadow-banned users should be allowed to
+                send this event.
 
         Return:
             The stream_id of the persisted event.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
         """
         if event.type == EventTypes.Member:
             raise SynapseError(
                 500, "Tried to send member event through non-member codepath"
             )
 
+        if not ignore_shadow_ban and requester.shadow_banned:
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
         user = UserID.from_string(event.sender)
 
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
         if event.is_state():
-            prev_state = await self.deduplicate_state_event(event, context)
-            if prev_state is not None:
+            prev_event = await self.deduplicate_state_event(event, context)
+            if prev_event is not None:
                 logger.info(
                     "Not bothering to persist state event %s duplicated by %s",
                     event.event_id,
-                    prev_state.event_id,
+                    prev_event.event_id,
                 )
-                return prev_state
+                return await self.store.get_stream_id_for_event(prev_event.event_id)
 
         return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
         )
 
-    @defer.inlineCallbacks
-    def deduplicate_state_event(self, event, context):
+    async def deduplicate_state_event(
+        self, event: EventBase, context: EventContext
+    ) -> Optional[EventBase]:
         """
         Checks whether event is in the latest resolved state in context.
 
-        If so, returns the version of the event in context.
-        Otherwise, returns None.
+        Args:
+            event: The event to check for duplication.
+            context: The event context.
+
+        Returns:
+            The previous verion of the event is returned, if it is found in the
+            event context. Otherwise, None is returned.
         """
-        prev_state_ids = yield context.get_prev_state_ids()
+        prev_state_ids = await context.get_prev_state_ids()
         prev_event_id = prev_state_ids.get((event.type, event.state_key))
         if not prev_event_id:
-            return
-        prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+            return None
+        prev_event = await self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
-            return
+            return None
 
         if prev_event and event.user_id == prev_event.user_id:
             prev_content = encode_canonical_json(prev_event.content)
             next_content = encode_canonical_json(event.content)
             if prev_content == next_content:
                 return prev_event
-        return
+        return None
 
     async def create_and_send_nonmember_event(
-        self, requester, event_dict, ratelimit=True, txn_id=None
+        self,
+        requester: Requester,
+        event_dict: dict,
+        ratelimit: bool = True,
+        txn_id: Optional[str] = None,
+        ignore_shadow_ban: bool = False,
     ) -> Tuple[EventBase, int]:
         """
         Creates an event, then sends it.
 
         See self.create_event and self.send_nonmember_event.
+
+        Args:
+            requester: The requester sending the event.
+            event_dict: An entire event.
+            ratelimit: Whether to rate limit this send.
+            txn_id: The transaction ID.
+            ignore_shadow_ban: True if shadow-banned users should be allowed to
+                send this event.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
         """
+        if not ignore_shadow_ban and requester.shadow_banned:
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
 
         # We limit the number of concurrent event sends in a room so that we
         # don't fork the DAG too much. If we don't limit then we can end up in
@@ -715,27 +761,31 @@ class EventCreationHandler(object):
 
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
-                if not isinstance(spam_error, string_types):
+                if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
             stream_id = await self.send_nonmember_event(
-                requester, event, context, ratelimit=ratelimit
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+                ignore_shadow_ban=ignore_shadow_ban,
             )
         return event, stream_id
 
     @measure_func("create_new_client_event")
-    @defer.inlineCallbacks
-    def create_new_client_event(
-        self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
-    ):
+    async def create_new_client_event(
+        self,
+        builder: EventBuilder,
+        requester: Optional[Requester] = None,
+        prev_event_ids: Optional[List[str]] = None,
+    ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
         Args:
-            builder (EventBuilder):
-
-            requester (synapse.types.Requester|None):
-
+            builder:
+            requester:
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -743,7 +793,7 @@ class EventCreationHandler(object):
                 If None, they will be requested from the database.
 
         Returns:
-            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+            Tuple of created event, context
         """
 
         if prev_event_ids is not None:
@@ -752,10 +802,19 @@ class EventCreationHandler(object):
                 % (len(prev_event_ids),)
             )
         else:
-            prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
+            prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
 
-        event = yield builder.build(prev_event_ids=prev_event_ids)
-        context = yield self.state.compute_event_context(event)
+        # we now ought to have some prev_events (unless it's a create event).
+        #
+        # do a quick sanity check here, rather than waiting until we've created the
+        # event and then try to auth it (which fails with a somewhat confusing "No
+        # create event in auth events")
+        assert (
+            builder.type == EventTypes.Create or len(prev_event_ids) > 0
+        ), "Attempting to create an event with no prev_events"
+
+        event = await builder.build(prev_event_ids=prev_event_ids)
+        context = await self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
 
@@ -769,7 +828,7 @@ class EventCreationHandler(object):
             relates_to = relation["event_id"]
             aggregation_key = relation["key"]
 
-            already_exists = yield self.store.has_user_annotated_event(
+            already_exists = await self.store.has_user_annotated_event(
                 relates_to, event.type, aggregation_key, event.sender
             )
             if already_exists:
@@ -781,7 +840,12 @@ class EventCreationHandler(object):
 
     @measure_func("handle_new_client_event")
     async def handle_new_client_event(
-        self, requester, event, context, ratelimit=True, extra_users=[]
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        extra_users: List[UserID] = [],
     ) -> int:
         """Processes a new event. This includes checking auth, persisting it,
         notifying users, sending to remote servers, etc.
@@ -790,11 +854,11 @@ class EventCreationHandler(object):
         processing.
 
         Args:
-            requester (Requester)
-            event (FrozenEvent)
-            context (EventContext)
-            ratelimit (bool)
-            extra_users (list(UserID)): Any extra users to notify about event
+            requester
+            event
+            context
+            ratelimit
+            extra_users: Any extra users to notify about event
 
         Return:
             The stream_id of the persisted event.
@@ -816,25 +880,28 @@ class EventCreationHandler(object):
                 403, "This event is not allowed in this context", Codes.FORBIDDEN
             )
 
-        try:
-            await self.auth.check_from_context(room_version, event, context)
-        except AuthError as err:
-            logger.warning("Denying new event %r because %s", event, err)
-            raise err
+        if event.internal_metadata.is_out_of_band_membership():
+            # the only sort of out-of-band-membership events we expect to see here
+            # are invite rejections we have generated ourselves.
+            assert event.type == EventTypes.Member
+            assert event.content["membership"] == Membership.LEAVE
+        else:
+            try:
+                await self.auth.check_from_context(room_version, event, context)
+            except AuthError as err:
+                logger.warning("Denying new event %r because %s", event, err)
+                raise err
 
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            json.loads(dump)
+            json_decoder.decode(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
         await self.action_generator.handle_push_actions_for_event(event, context)
 
-        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
-        # hack around with a try/finally instead.
-        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if not self._is_event_writer:
@@ -850,27 +917,22 @@ class EventCreationHandler(object):
                 )
                 stream_id = result["stream_id"]
                 event.internal_metadata.stream_ordering = stream_id
-                success = True
                 return stream_id
 
             stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
-            success = True
             return stream_id
-        finally:
-            if not success:
-                # Ensure that we actually remove the entries in the push actions
-                # staging area, if we calculated them.
-                run_in_background(
-                    self.store.remove_push_actions_from_staging, event.event_id
-                )
+        except Exception:
+            # Ensure that we actually remove the entries in the push actions
+            # staging area, if we calculated them.
+            await self.store.remove_push_actions_from_staging(event.event_id)
+            raise
 
-    @defer.inlineCallbacks
-    def _validate_canonical_alias(
-        self, directory_handler, room_alias_str, expected_room_id
-    ):
+    async def _validate_canonical_alias(
+        self, directory_handler, room_alias_str: str, expected_room_id: str
+    ) -> None:
         """
         Ensure that the given room alias points to the expected room ID.
 
@@ -881,7 +943,7 @@ class EventCreationHandler(object):
         """
         room_alias = RoomAlias.from_string(room_alias_str)
         try:
-            mapping = yield directory_handler.get_association(room_alias)
+            mapping = await directory_handler.get_association(room_alias)
         except SynapseError as e:
             # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
             if e.errcode == Codes.NOT_FOUND:
@@ -900,7 +962,12 @@ class EventCreationHandler(object):
             )
 
     async def persist_and_notify_client_event(
-        self, requester, event, context, ratelimit=True, extra_users=[]
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        extra_users: List[UserID] = [],
     ) -> int:
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
@@ -924,7 +991,7 @@ class EventCreationHandler(object):
                     allow_none=True,
                 )
 
-                is_admin_redaction = (
+                is_admin_redaction = bool(
                     original_event and event.sender != original_event.sender
                 )
 
@@ -938,7 +1005,7 @@ class EventCreationHandler(object):
             # Validate a newly added alias or newly added alt_aliases.
 
             original_alias = None
-            original_alt_aliases = set()
+            original_alt_aliases = []  # type: List[str]
 
             original_event_id = event.unsigned.get("replaces_state")
             if original_event_id:
@@ -986,9 +1053,13 @@ class EventCreationHandler(object):
 
                 current_state_ids = await context.get_current_state_ids()
 
+                # We know this event is not an outlier, so this must be
+                # non-None.
+                assert current_state_ids is not None
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(current_state_ids)
+                    for k, e_id in current_state_ids.items()
                     if k[0] in self.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -1002,7 +1073,7 @@ class EventCreationHandler(object):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in itervalues(state_to_include)
+                    for e in state_to_include.values()
                 ]
 
                 invitee = UserID.from_string(event.state_key)
@@ -1037,11 +1108,11 @@ class EventCreationHandler(object):
                     raise SynapseError(400, "Cannot redact event from a different room")
 
             prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = await self.auth.compute_auth_events(
+            auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
             )
-            auth_events = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+            auth_events_map = await self.store.get_events(auth_events_ids)
+            auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()}
 
             room_version = await self.store.get_room_version_id(event.room_id)
             room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -1093,7 +1164,7 @@ class EventCreationHandler(object):
 
         return event_stream_id
 
-    async def _bump_active_time(self, user):
+    async def _bump_active_time(self, user: UserID) -> None:
         try:
             presence = self.hs.get_presence_handler()
             await presence.bump_presence_active_time(user)
@@ -1139,8 +1210,14 @@ class EventCreationHandler(object):
 
                     event.internal_metadata.proactively_send = False
 
+                    # Since this is a dummy-event it is OK if it is sent by a
+                    # shadow-banned user.
                     await self.send_nonmember_event(
-                        requester, event, context, ratelimit=False
+                        requester,
+                        event,
+                        context,
+                        ratelimit=False,
+                        ignore_shadow_ban=True,
                     )
                     dummy_event_sent = True
                     break