summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py21
-rw-r--r--synapse/handlers/auth.py88
-rw-r--r--synapse/handlers/deactivate_account.py3
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/handlers/e2e_room_keys.py130
-rw-r--r--synapse/handlers/federation.py242
-rw-r--r--synapse/handlers/message.py129
-rw-r--r--synapse/handlers/pagination.py106
-rw-r--r--synapse/handlers/profile.py6
-rw-r--r--synapse/handlers/register.py4
-rw-r--r--synapse/handlers/room.py50
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/room_member_worker.py5
-rw-r--r--synapse/handlers/typing.py4
15 files changed, 586 insertions, 238 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 6407d56f8e..14449b9a1e 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -56,7 +56,7 @@ class AdminHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_users(self):
-        """Function to reterive a list of users in users table.
+        """Function to retrieve a list of users in users table.
 
         Args:
         Returns:
@@ -67,19 +67,22 @@ class AdminHandler(BaseHandler):
         return ret
 
     @defer.inlineCallbacks
-    def get_users_paginate(self, order, start, limit):
-        """Function to reterive a paginated list of users from
-        users list. This will return a json object, which contains
-        list of users and the total number of users in users table.
+    def get_users_paginate(self, start, limit, name, guests, deactivated):
+        """Function to retrieve a paginated list of users from
+        users list. This will return a json list of users.
 
         Args:
-            order (str): column name to order the select by this column
             start (int): start number to begin the query from
-            limit (int): number of rows to reterive
+            limit (int): number of rows to retrieve
+            name (string): filter for user names
+            guests (bool): whether to in include guest users
+            deactivated (bool): whether to include deactivated users
         Returns:
-            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+            defer.Deferred: resolves to json list[dict[str, Any]]
         """
-        ret = yield self.store.get_users_paginate(order, start, limit)
+        ret = yield self.store.get_users_paginate(
+            start, limit, name, guests, deactivated
+        )
 
         return ret
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7a0f54ca24..54a71c49d2 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -102,8 +102,9 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
-        self._account_ratelimiter = Ratelimiter()
-        self._failed_attempts_ratelimiter = Ratelimiter()
+        # Ratelimiter for failed auth during UIA. Uses same ratelimit config
+        # as per `rc_login.failed_attempts`.
+        self._failed_uia_attempts_ratelimiter = Ratelimiter()
 
         self._clock = self.hs.get_clock()
 
@@ -133,12 +134,38 @@ class AuthHandler(BaseHandler):
 
             AuthError if the client has completed a login flow, and it gives
                 a different user to `requester`
+
+            LimitExceededError if the ratelimiter's failed request count for this
+                user is too high to proceed
+
         """
 
+        user_id = requester.user.to_string()
+
+        # Check if we should be ratelimited due to too many previous failed attempts
+        self._failed_uia_attempts_ratelimiter.ratelimit(
+            user_id,
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
         # build a list of supported flows
         flows = [[login_type] for login_type in self._supported_login_types]
 
-        result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        try:
+            result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        except LoginError:
+            # Update the ratelimite to say we failed (`can_do_action` doesn't raise).
+            self._failed_uia_attempts_ratelimiter.can_do_action(
+                user_id,
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=True,
+            )
+            raise
 
         # find the completed login type
         for login_type in self._supported_login_types:
@@ -223,7 +250,7 @@ class AuthHandler(BaseHandler):
             # could continue registration from your phone having clicked the
             # email auth link on there). It's probably too open to abuse
             # because it lets unauthenticated clients store arbitrary objects
-            # on a home server.
+            # on a homeserver.
             # Revisit: Assumimg the REST APIs do sensible validation, the data
             # isn't arbintrary.
             session["clientdict"] = clientdict
@@ -501,11 +528,8 @@ class AuthHandler(BaseHandler):
             multiple matches
 
         Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
             UserDeactivatedError if a user is found but is deactivated.
         """
-        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             return res[0]
@@ -572,8 +596,6 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
 
         if username.startswith("@"):
@@ -581,8 +603,6 @@ class AuthHandler(BaseHandler):
         else:
             qualified_user_id = UserID(username, self.hs.hostname).to_string()
 
-        self.ratelimit_login_per_account(qualified_user_id)
-
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -650,15 +670,6 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password.
-        self._failed_attempts_ratelimiter.ratelimit(
-            qualified_user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=True,
-        )
-
         # We raise a 403 here, but note that if we're doing user-interactive
         # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@@ -710,10 +721,6 @@ class AuthHandler(BaseHandler):
         Returns:
             Deferred[unicode] the canonical_user_id, or Deferred[None] if
                 unknown user/bad password
-
-        Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -742,7 +749,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
-        self.ratelimit_login_per_account(user_id)
+
         yield self.auth.check_auth_blocking(user_id)
         return user_id
 
@@ -810,7 +817,7 @@ class AuthHandler(BaseHandler):
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
         # 'Canonicalise' email addresses down to lower case.
-        # We've now moving towards the Home Server being the entity that
+        # We've now moving towards the homeserver being the entity that
         # is responsible for validating threepids used for resetting passwords
         # on accounts, so in future Synapse will gain knowledge of specific
         # types (mediums) of threepid. For now, we still use the existing
@@ -912,35 +919,6 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
-    def ratelimit_login_per_account(self, user_id):
-        """Checks whether the process must be stopped because of ratelimiting.
-
-        Checks against two ratelimiters: the generic one for login attempts per
-        account and the one specific to failed attempts.
-
-        Args:
-            user_id (unicode): complete @user:id
-
-        Raises:
-            LimitExceededError if one of the ratelimiters' login requests count
-                for this user is too high too proceed.
-        """
-        self._failed_attempts_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=False,
-        )
-
-        self._account_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_account.per_second,
-            burst_count=self.hs.config.rc_login_account.burst_count,
-            update=True,
-        )
-
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 63267a0a4c..6dedaaff8d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -95,6 +95,9 @@ class DeactivateAccountHandler(BaseHandler):
                 user_id, threepid["medium"], threepid["address"]
             )
 
+        # Remove all 3PIDs this user has bound to the homeserver
+        yield self.store.user_delete_threepids(user_id)
+
         # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c4632f8984..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
             if not service.is_interested_in_alias(room_alias.to_string()):
                 raise SynapseError(
                     400,
-                    "This application service has not reserved" " this kind of alias.",
+                    "This application service has not reserved this kind of alias.",
                     errcode=Codes.EXCLUSIVE,
                 )
         else:
@@ -283,7 +283,7 @@ class DirectoryHandler(BaseHandler):
     def on_directory_query(self, args):
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
-            raise SynapseError(400, "Room Alias is not hosted on this Home Server")
+            raise SynapseError(400, "Room Alias is not hosted on this homeserver")
 
         result = yield self.get_association_from_room_alias(room_alias)
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import (
     UserID,
     get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
 
         self._edu_updater = SigningKeyEduUpdater(hs, self)
 
+        self._is_master = hs.config.worker_app is None
+        if not self._is_master:
+            self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+
         federation_registry = hs.get_federation_registry()
 
         # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
                 # probably be tracking their device lists. However, we haven't
                 # done an initial sync on the device list so we do it now.
                 try:
-                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+                    if self._is_master:
+                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                            user_id
+                        )
+                    else:
+                        user_devices = yield self._user_device_resync_client(
+                            user_id=user_id
+                        )
+
                     user_devices = user_devices["devices"]
                     for device in user_devices:
                         results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
                 rooms
             session_id(string): session ID to delete keys for, for None to delete keys
                 for all sessions
+        Raises:
+            NotFoundError: if the backup version does not exist
         Returns:
-            A deferred of the deletion transaction
+            A dict containing the count and etag for the backup version
         """
 
         # lock for consistency with uploading
         with (yield self._upload_linearizer.queue(user_id)):
+            # make sure the backup version exists
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(
+                    user_id, version
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+            version_etag = version_info["etag"] + 1
+            yield self.store.update_e2e_room_keys_version(
+                user_id, version, None, version_etag
+            )
+
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
+
     @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
             }
         }
 
+        Returns:
+            A dict containing the count and etag for the backup version
+
         Raises:
             NotFoundError: if there are no versions defined
             RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
                     else:
                         raise
 
-            # go through the room_keys.
-            # XXX: this should/could be done concurrently, given we're in a lock.
+            # Fetch any existing room keys for the sessions that have been
+            # submitted.  Then compare them with the submitted keys.  If the
+            # key is new, insert it; if the key should be updated, then update
+            # it; otherwise, drop it.
+            existing_keys = yield self.store.get_e2e_room_keys_multi(
+                user_id, version, room_keys["rooms"]
+            )
+            to_insert = []  # batch the inserts together
+            changed = False  # if anything has changed, we need to update the etag
             for room_id, room in iteritems(room_keys["rooms"]):
-                for session_id, session in iteritems(room["sessions"]):
-                    yield self._upload_room_key(
-                        user_id, version, room_id, session_id, session
+                for session_id, room_key in iteritems(room["sessions"]):
+                    log_kv(
+                        {
+                            "message": "Trying to upload room key",
+                            "room_id": room_id,
+                            "session_id": session_id,
+                            "user_id": user_id,
+                        }
                     )
-
-    @defer.inlineCallbacks
-    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Upload a given room_key for a given room and session into a given
-        version of the backup.  Merges the key with any which might already exist.
-
-        Args:
-            user_id(str): the user whose backup we're setting
-            version(str): the version ID of the backup we're updating
-            room_id(str): the ID of the room whose keys we're setting
-            session_id(str): the session whose room_key we're setting
-            room_key(dict): the room_key being set
-        """
-        log_kv(
-            {
-                "message": "Trying to upload room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "user_id": user_id,
-            }
-        )
-        # get the room_key for this particular row
-        current_room_key = None
-        try:
-            current_room_key = yield self.store.get_e2e_room_key(
-                user_id, version, room_id, session_id
-            )
-        except StoreError as e:
-            if e.code == 404:
-                log_kv(
-                    {
-                        "message": "Room key not found.",
-                        "room_id": room_id,
-                        "user_id": user_id,
-                    }
+                    current_room_key = existing_keys.get(room_id, {}).get(session_id)
+                    if current_room_key:
+                        if self._should_replace_room_key(current_room_key, room_key):
+                            log_kv({"message": "Replacing room key."})
+                            # updates are done one at a time in the DB, so send
+                            # updates right away rather than batching them up,
+                            # like we do with the inserts
+                            yield self.store.update_e2e_room_key(
+                                user_id, version, room_id, session_id, room_key
+                            )
+                            changed = True
+                        else:
+                            log_kv({"message": "Not replacing room_key."})
+                    else:
+                        log_kv(
+                            {
+                                "message": "Room key not found.",
+                                "room_id": room_id,
+                                "user_id": user_id,
+                            }
+                        )
+                        log_kv({"message": "Replacing room key."})
+                        to_insert.append((room_id, session_id, room_key))
+                        changed = True
+
+            if len(to_insert):
+                yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+            version_etag = version_info["etag"]
+            if changed:
+                version_etag = version_etag + 1
+                yield self.store.update_e2e_room_keys_version(
+                    user_id, version, None, version_etag
                 )
-            else:
-                raise
 
-        if self._should_replace_room_key(current_room_key, room_key):
-            log_kv({"message": "Replacing room key."})
-            yield self.store.set_e2e_room_key(
-                user_id, version, room_id, session_id, room_key
-            )
-        else:
-            log_kv({"message": "Not replacing room_key."})
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
                     raise NotFoundError("Unknown backup version")
                 else:
                     raise
+
+            res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 05dd8d2671..bc26921768 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,11 +19,13 @@
 
 import itertools
 import logging
+from typing import Dict, Iterable, Optional, Sequence, Tuple
 
 import six
 from six import iteritems, itervalues
 from six.moves import http_client, zip
 
+import attr
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -45,6 +47,7 @@ from synapse.api.errors import (
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
+from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
 from synapse.logging.context import (
@@ -72,6 +75,23 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+@attr.s
+class _NewEventInfo:
+    """Holds information about a received event, ready for passing to _handle_new_events
+
+    Attributes:
+        event: the received event
+
+        state: the state at that event
+
+        auth_events: the auth_event map for that event
+    """
+
+    event = attr.ib(type=EventBase)
+    state = attr.ib(type=Optional[Sequence[EventBase]], default=None)
+    auth_events = attr.ib(type=Optional[Dict[Tuple[str, str], EventBase]], default=None)
+
+
 def shortstr(iterable, maxitems=5):
     """If iterable has maxitems or fewer, return the stringification of a list
     containing those items.
@@ -97,9 +117,9 @@ class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
         a) handling received Pdus before handing them on as Events to the rest
-        of the home server (including auth and state conflict resoultion)
+        of the homeserver (including auth and state conflict resoultion)
         b) converting events that were produced by local clients that may need
-        to be sent to remote home servers.
+        to be sent to remote homeservers.
         c) doing the necessary dances to invite remote users and join remote
         rooms.
     """
@@ -121,6 +141,7 @@ class FederationHandler(BaseHandler):
         self.pusher_pool = hs.get_pusherpool()
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self._message_handler = hs.get_message_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
         self.http_client = hs.get_simple_http_client()
@@ -141,6 +162,8 @@ class FederationHandler(BaseHandler):
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
+        self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+
     @defer.inlineCallbacks
     def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
         """ Process a PDU received via a federation /send/ transaction, or
@@ -594,14 +617,14 @@ class FederationHandler(BaseHandler):
                     for e in auth_chain
                     if e.event_id in auth_ids or e.type == EventTypes.Create
                 }
-                event_infos.append({"event": e, "auth_events": auth})
+                event_infos.append(_NewEventInfo(event=e, auth_events=auth))
                 seen_ids.add(e.event_id)
 
             logger.info(
                 "[%s %s] persisting newly-received auth/state events %s",
                 room_id,
                 event_id,
-                [e["event"].event_id for e in event_infos],
+                [e.event.event_id for e in event_infos],
             )
             yield self._handle_new_events(origin, event_infos)
 
@@ -792,9 +815,9 @@ class FederationHandler(BaseHandler):
 
             a.internal_metadata.outlier = True
             ev_infos.append(
-                {
-                    "event": a,
-                    "auth_events": {
+                _NewEventInfo(
+                    event=a,
+                    auth_events={
                         (
                             auth_events[a_id].type,
                             auth_events[a_id].state_key,
@@ -802,7 +825,7 @@ class FederationHandler(BaseHandler):
                         for a_id in a.auth_event_ids()
                         if a_id in auth_events
                     },
-                }
+                )
             )
 
         # Step 1b: persist the events in the chunk we fetched state for (i.e.
@@ -814,10 +837,10 @@ class FederationHandler(BaseHandler):
             assert not ev.internal_metadata.is_outlier()
 
             ev_infos.append(
-                {
-                    "event": ev,
-                    "state": events_to_state[e_id],
-                    "auth_events": {
+                _NewEventInfo(
+                    event=ev,
+                    state=events_to_state[e_id],
+                    auth_events={
                         (
                             auth_events[a_id].type,
                             auth_events[a_id].state_key,
@@ -825,7 +848,7 @@ class FederationHandler(BaseHandler):
                         for a_id in ev.auth_event_ids()
                         if a_id in auth_events
                     },
-                }
+                )
             )
 
         yield self._handle_new_events(dest, ev_infos, backfilled=True)
@@ -1428,9 +1451,9 @@ class FederationHandler(BaseHandler):
         return event
 
     @defer.inlineCallbacks
-    def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+    def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
         origin, event, event_format_version = yield self._make_and_verify_event(
-            target_hosts, room_id, user_id, "leave"
+            target_hosts, room_id, user_id, "leave", content=content,
         )
         # Mark as outlier as we don't have any state for this event; we're not
         # even in the room.
@@ -1710,7 +1733,12 @@ class FederationHandler(BaseHandler):
         return context
 
     @defer.inlineCallbacks
-    def _handle_new_events(self, origin, event_infos, backfilled=False):
+    def _handle_new_events(
+        self,
+        origin: str,
+        event_infos: Iterable[_NewEventInfo],
+        backfilled: bool = False,
+    ):
         """Creates the appropriate contexts and persists events. The events
         should not depend on one another, e.g. this should be used to persist
         a bunch of outliers, but not a chunk of individual events that depend
@@ -1720,14 +1748,14 @@ class FederationHandler(BaseHandler):
         """
 
         @defer.inlineCallbacks
-        def prep(ev_info):
-            event = ev_info["event"]
+        def prep(ev_info: _NewEventInfo):
+            event = ev_info.event
             with nested_logging_context(suffix=event.event_id):
                 res = yield self._prep_event(
                     origin,
                     event,
-                    state=ev_info.get("state"),
-                    auth_events=ev_info.get("auth_events"),
+                    state=ev_info.state,
+                    auth_events=ev_info.auth_events,
                     backfilled=backfilled,
                 )
             return res
@@ -1741,7 +1769,7 @@ class FederationHandler(BaseHandler):
 
         yield self.persist_events_and_notify(
             [
-                (ev_info["event"], context)
+                (ev_info.event, context)
                 for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
@@ -1843,7 +1871,14 @@ class FederationHandler(BaseHandler):
         yield self.persist_events_and_notify([(event, new_event_context)])
 
     @defer.inlineCallbacks
-    def _prep_event(self, origin, event, state, auth_events, backfilled):
+    def _prep_event(
+        self,
+        origin: str,
+        event: EventBase,
+        state: Optional[Iterable[EventBase]],
+        auth_events: Optional[Dict[Tuple[str, str], EventBase]],
+        backfilled: bool,
+    ):
         """
 
         Args:
@@ -1851,7 +1886,7 @@ class FederationHandler(BaseHandler):
             event:
             state:
             auth_events:
-            backfilled (bool)
+            backfilled:
 
         Returns:
             Deferred, which resolves to synapse.events.snapshot.EventContext
@@ -1887,15 +1922,16 @@ class FederationHandler(BaseHandler):
         return context
 
     @defer.inlineCallbacks
-    def _check_for_soft_fail(self, event, state, backfilled):
+    def _check_for_soft_fail(
+        self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool
+    ):
         """Checks if we should soft fail the event, if so marks the event as
         such.
 
         Args:
-            event (FrozenEvent)
-            state (dict|None): The state at the event if we don't have all the
-                event's prev events
-            backfilled (bool): Whether the event is from backfill
+            event
+            state: The state at the event if we don't have all the event's prev events
+            backfilled: Whether the event is from backfill
 
         Returns:
             Deferred
@@ -2040,8 +2076,10 @@ class FederationHandler(BaseHandler):
             auth_events (dict[(str, str)->synapse.events.EventBase]):
                 Map from (event_type, state_key) to event
 
-                What we expect the event's auth_events to be, based on the event's
-                position in the dag. I think? maybe??
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
 
                 Also NB that this function adds entries to it.
         Returns:
@@ -2091,35 +2129,35 @@ class FederationHandler(BaseHandler):
             origin (str):
             event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
+
             auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
+
+                Also NB that this function adds entries to it.
 
         Returns:
             defer.Deferred[EventContext]: updated context
         """
         event_auth_events = set(event.auth_event_ids())
 
-        if event.is_state():
-            event_key = (event.type, event.state_key)
-        else:
-            event_key = None
-
-        # if the event's auth_events refers to events which are not in our
-        # calculated auth_events, we need to fetch those events from somewhere.
-        #
-        # we start by fetching them from the store, and then try calling /event_auth/.
+        # missing_auth is the set of the event's auth_events which we don't yet have
+        # in auth_events.
         missing_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
 
+        # if we have missing events, we need to fetch those events from somewhere.
+        #
+        # we start by checking if they are in the store, and then try calling /event_auth/.
         if missing_auth:
-            # TODO: can we use store.have_seen_events here instead?
-            have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
-            logger.debug("Got events %s from store", have_events)
-            missing_auth.difference_update(have_events.keys())
-        else:
-            have_events = {}
-
-        have_events.update({e.event_id: "" for e in auth_events.values()})
+            have_events = yield self.store.have_seen_events(missing_auth)
+            logger.debug("Events %s are in the store", have_events)
+            missing_auth.difference_update(have_events)
 
         if missing_auth:
             # If we don't have all the auth events, we need to get them.
@@ -2165,19 +2203,18 @@ class FederationHandler(BaseHandler):
                     except AuthError:
                         pass
 
-                have_events = yield self.store.get_seen_events_with_rejections(
-                    event.auth_event_ids()
-                )
             except Exception:
-                # FIXME:
                 logger.exception("Failed to get auth chain")
 
         if event.internal_metadata.is_outlier():
+            # XXX: given that, for an outlier, we'll be working with the
+            # event's *claimed* auth events rather than those we calculated:
+            # (a) is there any point in this test, since different_auth below will
+            # obviously be empty
+            # (b) alternatively, why don't we do it earlier?
             logger.info("Skipping auth_event fetch for outlier")
             return context
 
-        # FIXME: Assumes we have and stored all the state for all the
-        # prev_events
         different_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
@@ -2191,53 +2228,58 @@ class FederationHandler(BaseHandler):
             different_auth,
         )
 
-        room_version = yield self.store.get_room_version(event.room_id)
+        # XXX: currently this checks for redactions but I'm not convinced that is
+        # necessary?
+        different_events = yield self.store.get_events_as_list(different_auth)
 
-        different_events = yield make_deferred_yieldable(
-            defer.gatherResults(
-                [
-                    run_in_background(
-                        self.store.get_event, d, allow_none=True, allow_rejected=False
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ],
-                consumeErrors=True,
-            )
-        ).addErrback(unwrapFirstError)
+        for d in different_events:
+            if d.room_id != event.room_id:
+                logger.warning(
+                    "Event %s refers to auth_event %s which is in a different room",
+                    event.event_id,
+                    d.event_id,
+                )
 
-        if different_events:
-            local_view = dict(auth_events)
-            remote_view = dict(auth_events)
-            remote_view.update(
-                {(d.type, d.state_key): d for d in different_events if d}
-            )
+                # don't attempt to resolve the claimed auth events against our own
+                # in this case: just use our own auth events.
+                #
+                # XXX: should we reject the event in this case? It feels like we should,
+                # but then shouldn't we also do so if we've failed to fetch any of the
+                # auth events?
+                return context
 
-            new_state = yield self.state_handler.resolve_events(
-                room_version,
-                [list(local_view.values()), list(remote_view.values())],
-                event,
-            )
+        # now we state-resolve between our own idea of the auth events, and the remote's
+        # idea of them.
 
-            logger.info(
-                "After state res: updating auth_events with new state %s",
-                {
-                    (d.type, d.state_key): d.event_id
-                    for d in new_state.values()
-                    if auth_events.get((d.type, d.state_key)) != d
-                },
-            )
+        local_state = auth_events.values()
+        remote_auth_events = dict(auth_events)
+        remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
+        remote_state = remote_auth_events.values()
+
+        room_version = yield self.store.get_room_version(event.room_id)
+        new_state = yield self.state_handler.resolve_events(
+            room_version, (local_state, remote_state), event
+        )
+
+        logger.info(
+            "After state res: updating auth_events with new state %s",
+            {
+                (d.type, d.state_key): d.event_id
+                for d in new_state.values()
+                if auth_events.get((d.type, d.state_key)) != d
+            },
+        )
 
-            auth_events.update(new_state)
+        auth_events.update(new_state)
 
-            context = yield self._update_context_for_auth_events(
-                event, context, auth_events, event_key
-            )
+        context = yield self._update_context_for_auth_events(
+            event, context, auth_events
+        )
 
         return context
 
     @defer.inlineCallbacks
-    def _update_context_for_auth_events(self, event, context, auth_events, event_key):
+    def _update_context_for_auth_events(self, event, context, auth_events):
         """Update the state_ids in an event context after auth event resolution,
         storing the changes as a new state group.
 
@@ -2246,18 +2288,21 @@ class FederationHandler(BaseHandler):
 
             context (synapse.events.snapshot.EventContext): initial event context
 
-            auth_events (dict[(str, str)->str]): Events to update in the event
+            auth_events (dict[(str, str)->EventBase]): Events to update in the event
                 context.
 
-            event_key ((str, str)): (type, state_key) for the current event.
-                this will not be included in the current_state in the context.
-
         Returns:
             Deferred[EventContext]: new event context
         """
+        # exclude the state key of the new event from the current_state in the context.
+        if event.is_state():
+            event_key = (event.type, event.state_key)
+        else:
+            event_key = None
         state_updates = {
             k: a.event_id for k, a in iteritems(auth_events) if k != event_key
         }
+
         current_state_ids = yield context.get_current_state_ids(self.store)
         current_state_ids = dict(current_state_ids)
 
@@ -2459,7 +2504,7 @@ class FederationHandler(BaseHandler):
                 room_version, event_dict, event, context
             )
 
-            EventValidator().validate_new(event)
+            EventValidator().validate_new(event, self.config)
 
             # We need to tell the transaction queue to send this out, even
             # though the sender isn't a local user.
@@ -2574,7 +2619,7 @@ class FederationHandler(BaseHandler):
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder
         )
-        EventValidator().validate_new(event)
+        EventValidator().validate_new(event, self.config)
         return (event, context)
 
     @defer.inlineCallbacks
@@ -2708,6 +2753,11 @@ class FederationHandler(BaseHandler):
                 event_and_contexts, backfilled=backfilled
             )
 
+            if self._ephemeral_messages_enabled:
+                for (event, context) in event_and_contexts:
+                    # If there's an expiry timestamp on the event, schedule its expiry.
+                    self._message_handler.maybe_schedule_expiry(event)
+
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
                     yield self._notify_persisted_event(event, max_stream_id)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..4f53a5f5dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Optional
 
 from six import iteritems, itervalues, string_types
 
@@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
+from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    Membership,
+    RelationTypes,
+    UserTypes,
+)
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -62,6 +70,17 @@ class MessageHandler(object):
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
         self._event_serializer = hs.get_event_client_serializer()
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+        self._is_worker_app = bool(hs.config.worker_app)
+
+        # The scheduled call to self._expire_event. None if no call is currently
+        # scheduled.
+        self._scheduled_expiry = None  # type: Optional[IDelayedCall]
+
+        if not hs.config.worker_app:
+            run_as_background_process(
+                "_schedule_next_expiry", self._schedule_next_expiry
+            )
 
     @defer.inlineCallbacks
     def get_room_data(
@@ -138,7 +157,7 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.storage, user_id, last_events
+                self.storage, user_id, last_events, apply_retention_policies=False
             )
 
             event = last_events[0]
@@ -225,6 +244,100 @@ class MessageHandler(object):
             for user_id, profile in iteritems(users_with_profile)
         }
 
+    def maybe_schedule_expiry(self, event):
+        """Schedule the expiry of an event if there's not already one scheduled,
+        or if the one running is for an event that will expire after the provided
+        timestamp.
+
+        This function needs to invalidate the event cache, which is only possible on
+        the master process, and therefore needs to be run on there.
+
+        Args:
+            event (EventBase): The event to schedule the expiry of.
+        """
+        assert not self._is_worker_app
+
+        expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+        if not isinstance(expiry_ts, int) or event.is_state():
+            return
+
+        # _schedule_expiry_for_event won't actually schedule anything if there's already
+        # a task scheduled for a timestamp that's sooner than the provided one.
+        self._schedule_expiry_for_event(event.event_id, expiry_ts)
+
+    @defer.inlineCallbacks
+    def _schedule_next_expiry(self):
+        """Retrieve the ID and the expiry timestamp of the next event to be expired,
+        and schedule an expiry task for it.
+
+        If there's no event left to expire, set _expiry_scheduled to None so that a
+        future call to save_expiry_ts can schedule a new expiry task.
+        """
+        # Try to get the expiry timestamp of the next event to expire.
+        res = yield self.store.get_next_event_to_expire()
+        if res:
+            event_id, expiry_ts = res
+            self._schedule_expiry_for_event(event_id, expiry_ts)
+
+    def _schedule_expiry_for_event(self, event_id, expiry_ts):
+        """Schedule an expiry task for the provided event if there's not already one
+        scheduled at a timestamp that's sooner than the provided one.
+
+        Args:
+            event_id (str): The ID of the event to expire.
+            expiry_ts (int): The timestamp at which to expire the event.
+        """
+        if self._scheduled_expiry:
+            # If the provided timestamp refers to a time before the scheduled time of the
+            # next expiry task, cancel that task and reschedule it for this timestamp.
+            next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
+            if expiry_ts < next_scheduled_expiry_ts:
+                self._scheduled_expiry.cancel()
+            else:
+                return
+
+        # Figure out how many seconds we need to wait before expiring the event.
+        now_ms = self.clock.time_msec()
+        delay = (expiry_ts - now_ms) / 1000
+
+        # callLater doesn't support negative delays, so trim the delay to 0 if we're
+        # in that case.
+        if delay < 0:
+            delay = 0
+
+        logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
+
+        self._scheduled_expiry = self.clock.call_later(
+            delay,
+            run_as_background_process,
+            "_expire_event",
+            self._expire_event,
+            event_id,
+        )
+
+    @defer.inlineCallbacks
+    def _expire_event(self, event_id):
+        """Retrieve and expire an event that needs to be expired from the database.
+
+        If the event doesn't exist in the database, log it and delete the expiry date
+        from the database (so that we don't try to expire it again).
+        """
+        assert self._ephemeral_events_enabled
+
+        self._scheduled_expiry = None
+
+        logger.info("Expiring event %s", event_id)
+
+        try:
+            # Expire the event if we know about it. This function also deletes the expiry
+            # date from the database in the same database transaction.
+            yield self.store.expire_event(event_id)
+        except Exception as e:
+            logger.error("Could not expire event %s: %r", event_id, e)
+
+        # Schedule the expiry of the next event to expire.
+        yield self._schedule_next_expiry()
+
 
 # The duration (in ms) after which rooms should be removed
 # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
@@ -295,6 +408,10 @@ class EventCreationHandler(object):
                 5 * 60 * 1000,
             )
 
+        self._message_handler = hs.get_message_handler()
+
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
     @defer.inlineCallbacks
     def create_event(
         self,
@@ -417,7 +534,7 @@ class EventCreationHandler(object):
                     403, "You must be in the room to create an alias for it"
                 )
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         return (event, context)
 
@@ -634,7 +751,7 @@ class EventCreationHandler(object):
         if requester:
             context.app_service = requester.app_service
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         # If this event is an annotation then we check that that the sender
         # can't annotate the same way twice (e.g. stops users from liking an
@@ -877,6 +994,10 @@ class EventCreationHandler(object):
             event, context=context
         )
 
+        if self._ephemeral_events_enabled:
+            # If there's an expiry timestamp on the event, schedule its expiry.
+            self._message_handler.maybe_schedule_expiry(event)
+
         yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
 
         def _notify():
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 260a4351ca..8514ddc600 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 
+from six import iteritems
+
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -80,6 +83,109 @@ class PaginationHandler(object):
         self._purges_by_id = {}
         self._event_serializer = hs.get_event_client_serializer()
 
+        self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+
+        if hs.config.retention_enabled:
+            # Run the purge jobs described in the configuration file.
+            for job in hs.config.retention_purge_jobs:
+                self.clock.looping_call(
+                    run_as_background_process,
+                    job["interval"],
+                    "purge_history_for_rooms_in_range",
+                    self.purge_history_for_rooms_in_range,
+                    job["shortest_max_lifetime"],
+                    job["longest_max_lifetime"],
+                )
+
+    @defer.inlineCallbacks
+    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+        """Purge outdated events from rooms within the given retention range.
+
+        If a default retention policy is defined in the server's configuration and its
+        'max_lifetime' is within this range, also targets rooms which don't have a
+        retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, it means that the range has no
+                lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, it means that the range has no
+                upper limit.
+        """
+        # We want the storage layer to to include rooms with no retention policy in its
+        # return value only if a default retention policy is defined in the server's
+        # configuration and that policy's 'max_lifetime' is either lower (or equal) than
+        # max_ms or higher than min_ms (or both).
+        if self._retention_default_max_lifetime is not None:
+            include_null = True
+
+            if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
+                # The default max_lifetime is lower than (or equal to) min_ms.
+                include_null = False
+
+            if max_ms is not None and max_ms < self._retention_default_max_lifetime:
+                # The default max_lifetime is higher than max_ms.
+                include_null = False
+        else:
+            include_null = False
+
+        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+            min_ms, max_ms, include_null
+        )
+
+        for room_id, retention_policy in iteritems(rooms):
+            if room_id in self._purges_in_progress_by_room:
+                logger.warning(
+                    "[purge] not purging room %s as there's an ongoing purge running"
+                    " for this room",
+                    room_id,
+                )
+                continue
+
+            max_lifetime = retention_policy["max_lifetime"]
+
+            if max_lifetime is None:
+                # If max_lifetime is None, it means that include_null equals True,
+                # therefore we can safely assume that there is a default policy defined
+                # in the server's configuration.
+                max_lifetime = self._retention_default_max_lifetime
+
+            # Figure out what token we should start purging at.
+            ts = self.clock.time_msec() - max_lifetime
+
+            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+
+            r = yield self.store.get_room_event_after_stream_ordering(
+                room_id, stream_ordering,
+            )
+            if not r:
+                logger.warning(
+                    "[purge] purging events not possible: No event found "
+                    "(ts %i => stream_ordering %i)",
+                    ts,
+                    stream_ordering,
+                )
+                continue
+
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
+
+            purge_id = random_string(16)
+
+            self._purges_by_id[purge_id] = PurgeStatus()
+
+            logger.info(
+                "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
+            )
+
+            # We want to purge everything, including local events, and to run the purge in
+            # the background so that it's not blocking any other operation apart from
+            # other purges in the same room.
+            run_as_background_process(
+                "_purge_history", self._purge_history, purge_id, room_id, token, True,
+            )
+
     def start_purge_history(self, room_id, token, delete_local_events=False):
         """Start off a history purge on a room.
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 22e0a04da4..1e5a4613c9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -152,7 +152,7 @@ class BaseProfileHandler(BaseHandler):
             by_admin (bool): Whether this change was made by an administrator.
         """
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
@@ -207,7 +207,7 @@ class BaseProfileHandler(BaseHandler):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
@@ -231,7 +231,7 @@ class BaseProfileHandler(BaseHandler):
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         just_field = args.get("field", None)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 235f11c322..8a7d965feb 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -266,7 +266,7 @@ class RegistrationHandler(BaseHandler):
             }
 
             # Bind email to new account
-            yield self._register_email_threepid(user_id, threepid_dict, None, False)
+            yield self._register_email_threepid(user_id, threepid_dict, None)
 
         return user_id
 
@@ -630,7 +630,7 @@ class RegistrationHandler(BaseHandler):
         # And we add an email pusher for them by default, but only
         # if email notifications are enabled (so people don't start
         # getting mail spam where they weren't before if email
-        # notifs are set up on a home server)
+        # notifs are set up on a homeserver)
         if (
             self.hs.config.email_enable_notifs
             and self.hs.config.email_notif_for_new_users
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e92b2eafd5..22768e97ff 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -198,21 +199,21 @@ class RoomCreationHandler(BaseHandler):
         # finally, shut down the PLs in the old room, and update them in the new
         # room.
         yield self._update_upgraded_room_pls(
-            requester, old_room_id, new_room_id, old_room_state
+            requester, old_room_id, new_room_id, old_room_state,
         )
 
         return new_room_id
 
     @defer.inlineCallbacks
     def _update_upgraded_room_pls(
-        self, requester, old_room_id, new_room_id, old_room_state
+        self, requester, old_room_id, new_room_id, old_room_state,
     ):
         """Send updated power levels in both rooms after an upgrade
 
         Args:
             requester (synapse.types.Requester): the user requesting the upgrade
-            old_room_id (unicode): the id of the room to be replaced
-            new_room_id (unicode): the id of the replacement room
+            old_room_id (str): the id of the room to be replaced
+            new_room_id (str): the id of the replacement room
             old_room_state (dict[tuple[str, str], str]): the state map for the old room
 
         Returns:
@@ -298,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
             tombstone_event_id (unicode|str): the ID of the tombstone event in the old
                 room.
         Returns:
-            Deferred[None]
+            Deferred
         """
         user_id = requester.user.to_string()
 
@@ -333,6 +334,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.Encryption, ""),
             (EventTypes.ServerACL, ""),
             (EventTypes.RelatedGroups, ""),
+            (EventTypes.PowerLevels, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -346,6 +348,31 @@ class RoomCreationHandler(BaseHandler):
             if old_event:
                 initial_state[k] = old_event.content
 
+        # Resolve the minimum power level required to send any state event
+        # We will give the upgrading user this power level temporarily (if necessary) such that
+        # they are able to copy all of the state events over, then revert them back to their
+        # original power level afterwards in _update_upgraded_room_pls
+
+        # Copy over user power levels now as this will not be possible with >100PL users once
+        # the room has been created
+
+        power_levels = initial_state[(EventTypes.PowerLevels, "")]
+
+        # Calculate the minimum power level needed to clone the room
+        event_power_levels = power_levels.get("events", {})
+        state_default = power_levels.get("state_default", 0)
+        ban = power_levels.get("ban")
+        needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+        # Raise the requester's power level in the new room if necessary
+        current_power_level = power_levels["users"][requester.user.to_string()]
+        if current_power_level < needed_power_level:
+            # Assign this power level to the requester
+            power_levels["users"][requester.user.to_string()] = needed_power_level
+
+        # Set the power levels to the modified state
+        initial_state[(EventTypes.PowerLevels, "")] = power_levels
+
         yield self._send_events_for_new_room(
             requester,
             new_room_id,
@@ -874,6 +901,10 @@ class RoomContextHandler(object):
             room_id, event_id, before_limit, after_limit, event_filter
         )
 
+        if event_filter:
+            results["events_before"] = event_filter.filter(results["events_before"])
+            results["events_after"] = event_filter.filter(results["events_after"])
+
         results["events_before"] = yield filter_evts(results["events_before"])
         results["events_after"] = yield filter_evts(results["events_after"])
         results["event"] = event
@@ -902,7 +933,12 @@ class RoomContextHandler(object):
         state = yield self.state_store.get_state_for_events(
             [last_event_id], state_filter=state_filter
         )
-        results["state"] = list(state[last_event_id].values())
+
+        state_events = list(state[last_event_id].values())
+        if event_filter:
+            state_events = event_filter.filter(state_events)
+
+        results["state"] = state_events
 
         # We use a dummy token here as we only care about the room portion of
         # the token, which we replace.
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6cfee4b361..7b7270fc61 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -94,7 +94,9 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Attempt to reject an invite for a room this server is not in. If we
         fail to do so we locally mark the invite as rejected.
 
@@ -104,6 +106,7 @@ class RoomMemberHandler(object):
                 reject invite
             room_id (str)
             target (UserID): The user rejecting the invite
+            content (dict): The content for the rejection event
 
         Returns:
             Deferred[dict]: A dictionary to be returned to the client, may
@@ -471,7 +474,7 @@ class RoomMemberHandler(object):
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
                     res = yield self._remote_reject_invite(
-                        requester, remote_room_hosts, room_id, target
+                        requester, remote_room_hosts, room_id, target, content,
                     )
                     return res
 
@@ -971,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             )
 
     @defer.inlineCallbacks
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Implements RoomMemberHandler._remote_reject_invite
         """
         fed_handler = self.federation_handler
         try:
             ret = yield fed_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, target.to_string()
+                remote_room_hosts, room_id, target.to_string(), content=content,
             )
             return ret
         except Exception as e:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 75e96ae1a2..69be86893b 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
 
         return ret
 
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Implements RoomMemberHandler._remote_reject_invite
         """
         return self._remote_reject_client(
@@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
             user_id=target.to_string(),
+            content=content,
         )
 
     def _user_joined_room(self, target, room_id):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ca8ae9fb5b..856337b7e2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -120,7 +120,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
@@ -150,7 +150,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")