summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py521
1 files changed, 383 insertions, 138 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1f8272784e..649ca1f08a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Optional, Tuple
 
 from six import iteritems, itervalues, string_types
 
@@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
+from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    Membership,
+    RelationTypes,
+    UserTypes,
+)
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -32,14 +40,16 @@ from synapse.api.errors import (
     NotFoundError,
     SynapseError,
 )
-from synapse.api.room_versions import RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.api.urls import ConsentURIBuilder
+from synapse.events import EventBase
 from synapse.events.validator import EventValidator
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, UserID, create_requester
+from synapse.types import Collection, RoomAlias, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.metrics import measure_func
@@ -59,7 +69,19 @@ class MessageHandler(object):
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self._event_serializer = hs.get_event_client_serializer()
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
+        # The scheduled call to self._expire_event. None if no call is currently
+        # scheduled.
+        self._scheduled_expiry = None  # type: Optional[IDelayedCall]
+
+        if not hs.config.worker_app:
+            run_as_background_process(
+                "_schedule_next_expiry", self._schedule_next_expiry
+            )
 
     @defer.inlineCallbacks
     def get_room_data(
@@ -74,15 +96,18 @@ class MessageHandler(object):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
-            room_id, user_id
+        (
+            membership,
+            membership_event_id,
+        ) = yield self.auth.check_user_in_room_or_world_readable(
+            room_id, user_id, allow_departed_users=True
         )
 
         if membership == Membership.JOIN:
             data = yield self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
-            room_state = yield self.store.get_state_for_events(
+            room_state = yield self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
@@ -135,12 +160,12 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.store, user_id, last_events
+                self.storage, user_id, last_events, filter_send_to_client=False
             )
 
             event = last_events[0]
             if visible_events:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
@@ -151,8 +176,11 @@ class MessageHandler(object):
                     % (user_id, room_id, at_token),
                 )
         else:
-            membership, membership_event_id = (
-                yield self.auth.check_in_room_or_world_readable(room_id, user_id)
+            (
+                membership,
+                membership_event_id,
+            ) = yield self.auth.check_user_in_room_or_world_readable(
+                room_id, user_id, allow_departed_users=True
             )
 
             if membership == Membership.JOIN:
@@ -161,7 +189,7 @@ class MessageHandler(object):
                 )
                 room_state = yield self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
@@ -192,8 +220,8 @@ class MessageHandler(object):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self.auth.check_in_room_or_world_readable(
-                room_id, user_id
+            membership, _ = yield self.auth.check_user_in_room_or_world_readable(
+                room_id, user_id, allow_departed_users=True
             )
             if membership != Membership.JOIN:
                 raise NotImplementedError(
@@ -221,24 +249,129 @@ class MessageHandler(object):
             for user_id, profile in iteritems(users_with_profile)
         }
 
+    def maybe_schedule_expiry(self, event):
+        """Schedule the expiry of an event if there's not already one scheduled,
+        or if the one running is for an event that will expire after the provided
+        timestamp.
+
+        This function needs to invalidate the event cache, which is only possible on
+        the master process, and therefore needs to be run on there.
+
+        Args:
+            event (EventBase): The event to schedule the expiry of.
+        """
+
+        expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+        if not isinstance(expiry_ts, int) or event.is_state():
+            return
+
+        # _schedule_expiry_for_event won't actually schedule anything if there's already
+        # a task scheduled for a timestamp that's sooner than the provided one.
+        self._schedule_expiry_for_event(event.event_id, expiry_ts)
+
+    @defer.inlineCallbacks
+    def _schedule_next_expiry(self):
+        """Retrieve the ID and the expiry timestamp of the next event to be expired,
+        and schedule an expiry task for it.
+
+        If there's no event left to expire, set _expiry_scheduled to None so that a
+        future call to save_expiry_ts can schedule a new expiry task.
+        """
+        # Try to get the expiry timestamp of the next event to expire.
+        res = yield self.store.get_next_event_to_expire()
+        if res:
+            event_id, expiry_ts = res
+            self._schedule_expiry_for_event(event_id, expiry_ts)
+
+    def _schedule_expiry_for_event(self, event_id, expiry_ts):
+        """Schedule an expiry task for the provided event if there's not already one
+        scheduled at a timestamp that's sooner than the provided one.
+
+        Args:
+            event_id (str): The ID of the event to expire.
+            expiry_ts (int): The timestamp at which to expire the event.
+        """
+        if self._scheduled_expiry:
+            # If the provided timestamp refers to a time before the scheduled time of the
+            # next expiry task, cancel that task and reschedule it for this timestamp.
+            next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
+            if expiry_ts < next_scheduled_expiry_ts:
+                self._scheduled_expiry.cancel()
+            else:
+                return
+
+        # Figure out how many seconds we need to wait before expiring the event.
+        now_ms = self.clock.time_msec()
+        delay = (expiry_ts - now_ms) / 1000
+
+        # callLater doesn't support negative delays, so trim the delay to 0 if we're
+        # in that case.
+        if delay < 0:
+            delay = 0
+
+        logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
+
+        self._scheduled_expiry = self.clock.call_later(
+            delay,
+            run_as_background_process,
+            "_expire_event",
+            self._expire_event,
+            event_id,
+        )
+
+    @defer.inlineCallbacks
+    def _expire_event(self, event_id):
+        """Retrieve and expire an event that needs to be expired from the database.
+
+        If the event doesn't exist in the database, log it and delete the expiry date
+        from the database (so that we don't try to expire it again).
+        """
+        assert self._ephemeral_events_enabled
+
+        self._scheduled_expiry = None
+
+        logger.info("Expiring event %s", event_id)
+
+        try:
+            # Expire the event if we know about it. This function also deletes the expiry
+            # date from the database in the same database transaction.
+            yield self.store.expire_event(event_id)
+        except Exception as e:
+            logger.error("Could not expire event %s: %r", event_id, e)
+
+        # Schedule the expiry of the next event to expire.
+        yield self._schedule_next_expiry()
+
+
+# The duration (in ms) after which rooms should be removed
+# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
+# to generate a dummy event for them once more)
+#
+_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
+
 
 class EventCreationHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
         self.profile_handler = hs.get_profile_handler()
         self.event_builder_factory = hs.get_event_builder_factory()
         self.server_name = hs.hostname
-        self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
+        self._is_event_writer = (
+            self.config.worker.writers.events == hs.get_instance_name()
+        )
 
-        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
+        self.room_invite_state_types = self.hs.config.room_invite_state_types
+
+        self.send_event = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -258,6 +391,13 @@ class EventCreationHandler(object):
             self.config.block_events_without_consent_error
         )
 
+        # Rooms which should be excluded from dummy insertion. (For instance,
+        # those without local users who can send events into the room).
+        #
+        # map from room id to time-of-last-attempt.
+        #
+        self._rooms_to_exclude_from_dummy_event_insertion = {}  # type: dict[str, int]
+
         # we need to construct a ConsentURIBuilder here, as it checks that the necessary
         # config options, but *only* if we have a configuration for which we are
         # going to need it.
@@ -276,6 +416,12 @@ class EventCreationHandler(object):
                 5 * 60 * 1000,
             )
 
+        self._message_handler = hs.get_message_handler()
+
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
+        self._dummy_events_threshold = hs.config.dummy_events_threshold
+
     @defer.inlineCallbacks
     def create_event(
         self,
@@ -283,7 +429,7 @@ class EventCreationHandler(object):
         event_dict,
         token_id=None,
         txn_id=None,
-        prev_events_and_hashes=None,
+        prev_event_ids: Optional[Collection[str]] = None,
         require_consent=True,
     ):
         """
@@ -300,10 +446,9 @@ class EventCreationHandler(object):
             token_id (str)
             txn_id (str)
 
-            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+            prev_event_ids:
                 the forward extremities to use as the prev_events for the
-                new event. For each event, a tuple of (event_id, hashes, depth)
-                where *hashes* is a map from algorithm to hash.
+                new event.
 
                 If None, they will be requested from the database.
 
@@ -321,7 +466,9 @@ class EventCreationHandler(object):
             room_version = event_dict["content"]["room_version"]
         else:
             try:
-                room_version = yield self.store.get_room_version(event_dict["room_id"])
+                room_version = yield self.store.get_room_version_id(
+                    event_dict["room_id"]
+                )
             except NotFoundError:
                 raise AuthError(403, "Unknown room")
 
@@ -340,9 +487,13 @@ class EventCreationHandler(object):
 
                 try:
                     if "displayname" not in content:
-                        content["displayname"] = yield profile.get_displayname(target)
+                        displayname = yield profile.get_displayname(target)
+                        if displayname is not None:
+                            content["displayname"] = displayname
                     if "avatar_url" not in content:
-                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                        avatar_url = yield profile.get_avatar_url(target)
+                        if avatar_url is not None:
+                            content["avatar_url"] = avatar_url
                 except Exception as e:
                     logger.info(
                         "Failed to get profile information for %r: %s", target, e
@@ -359,9 +510,7 @@ class EventCreationHandler(object):
             builder.internal_metadata.txn_id = txn_id
 
         event, context = yield self.create_new_client_event(
-            builder=builder,
-            requester=requester,
-            prev_events_and_hashes=prev_events_and_hashes,
+            builder=builder, requester=requester, prev_event_ids=prev_event_ids,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -376,7 +525,7 @@ class EventCreationHandler(object):
             # federation as well as those created locally. As of room v3, aliases events
             # can be created by users that are not in the room, therefore we have to
             # tolerate them in event_auth.check().
-            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            prev_state_ids = yield context.get_prev_state_ids()
             prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
                 yield self.store.get_event(prev_event_id, allow_none=True)
@@ -398,7 +547,7 @@ class EventCreationHandler(object):
                     403, "You must be in the room to create an alias for it"
                 )
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         return (event, context)
 
@@ -484,8 +633,9 @@ class EventCreationHandler(object):
         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
-    @defer.inlineCallbacks
-    def send_nonmember_event(self, requester, event, context, ratelimit=True):
+    async def send_nonmember_event(
+        self, requester, event, context, ratelimit=True
+    ) -> int:
         """
         Persists and notifies local clients and federation of an event.
 
@@ -494,6 +644,9 @@ class EventCreationHandler(object):
             context (Context) the context of the event.
             ratelimit (bool): Whether to rate limit this send.
             is_guest (bool): Whether the sender is a guest.
+
+        Return:
+            The stream_id of the persisted event.
         """
         if event.type == EventTypes.Member:
             raise SynapseError(
@@ -505,7 +658,7 @@ class EventCreationHandler(object):
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
         if event.is_state():
-            prev_state = yield self.deduplicate_state_event(event, context)
+            prev_state = await self.deduplicate_state_event(event, context)
             if prev_state is not None:
                 logger.info(
                     "Not bothering to persist state event %s duplicated by %s",
@@ -514,7 +667,7 @@ class EventCreationHandler(object):
                 )
                 return prev_state
 
-        yield self.handle_new_client_event(
+        return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
         )
 
@@ -526,7 +679,7 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = yield context.get_prev_state_ids()
         prev_event_id = prev_state_ids.get((event.type, event.state_key))
         if not prev_event_id:
             return
@@ -541,10 +694,9 @@ class EventCreationHandler(object):
                 return prev_event
         return
 
-    @defer.inlineCallbacks
-    def create_and_send_nonmember_event(
+    async def create_and_send_nonmember_event(
         self, requester, event_dict, ratelimit=True, txn_id=None
-    ):
+    ) -> Tuple[EventBase, int]:
         """
         Creates an event, then sends it.
 
@@ -556,8 +708,8 @@ class EventCreationHandler(object):
         # a situation where event persistence can't keep up, causing
         # extremities to pile up, which in turn leads to state resolution
         # taking longer.
-        with (yield self.limiter.queue(event_dict["room_id"])):
-            event, context = yield self.create_event(
+        with (await self.limiter.queue(event_dict["room_id"])):
+            event, context = await self.create_event(
                 requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
             )
 
@@ -567,15 +719,15 @@ class EventCreationHandler(object):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
-            yield self.send_nonmember_event(
+            stream_id = await self.send_nonmember_event(
                 requester, event, context, ratelimit=ratelimit
             )
-        return event
+        return event, stream_id
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
     def create_new_client_event(
-        self, builder, requester=None, prev_events_and_hashes=None
+        self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
     ):
         """Create a new event for a local client
 
@@ -584,10 +736,9 @@ class EventCreationHandler(object):
 
             requester (synapse.types.Requester|None):
 
-            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+            prev_event_ids:
                 the forward extremities to use as the prev_events for the
-                new event. For each event, a tuple of (event_id, hashes, depth)
-                where *hashes* is a map from algorithm to hash.
+                new event.
 
                 If None, they will be requested from the database.
 
@@ -595,27 +746,20 @@ class EventCreationHandler(object):
             Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
         """
 
-        if prev_events_and_hashes is not None:
-            assert len(prev_events_and_hashes) <= 10, (
+        if prev_event_ids is not None:
+            assert len(prev_event_ids) <= 10, (
                 "Attempting to create an event with %i prev_events"
-                % (len(prev_events_and_hashes),)
+                % (len(prev_event_ids),)
             )
         else:
-            prev_events_and_hashes = yield self.store.get_prev_events_for_room(
-                builder.room_id
-            )
-
-        prev_events = [
-            (event_id, prev_hashes)
-            for event_id, prev_hashes, _ in prev_events_and_hashes
-        ]
+            prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
 
-        event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
+        event = yield builder.build(prev_event_ids=prev_event_ids)
         context = yield self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         # If this event is an annotation then we check that that the sender
         # can't annotate the same way twice (e.g. stops users from liking an
@@ -636,10 +780,9 @@ class EventCreationHandler(object):
         return (event, context)
 
     @measure_func("handle_new_client_event")
-    @defer.inlineCallbacks
-    def handle_new_client_event(
+    async def handle_new_client_event(
         self, requester, event, context, ratelimit=True, extra_users=[]
-    ):
+    ) -> int:
         """Processes a new event. This includes checking auth, persisting it,
         notifying users, sending to remote servers, etc.
 
@@ -652,6 +795,9 @@ class EventCreationHandler(object):
             context (EventContext)
             ratelimit (bool)
             extra_users (list(UserID)): Any extra users to notify about event
+
+        Return:
+            The stream_id of the persisted event.
         """
 
         if event.is_state() and (event.type, event.state_key) == (
@@ -660,9 +806,9 @@ class EventCreationHandler(object):
         ):
             room_version = event.content.get("room_version", RoomVersions.V1.identifier)
         else:
-            room_version = yield self.store.get_room_version(event.room_id)
+            room_version = await self.store.get_room_version_id(event.room_id)
 
-        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+        event_allowed = await self.third_party_event_rules.check_event_allowed(
             event, context
         )
         if not event_allowed:
@@ -671,9 +817,9 @@ class EventCreationHandler(object):
             )
 
         try:
-            yield self.auth.check_from_context(room_version, event, context)
+            await self.auth.check_from_context(room_version, event, context)
         except AuthError as err:
-            logger.warn("Denying new event %r because %s", event, err)
+            logger.warning("Denying new event %r because %s", event, err)
             raise err
 
         # Ensure that we can round trip before trying to persist in db
@@ -684,15 +830,16 @@ class EventCreationHandler(object):
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
-        yield self.action_generator.handle_push_actions_for_event(event, context)
+        await self.action_generator.handle_push_actions_for_event(event, context)
 
         # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
         # hack around with a try/finally instead.
         success = False
         try:
             # If we're a worker we need to hit out to the master.
-            if self.config.worker_app:
-                yield self.send_event_to_master(
+            if not self._is_event_writer:
+                result = await self.send_event(
+                    instance_name=self.config.worker.writers.events,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -701,14 +848,17 @@ class EventCreationHandler(object):
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                stream_id = result["stream_id"]
+                event.internal_metadata.stream_ordering = stream_id
                 success = True
-                return
+                return stream_id
 
-            yield self.persist_and_notify_client_event(
+            stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
             success = True
+            return stream_id
         finally:
             if not success:
                 # Ensure that we actually remove the entries in the push actions
@@ -718,15 +868,46 @@ class EventCreationHandler(object):
                 )
 
     @defer.inlineCallbacks
-    def persist_and_notify_client_event(
-        self, requester, event, context, ratelimit=True, extra_users=[]
+    def _validate_canonical_alias(
+        self, directory_handler, room_alias_str, expected_room_id
     ):
+        """
+        Ensure that the given room alias points to the expected room ID.
+
+        Args:
+            directory_handler: The directory handler object.
+            room_alias_str: The room alias to check.
+            expected_room_id: The room ID that the alias should point to.
+        """
+        room_alias = RoomAlias.from_string(room_alias_str)
+        try:
+            mapping = yield directory_handler.get_association(room_alias)
+        except SynapseError as e:
+            # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
+            if e.errcode == Codes.NOT_FOUND:
+                raise SynapseError(
+                    400,
+                    "Room alias %s does not point to the room" % (room_alias_str,),
+                    Codes.BAD_ALIAS,
+                )
+            raise
+
+        if mapping["room_id"] != expected_room_id:
+            raise SynapseError(
+                400,
+                "Room alias %s does not point to the room" % (room_alias_str,),
+                Codes.BAD_ALIAS,
+            )
+
+    async def persist_and_notify_client_event(
+        self, requester, event, context, ratelimit=True, extra_users=[]
+    ) -> int:
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
 
-        This should only be run on master.
+        This should only be run on the instance in charge of persisting events.
         """
-        assert not self.config.worker_app
+        assert self._is_event_writer
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
@@ -735,9 +916,9 @@ class EventCreationHandler(object):
             # user is actually admin or not).
             is_admin_redaction = False
             if event.type == EventTypes.Redaction:
-                original_event = yield self.store.get_event(
+                original_event = await self.store.get_event(
                     event.redacts,
-                    check_redacted=False,
+                    redact_behaviour=EventRedactBehaviour.AS_IS,
                     get_prev_content=False,
                     allow_rejected=False,
                     allow_none=True,
@@ -747,24 +928,52 @@ class EventCreationHandler(object):
                     original_event and event.sender != original_event.sender
                 )
 
-            yield self.base_handler.ratelimit(
+            await self.base_handler.ratelimit(
                 requester, is_admin_redaction=is_admin_redaction
             )
 
-        yield self.base_handler.maybe_kick_guest_users(event, context)
+        await self.base_handler.maybe_kick_guest_users(event, context)
 
         if event.type == EventTypes.CanonicalAlias:
-            # Check the alias is acually valid (at this time at least)
+            # Validate a newly added alias or newly added alt_aliases.
+
+            original_alias = None
+            original_alt_aliases = set()
+
+            original_event_id = event.unsigned.get("replaces_state")
+            if original_event_id:
+                original_event = await self.store.get_event(original_event_id)
+
+                if original_event:
+                    original_alias = original_event.content.get("alias", None)
+                    original_alt_aliases = original_event.content.get("alt_aliases", [])
+
+            # Check the alias is currently valid (if it has changed).
             room_alias_str = event.content.get("alias", None)
-            if room_alias_str:
-                room_alias = RoomAlias.from_string(room_alias_str)
-                directory_handler = self.hs.get_handlers().directory_handler
-                mapping = yield directory_handler.get_association(room_alias)
-
-                if mapping["room_id"] != event.room_id:
-                    raise SynapseError(
-                        400,
-                        "Room alias %s does not point to the room" % (room_alias_str,),
+            directory_handler = self.hs.get_handlers().directory_handler
+            if room_alias_str and room_alias_str != original_alias:
+                await self._validate_canonical_alias(
+                    directory_handler, room_alias_str, event.room_id
+                )
+
+            # Check that alt_aliases is the proper form.
+            alt_aliases = event.content.get("alt_aliases", [])
+            if not isinstance(alt_aliases, (list, tuple)):
+                raise SynapseError(
+                    400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM
+                )
+
+            # If the old version of alt_aliases is of an unknown form,
+            # completely replace it.
+            if not isinstance(original_alt_aliases, (list, tuple)):
+                original_alt_aliases = []
+
+            # Check that each alias is currently valid.
+            new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
+            if new_alt_aliases:
+                for alias_str in new_alt_aliases:
+                    await self._validate_canonical_alias(
+                        directory_handler, alias_str, event.room_id
                     )
 
         federation_handler = self.hs.get_handlers().federation_handler
@@ -775,16 +984,16 @@ class EventCreationHandler(object):
                 def is_inviter_member_event(e):
                     return e.type == EventTypes.Member and e.sender == event.sender
 
-                current_state_ids = yield context.get_current_state_ids(self.store)
+                current_state_ids = await context.get_current_state_ids()
 
                 state_to_include_ids = [
                     e_id
                     for k, e_id in iteritems(current_state_ids)
-                    if k[0] in self.hs.config.room_invite_state_types
+                    if k[0] in self.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
 
-                state_to_include = yield self.store.get_events(state_to_include_ids)
+                state_to_include = await self.store.get_events(state_to_include_ids)
 
                 event.unsigned["invite_room_state"] = [
                     {
@@ -802,19 +1011,18 @@ class EventCreationHandler(object):
                     # way? If we have been invited by a remote server, we need
                     # to get them to sign the event.
 
-                    returned_invite = yield federation_handler.send_invite(
+                    returned_invite = await federation_handler.send_invite(
                         invitee.domain, event
                     )
-
                     event.unsigned.pop("room_state", None)
 
                     # TODO: Make sure the signatures actually are correct.
                     event.signatures.update(returned_invite.signatures)
 
         if event.type == EventTypes.Redaction:
-            original_event = yield self.store.get_event(
+            original_event = await self.store.get_event(
                 event.redacts,
-                check_redacted=False,
+                redact_behaviour=EventRedactBehaviour.AS_IS,
                 get_prev_content=False,
                 allow_rejected=False,
                 allow_none=True,
@@ -828,15 +1036,19 @@ class EventCreationHandler(object):
                 if original_event.room_id != event.room_id:
                     raise SynapseError(400, "Cannot redact event from a different room")
 
-            prev_state_ids = yield context.get_prev_state_ids(self.store)
-            auth_events_ids = yield self.auth.compute_auth_events(
+            prev_state_ids = await context.get_prev_state_ids()
+            auth_events_ids = await self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
             )
-            auth_events = yield self.store.get_events(auth_events_ids)
+            auth_events = await self.store.get_events(auth_events_ids)
             auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
-            room_version = yield self.store.get_room_version(event.room_id)
 
-            if event_auth.check_redaction(room_version, event, auth_events=auth_events):
+            room_version = await self.store.get_room_version_id(event.room_id)
+            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+            if event_auth.check_redaction(
+                room_version_obj, event, auth_events=auth_events
+            ):
                 # this user doesn't have 'redact' rights, so we need to do some more
                 # checks on the original event. Let's start by checking the original
                 # event exists.
@@ -850,15 +1062,19 @@ class EventCreationHandler(object):
                 event.internal_metadata.recheck_redaction = False
 
         if event.type == EventTypes.Create:
-            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            prev_state_ids = await context.get_prev_state_ids()
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+        event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
             event, context=context
         )
 
-        yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
+        if self._ephemeral_events_enabled:
+            # If there's an expiry timestamp on the event, schedule its expiry.
+            self._message_handler.maybe_schedule_expiry(event)
+
+        await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
 
         def _notify():
             try:
@@ -875,61 +1091,90 @@ class EventCreationHandler(object):
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-    @defer.inlineCallbacks
-    def _bump_active_time(self, user):
+        return event_stream_id
+
+    async def _bump_active_time(self, user):
         try:
             presence = self.hs.get_presence_handler()
-            yield presence.bump_presence_active_time(user)
+            await presence.bump_presence_active_time(user)
         except Exception:
             logger.exception("Error bumping presence active time")
 
-    @defer.inlineCallbacks
-    def _send_dummy_events_to_fill_extremities(self):
+    async def _send_dummy_events_to_fill_extremities(self):
         """Background task to send dummy events into rooms that have a large
         number of extremities
         """
-
-        room_ids = yield self.store.get_rooms_with_many_extremities(
-            min_count=10, limit=5
+        self._expire_rooms_to_exclude_from_dummy_event_insertion()
+        room_ids = await self.store.get_rooms_with_many_extremities(
+            min_count=self._dummy_events_threshold,
+            limit=5,
+            room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
         )
 
         for room_id in room_ids:
             # For each room we need to find a joined member we can use to send
             # the dummy event with.
 
-            prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
-
-            latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
+            latest_event_ids = await self.store.get_prev_events_for_room(room_id)
 
-            members = yield self.state.get_current_users_in_room(
+            members = await self.state.get_current_users_in_room(
                 room_id, latest_event_ids=latest_event_ids
             )
+            dummy_event_sent = False
+            for user_id in members:
+                if not self.hs.is_mine_id(user_id):
+                    continue
+                requester = create_requester(user_id)
+                try:
+                    event, context = await self.create_event(
+                        requester,
+                        {
+                            "type": "org.matrix.dummy_event",
+                            "content": {},
+                            "room_id": room_id,
+                            "sender": user_id,
+                        },
+                        prev_event_ids=latest_event_ids,
+                    )
+
+                    event.internal_metadata.proactively_send = False
 
-            user_id = None
-            for member in members:
-                if self.hs.is_mine_id(member):
-                    user_id = member
+                    await self.send_nonmember_event(
+                        requester, event, context, ratelimit=False
+                    )
+                    dummy_event_sent = True
                     break
+                except ConsentNotGivenError:
+                    logger.info(
+                        "Failed to send dummy event into room %s for user %s due to "
+                        "lack of consent. Will try another user" % (room_id, user_id)
+                    )
+                except AuthError:
+                    logger.info(
+                        "Failed to send dummy event into room %s for user %s due to "
+                        "lack of power. Will try another user" % (room_id, user_id)
+                    )
 
-            if not user_id:
-                # We don't have a joined user.
-                # TODO: We should do something here to stop the room from
-                # appearing next time.
-                continue
-
-            requester = create_requester(user_id)
-
-            event, context = yield self.create_event(
-                requester,
-                {
-                    "type": "org.matrix.dummy_event",
-                    "content": {},
-                    "room_id": room_id,
-                    "sender": user_id,
-                },
-                prev_events_and_hashes=prev_events_and_hashes,
+            if not dummy_event_sent:
+                # Did not find a valid user in the room, so remove from future attempts
+                # Exclusion is time limited, so the room will be rechecked in the future
+                # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+                logger.info(
+                    "Failed to send dummy event into room %s. Will exclude it from "
+                    "future attempts until cache expires" % (room_id,)
+                )
+                now = self.clock.time_msec()
+                self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+
+    def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
+        expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+        to_expire = set()
+        for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items():
+            if time < expire_before:
+                to_expire.add(room_id)
+        for room_id in to_expire:
+            logger.debug(
+                "Expiring room id %s from dummy event insertion exclusion cache",
+                room_id,
             )
-
-            event.internal_metadata.proactively_send = False
-
-            yield self.send_nonmember_event(requester, event, context, ratelimit=False)
+            del self._rooms_to_exclude_from_dummy_event_insertion[room_id]