summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py11
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py22
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/appservice/__init__.py2
-rw-r--r--synapse/config/_base.py3
-rw-r--r--synapse/config/password.py38
-rw-r--r--synapse/config/ratelimiting.py9
-rw-r--r--synapse/config/registration.py75
-rw-r--r--synapse/config/repository.py30
-rw-r--r--synapse/config/server.py205
-rw-r--r--synapse/config/user_directory.py9
-rw-r--r--synapse/events/spamcheck.py63
-rw-r--r--synapse/events/validator.py100
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/handlers/account_validity.py28
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/deactivate_account.py41
-rw-r--r--synapse/handlers/federation.py22
-rw-r--r--synapse/handlers/identity.py161
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/pagination.py108
-rw-r--r--synapse/handlers/password_policy.py93
-rw-r--r--synapse/handlers/profile.py219
-rw-r--r--synapse/handlers/register.py79
-rw-r--r--synapse/handlers/room.py37
-rw-r--r--synapse/handlers/room_member.py176
-rw-r--r--synapse/handlers/set_password.py6
-rw-r--r--synapse/http/client.py17
-rw-r--r--synapse/http/connectproxyclient.py195
-rw-r--r--synapse/http/proxyagent.py195
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/v1/login.py2
-rw-r--r--synapse/rest/client/v1/profile.py44
-rw-r--r--synapse/rest/client/v1/room.py1
-rw-r--r--synapse/rest/client/v2_alpha/account.py204
-rw-r--r--synapse/rest/client/v2_alpha/account_data.py7
-rw-r--r--synapse/rest/client/v2_alpha/account_validity.py2
-rw-r--r--synapse/rest/client/v2_alpha/password_policy.py58
-rw-r--r--synapse/rest/client/v2_alpha/register.py207
-rw-r--r--synapse/rest/client/v2_alpha/user_directory.py96
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/rulecheck/__init__.py0
-rw-r--r--synapse/rulecheck/domain_rule_checker.py181
-rw-r--r--synapse/server.py14
-rw-r--r--synapse/server.pyi9
-rw-r--r--synapse/storage/_base.py9
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/devices.py38
-rw-r--r--synapse/storage/end_to_end_keys.py2
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/profile.py100
-rw-r--r--synapse/storage/registration.py22
-rw-r--r--synapse/storage/room.py274
-rw-r--r--synapse/storage/schema/delta/48/profiles_batch.sql36
-rw-r--r--synapse/storage/schema/delta/50/profiles_deactivated_users.sql23
-rw-r--r--synapse/storage/schema/delta/55/profile_replication_status_index.sql17
-rw-r--r--synapse/storage/schema/delta/55/room_retention.sql33
-rw-r--r--synapse/storage/schema/delta/56/hidden_devices.sql18
-rw-r--r--synapse/storage/schema/full_schemas/54/full.sql.postgres15
-rw-r--r--synapse/storage/schema/full_schemas/54/full.sql.sqlite4
-rw-r--r--synapse/third_party_rules/access_rules.py586
-rw-r--r--synapse/types.py15
-rw-r--r--synapse/util/stringutils.py14
-rw-r--r--synapse/util/threepids.py40
-rw-r--r--synapse/visibility.py32
67 files changed, 3892 insertions, 181 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index ddc195bc32..7f604e3744 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -209,6 +209,7 @@ class Auth(object):
             access_token = self.get_access_token_from_request(request)
 
             user_id, app_service = yield self._get_appservice_user_id(request)
+
             if user_id:
                 request.authenticated_entity = user_id
                 opentracing.set_tag("authenticated_entity", user_id)
@@ -270,11 +271,11 @@ class Auth(object):
         except KeyError:
             raise MissingClientTokenError()
 
-    @defer.inlineCallbacks
     def _get_appservice_user_id(self, request):
         app_service = self.store.get_app_service_by_token(
             self.get_access_token_from_request(request)
         )
+
         if app_service is None:
             return None, None
 
@@ -292,8 +293,12 @@ class Auth(object):
 
         if not app_service.is_interested_in_user(user_id):
             raise AuthError(403, "Application service cannot masquerade as this user.")
-        if not (yield self.store.get_user_by_id(user_id)):
-            raise AuthError(403, "Application service has not registered this user")
+        # Let ASes manipulate nonexistent users (e.g. to shadow-register them)
+        # if not (yield self.store.get_user_by_id(user_id)):
+        #     raise AuthError(
+        #         403,
+        #         "Application service has not registered this user"
+        #     )
         return user_id, app_service
 
     @defer.inlineCallbacks
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index f29bce560c..956b86f6cf 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -85,6 +85,7 @@ class EventTypes(object):
     RoomAvatar = "m.room.avatar"
     RoomEncryption = "m.room.encryption"
     GuestAccess = "m.room.guest_access"
+    Encryption = "m.room.encryption"
 
     # These are used for validation
     Message = "m.room.message"
@@ -94,6 +95,8 @@ class EventTypes(object):
     ServerACL = "m.room.server_acl"
     Pinned = "m.room.pinned_events"
 
+    Retention = "m.room.retention"
+
 
 class RejectedReason(object):
     AUTH_ERROR = "auth_error"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index cf1ebf1af2..d160df0bfd 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -62,6 +63,13 @@ class Codes(object):
     WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
     EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
     USER_DEACTIVATED = "M_USER_DEACTIVATED"
+    PASSWORD_TOO_SHORT = "M_PASSWORD_TOO_SHORT"
+    PASSWORD_NO_DIGIT = "M_PASSWORD_NO_DIGIT"
+    PASSWORD_NO_UPPERCASE = "M_PASSWORD_NO_UPPERCASE"
+    PASSWORD_NO_LOWERCASE = "M_PASSWORD_NO_LOWERCASE"
+    PASSWORD_NO_SYMBOL = "M_PASSWORD_NO_SYMBOL"
+    PASSWORD_IN_DICTIONARY = "M_PASSWORD_IN_DICTIONARY"
+    WEAK_PASSWORD = "M_WEAK_PASSWORD"
 
 
 class CodeMessageException(RuntimeError):
@@ -419,6 +427,18 @@ class IncompatibleRoomVersionError(SynapseError):
         return cs_error(self.msg, self.errcode, room_version=self._room_version)
 
 
+class PasswordRefusedError(SynapseError):
+    """A password has been refused, either during password reset/change or registration.
+    """
+
+    def __init__(
+        self,
+        msg="This password doesn't comply with the server's policy",
+        errcode=Codes.WEAK_PASSWORD,
+    ):
+        super(PasswordRefusedError, self).__init__(code=400, msg=msg, errcode=errcode)
+
+
 class RequestSendFailed(RuntimeError):
     """Sending a HTTP request over federation failed due to not being able to
     talk to the remote server for some reason.
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 04f1ed14f3..baeafd9e04 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -563,7 +563,7 @@ def run(hs):
         stats["database_server_version"] = hs.get_datastore().get_server_version()
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
-            yield hs.get_simple_http_client().put_json(
+            yield hs.get_proxied_http_client().put_json(
                 "https://matrix.org/report-usage-stats/push", stats
             )
         except Exception as e:
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 33b3579425..65cbff95b9 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -268,7 +268,7 @@ class ApplicationService(object):
     def is_exclusive_room(self, room_id):
         return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
 
-    def get_exlusive_user_regexes(self):
+    def get_exclusive_user_regexes(self):
         """Get the list of regexes used to determine if a user is exclusively
         registered by the AS
         """
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 31f6530978..2674a62aa5 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -18,6 +18,7 @@
 import argparse
 import errno
 import os
+from io import open as io_open
 from textwrap import dedent
 
 from six import integer_types
@@ -133,7 +134,7 @@ class Config(object):
     @classmethod
     def read_file(cls, file_path, config_name):
         cls.check_file(file_path, config_name)
-        with open(file_path) as file_stream:
+        with io_open(file_path, encoding="utf-8") as file_stream:
             return file_stream.read()
 
     def invoke_all(self, name, *args, **kargs):
diff --git a/synapse/config/password.py b/synapse/config/password.py
index d5b5953f2f..47df98f41a 100644
--- a/synapse/config/password.py
+++ b/synapse/config/password.py
@@ -1,5 +1,7 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2015-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +31,10 @@ class PasswordConfig(Config):
         self.password_localdb_enabled = password_config.get("localdb_enabled", True)
         self.password_pepper = password_config.get("pepper", "")
 
+        # Password policy
+        self.password_policy = password_config.get("policy", {})
+        self.password_policy_enabled = self.password_policy.pop("enabled", False)
+
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """\
         password_config:
@@ -46,4 +52,34 @@ class PasswordConfig(Config):
            # DO NOT CHANGE THIS AFTER INITIAL SETUP!
            #
            #pepper: "EVEN_MORE_SECRET"
+
+           # Define and enforce a password policy. Each parameter is optional, boolean
+           # parameters default to 'false' and integer parameters default to 0.
+           # This is an early implementation of MSC2000.
+           #
+           #policy:
+              # Whether to enforce the password policy.
+              #
+              #enabled: true
+
+              # Minimum accepted length for a password.
+              #
+              #minimum_length: 15
+
+              # Whether a password must contain at least one digit.
+              #
+              #require_digit: true
+
+              # Whether a password must contain at least one symbol.
+              # A symbol is any character that's not a number or a letter.
+              #
+              #require_symbol: true
+
+              # Whether a password must contain at least one lowercase letter.
+              #
+              #require_lowercase: true
+
+              # Whether a password must contain at least one lowercase letter.
+              #
+              #require_uppercase: true
         """
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 33f31cf213..a1ea4fe02d 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -68,6 +68,9 @@ class RatelimitConfig(Config):
             )
 
         self.rc_registration = RateLimitConfig(config.get("rc_registration", {}))
+        self.rc_third_party_invite = RateLimitConfig(
+            config.get("rc_third_party_invite", {})
+        )
 
         rc_login_config = config.get("rc_login", {})
         self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {}))
@@ -102,6 +105,8 @@ class RatelimitConfig(Config):
         #   - one for login that ratelimits login requests based on the account the
         #     client is attempting to log into, based on the amount of failed login
         #     attempts for this account.
+        #   - one that ratelimits third-party invites requests based on the account
+        #     that's making the requests.
         #
         # The defaults are as shown below.
         #
@@ -123,6 +128,10 @@ class RatelimitConfig(Config):
         #  failed_attempts:
         #    per_second: 0.17
         #    burst_count: 3
+        #
+        #rc_third_party_invite:
+        #  per_second: 0.2
+        #  burst_count: 10
 
 
         # Ratelimiting settings for incoming federation
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index e2bee3c116..3240e30f70 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -92,8 +92,19 @@ class RegistrationConfig(Config):
 
         self.registrations_require_3pid = config.get("registrations_require_3pid", [])
         self.allowed_local_3pids = config.get("allowed_local_3pids", [])
+        self.check_is_for_allowed_local_3pids = config.get(
+            "check_is_for_allowed_local_3pids", None
+        )
+        self.allow_invited_3pids = config.get("allow_invited_3pids", False)
+
+        self.disable_3pid_changes = config.get("disable_3pid_changes", False)
+
         self.enable_3pid_lookup = config.get("enable_3pid_lookup", True)
         self.registration_shared_secret = config.get("registration_shared_secret")
+        self.register_mxid_from_3pid = config.get("register_mxid_from_3pid")
+        self.register_just_use_email_for_display_name = config.get(
+            "register_just_use_email_for_display_name", False
+        )
 
         self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
         self.trusted_third_party_id_servers = config.get(
@@ -111,6 +122,18 @@ class RegistrationConfig(Config):
                 raise ConfigError("Invalid auto_join_rooms entry %s" % (room_alias,))
         self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
 
+        self.disable_set_displayname = config.get("disable_set_displayname", False)
+        self.disable_set_avatar_url = config.get("disable_set_avatar_url", False)
+
+        self.replicate_user_profiles_to = config.get("replicate_user_profiles_to", [])
+        if not isinstance(self.replicate_user_profiles_to, list):
+            self.replicate_user_profiles_to = [self.replicate_user_profiles_to]
+
+        self.shadow_server = config.get("shadow_server", None)
+        self.rewrite_identity_server_urls = config.get(
+            "rewrite_identity_server_urls", {}
+        )
+
         self.disable_msisdn_registration = config.get(
             "disable_msisdn_registration", False
         )
@@ -209,9 +232,32 @@ class RegistrationConfig(Config):
         #
         #disable_msisdn_registration: true
 
+        # Derive the user's matrix ID from a type of 3PID used when registering.
+        # This overrides any matrix ID the user proposes when calling /register
+        # The 3PID type should be present in registrations_require_3pid to avoid
+        # users failing to register if they don't specify the right kind of 3pid.
+        #
+        #register_mxid_from_3pid: email
+
+        # Uncomment to set the display name of new users to their email address,
+        # rather than using the default heuristic.
+        #
+        #register_just_use_email_for_display_name: true
+
         # Mandate that users are only allowed to associate certain formats of
         # 3PIDs with accounts on this server.
         #
+        # Use an Identity Server to establish which 3PIDs are allowed to register?
+        # Overrides allowed_local_3pids below.
+        #
+        #check_is_for_allowed_local_3pids: matrix.org
+        #
+        # If you are using an IS you can also check whether that IS registers
+        # pending invites for the given 3PID (and then allow it to sign up on
+        # the platform):
+        #
+        #allow_invited_3pids: False
+        #
         #allowed_local_3pids:
         #  - medium: email
         #    pattern: '.*@matrix\\.org'
@@ -220,6 +266,11 @@ class RegistrationConfig(Config):
         #  - medium: msisdn
         #    pattern: '\\+44'
 
+        # If true, stop users from trying to change the 3PIDs associated with
+        # their accounts.
+        #
+        #disable_3pid_changes: False
+
         # Enable 3PIDs lookup requests to identity servers from this server.
         #
         #enable_3pid_lookup: true
@@ -261,6 +312,30 @@ class RegistrationConfig(Config):
         #  - matrix.org
         #  - vector.im
 
+        # If enabled, user IDs, display names and avatar URLs will be replicated
+        # to this server whenever they change.
+        # This is an experimental API currently implemented by sydent to support
+        # cross-homeserver user directories.
+        #
+        #replicate_user_profiles_to: example.com
+
+        # If specified, attempt to replay registrations, profile changes & 3pid
+        # bindings on the given target homeserver via the AS API. The HS is authed
+        # via a given AS token.
+        #
+        #shadow_server:
+        #  hs_url: https://shadow.example.com
+        #  hs: shadow.example.com
+        #  as_token: 12u394refgbdhivsia
+
+        # If enabled, don't let users set their own display names/avatars
+        # other than for the very first time (unless they are a server admin).
+        # Useful when provisioning users based on the contents of a 3rd party
+        # directory and to avoid ambiguities.
+        #
+        #disable_set_displayname: False
+        #disable_set_avatar_url: False
+
         # Users who register on this homeserver will automatically be joined
         # to these rooms
         #
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index fdb1f246d0..d3690733f7 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -104,6 +104,12 @@ class ContentRepositoryConfig(Config):
         self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
         self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
 
+        self.max_avatar_size = config.get("max_avatar_size")
+        if self.max_avatar_size:
+            self.max_avatar_size = self.parse_size(self.max_avatar_size)
+
+        self.allowed_avatar_mimetypes = config.get("allowed_avatar_mimetypes", [])
+
         self.media_store_path = self.ensure_directory(
             config.get("media_store_path", "media_store")
         )
@@ -249,6 +255,30 @@ class ContentRepositoryConfig(Config):
         #
         #max_upload_size: 10M
 
+        # The largest allowed size for a user avatar. If not defined, no
+        # restriction will be imposed.
+        #
+        # Note that this only applies when an avatar is changed globally.
+        # Per-room avatar changes are not affected. See allow_per_room_profiles
+        # for disabling that functionality.
+        #
+        # Note that user avatar changes will not work if this is set without
+        # using Synapse's local media repo.
+        #
+        #max_avatar_size: 10M
+
+        # Allow mimetypes for a user avatar. If not defined, no restriction will
+        # be imposed.
+        #
+        # Note that this only applies when an avatar is changed globally.
+        # Per-room avatar changes are not affected. See allow_per_room_profiles
+        # for disabling that functionality.
+        #
+        # Note that user avatar changes will not work if this is set without
+        # using Synapse's local media repo.
+        #
+        #allowed_avatar_mimetypes: ["image/png", "image/jpeg", "image/gif"]
+
         # Maximum number of pixels that will be thumbnailed
         #
         #max_image_pixels: 32M
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 2abdef0971..6c1e505777 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -92,6 +92,12 @@ class ServerConfig(Config):
             "require_auth_for_profile_requests", False
         )
 
+        # Whether to require sharing a room with a user to retrieve their
+        # profile data
+        self.limit_profile_requests_to_known_users = config.get(
+            "limit_profile_requests_to_known_users", False
+        )
+
         if "restrict_public_rooms_to_local_users" in config and (
             "allow_public_rooms_without_auth" in config
             or "allow_public_rooms_over_federation" in config
@@ -219,6 +225,130 @@ class ServerConfig(Config):
         # events with profile information that differ from the target's global profile.
         self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
 
+        # Whether to show the users on this homeserver in the user directory. Defaults to
+        # True.
+        self.show_users_in_user_directory = config.get(
+            "show_users_in_user_directory", True
+        )
+
+        retention_config = config.get("retention")
+        if retention_config is None:
+            retention_config = {}
+
+        self.retention_enabled = retention_config.get("enabled", False)
+
+        retention_default_policy = retention_config.get("default_policy")
+
+        if retention_default_policy is not None:
+            self.retention_default_min_lifetime = retention_default_policy.get(
+                "min_lifetime"
+            )
+            if self.retention_default_min_lifetime is not None:
+                self.retention_default_min_lifetime = self.parse_duration(
+                    self.retention_default_min_lifetime
+                )
+
+            self.retention_default_max_lifetime = retention_default_policy.get(
+                "max_lifetime"
+            )
+            if self.retention_default_max_lifetime is not None:
+                self.retention_default_max_lifetime = self.parse_duration(
+                    self.retention_default_max_lifetime
+                )
+
+            if (
+                self.retention_default_min_lifetime is not None
+                and self.retention_default_max_lifetime is not None
+                and (
+                    self.retention_default_min_lifetime
+                    > self.retention_default_max_lifetime
+                )
+            ):
+                raise ConfigError(
+                    "The default retention policy's 'min_lifetime' can not be greater"
+                    " than its 'max_lifetime'"
+                )
+        else:
+            self.retention_default_min_lifetime = None
+            self.retention_default_max_lifetime = None
+
+        self.retention_allowed_lifetime_min = retention_config.get(
+            "allowed_lifetime_min"
+        )
+        if self.retention_allowed_lifetime_min is not None:
+            self.retention_allowed_lifetime_min = self.parse_duration(
+                self.retention_allowed_lifetime_min
+            )
+
+        self.retention_allowed_lifetime_max = retention_config.get(
+            "allowed_lifetime_max"
+        )
+        if self.retention_allowed_lifetime_max is not None:
+            self.retention_allowed_lifetime_max = self.parse_duration(
+                self.retention_allowed_lifetime_max
+            )
+
+        if (
+            self.retention_allowed_lifetime_min is not None
+            and self.retention_allowed_lifetime_max is not None
+            and self.retention_allowed_lifetime_min
+            > self.retention_allowed_lifetime_max
+        ):
+            raise ConfigError(
+                "Invalid retention policy limits: 'allowed_lifetime_min' can not be"
+                " greater than 'allowed_lifetime_max'"
+            )
+
+        self.retention_purge_jobs = []
+        for purge_job_config in retention_config.get("purge_jobs", []):
+            interval_config = purge_job_config.get("interval")
+
+            if interval_config is None:
+                raise ConfigError(
+                    "A retention policy's purge jobs configuration must have the"
+                    " 'interval' key set."
+                )
+
+            interval = self.parse_duration(interval_config)
+
+            shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
+
+            if shortest_max_lifetime is not None:
+                shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
+
+            longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
+
+            if longest_max_lifetime is not None:
+                longest_max_lifetime = self.parse_duration(longest_max_lifetime)
+
+            if (
+                shortest_max_lifetime is not None
+                and longest_max_lifetime is not None
+                and shortest_max_lifetime > longest_max_lifetime
+            ):
+                raise ConfigError(
+                    "A retention policy's purge jobs configuration's"
+                    " 'shortest_max_lifetime' value can not be greater than its"
+                    " 'longest_max_lifetime' value."
+                )
+
+            self.retention_purge_jobs.append(
+                {
+                    "interval": interval,
+                    "shortest_max_lifetime": shortest_max_lifetime,
+                    "longest_max_lifetime": longest_max_lifetime,
+                }
+            )
+
+        if not self.retention_purge_jobs:
+            self.retention_purge_jobs = [
+                {
+                    "interval": self.parse_duration("1d"),
+                    "shortest_max_lifetime": None,
+                    "longest_max_lifetime": None,
+                }
+            ]
+
         self.listeners = []
         for listener in config.get("listeners", []):
             if not isinstance(listener.get("port", None), int):
@@ -479,6 +609,13 @@ class ServerConfig(Config):
         #
         #require_auth_for_profile_requests: true
 
+        # Whether to require a user to share a room with another user in order
+        # to retrieve their profile information. Only checked on Client-Server
+        # requests. Profile requests from other servers should be checked by the
+        # requesting server. Defaults to 'false'.
+        #
+        # limit_profile_requests_to_known_users: true
+
         # If set to 'false', requires authentication to access the server's public rooms
         # directory through the client API. Defaults to 'true'.
         #
@@ -718,6 +855,74 @@ class ServerConfig(Config):
         # Defaults to 'true'.
         #
         #allow_per_room_profiles: false
+
+        # Whether to show the users on this homeserver in the user directory. Defaults to
+        # 'true'.
+        #
+        #show_users_in_user_directory: false
+
+        # Message retention policy at the server level.
+        #
+        # Room admins and mods can define a retention period for their rooms using the
+        # 'm.room.retention' state event, and server admins can cap this period by setting
+        # the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
+        #
+        # If this feature is enabled, Synapse will regularly look for and purge events
+        # which are older than the room's maximum retention period. Synapse will also
+        # filter events received over federation so that events that should have been
+        # purged are ignored and not stored again.
+        #
+        retention:
+          # The message retention policies feature is disabled by default. Uncomment the
+          # following line to enable it.
+          #
+          #enabled: true
+
+          # Default retention policy. If set, Synapse will apply it to rooms that lack the
+          # 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
+          # matter much because Synapse doesn't take it into account yet.
+          #
+          #default_policy:
+          #  min_lifetime: 1d
+          #  max_lifetime: 1y
+
+          # Retention policy limits. If set, a user won't be able to send a
+          # 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
+          # that's not within this range. This is especially useful in closed federations,
+          # in which server admins can make sure every federating server applies the same
+          # rules.
+          #
+          #allowed_lifetime_min: 1d
+          #allowed_lifetime_max: 1y
+
+          # Server admins can define the settings of the background jobs purging the
+          # events which lifetime has expired under the 'purge_jobs' section.
+          #
+          # If no configuration is provided, a single job will be set up to delete expired
+          # events in every room daily.
+          #
+          # Each job's configuration defines which range of message lifetimes the job
+          # takes care of. For example, if 'shortest_max_lifetime' is '2d' and
+          # 'longest_max_lifetime' is '3d', the job will handle purging expired events in
+          # rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
+          # lower than or equal to 3 days. Both the minimum and the maximum value of a
+          # range are optional, e.g. a job with no 'shortest_max_lifetime' and a
+          # 'longest_max_lifetime' of '3d' will handle every room with a retention policy
+          # which 'max_lifetime' is lower than or equal to three days.
+          #
+          # The rationale for this per-job configuration is that some rooms might have a
+          # retention policy with a low 'max_lifetime', where history needs to be purged
+          # of outdated messages on a very frequent basis (e.g. every 5min), but not want
+          # that purge to be performed by a job that's iterating over every room it knows,
+          # which would be quite heavy on the server.
+          #
+          #purge_jobs:
+          #  - shortest_max_lifetime: 1d
+          #    longest_max_lifetime: 3d
+          #    interval: 5m:
+          #  - shortest_max_lifetime: 3d
+          #    longest_max_lifetime: 1y
+          #    interval: 24h
         """
             % locals()
         )
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index f6313e17d4..96493a5dcc 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -24,6 +24,7 @@ class UserDirectoryConfig(Config):
     def read_config(self, config, **kwargs):
         self.user_directory_search_enabled = True
         self.user_directory_search_all_users = False
+        self.user_directory_defer_to_id_server = None
         user_directory_config = config.get("user_directory", None)
         if user_directory_config:
             self.user_directory_search_enabled = user_directory_config.get(
@@ -32,6 +33,9 @@ class UserDirectoryConfig(Config):
             self.user_directory_search_all_users = user_directory_config.get(
                 "search_all_users", False
             )
+            self.user_directory_defer_to_id_server = user_directory_config.get(
+                "defer_to_id_server", None
+            )
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """
@@ -50,4 +54,9 @@ class UserDirectoryConfig(Config):
         #user_directory:
         #  enabled: true
         #  search_all_users: false
+        #
+        #  # If this is set, user search will be delegated to this ID server instead
+        #  # of synapse performing the search itself.
+        #  # This is an experimental API.
+        #  defer_to_id_server: https://id.example.com
         """
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index 129771f183..f0de4d961f 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -46,13 +46,33 @@ class SpamChecker(object):
 
         return self.spam_checker.check_event_for_spam(event)
 
-    def user_may_invite(self, inviter_userid, invitee_userid, room_id):
+    def user_may_invite(
+        self,
+        inviter_userid,
+        invitee_userid,
+        third_party_invite,
+        room_id,
+        new_room,
+        published_room,
+    ):
         """Checks if a given user may send an invite
 
         If this method returns false, the invite will be rejected.
 
         Args:
-            userid (string): The sender's user ID
+            inviter_userid (str)
+            invitee_userid (str|None): The user ID of the invitee. Is None
+                if this is a third party invite and the 3PID is not bound to a
+                user ID.
+            third_party_invite (dict|None): If a third party invite then is a
+                dict containing the medium and address of the invitee.
+            room_id (str)
+            new_room (bool): Whether the user is being invited to the room as
+                part of a room creation, if so the invitee would have been
+                included in the call to `user_may_create_room`.
+            published_room (bool): Whether the room the user is being invited
+                to has been published in the local homeserver's public room
+                directory.
 
         Returns:
             bool: True if the user may send an invite, otherwise False
@@ -61,16 +81,29 @@ class SpamChecker(object):
             return True
 
         return self.spam_checker.user_may_invite(
-            inviter_userid, invitee_userid, room_id
+            inviter_userid,
+            invitee_userid,
+            third_party_invite,
+            room_id,
+            new_room,
+            published_room,
         )
 
-    def user_may_create_room(self, userid):
+    def user_may_create_room(
+        self, userid, invite_list, third_party_invite_list, cloning
+    ):
         """Checks if a given user may create a room
 
         If this method returns false, the creation request will be rejected.
 
         Args:
             userid (string): The sender's user ID
+            invite_list (list[str]): List of user IDs that would be invited to
+                the new room.
+            third_party_invite_list (list[dict]): List of third party invites
+                for the new room.
+            cloning (bool): Whether the user is cloning an existing room, e.g.
+                upgrading a room.
 
         Returns:
             bool: True if the user may create a room, otherwise False
@@ -78,7 +111,9 @@ class SpamChecker(object):
         if self.spam_checker is None:
             return True
 
-        return self.spam_checker.user_may_create_room(userid)
+        return self.spam_checker.user_may_create_room(
+            userid, invite_list, third_party_invite_list, cloning
+        )
 
     def user_may_create_room_alias(self, userid, room_alias):
         """Checks if a given user may create a room alias
@@ -113,3 +148,21 @@ class SpamChecker(object):
             return True
 
         return self.spam_checker.user_may_publish_room(userid, room_id)
+
+    def user_may_join_room(self, userid, room_id, is_invited):
+        """Checks if a given users is allowed to join a room.
+
+        Is not called when the user creates a room.
+
+        Args:
+            userid (str)
+            room_id (str)
+            is_invited (bool): Whether the user is invited into the room
+
+        Returns:
+            bool: Whether the user may join the room
+        """
+        if self.spam_checker is None:
+            return True
+
+        return self.spam_checker.user_may_join_room(userid, room_id, is_invited)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 272426e105..9b90c9ce04 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import string_types
+from six import integer_types, string_types
 
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
@@ -22,11 +22,12 @@ from synapse.types import EventID, RoomID, UserID
 
 
 class EventValidator(object):
-    def validate_new(self, event):
+    def validate_new(self, event, config):
         """Validates the event has roughly the right format
 
         Args:
-            event (FrozenEvent)
+            event (FrozenEvent): The event to validate.
+            config (Config): The homeserver's configuration.
         """
         self.validate_builder(event)
 
@@ -67,6 +68,99 @@ class EventValidator(object):
                             Codes.INVALID_PARAM,
                         )
 
+        if event.type == EventTypes.Retention:
+            self._validate_retention(event, config)
+
+    def _validate_retention(self, event, config):
+        """Checks that an event that defines the retention policy for a room respects the
+        boundaries imposed by the server's administrator.
+
+        Args:
+            event (FrozenEvent): The event to validate.
+            config (Config): The homeserver's configuration.
+        """
+        min_lifetime = event.content.get("min_lifetime")
+        max_lifetime = event.content.get("max_lifetime")
+
+        if min_lifetime is not None:
+            if not isinstance(min_lifetime, integer_types):
+                raise SynapseError(
+                    code=400,
+                    msg="'min_lifetime' must be an integer",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_min is not None
+                and min_lifetime < config.retention_allowed_lifetime_min
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'min_lifetime' can't be lower than the minimum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_max is not None
+                and min_lifetime > config.retention_allowed_lifetime_max
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'min_lifetime' can't be greater than the maximum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+        if max_lifetime is not None:
+            if not isinstance(max_lifetime, integer_types):
+                raise SynapseError(
+                    code=400,
+                    msg="'max_lifetime' must be an integer",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_min is not None
+                and max_lifetime < config.retention_allowed_lifetime_min
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'max_lifetime' can't be lower than the minimum allowed value"
+                        " enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_max is not None
+                and max_lifetime > config.retention_allowed_lifetime_max
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'max_lifetime' can't be greater than the maximum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+        if (
+            min_lifetime is not None
+            and max_lifetime is not None
+            and min_lifetime > max_lifetime
+        ):
+            raise SynapseError(
+                code=400,
+                msg="'min_lifetime' can't be greater than 'max_lifetime",
+                errcode=Codes.BAD_JSON,
+            )
+
     def validate_builder(self, event):
         """Validates that the builder/event has roughly the right format. Only
         checks values that we expect a proto event to have, rather than all the
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d46f4aaeb1..36f6d470dc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
 
 sent_pdus_destination_dist_total = Counter(
     "synapse_federation_client_sent_pdu_destinations:total",
-    "" "Total number of PDUs queued for sending across all destinations",
+    "Total number of PDUs queued for sending across all destinations",
 )
 
 
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 34574f1a12..51305b0c90 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -43,6 +43,8 @@ class AccountValidityHandler(object):
         self.clock = self.hs.get_clock()
 
         self._account_validity = self.hs.config.account_validity
+        self._show_users_in_user_directory = self.hs.config.show_users_in_user_directory
+        self.profile_handler = self.hs.get_profile_handler()
 
         if self._account_validity.renew_by_email_enabled and load_jinja2_templates:
             # Don't do email-specific configuration if renewal by email is disabled.
@@ -77,6 +79,9 @@ class AccountValidityHandler(object):
 
             self.clock.looping_call(send_emails, 30 * 60 * 1000)
 
+        # Check every hour to remove expired users from the user directory
+        self.clock.looping_call(self._mark_expired_users_as_inactive, 60 * 60 * 1000)
+
     @defer.inlineCallbacks
     def send_renewal_emails(self):
         """Gets the list of users whose account is expiring in the amount of time
@@ -262,4 +267,27 @@ class AccountValidityHandler(object):
             user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
         )
 
+        # Check if renewed users should be reintroduced to the user directory
+        if self._show_users_in_user_directory:
+            # Show the user in the directory again by setting them to active
+            yield self.profile_handler.set_active(
+                UserID.from_string(user_id), True, True
+            )
+
         return expiration_ts
+
+    @defer.inlineCallbacks
+    def _mark_expired_users_as_inactive(self):
+        """Iterate over expired users. Mark them as inactive in order to hide them from the
+        user directory.
+
+        Returns:
+            Deferred
+        """
+        # Get expired users
+        expired_user_ids = yield self.store.get_expired_users()
+        expired_users = [UserID.from_string(user_id) for user_id in expired_user_ids]
+
+        # Mark each one as non-active
+        for user in expired_users:
+            yield self.profile_handler.set_active(user, False, True)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index f844409d21..75e743458b 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -409,7 +409,7 @@ class AuthHandler(BaseHandler):
         # TODO: get this from the homeserver rather than creating a new one for
         # each request
         try:
-            client = self.hs.get_simple_http_client()
+            client = self.hs.get_proxied_http_client()
             resp_body = yield client.post_urlencoded_get_json(
                 self.hs.config.recaptcha_siteverify_api,
                 args={
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 5f804d1f13..ad00dcecfd 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -35,6 +35,7 @@ class DeactivateAccountHandler(BaseHandler):
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
         self._identity_handler = hs.get_handlers().identity_handler
+        self._profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -102,6 +103,9 @@ class DeactivateAccountHandler(BaseHandler):
 
         yield self.store.user_set_password_hash(user_id, None)
 
+        user = UserID.from_string(user_id)
+        yield self._profile_handler.set_active(user, False, False)
+
         # Add the user to a table of users pending deactivation (ie.
         # removal from all the rooms they're a member of)
         yield self.store.add_user_pending_deactivation(user_id)
@@ -118,6 +122,10 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        # Reject all pending invites for the user, so that the user doesn't show up in the
+        # "invited" section of rooms' members list.
+        yield self._reject_pending_invites_for_user(user_id)
+
         # Remove all information on the user from the account_validity table.
         if self._account_validity_enabled:
             yield self.store.delete_account_validity_for_user(user_id)
@@ -127,6 +135,39 @@ class DeactivateAccountHandler(BaseHandler):
 
         return identity_server_supports_unbinding
 
+    @defer.inlineCallbacks
+    def _reject_pending_invites_for_user(self, user_id):
+        """Reject pending invites addressed to a given user ID.
+
+        Args:
+            user_id (str): The user ID to reject pending invites for.
+        """
+        user = UserID.from_string(user_id)
+        pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
+
+        for room in pending_invites:
+            try:
+                yield self._room_member_handler.update_membership(
+                    create_requester(user),
+                    user,
+                    room.room_id,
+                    "leave",
+                    ratelimit=False,
+                    require_consent=False,
+                )
+                logger.info(
+                    "Rejected invite for deactivated user %r in room %r",
+                    user_id,
+                    room.room_id,
+                )
+            except Exception:
+                logger.exception(
+                    "Failed to reject invite for user %r in room %r:"
+                    " ignoring and continuing",
+                    user_id,
+                    room.room_id,
+                )
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 538b16efd6..807b38eb16 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1349,8 +1349,15 @@ class FederationHandler(BaseHandler):
         if self.hs.config.block_non_admin_invites:
             raise SynapseError(403, "This server does not accept room invites")
 
+        is_published = yield self.store.is_room_published(event.room_id)
+
         if not self.spam_checker.user_may_invite(
-            event.sender, event.state_key, event.room_id
+            event.sender,
+            event.state_key,
+            None,
+            room_id=event.room_id,
+            new_room=False,
+            published_room=is_published,
         ):
             raise SynapseError(
                 403, "This user is not permitted to send invites to this server/user"
@@ -2507,7 +2514,7 @@ class FederationHandler(BaseHandler):
                 room_version, event_dict, event, context
             )
 
-            EventValidator().validate_new(event)
+            EventValidator().validate_new(event, self.config)
 
             # We need to tell the transaction queue to send this out, even
             # though the sender isn't a local user.
@@ -2565,7 +2572,7 @@ class FederationHandler(BaseHandler):
         )
 
         try:
-            self.auth.check_from_context(room_version, event, context)
+            yield self.auth.check_from_context(room_version, event, context)
         except AuthError as e:
             logger.warn("Denying third party invite %r because %s", event, e)
             raise e
@@ -2594,7 +2601,12 @@ class FederationHandler(BaseHandler):
                 original_invite_id, allow_none=True
             )
         if original_invite:
-            display_name = original_invite.content["display_name"]
+            # If the m.room.third_party_invite event's content is empty, it means the
+            # invite has been revoked. In this case, we don't have to raise an error here
+            # because the auth check will fail on the invite (because it's not able to
+            # fetch public keys from the m.room.third_party_invite event's content, which
+            # is empty).
+            display_name = original_invite.content.get("display_name")
             event_dict["content"]["third_party_invite"]["display_name"] = display_name
         else:
             logger.info(
@@ -2609,7 +2621,7 @@ class FederationHandler(BaseHandler):
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder
         )
-        EventValidator().validate_new(event)
+        EventValidator().validate_new(event, self.config)
         return (event, context)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index d199521b58..339e0dd04d 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018, 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -20,13 +20,18 @@
 import logging
 
 from canonicaljson import json
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
 from synapse.api.errors import (
+    AuthError,
     CodeMessageException,
     Codes,
     HttpResponseException,
+    ProxiedRequestError,
     SynapseError,
 )
 
@@ -46,6 +51,8 @@ class IdentityHandler(BaseHandler):
         self.trust_any_id_server_just_for_testing_do_not_use = (
             hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
         )
+        self.rewrite_identity_server_urls = hs.config.rewrite_identity_server_urls
+        self._enable_lookup = hs.config.enable_3pid_lookup
 
     def _should_trust_id_server(self, id_server):
         if id_server not in self.trusted_id_servers:
@@ -83,7 +90,10 @@ class IdentityHandler(BaseHandler):
                 id_server,
             )
             return None
-
+        # if we have a rewrite rule set for the identity server,
+        # apply it now.
+        if id_server in self.rewrite_identity_server_urls:
+            id_server = self.rewrite_identity_server_urls[id_server]
         try:
             data = yield self.http_client.get_json(
                 "https://%s%s"
@@ -117,9 +127,17 @@ class IdentityHandler(BaseHandler):
         else:
             raise SynapseError(400, "No client_secret in creds")
 
+        # if we have a rewrite rule set for the identity server,
+        # apply it now, but only for sending the request (not
+        # storing in the database).
+        if id_server in self.rewrite_identity_server_urls:
+            id_server_host = self.rewrite_identity_server_urls[id_server]
+        else:
+            id_server_host = id_server
+
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
+                "https://%s%s" % (id_server_host, "/_matrix/identity/api/v1/3pid/bind"),
                 {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
@@ -205,6 +223,16 @@ class IdentityHandler(BaseHandler):
         )
         headers = {b"Authorization": auth_headers}
 
+        # if we have a rewrite rule set for the identity server,
+        # apply it now.
+        #
+        # Note that destination_is has to be the real id_server, not
+        # the server we connect to.
+        if id_server in self.rewrite_identity_server_urls:
+            id_server = self.rewrite_identity_server_urls[id_server]
+
+        url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+
         try:
             yield self.http_client.post_json_get_json(url, content, headers)
             changed = True
@@ -241,6 +269,11 @@ class IdentityHandler(BaseHandler):
             "send_attempt": send_attempt,
         }
 
+        # if we have a rewrite rule set for the identity server,
+        # apply it now.
+        if id_server in self.rewrite_identity_server_urls:
+            id_server = self.rewrite_identity_server_urls[id_server]
+
         if next_link:
             params.update({"next_link": next_link})
 
@@ -271,7 +304,10 @@ class IdentityHandler(BaseHandler):
             "send_attempt": send_attempt,
         }
         params.update(kwargs)
-
+        # if we have a rewrite rule set for the identity server,
+        # apply it now.
+        if id_server in self.rewrite_identity_server_urls:
+            id_server = self.rewrite_identity_server_urls[id_server]
         try:
             data = yield self.http_client.post_json_get_json(
                 "https://%s%s"
@@ -282,3 +318,120 @@ class IdentityHandler(BaseHandler):
         except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e.to_synapse_error()
+
+    @defer.inlineCallbacks
+    def lookup_3pid(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            Deferred[dict]: The result of the lookup. See
+            https://matrix.org/docs/spec/identity_service/r0.1.0.html#association-lookup
+            for details
+        """
+        if not self._should_trust_id_server(id_server):
+            raise SynapseError(
+                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
+            )
+
+        if not self._enable_lookup:
+            raise AuthError(
+                403, "Looking up third-party identifiers is denied from this server"
+            )
+
+        target = self.rewrite_identity_server_urls.get(id_server, id_server)
+
+        try:
+            data = yield self.http_client.get_json(
+                "https://%s/_matrix/identity/api/v1/lookup" % (target,),
+                {"medium": medium, "address": address},
+            )
+
+            if "mxid" in data:
+                if "signatures" not in data:
+                    raise AuthError(401, "No signatures on 3pid binding")
+                yield self._verify_any_signature(data, id_server)
+
+        except HttpResponseException as e:
+            logger.info("Proxied lookup failed: %r", e)
+            raise e.to_synapse_error()
+        except IOError as e:
+            logger.info("Failed to contact %r: %s", id_server, e)
+            raise ProxiedRequestError(503, "Failed to contact identity server")
+
+        defer.returnValue(data)
+
+    @defer.inlineCallbacks
+    def bulk_lookup_3pid(self, id_server, threepids):
+        """Looks up given 3pids in the passed identity server.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            threepids ([[str, str]]): The third party identifiers to lookup, as
+                a list of 2-string sized lists ([medium, address]).
+
+        Returns:
+            Deferred[dict]: The result of the lookup. See
+            https://matrix.org/docs/spec/identity_service/r0.1.0.html#association-lookup
+            for details
+        """
+        if not self._should_trust_id_server(id_server):
+            raise SynapseError(
+                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
+            )
+
+        if not self._enable_lookup:
+            raise AuthError(
+                403, "Looking up third-party identifiers is denied from this server"
+            )
+
+        target = self.rewrite_identity_server_urls.get(id_server, id_server)
+
+        try:
+            data = yield self.http_client.post_json_get_json(
+                "https://%s/_matrix/identity/api/v1/bulk_lookup" % (target,),
+                {"threepids": threepids},
+            )
+
+        except HttpResponseException as e:
+            logger.info("Proxied lookup failed: %r", e)
+            raise e.to_synapse_error()
+        except IOError as e:
+            logger.info("Failed to contact %r: %s", id_server, e)
+            raise ProxiedRequestError(503, "Failed to contact identity server")
+
+        defer.returnValue(data)
+
+    @defer.inlineCallbacks
+    def _verify_any_signature(self, data, server_hostname):
+        if server_hostname not in data["signatures"]:
+            raise AuthError(401, "No signature from server %s" % (server_hostname,))
+
+        for key_name, signature in data["signatures"][server_hostname].items():
+            target = self.rewrite_identity_server_urls.get(
+                server_hostname, server_hostname
+            )
+
+            key_data = yield self.http_client.get_json(
+                "https://%s/_matrix/identity/api/v1/pubkey/%s" % (target, key_name)
+            )
+            if "public_key" not in key_data:
+                raise AuthError(
+                    401, "No public key named %s from %s" % (key_name, server_hostname)
+                )
+            verify_signed_json(
+                data,
+                server_hostname,
+                decode_verify_key_bytes(
+                    key_name, decode_base64(key_data["public_key"])
+                ),
+            )
+            return
+
+        raise AuthError(401, "No signature from server %s" % (server_hostname,))
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 111f7c7e2f..291eb9cb15 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -135,7 +135,7 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.store, user_id, last_events
+                self.store, user_id, last_events, apply_retention_policies=False
             )
 
             event = last_events[0]
@@ -398,7 +398,7 @@ class EventCreationHandler(object):
                     403, "You must be in the room to create an alias for it"
                 )
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         return (event, context)
 
@@ -615,7 +615,7 @@ class EventCreationHandler(object):
         if requester:
             context.app_service = requester.app_service
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         # If this event is an annotation then we check that that the sender
         # can't annotate the same way twice (e.g. stops users from liking an
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5744f4579d..d8c3feff16 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 
+from six import iteritems
+
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +81,111 @@ class PaginationHandler(object):
         self._purges_by_id = {}
         self._event_serializer = hs.get_event_client_serializer()
 
+        self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+
+        if hs.config.retention_enabled:
+            # Run the purge jobs described in the configuration file.
+            for job in hs.config.retention_purge_jobs:
+                self.clock.looping_call(
+                    run_as_background_process,
+                    job["interval"],
+                    "purge_history_for_rooms_in_range",
+                    self.purge_history_for_rooms_in_range,
+                    job["shortest_max_lifetime"],
+                    job["longest_max_lifetime"],
+                )
+
+    @defer.inlineCallbacks
+    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+        """Purge outdated events from rooms within the given retention range.
+
+        If a default retention policy is defined in the server's configuration and its
+        'max_lifetime' is within this range, also targets rooms which don't have a
+        retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, it means that the range has no
+                lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, it means that the range has no
+                upper limit.
+        """
+        # We want the storage layer to to include rooms with no retention policy in its
+        # return value only if a default retention policy is defined in the server's
+        # configuration and that policy's 'max_lifetime' is either lower (or equal) than
+        # max_ms or higher than min_ms (or both).
+        if self._retention_default_max_lifetime is not None:
+            include_null = True
+
+            if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
+                # The default max_lifetime is lower than (or equal to) min_ms.
+                include_null = False
+
+            if max_ms is not None and max_ms < self._retention_default_max_lifetime:
+                # The default max_lifetime is higher than max_ms.
+                include_null = False
+        else:
+            include_null = False
+
+        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+            min_ms, max_ms, include_null
+        )
+
+        for room_id, retention_policy in iteritems(rooms):
+            if room_id in self._purges_in_progress_by_room:
+                logger.warning(
+                    "[purge] not purging room %s as there's an ongoing purge running"
+                    " for this room",
+                    room_id,
+                )
+                continue
+
+            max_lifetime = retention_policy["max_lifetime"]
+
+            if max_lifetime is None:
+                # If max_lifetime is None, it means that include_null equals True,
+                # therefore we can safely assume that there is a default policy defined
+                # in the server's configuration.
+                max_lifetime = self._retention_default_max_lifetime
+
+            # Figure out what token we should start purging at.
+            ts = self.clock.time_msec() - max_lifetime
+
+            stream_ordering = (yield self.store.find_first_stream_ordering_after_ts(ts))
+
+            r = (
+                yield self.store.get_room_event_after_stream_ordering(
+                    room_id, stream_ordering
+                )
+            )
+            if not r:
+                logger.warning(
+                    "[purge] purging events not possible: No event found "
+                    "(ts %i => stream_ordering %i)",
+                    ts,
+                    stream_ordering,
+                )
+                continue
+
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
+
+            purge_id = random_string(16)
+
+            self._purges_by_id[purge_id] = PurgeStatus()
+
+            logger.info(
+                "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
+            )
+
+            # We want to purge everything, including local events, and to run the purge in
+            # the background so that it's not blocking any other operation apart from
+            # other purges in the same room.
+            run_as_background_process(
+                "_purge_history", self._purge_history, purge_id, room_id, token, True
+            )
+
     def start_purge_history(self, room_id, token, delete_local_events=False):
         """Start off a history purge on a room.
 
diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py
new file mode 100644
index 0000000000..d06b110269
--- /dev/null
+++ b/synapse/handlers/password_policy.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from synapse.api.errors import Codes, PasswordRefusedError
+
+logger = logging.getLogger(__name__)
+
+
+class PasswordPolicyHandler(object):
+    def __init__(self, hs):
+        self.policy = hs.config.password_policy
+        self.enabled = hs.config.password_policy_enabled
+
+        # Regexps for the spec'd policy parameters.
+        self.regexp_digit = re.compile("[0-9]")
+        self.regexp_symbol = re.compile("[^a-zA-Z0-9]")
+        self.regexp_uppercase = re.compile("[A-Z]")
+        self.regexp_lowercase = re.compile("[a-z]")
+
+    def validate_password(self, password):
+        """Checks whether a given password complies with the server's policy.
+
+        Args:
+            password (str): The password to check against the server's policy.
+
+        Raises:
+            PasswordRefusedError: The password doesn't comply with the server's policy.
+        """
+
+        if not self.enabled:
+            return
+
+        minimum_accepted_length = self.policy.get("minimum_length", 0)
+        if len(password) < minimum_accepted_length:
+            raise PasswordRefusedError(
+                msg=(
+                    "The password must be at least %d characters long"
+                    % minimum_accepted_length
+                ),
+                errcode=Codes.PASSWORD_TOO_SHORT,
+            )
+
+        if (
+            self.policy.get("require_digit", False)
+            and self.regexp_digit.search(password) is None
+        ):
+            raise PasswordRefusedError(
+                msg="The password must include at least one digit",
+                errcode=Codes.PASSWORD_NO_DIGIT,
+            )
+
+        if (
+            self.policy.get("require_symbol", False)
+            and self.regexp_symbol.search(password) is None
+        ):
+            raise PasswordRefusedError(
+                msg="The password must include at least one symbol",
+                errcode=Codes.PASSWORD_NO_SYMBOL,
+            )
+
+        if (
+            self.policy.get("require_uppercase", False)
+            and self.regexp_uppercase.search(password) is None
+        ):
+            raise PasswordRefusedError(
+                msg="The password must include at least one uppercase letter",
+                errcode=Codes.PASSWORD_NO_UPPERCASE,
+            )
+
+        if (
+            self.policy.get("require_lowercase", False)
+            and self.regexp_lowercase.search(password) is None
+        ):
+            raise PasswordRefusedError(
+                msg="The password must include at least one lowercase letter",
+                errcode=Codes.PASSWORD_NO_LOWERCASE,
+            )
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 8690f69d45..fb31711b29 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,8 +17,11 @@
 import logging
 
 from six import raise_from
+from six.moves import range
 
-from twisted.internet import defer
+from signedjson.sign import sign_json
+
+from twisted.internet import defer, reactor
 
 from synapse.api.errors import (
     AuthError,
@@ -27,6 +31,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
 
@@ -46,6 +51,8 @@ class BaseProfileHandler(BaseHandler):
     subclass MasterProfileHandler
     """
 
+    PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
     def __init__(self, hs):
         super(BaseProfileHandler, self).__init__(hs)
 
@@ -56,6 +63,87 @@ class BaseProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
+        self.http_client = hs.get_simple_http_client()
+
+        self.max_avatar_size = hs.config.max_avatar_size
+        self.allowed_avatar_mimetypes = hs.config.allowed_avatar_mimetypes
+
+        if hs.config.worker_app is None:
+            self.clock.looping_call(
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
+            )
+
+            if len(self.hs.config.replicate_user_profiles_to) > 0:
+                reactor.callWhenRunning(self._assign_profile_replication_batches)
+                reactor.callWhenRunning(self._replicate_profiles)
+                # Add a looping call to replicate_profiles: this handles retries
+                # if the replication is unsuccessful when the user updated their
+                # profile.
+                self.clock.looping_call(
+                    self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
+                )
+
+    @defer.inlineCallbacks
+    def _assign_profile_replication_batches(self):
+        """If no profile replication has been done yet, allocate replication batch
+        numbers to each profile to start the replication process.
+        """
+        logger.info("Assigning profile batch numbers...")
+        total = 0
+        while True:
+            assigned = yield self.store.assign_profile_batch()
+            total += assigned
+            if assigned == 0:
+                break
+        logger.info("Assigned %d profile batch numbers", total)
+
+    @defer.inlineCallbacks
+    def _replicate_profiles(self):
+        """If any profile data has been updated and not pushed to the replication targets,
+        replicate it.
+        """
+        host_batches = yield self.store.get_replication_hosts()
+        latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+        if latest_batch is None:
+            latest_batch = -1
+        for repl_host in self.hs.config.replicate_user_profiles_to:
+            if repl_host not in host_batches:
+                host_batches[repl_host] = -1
+            try:
+                for i in range(host_batches[repl_host] + 1, latest_batch + 1):
+                    yield self._replicate_host_profile_batch(repl_host, i)
+            except Exception:
+                logger.exception(
+                    "Exception while replicating to %s: aborting for now", repl_host
+                )
+
+    @defer.inlineCallbacks
+    def _replicate_host_profile_batch(self, host, batchnum):
+        logger.info("Replicating profile batch %d to %s", batchnum, host)
+        batch_rows = yield self.store.get_profile_batch(batchnum)
+        batch = {
+            UserID(r["user_id"], self.hs.hostname).to_string(): (
+                {"display_name": r["displayname"], "avatar_url": r["avatar_url"]}
+                if r["active"]
+                else None
+            )
+            for r in batch_rows
+        }
+
+        url = "https://%s/_matrix/identity/api/v1/replicate_profiles" % (host,)
+        body = {"batchnum": batchnum, "batch": batch, "origin_server": self.hs.hostname}
+        signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
+        try:
+            yield self.http_client.post_json_get_json(url, signed_body)
+            yield self.store.update_replication_batch_for_host(host, batchnum)
+            logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+        except Exception:
+            # This will get retried when the looping call next comes around
+            logger.exception(
+                "Failed to replicate profile batch %d to %s", batchnum, host
+            )
+            raise
+
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
@@ -154,9 +242,16 @@ class BaseProfileHandler(BaseHandler):
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if not by_admin and target_user != requester.user:
+        if not by_admin and requester and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
+        if not by_admin and self.hs.config.disable_set_displayname:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            if profile.display_name:
+                raise SynapseError(
+                    400, "Changing displayname is disabled on this server"
+                )
+
         if len(new_displayname) > MAX_DISPLAYNAME_LEN:
             raise SynapseError(
                 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,)
@@ -165,7 +260,17 @@ class BaseProfileHandler(BaseHandler):
         if new_displayname == "":
             new_displayname = None
 
-        yield self.store.set_profile_displayname(target_user.localpart, new_displayname)
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+
+        yield self.store.set_profile_displayname(
+            target_user.localpart, new_displayname, new_batchnum
+        )
 
         if self.hs.config.user_directory_search_all_users:
             profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -173,7 +278,39 @@ class BaseProfileHandler(BaseHandler):
                 target_user.to_string(), profile
             )
 
-        yield self._update_join_states(requester, target_user)
+        if requester:
+            yield self._update_join_states(requester, target_user)
+
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
+
+    @defer.inlineCallbacks
+    def set_active(self, target_user, active, hide):
+        """
+        Sets the 'active' flag on a user profile. If set to false, the user
+        account is considered deactivated or hidden.
+
+        If 'hide' is true, then we interpret active=False as a request to try to
+        hide the user rather than deactivating it.  This means withholding the
+        profile from replication (and mark it as inactive) rather than clearing
+        the profile from the HS DB. Note that unlike set_displayname and
+        set_avatar_url, this does *not* perform authorization checks! This is
+        because the only place it's used currently is in account deactivation
+        where we've already done these checks anyway.
+        """
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+        yield self.store.set_profile_active(
+            target_user.localpart, active, hide, new_batchnum
+        )
+
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
 
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
@@ -212,12 +349,59 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
+        if not by_admin and self.hs.config.disable_set_avatar_url:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            if profile.avatar_url:
+                raise SynapseError(
+                    400, "Changing avatar url is disabled on this server"
+                )
+
+        if len(self.hs.config.replicate_user_profiles_to) > 0:
+            cur_batchnum = (
+                yield self.store.get_latest_profile_replication_batch_number()
+            )
+            new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1
+        else:
+            new_batchnum = None
+
         if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
             raise SynapseError(
                 400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN,)
             )
 
-        yield self.store.set_profile_avatar_url(target_user.localpart, new_avatar_url)
+        # Enforce a max avatar size if one is defined
+        if self.max_avatar_size or self.allowed_avatar_mimetypes:
+            media_id = self._validate_and_parse_media_id_from_avatar_url(new_avatar_url)
+
+            # Check that this media exists locally
+            media_info = yield self.store.get_local_media(media_id)
+            if not media_info:
+                raise SynapseError(
+                    400, "Unknown media id supplied", errcode=Codes.NOT_FOUND
+                )
+
+            # Ensure avatar does not exceed max allowed avatar size
+            media_size = media_info["media_length"]
+            if self.max_avatar_size and media_size > self.max_avatar_size:
+                raise SynapseError(
+                    400,
+                    "Avatars must be less than %s bytes in size"
+                    % (self.max_avatar_size,),
+                    errcode=Codes.TOO_LARGE,
+                )
+
+            # Ensure the avatar's file type is allowed
+            if (
+                self.allowed_avatar_mimetypes
+                and media_info["media_type"] not in self.allowed_avatar_mimetypes
+            ):
+                raise SynapseError(
+                    400, "Avatar file type '%s' not allowed" % media_info["media_type"]
+                )
+
+        yield self.store.set_profile_avatar_url(
+            target_user.localpart, new_avatar_url, new_batchnum
+        )
 
         if self.hs.config.user_directory_search_all_users:
             profile = yield self.store.get_profileinfo(target_user.localpart)
@@ -227,6 +411,23 @@ class BaseProfileHandler(BaseHandler):
 
         yield self._update_join_states(requester, target_user)
 
+        # start a profile replication push
+        run_in_background(self._replicate_profiles)
+
+    def _validate_and_parse_media_id_from_avatar_url(self, mxc):
+        """Validate and parse a provided avatar url and return the local media id
+
+        Args:
+            mxc (str): A mxc URL
+
+        Returns:
+            str: The ID of the media
+        """
+        avatar_pieces = mxc.split("/")
+        if len(avatar_pieces) != 4 or avatar_pieces[0] != "mxc:":
+            raise SynapseError(400, "Invalid avatar URL '%s' supplied" % mxc)
+        return avatar_pieces[-1]
+
     @defer.inlineCallbacks
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
@@ -282,7 +483,7 @@ class BaseProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def check_profile_query_allowed(self, target_user, requester=None):
         """Checks whether a profile query is allowed. If the
-        'require_auth_for_profile_requests' config flag is set to True and a
+        'limit_profile_requests_to_known_users' config flag is set to True and a
         'requester' is provided, the query is only allowed if the two users
         share a room.
 
@@ -300,7 +501,11 @@ class BaseProfileHandler(BaseHandler):
         # be None when this function is called outside of a profile query, e.g.
         # when building a membership event. In this case, we must allow the
         # lookup.
-        if not self.hs.config.require_auth_for_profile_requests or not requester:
+        if not self.hs.config.limit_profile_requests_to_known_users or not requester:
+            return
+
+        # Always allow the user to query their own profile.
+        if target_user.to_string() == requester.to_string():
             return
 
         # Always allow the user to query their own profile.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 3142d85788..98950b9628 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -56,6 +56,7 @@ class RegistrationHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
+        self.http_client = hs.get_simple_http_client()
         self.identity_handler = self.hs.get_handlers().identity_handler
         self.ratelimiter = hs.get_registration_ratelimiter()
 
@@ -68,6 +69,8 @@ class RegistrationHandler(BaseHandler):
         )
         self._server_notices_mxid = hs.config.server_notices_mxid
 
+        self._show_in_user_directory = self.hs.config.show_users_in_user_directory
+
         if hs.config.worker_app:
             self._register_client = ReplicationRegisterServlet.make_client(hs)
             self._register_device_client = RegisterDeviceReplicationServlet.make_client(
@@ -209,6 +212,11 @@ class RegistrationHandler(BaseHandler):
                 address=address,
             )
 
+            if default_display_name:
+                yield self.profile_handler.set_displayname(
+                    user, None, default_display_name, by_admin=True
+                )
+
             if self.hs.config.user_directory_search_all_users:
                 profile = yield self.store.get_profileinfo(localpart)
                 yield self.user_directory_handler.handle_local_profile_change(
@@ -234,6 +242,11 @@ class RegistrationHandler(BaseHandler):
                         create_profile_with_displayname=default_display_name,
                         address=address,
                     )
+
+                    yield self.profile_handler.set_displayname(
+                        user, None, default_display_name, by_admin=True
+                    )
+
                 except SynapseError:
                     # if user id is taken, just generate another
                     user = None
@@ -261,6 +274,14 @@ class RegistrationHandler(BaseHandler):
             # Bind email to new account
             yield self._register_email_threepid(user_id, threepid_dict, None, False)
 
+        # Prevent the new user from showing up in the user directory if the server
+        # mandates it.
+        if not self._show_in_user_directory:
+            yield self.store.add_account_data_for_user(
+                user_id, "im.vector.hide_profile", {"hide_profile": True}
+            )
+            yield self.profile_handler.set_active(user, False, True)
+
         return user_id
 
     @defer.inlineCallbacks
@@ -331,7 +352,9 @@ class RegistrationHandler(BaseHandler):
         yield self._auto_join_rooms(user_id)
 
     @defer.inlineCallbacks
-    def appservice_register(self, user_localpart, as_token):
+    def appservice_register(self, user_localpart, as_token, password, display_name):
+        # FIXME: this should be factored out and merged with normal register()
+
         user = UserID(user_localpart, self.hs.hostname)
         user_id = user.to_string()
         service = self.store.get_app_service_by_token(as_token)
@@ -350,12 +373,29 @@ class RegistrationHandler(BaseHandler):
             user_id, allowed_appservice=service
         )
 
+        password_hash = ""
+        if password:
+            password_hash = yield self.auth_handler().hash(password)
+
+        display_name = display_name or user.localpart
+
         yield self.register_with_store(
             user_id=user_id,
-            password_hash="",
+            password_hash=password_hash,
             appservice_id=service_id,
-            create_profile_with_displayname=user.localpart,
+            create_profile_with_displayname=display_name,
+        )
+
+        yield self.profile_handler.set_displayname(
+            user, None, display_name, by_admin=True
         )
+
+        if self.hs.config.user_directory_search_all_users:
+            profile = yield self.store.get_profileinfo(user_localpart)
+            yield self.user_directory_handler.handle_local_profile_change(
+                user_id, profile
+            )
+
         return user_id
 
     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
@@ -383,6 +423,39 @@ class RegistrationHandler(BaseHandler):
                 )
 
     @defer.inlineCallbacks
+    def shadow_register(self, localpart, display_name, auth_result, params):
+        """Invokes the current registration on another server, using
+        shared secret registration, passing in any auth_results from
+        other registration UI auth flows (e.g. validated 3pids)
+        Useful for setting up shadow/backup accounts on a parallel deployment.
+        """
+
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.post_json_get_json(
+            "%s/_matrix/client/r0/register?access_token=%s" % (shadow_hs_url, as_token),
+            {
+                # XXX: auth_result is an unspecified extension for shadow registration
+                "auth_result": auth_result,
+                # XXX: another unspecified extension for shadow registration to ensure
+                # that the displayname is correctly set by the masters erver
+                "display_name": display_name,
+                "username": localpart,
+                "password": params.get("password"),
+                "bind_email": params.get("bind_email"),
+                "bind_msisdn": params.get("bind_msisdn"),
+                "device_id": params.get("device_id"),
+                "initial_device_display_name": params.get(
+                    "initial_device_display_name"
+                ),
+                "inhibit_login": False,
+                "access_token": as_token,
+            },
+        )
+
+    @defer.inlineCallbacks
     def _generate_user_id(self, reseed=False):
         if reseed or self._next_generated_user_id is None:
             with (yield self._generate_user_id_linearizer.queue(())):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a509e11d69..6bfd9f1231 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -52,12 +52,14 @@ class RoomCreationHandler(BaseHandler):
             "history_visibility": "shared",
             "original_invitees_have_ops": False,
             "guest_can_join": True,
+            "encryption_alg": "m.megolm.v1.aes-sha2",
         },
         RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
             "join_rules": JoinRules.INVITE,
             "history_visibility": "shared",
             "original_invitees_have_ops": True,
             "guest_can_join": True,
+            "encryption_alg": "m.megolm.v1.aes-sha2",
         },
         RoomCreationPreset.PUBLIC_CHAT: {
             "join_rules": JoinRules.PUBLIC,
@@ -294,7 +296,19 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
-        if not self.spam_checker.user_may_create_room(user_id):
+        if (
+            self._server_notices_mxid is not None
+            and requester.user.to_string() == self._server_notices_mxid
+        ):
+            # allow the server notices mxid to create rooms
+            is_requester_admin = True
+
+        else:
+            is_requester_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_requester_admin and not self.spam_checker.user_may_create_room(
+            user_id, invite_list=[], third_party_invite_list=[], cloning=True
+        ):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         creation_content = {
@@ -516,8 +530,14 @@ class RoomCreationHandler(BaseHandler):
             requester, config, is_requester_admin=is_requester_admin
         )
 
+        invite_list = config.get("invite", [])
+        invite_3pid_list = config.get("invite_3pid", [])
+
         if not is_requester_admin and not self.spam_checker.user_may_create_room(
-            user_id
+            user_id,
+            invite_list=invite_list,
+            third_party_invite_list=invite_3pid_list,
+            cloning=False,
         ):
             raise SynapseError(403, "You are not permitted to create rooms")
 
@@ -551,7 +571,6 @@ class RoomCreationHandler(BaseHandler):
         else:
             room_alias = None
 
-        invite_list = config.get("invite", [])
         for i in invite_list:
             try:
                 UserID.from_string(i)
@@ -572,8 +591,6 @@ class RoomCreationHandler(BaseHandler):
                 % (user_id,),
             )
 
-        invite_3pid_list = config.get("invite_3pid", [])
-
         visibility = config.get("visibility", None)
         is_public = visibility == "public"
 
@@ -661,6 +678,7 @@ class RoomCreationHandler(BaseHandler):
                 "invite",
                 ratelimit=False,
                 content=content,
+                new_room=True,
             )
 
         for invite_3pid in invite_3pid_list:
@@ -675,6 +693,7 @@ class RoomCreationHandler(BaseHandler):
                 id_server,
                 requester,
                 txn_id=None,
+                new_room=True,
             )
 
         result = {"room_id": room_id}
@@ -731,6 +750,7 @@ class RoomCreationHandler(BaseHandler):
             "join",
             ratelimit=False,
             content=creator_join_profile,
+            new_room=True,
         )
 
         # We treat the power levels override specially as this needs to be one
@@ -792,6 +812,13 @@ class RoomCreationHandler(BaseHandler):
         for (etype, state_key), content in initial_state.items():
             yield send(etype=etype, state_key=state_key, content=content)
 
+        if "encryption_alg" in config:
+            yield send(
+                etype=EventTypes.Encryption,
+                state_key="",
+                content={"algorithm": config["encryption_alg"]},
+            )
+
     @defer.inlineCallbacks
     def _generate_room_id(self, creator_id, is_public):
         # autogen room IDs and try to create it. We may clash, so just
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f03a2bd540..d3d4967ea3 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -20,15 +20,18 @@ import logging
 
 from six.moves import http_client
 
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
-from unpaddedbase64 import decode_base64
-
 from twisted.internet import defer
 
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
+from synapse.api.ratelimiting import Ratelimiter
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ProxiedRequestError,
+    HttpResponseException,
+    SynapseError,
+)
 from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
@@ -66,6 +69,7 @@ class RoomMemberHandler(object):
         self.registration_handler = hs.get_registration_handler()
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.identity_handler = hs.get_handlers().identity_handler
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -73,13 +77,10 @@ class RoomMemberHandler(object):
         self.spam_checker = hs.get_spam_checker()
         self.third_party_event_rules = hs.get_third_party_event_rules()
         self._server_notices_mxid = self.config.server_notices_mxid
+        self.rewrite_identity_server_urls = self.config.rewrite_identity_server_urls
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
-
-        # This is only used to get at ratelimit function, and
-        # maybe_kick_guest_users. It's fine there are multiple of these as
-        # it doesn't store state.
-        self.base_handler = BaseHandler(hs)
+        self.ratelimiter = Ratelimiter()
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -284,8 +285,31 @@ class RoomMemberHandler(object):
         third_party_signed=None,
         ratelimit=True,
         content=None,
+        new_room=False,
         require_consent=True,
     ):
+        """Update a users membership in a room
+
+        Args:
+            requester (Requester)
+            target (UserID)
+            room_id (str)
+            action (str): The "action" the requester is performing against the
+                target. One of join/leave/kick/ban/invite/unban.
+            txn_id (str|None): The transaction ID associated with the request,
+                or None not provided.
+            remote_room_hosts (list[str]|None): List of remote servers to try
+                and join via if server isn't already in the room.
+            third_party_signed (dict|None): The signed object for third party
+                invites.
+            ratelimit (bool): Whether to apply ratelimiting to this request.
+            content (dict|None): Fields to include in the new events content.
+            new_room (bool): Whether these membership changes are happening
+                as part of a room creation (e.g. initial joins and invites)
+
+        Returns:
+            Deferred[FrozenEvent]
+        """
         key = (room_id,)
 
         with (yield self.member_linearizer.queue(key)):
@@ -299,6 +323,7 @@ class RoomMemberHandler(object):
                 third_party_signed=third_party_signed,
                 ratelimit=ratelimit,
                 content=content,
+                new_room=new_room,
                 require_consent=require_consent,
             )
 
@@ -316,6 +341,7 @@ class RoomMemberHandler(object):
         third_party_signed=None,
         ratelimit=True,
         content=None,
+        new_room=False,
         require_consent=True,
     ):
         content_specified = bool(content)
@@ -380,8 +406,15 @@ class RoomMemberHandler(object):
                     )
                     block_invite = True
 
+                is_published = yield self.store.is_room_published(room_id)
+
                 if not self.spam_checker.user_may_invite(
-                    requester.user.to_string(), target.to_string(), room_id
+                    requester.user.to_string(),
+                    target.to_string(),
+                    third_party_invite=None,
+                    room_id=room_id,
+                    new_room=new_room,
+                    published_room=is_published,
                 ):
                     logger.info("Blocking invite due to spam checker")
                     block_invite = True
@@ -454,8 +487,26 @@ class RoomMemberHandler(object):
                     # so don't really fit into the general auth process.
                     raise AuthError(403, "Guest access not allowed")
 
+            if (
+                self._server_notices_mxid is not None
+                and requester.user.to_string() == self._server_notices_mxid
+            ):
+                # allow the server notices mxid to join rooms
+                is_requester_admin = True
+
+            else:
+                is_requester_admin = yield self.auth.is_server_admin(requester.user)
+
+            inviter = yield self._get_inviter(target.to_string(), room_id)
+            if not is_requester_admin:
+                # We assume that if the spam checker allowed the user to create
+                # a room then they're allowed to join it.
+                if not new_room and not self.spam_checker.user_may_join_room(
+                    target.to_string(), room_id, is_invited=inviter is not None
+                ):
+                    raise SynapseError(403, "Not allowed to join this room")
+
             if not is_host_in_room:
-                inviter = yield self._get_inviter(target.to_string(), room_id)
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
 
@@ -646,7 +697,15 @@ class RoomMemberHandler(object):
 
     @defer.inlineCallbacks
     def do_3pid_invite(
-        self, room_id, inviter, medium, address, id_server, requester, txn_id
+        self,
+        room_id,
+        inviter,
+        medium,
+        address,
+        id_server,
+        requester,
+        txn_id,
+        new_room=False,
     ):
         if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(requester.user)
@@ -657,7 +716,23 @@ class RoomMemberHandler(object):
 
         # We need to rate limit *before* we send out any 3PID invites, so we
         # can't just rely on the standard ratelimiting of events.
-        yield self.base_handler.ratelimit(requester)
+        self.ratelimiter.ratelimit(
+            requester.user.to_string(),
+            time_now_s=self.hs.clock.time(),
+            rate_hz=self.hs.config.rc_third_party_invite.per_second,
+            burst_count=self.hs.config.rc_third_party_invite.burst_count,
+            update=True,
+        )
+
+        can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited(
+            medium, address, room_id
+        )
+        if not can_invite:
+            raise SynapseError(
+                403,
+                "This third-party identifier can not be invited in this room",
+                Codes.FORBIDDEN,
+            )
 
         can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited(
             medium, address, room_id
@@ -671,6 +746,19 @@ class RoomMemberHandler(object):
 
         invitee = yield self._lookup_3pid(id_server, medium, address)
 
+        is_published = yield self.store.is_room_published(room_id)
+
+        if not self.spam_checker.user_may_invite(
+            requester.user.to_string(),
+            invitee,
+            third_party_invite={"medium": medium, "address": address},
+            room_id=room_id,
+            new_room=new_room,
+            published_room=is_published,
+        ):
+            logger.info("Blocking invite due to spam checker")
+            raise SynapseError(403, "Invites have been disabled on this server")
+
         if invitee:
             yield self.update_membership(
                 requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
@@ -680,6 +768,20 @@ class RoomMemberHandler(object):
                 requester, id_server, medium, address, room_id, inviter, txn_id=txn_id
             )
 
+    def _get_id_server_target(self, id_server):
+        """Looks up an id_server's actual http endpoint
+
+        Args:
+            id_server (str): the server name to lookup.
+
+        Returns:
+            the http endpoint to connect to.
+        """
+        if id_server in self.rewrite_identity_server_urls:
+            return self.rewrite_identity_server_urls[id_server]
+
+        return id_server
+
     @defer.inlineCallbacks
     def _lookup_3pid(self, id_server, medium, address):
         """Looks up a 3pid in the passed identity server.
@@ -693,49 +795,14 @@ class RoomMemberHandler(object):
         Returns:
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
-        if not self._enable_lookup:
-            raise SynapseError(
-                403, "Looking up third-party identifiers is denied from this server"
-            )
         try:
-            data = yield self.simple_http_client.get_json(
-                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
-                {"medium": medium, "address": address},
-            )
-
-            if "mxid" in data:
-                if "signatures" not in data:
-                    raise AuthError(401, "No signatures on 3pid binding")
-                yield self._verify_any_signature(data, id_server)
-                return data["mxid"]
-
-        except IOError as e:
+            data = yield self.identity_handler.lookup_3pid(id_server, medium, address)
+            return data.get("mxid")
+        except ProxiedRequestError as e:
             logger.warn("Error from identity server lookup: %s" % (e,))
             return None
 
     @defer.inlineCallbacks
-    def _verify_any_signature(self, data, server_hostname):
-        if server_hostname not in data["signatures"]:
-            raise AuthError(401, "No signature from server %s" % (server_hostname,))
-        for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.simple_http_client.get_json(
-                "%s%s/_matrix/identity/api/v1/pubkey/%s"
-                % (id_server_scheme, server_hostname, key_name)
-            )
-            if "public_key" not in key_data:
-                raise AuthError(
-                    401, "No public key named %s from %s" % (key_name, server_hostname)
-                )
-            verify_signed_json(
-                data,
-                server_hostname,
-                decode_verify_key_bytes(
-                    key_name, decode_base64(key_data["public_key"])
-                ),
-            )
-            return
-
-    @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
         self, requester, id_server, medium, address, room_id, user, txn_id
     ):
@@ -853,9 +920,10 @@ class RoomMemberHandler(object):
                     user.
         """
 
+        target = self._get_id_server_target(id_server)
         is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
             id_server_scheme,
-            id_server,
+            target,
         )
 
         invite_config = {
@@ -895,7 +963,7 @@ class RoomMemberHandler(object):
             fallback_public_key = {
                 "public_key": data["public_key"],
                 "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid"
-                % (id_server_scheme, id_server),
+                % (id_server_scheme, target),
             }
         else:
             fallback_public_key = public_keys[0]
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index d90c9e0108..3f50d6de47 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2017 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -30,12 +31,15 @@ class SetPasswordHandler(BaseHandler):
         super(SetPasswordHandler, self).__init__(hs)
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
+        self._password_policy_handler = hs.get_password_policy_handler()
 
     @defer.inlineCallbacks
     def set_password(self, user_id, newpassword, requester=None):
         if not self.hs.config.password_localdb_enabled:
             raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
 
+        self._password_policy_handler.validate_password(newpassword)
+
         password_hash = yield self._auth_handler.hash(newpassword)
 
         except_device_id = requester.device_id if requester else None
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 0ae6db8ea7..c973244c1f 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -45,6 +45,7 @@ from synapse.http import (
     cancelled_to_request_timed_out_error,
     redact_uri,
 )
+from synapse.http.proxyagent import ProxyAgent
 from synapse.logging.context import make_deferred_yieldable
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -182,7 +183,15 @@ class SimpleHttpClient(object):
     using HTTP in Matrix
     """
 
-    def __init__(self, hs, treq_args={}, ip_whitelist=None, ip_blacklist=None):
+    def __init__(
+        self,
+        hs,
+        treq_args={},
+        ip_whitelist=None,
+        ip_blacklist=None,
+        http_proxy=None,
+        https_proxy=None,
+    ):
         """
         Args:
             hs (synapse.server.HomeServer)
@@ -191,6 +200,8 @@ class SimpleHttpClient(object):
                 we may not request.
             ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
                request if it were otherwise caught in a blacklist.
+            http_proxy (bytes): proxy server to use for http connections. host[:port]
+            https_proxy (bytes): proxy server to use for https connections. host[:port]
         """
         self.hs = hs
 
@@ -235,11 +246,13 @@ class SimpleHttpClient(object):
         # The default context factory in Twisted 14.0.0 (which we require) is
         # BrowserLikePolicyForHTTPS which will do regular cert validation
         # 'like a browser'
-        self.agent = Agent(
+        self.agent = ProxyAgent(
             self.reactor,
             connectTimeout=15,
             contextFactory=self.hs.get_http_client_context_factory(),
             pool=pool,
+            http_proxy=http_proxy,
+            https_proxy=https_proxy,
         )
 
         if self._ip_blacklist:
diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py
new file mode 100644
index 0000000000..be7b2ceb8e
--- /dev/null
+++ b/synapse/http/connectproxyclient.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from zope.interface import implementer
+
+from twisted.internet import defer, protocol
+from twisted.internet.error import ConnectError
+from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.internet.protocol import connectionDone
+from twisted.web import http
+
+logger = logging.getLogger(__name__)
+
+
+class ProxyConnectError(ConnectError):
+    pass
+
+
+@implementer(IStreamClientEndpoint)
+class HTTPConnectProxyEndpoint(object):
+    """An Endpoint implementation which will send a CONNECT request to an http proxy
+
+    Wraps an existing HostnameEndpoint for the proxy.
+
+    When we get the connect() request from the connection pool (via the TLS wrapper),
+    we'll first connect to the proxy endpoint with a ProtocolFactory which will make the
+    CONNECT request. Once that completes, we invoke the protocolFactory which was passed
+    in.
+
+    Args:
+        reactor: the Twisted reactor to use for the connection
+        proxy_endpoint (IStreamClientEndpoint): the endpoint to use to connect to the
+            proxy
+        host (bytes): hostname that we want to CONNECT to
+        port (int): port that we want to connect to
+    """
+
+    def __init__(self, reactor, proxy_endpoint, host, port):
+        self._reactor = reactor
+        self._proxy_endpoint = proxy_endpoint
+        self._host = host
+        self._port = port
+
+    def __repr__(self):
+        return "<HTTPConnectProxyEndpoint %s>" % (self._proxy_endpoint,)
+
+    def connect(self, protocolFactory):
+        f = HTTPProxiedClientFactory(self._host, self._port, protocolFactory)
+        d = self._proxy_endpoint.connect(f)
+        # once the tcp socket connects successfully, we need to wait for the
+        # CONNECT to complete.
+        d.addCallback(lambda conn: f.on_connection)
+        return d
+
+
+class HTTPProxiedClientFactory(protocol.ClientFactory):
+    """ClientFactory wrapper that triggers an HTTP proxy CONNECT on connect.
+
+    Once the CONNECT completes, invokes the original ClientFactory to build the
+    HTTP Protocol object and run the rest of the connection.
+
+    Args:
+        dst_host (bytes): hostname that we want to CONNECT to
+        dst_port (int): port that we want to connect to
+        wrapped_factory (protocol.ClientFactory): The original Factory
+    """
+
+    def __init__(self, dst_host, dst_port, wrapped_factory):
+        self.dst_host = dst_host
+        self.dst_port = dst_port
+        self.wrapped_factory = wrapped_factory
+        self.on_connection = defer.Deferred()
+
+    def startedConnecting(self, connector):
+        return self.wrapped_factory.startedConnecting(connector)
+
+    def buildProtocol(self, addr):
+        wrapped_protocol = self.wrapped_factory.buildProtocol(addr)
+
+        return HTTPConnectProtocol(
+            self.dst_host, self.dst_port, wrapped_protocol, self.on_connection
+        )
+
+    def clientConnectionFailed(self, connector, reason):
+        logger.debug("Connection to proxy failed: %s", reason)
+        if not self.on_connection.called:
+            self.on_connection.errback(reason)
+        return self.wrapped_factory.clientConnectionFailed(connector, reason)
+
+    def clientConnectionLost(self, connector, reason):
+        logger.debug("Connection to proxy lost: %s", reason)
+        if not self.on_connection.called:
+            self.on_connection.errback(reason)
+        return self.wrapped_factory.clientConnectionLost(connector, reason)
+
+
+class HTTPConnectProtocol(protocol.Protocol):
+    """Protocol that wraps an existing Protocol to do a CONNECT handshake at connect
+
+    Args:
+        host (bytes): The original HTTP(s) hostname or IPv4 or IPv6 address literal
+            to put in the CONNECT request
+
+        port (int): The original HTTP(s) port to put in the CONNECT request
+
+        wrapped_protocol (interfaces.IProtocol): the original protocol (probably
+            HTTPChannel or TLSMemoryBIOProtocol, but could be anything really)
+
+        connected_deferred (Deferred): a Deferred which will be callbacked with
+            wrapped_protocol when the CONNECT completes
+    """
+
+    def __init__(self, host, port, wrapped_protocol, connected_deferred):
+        self.host = host
+        self.port = port
+        self.wrapped_protocol = wrapped_protocol
+        self.connected_deferred = connected_deferred
+        self.http_setup_client = HTTPConnectSetupClient(self.host, self.port)
+        self.http_setup_client.on_connected.addCallback(self.proxyConnected)
+
+    def connectionMade(self):
+        self.http_setup_client.makeConnection(self.transport)
+
+    def connectionLost(self, reason=connectionDone):
+        if self.wrapped_protocol.connected:
+            self.wrapped_protocol.connectionLost(reason)
+
+        self.http_setup_client.connectionLost(reason)
+
+        if not self.connected_deferred.called:
+            self.connected_deferred.errback(reason)
+
+    def proxyConnected(self, _):
+        self.wrapped_protocol.makeConnection(self.transport)
+
+        self.connected_deferred.callback(self.wrapped_protocol)
+
+        # Get any pending data from the http buf and forward it to the original protocol
+        buf = self.http_setup_client.clearLineBuffer()
+        if buf:
+            self.wrapped_protocol.dataReceived(buf)
+
+    def dataReceived(self, data):
+        # if we've set up the HTTP protocol, we can send the data there
+        if self.wrapped_protocol.connected:
+            return self.wrapped_protocol.dataReceived(data)
+
+        # otherwise, we must still be setting up the connection: send the data to the
+        # setup client
+        return self.http_setup_client.dataReceived(data)
+
+
+class HTTPConnectSetupClient(http.HTTPClient):
+    """HTTPClient protocol to send a CONNECT message for proxies and read the response.
+
+    Args:
+        host (bytes): The hostname to send in the CONNECT message
+        port (int): The port to send in the CONNECT message
+    """
+
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.on_connected = defer.Deferred()
+
+    def connectionMade(self):
+        logger.debug("Connected to proxy, sending CONNECT")
+        self.sendCommand(b"CONNECT", b"%s:%d" % (self.host, self.port))
+        self.endHeaders()
+
+    def handleStatus(self, version, status, message):
+        logger.debug("Got Status: %s %s %s", status, message, version)
+        if status != b"200":
+            raise ProxyConnectError("Unexpected status on CONNECT: %s" % status)
+
+    def handleEndHeaders(self):
+        logger.debug("End Headers")
+        self.on_connected.callback(None)
+
+    def handleResponse(self, body):
+        pass
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
new file mode 100644
index 0000000000..332da02a8d
--- /dev/null
+++ b/synapse/http/proxyagent.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import re
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.python.failure import Failure
+from twisted.web.client import URI, BrowserLikePolicyForHTTPS, _AgentBase
+from twisted.web.error import SchemeNotSupported
+from twisted.web.iweb import IAgent
+
+from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
+
+logger = logging.getLogger(__name__)
+
+_VALID_URI = re.compile(br"\A[\x21-\x7e]+\Z")
+
+
+@implementer(IAgent)
+class ProxyAgent(_AgentBase):
+    """An Agent implementation which will use an HTTP proxy if one was requested
+
+    Args:
+        reactor: twisted reactor to place outgoing
+            connections.
+
+        contextFactory (IPolicyForHTTPS): A factory for TLS contexts, to control the
+            verification parameters of OpenSSL.  The default is to use a
+            `BrowserLikePolicyForHTTPS`, so unless you have special
+            requirements you can leave this as-is.
+
+        connectTimeout (float): The amount of time that this Agent will wait
+            for the peer to accept a connection.
+
+        bindAddress (bytes): The local address for client sockets to bind to.
+
+        pool (HTTPConnectionPool|None): connection pool to be used. If None, a
+            non-persistent pool instance will be created.
+    """
+
+    def __init__(
+        self,
+        reactor,
+        contextFactory=BrowserLikePolicyForHTTPS(),
+        connectTimeout=None,
+        bindAddress=None,
+        pool=None,
+        http_proxy=None,
+        https_proxy=None,
+    ):
+        _AgentBase.__init__(self, reactor, pool)
+
+        self._endpoint_kwargs = {}
+        if connectTimeout is not None:
+            self._endpoint_kwargs["timeout"] = connectTimeout
+        if bindAddress is not None:
+            self._endpoint_kwargs["bindAddress"] = bindAddress
+
+        self.http_proxy_endpoint = _http_proxy_endpoint(
+            http_proxy, reactor, **self._endpoint_kwargs
+        )
+
+        self.https_proxy_endpoint = _http_proxy_endpoint(
+            https_proxy, reactor, **self._endpoint_kwargs
+        )
+
+        self._policy_for_https = contextFactory
+        self._reactor = reactor
+
+    def request(self, method, uri, headers=None, bodyProducer=None):
+        """
+        Issue a request to the server indicated by the given uri.
+
+        Supports `http` and `https` schemes.
+
+        An existing connection from the connection pool may be used or a new one may be
+        created.
+
+        See also: twisted.web.iweb.IAgent.request
+
+        Args:
+            method (bytes): The request method to use, such as `GET`, `POST`, etc
+
+            uri (bytes): The location of the resource to request.
+
+            headers (Headers|None): Extra headers to send with the request
+
+            bodyProducer (IBodyProducer|None): An object which can generate bytes to
+                make up the body of this request (for example, the properly encoded
+                contents of a file for a file upload). Or, None if the request is to
+                have no body.
+
+        Returns:
+            Deferred[IResponse]: completes when the header of the response has
+                 been received (regardless of the response status code).
+        """
+        uri = uri.strip()
+        if not _VALID_URI.match(uri):
+            raise ValueError("Invalid URI {!r}".format(uri))
+
+        parsed_uri = URI.fromBytes(uri)
+        pool_key = (parsed_uri.scheme, parsed_uri.host, parsed_uri.port)
+        request_path = parsed_uri.originForm
+
+        if parsed_uri.scheme == b"http" and self.http_proxy_endpoint:
+            # Cache *all* connections under the same key, since we are only
+            # connecting to a single destination, the proxy:
+            pool_key = ("http-proxy", self.http_proxy_endpoint)
+            endpoint = self.http_proxy_endpoint
+            request_path = uri
+        elif parsed_uri.scheme == b"https" and self.https_proxy_endpoint:
+            endpoint = HTTPConnectProxyEndpoint(
+                self._reactor,
+                self.https_proxy_endpoint,
+                parsed_uri.host,
+                parsed_uri.port,
+            )
+        else:
+            # not using a proxy
+            endpoint = HostnameEndpoint(
+                self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs
+            )
+
+        logger.debug("Requesting %s via %s", uri, endpoint)
+
+        if parsed_uri.scheme == b"https":
+            tls_connection_creator = self._policy_for_https.creatorForNetloc(
+                parsed_uri.host, parsed_uri.port
+            )
+            endpoint = wrapClientTLS(tls_connection_creator, endpoint)
+        elif parsed_uri.scheme == b"http":
+            pass
+        else:
+            return defer.fail(
+                Failure(
+                    SchemeNotSupported("Unsupported scheme: %r" % (parsed_uri.scheme,))
+                )
+            )
+
+        return self._requestWithEndpoint(
+            pool_key, endpoint, method, parsed_uri, headers, bodyProducer, request_path
+        )
+
+
+def _http_proxy_endpoint(proxy, reactor, **kwargs):
+    """Parses an http proxy setting and returns an endpoint for the proxy
+
+    Args:
+        proxy (bytes|None):  the proxy setting
+        reactor: reactor to be used to connect to the proxy
+        kwargs: other args to be passed to HostnameEndpoint
+
+    Returns:
+        interfaces.IStreamClientEndpoint|None: endpoint to use to connect to the proxy,
+            or None
+    """
+    if proxy is None:
+        return None
+
+    # currently we only support hostname:port. Some apps also support
+    # protocol://<host>[:port], which allows a way of requiring a TLS connection to the
+    # proxy.
+
+    host, port = parse_host_port(proxy, default_port=1080)
+    return HostnameEndpoint(reactor, host, port, **kwargs)
+
+
+def parse_host_port(hostport, default_port=None):
+    # could have sworn we had one of these somewhere else...
+    if b":" in hostport:
+        host, port = hostport.rsplit(b":", 1)
+        try:
+            port = int(port)
+            return host, port
+        except ValueError:
+            # the thing after the : wasn't a valid port; presumably this is an
+            # IPv6 address.
+            pass
+
+    return hostport, default_port
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bd5d53af91..9eaf73fc76 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -101,7 +101,7 @@ class HttpPusher(object):
         if "url" not in self.data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
         self.url = self.data["url"]
-        self.http_client = hs.get_simple_http_client()
+        self.http_client = hs.get_proxied_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
         del self.data_minus_url["url"]
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1d20b96d03..f161bc51a5 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -41,6 +41,7 @@ from synapse.rest.client.v2_alpha import (
     keys,
     notifications,
     openid,
+    password_policy,
     read_marker,
     receipts,
     register,
@@ -117,6 +118,7 @@ class ClientRestResource(JsonResource):
         room_upgrade_rest_servlet.register_servlets(hs, client_resource)
         capabilities.register_servlets(hs, client_resource)
         account_validity.register_servlets(hs, client_resource)
+        password_policy.register_servlets(hs, client_resource)
         relations.register_servlets(hs, client_resource)
 
         # moving to /_synapse/admin
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 25a1b67092..b74cb15c1f 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -378,7 +378,7 @@ class CasTicketServlet(RestServlet):
         self.cas_service_url = hs.config.cas_service_url
         self.cas_required_attributes = hs.config.cas_required_attributes
         self._sso_auth_handler = SSOAuthHandler(hs)
-        self._http_client = hs.get_simple_http_client()
+        self._http_client = hs.get_proxied_http_client()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index bbce2e2b71..400a0a7592 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -14,12 +14,16 @@
 # limitations under the License.
 
 """ This module contains REST servlets to do with profile: /profile/<paths> """
+import logging
+
 from twisted.internet import defer
 
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.types import UserID
 
+logger = logging.getLogger(__name__)
+
 
 class ProfileDisplaynameRestServlet(RestServlet):
     PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)/displayname", v1=True)
@@ -28,6 +32,7 @@ class ProfileDisplaynameRestServlet(RestServlet):
         super(ProfileDisplaynameRestServlet, self).__init__()
         self.hs = hs
         self.profile_handler = hs.get_profile_handler()
+        self.http_client = hs.get_simple_http_client()
         self.auth = hs.get_auth()
 
     @defer.inlineCallbacks
@@ -65,11 +70,27 @@ class ProfileDisplaynameRestServlet(RestServlet):
 
         yield self.profile_handler.set_displayname(user, requester, new_name, is_admin)
 
+        if self.hs.config.shadow_server:
+            shadow_user = UserID(user.localpart, self.hs.config.shadow_server.get("hs"))
+            self.shadow_displayname(shadow_user.to_string(), content)
+
         return 200, {}
 
     def on_OPTIONS(self, request, user_id):
         return 200, {}
 
+    @defer.inlineCallbacks
+    def shadow_displayname(self, user_id, body):
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.put_json(
+            "%s/_matrix/client/r0/profile/%s/displayname?access_token=%s&user_id=%s"
+            % (shadow_hs_url, user_id, as_token, user_id),
+            body,
+        )
+
 
 class ProfileAvatarURLRestServlet(RestServlet):
     PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)/avatar_url", v1=True)
@@ -78,6 +99,7 @@ class ProfileAvatarURLRestServlet(RestServlet):
         super(ProfileAvatarURLRestServlet, self).__init__()
         self.hs = hs
         self.profile_handler = hs.get_profile_handler()
+        self.http_client = hs.get_simple_http_client()
         self.auth = hs.get_auth()
 
     @defer.inlineCallbacks
@@ -108,17 +130,35 @@ class ProfileAvatarURLRestServlet(RestServlet):
 
         content = parse_json_object_from_request(request)
         try:
-            new_name = content["avatar_url"]
+            new_avatar_url = content["avatar_url"]
         except Exception:
             return 400, "Unable to parse name"
 
-        yield self.profile_handler.set_avatar_url(user, requester, new_name, is_admin)
+        yield self.profile_handler.set_avatar_url(
+            user, requester, new_avatar_url, is_admin
+        )
+
+        if self.hs.config.shadow_server:
+            shadow_user = UserID(user.localpart, self.hs.config.shadow_server.get("hs"))
+            self.shadow_avatar_url(shadow_user.to_string(), content)
 
         return 200, {}
 
     def on_OPTIONS(self, request, user_id):
         return 200, {}
 
+    @defer.inlineCallbacks
+    def shadow_avatar_url(self, user_id, body):
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.put_json(
+            "%s/_matrix/client/r0/profile/%s/avatar_url?access_token=%s&user_id=%s"
+            % (shadow_hs_url, user_id, as_token, user_id),
+            body,
+        )
+
 
 class ProfileRestServlet(RestServlet):
     PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)", v1=True)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3582259026..011208fc1f 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -701,6 +701,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
                 content["id_server"],
                 requester,
                 txn_id,
+                new_room=False,
             )
             return 200, {}
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 0620a4d0cf..d559b05fbc 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018, 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import re
 
 from six.moves import http_client
 
@@ -31,8 +32,9 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
+from synapse.types import UserID
 from synapse.util.msisdn import phone_number_to_msisdn
-from synapse.util.stringutils import random_string
+from synapse.util.stringutils import assert_valid_client_secret, random_string
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_patterns, interactive_auth_handler
@@ -82,6 +84,8 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
 
         # Extract params from body
         client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
+
         email = body["email"]
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
@@ -89,7 +93,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
         if not check_3pid_allowed(self.hs, "email", email):
             raise SynapseError(
                 403,
-                "Your email domain is not authorized on this server",
+                "Your email is not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
 
@@ -208,13 +212,15 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body["country"], body["phone_number"])
 
-        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+        if not (yield check_3pid_allowed(self.hs, "msisdn", msisdn)):
             raise SynapseError(
                 403,
                 "Account phone numbers are not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
 
+        assert_valid_client_secret(body["client_secret"])
+
         existingUid = yield self.datastore.get_user_id_by_threepid("msisdn", msisdn)
 
         if existingUid is None:
@@ -260,6 +266,9 @@ class PasswordResetSubmitTokenServlet(RestServlet):
 
         sid = parse_string(request, "sid")
         client_secret = parse_string(request, "client_secret")
+
+        assert_valid_client_secret(client_secret)
+
         token = parse_string(request, "token")
 
         # Attempt to validate a 3PID sesssion
@@ -325,7 +334,9 @@ class PasswordResetSubmitTokenServlet(RestServlet):
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ["sid", "client_secret", "token"])
 
-        valid, _ = yield self.datastore.validate_threepid_validation_token(
+        assert_valid_client_secret(body["client_secret"])
+
+        valid, _ = yield self.datastore.validate_threepid_session(
             body["sid"], body["client_secret"], body["token"], self.clock.time_msec()
         )
         response_code = 200 if valid else 400
@@ -343,6 +354,7 @@ class PasswordRestServlet(RestServlet):
         self.auth_handler = hs.get_auth_handler()
         self.datastore = self.hs.get_datastore()
         self._set_password_handler = hs.get_set_password_handler()
+        self.http_client = hs.get_simple_http_client()
 
     @interactive_auth_handler
     @defer.inlineCallbacks
@@ -361,9 +373,13 @@ class PasswordRestServlet(RestServlet):
 
         if self.auth.has_access_token(request):
             requester = yield self.auth.get_user_by_req(request)
-            params = yield self.auth_handler.validate_user_via_ui_auth(
-                requester, body, self.hs.get_ip_from_request(request)
-            )
+            # blindly trust ASes without UI-authing them
+            if requester.app_service:
+                params = body
+            else:
+                params = yield self.auth_handler.validate_user_via_ui_auth(
+                    requester, body, self.hs.get_ip_from_request(request)
+                )
             user_id = requester.user.to_string()
         else:
             requester = None
@@ -399,11 +415,29 @@ class PasswordRestServlet(RestServlet):
 
         yield self._set_password_handler.set_password(user_id, new_password, requester)
 
+        if self.hs.config.shadow_server:
+            shadow_user = UserID(
+                requester.user.localpart, self.hs.config.shadow_server.get("hs")
+            )
+            self.shadow_password(params, shadow_user.to_string())
+
         return 200, {}
 
     def on_OPTIONS(self, _):
         return 200, {}
 
+    @defer.inlineCallbacks
+    def shadow_password(self, body, user_id):
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.post_json_get_json(
+            "%s/_matrix/client/r0/account/password?access_token=%s&user_id=%s"
+            % (shadow_hs_url, as_token, user_id),
+            body,
+        )
+
 
 class DeactivateAccountRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/deactivate$")
@@ -466,13 +500,15 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
             body, ["id_server", "client_secret", "email", "send_attempt"]
         )
 
-        if not check_3pid_allowed(self.hs, "email", body["email"]):
+        if not (yield check_3pid_allowed(self.hs, "email", body["email"])):
             raise SynapseError(
                 403,
-                "Your email domain is not authorized on this server",
+                "Your email is not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
 
+        assert_valid_client_secret(body["client_secret"])
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             "email", body["email"]
         )
@@ -503,13 +539,15 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body["country"], body["phone_number"])
 
-        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+        if not (yield check_3pid_allowed(self.hs, "msisdn", msisdn)):
             raise SynapseError(
                 403,
                 "Account phone numbers are not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
 
+        assert_valid_client_secret(body["client_secret"])
+
         existingUid = yield self.datastore.get_user_id_by_threepid("msisdn", msisdn)
 
         if existingUid is not None:
@@ -528,7 +566,8 @@ class ThreepidRestServlet(RestServlet):
         self.identity_handler = hs.get_handlers().identity_handler
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
-        self.datastore = self.hs.get_datastore()
+        self.datastore = hs.get_datastore()
+        self.http_client = hs.get_simple_http_client()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -540,47 +579,83 @@ class ThreepidRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        body = parse_json_object_from_request(request)
+        if self.hs.config.disable_3pid_changes:
+            raise SynapseError(400, "3PID changes disabled on this server")
 
-        threePidCreds = body.get("threePidCreds")
-        threePidCreds = body.get("three_pid_creds", threePidCreds)
-        if threePidCreds is None:
-            raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
+        body = parse_json_object_from_request(request)
 
         requester = yield self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
 
-        threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
+        # skip validation if this is a shadow 3PID from an AS
+        if not requester.app_service:
+            threePidCreds = body.get("threePidCreds")
+            threePidCreds = body.get("three_pid_creds", threePidCreds)
+            if threePidCreds is None:
+                raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
 
-        if not threepid:
-            raise SynapseError(400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED)
+            threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
 
-        for reqd in ["medium", "address", "validated_at"]:
-            if reqd not in threepid:
-                logger.warn("Couldn't add 3pid: invalid response from ID server")
-                raise SynapseError(500, "Invalid response from ID Server")
+            if not threepid:
+                raise SynapseError(
+                    400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED
+                )
+
+            for reqd in ["medium", "address", "validated_at"]:
+                if reqd not in threepid:
+                    logger.warn("Couldn't add 3pid: invalid response from ID server")
+                    raise SynapseError(500, "Invalid response from ID Server")
+        else:
+            # XXX: ASes pass in a validated threepid directly to bypass the IS.
+            # This makes the API entirely change shape when we have an AS token;
+            # it really should be an entirely separate API - perhaps
+            # /account/3pid/replicate or something.
+            threepid = body.get("threepid")
 
         yield self.auth_handler.add_threepid(
             user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
         )
 
-        if "bind" in body and body["bind"]:
+        if not requester.app_service and ("bind" in body and body["bind"]):
             logger.debug("Binding threepid %s to %s", threepid, user_id)
             yield self.identity_handler.bind_threepid(threePidCreds, user_id)
 
+        if self.hs.config.shadow_server:
+            shadow_user = UserID(
+                requester.user.localpart, self.hs.config.shadow_server.get("hs")
+            )
+            self.shadow_3pid({"threepid": threepid}, shadow_user.to_string())
+
         return 200, {}
 
+    @defer.inlineCallbacks
+    def shadow_3pid(self, body, user_id):
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.post_json_get_json(
+            "%s/_matrix/client/r0/account/3pid?access_token=%s&user_id=%s"
+            % (shadow_hs_url, as_token, user_id),
+            body,
+        )
+
 
 class ThreepidDeleteRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/3pid/delete$")
 
     def __init__(self, hs):
         super(ThreepidDeleteRestServlet, self).__init__()
+        self.hs = hs
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
+        self.http_client = hs.get_simple_http_client()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
+        if self.hs.config.disable_3pid_changes:
+            raise SynapseError(400, "3PID changes disabled on this server")
+
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ["medium", "address"])
 
@@ -598,6 +673,12 @@ class ThreepidDeleteRestServlet(RestServlet):
             logger.exception("Failed to remove threepid")
             raise SynapseError(500, "Failed to remove threepid")
 
+        if self.hs.config.shadow_server:
+            shadow_user = UserID(
+                requester.user.localpart, self.hs.config.shadow_server.get("hs")
+            )
+            self.shadow_3pid_delete(body, shadow_user.to_string())
+
         if ret:
             id_server_unbind_result = "success"
         else:
@@ -605,6 +686,77 @@ class ThreepidDeleteRestServlet(RestServlet):
 
         return 200, {"id_server_unbind_result": id_server_unbind_result}
 
+    @defer.inlineCallbacks
+    def shadow_3pid_delete(self, body, user_id):
+        # TODO: retries
+        shadow_hs_url = self.hs.config.shadow_server.get("hs_url")
+        as_token = self.hs.config.shadow_server.get("as_token")
+
+        yield self.http_client.post_json_get_json(
+            "%s/_matrix/client/r0/account/3pid/delete?access_token=%s&user_id=%s"
+            % (shadow_hs_url, as_token, user_id),
+            body,
+        )
+
+
+class ThreepidLookupRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_matrix/client/unstable/account/3pid/lookup$")]
+
+    def __init__(self, hs):
+        super(ThreepidLookupRestServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        """Proxy a /_matrix/identity/api/v1/lookup request to an identity
+        server
+        """
+        yield self.auth.get_user_by_req(request)
+
+        # Verify query parameters
+        query_params = request.args
+        assert_params_in_dict(query_params, [b"medium", b"address", b"id_server"])
+
+        # Retrieve needed information from query parameters
+        medium = parse_string(request, "medium")
+        address = parse_string(request, "address")
+        id_server = parse_string(request, "id_server")
+
+        # Proxy the request to the identity server. lookup_3pid handles checking
+        # if the lookup is allowed so we don't need to do it here.
+        ret = yield self.identity_handler.lookup_3pid(id_server, medium, address)
+
+        defer.returnValue((200, ret))
+
+
+class ThreepidBulkLookupRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_matrix/client/unstable/account/3pid/bulk_lookup$")]
+
+    def __init__(self, hs):
+        super(ThreepidBulkLookupRestServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        """Proxy a /_matrix/identity/api/v1/bulk_lookup request to an identity
+        server
+        """
+        yield self.auth.get_user_by_req(request)
+
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_dict(body, ["threepids", "id_server"])
+
+        # Proxy the request to the identity server. lookup_3pid handles checking
+        # if the lookup is allowed so we don't need to do it here.
+        ret = yield self.identity_handler.bulk_lookup_3pid(
+            body["id_server"], body["threepids"]
+        )
+
+        defer.returnValue((200, ret))
+
 
 class WhoamiRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/whoami$")
@@ -630,4 +782,6 @@ def register_servlets(hs, http_server):
     MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
     ThreepidDeleteRestServlet(hs).register(http_server)
+    ThreepidLookupRestServlet(hs).register(http_server)
+    ThreepidBulkLookupRestServlet(hs).register(http_server)
     WhoamiRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py
index f0db204ffa..34ade04ff8 100644
--- a/synapse/rest/client/v2_alpha/account_data.py
+++ b/synapse/rest/client/v2_alpha/account_data.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, NotFoundError, SynapseError
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import UserID
 
 from ._base import client_patterns
 
@@ -40,6 +41,7 @@ class AccountDataServlet(RestServlet):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
+        self._profile_handler = hs.get_profile_handler()
 
     @defer.inlineCallbacks
     def on_PUT(self, request, user_id, account_data_type):
@@ -49,6 +51,11 @@ class AccountDataServlet(RestServlet):
 
         body = parse_json_object_from_request(request)
 
+        if account_data_type == "im.vector.hide_profile":
+            user = UserID.from_string(user_id)
+            hide_profile = body.get("hide_profile")
+            yield self._profile_handler.set_active(user, not hide_profile, True)
+
         max_id = yield self.store.add_account_data_for_user(
             user_id, account_data_type, body
         )
diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py
index 33f6a23028..9a62f938af 100644
--- a/synapse/rest/client/v2_alpha/account_validity.py
+++ b/synapse/rest/client/v2_alpha/account_validity.py
@@ -15,6 +15,8 @@
 
 import logging
 
+from six import ensure_binary
+
 from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
diff --git a/synapse/rest/client/v2_alpha/password_policy.py b/synapse/rest/client/v2_alpha/password_policy.py
new file mode 100644
index 0000000000..968403cca4
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/password_policy.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.http.servlet import RestServlet
+
+from ._base import client_patterns
+
+logger = logging.getLogger(__name__)
+
+
+class PasswordPolicyServlet(RestServlet):
+    PATTERNS = client_patterns("/password_policy$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(PasswordPolicyServlet, self).__init__()
+
+        self.policy = hs.config.password_policy
+        self.enabled = hs.config.password_policy_enabled
+
+    def on_GET(self, request):
+        if not self.enabled or not self.policy:
+            return (200, {})
+
+        policy = {}
+
+        for param in [
+            "minimum_length",
+            "require_digit",
+            "require_symbol",
+            "require_lowercase",
+            "require_uppercase",
+        ]:
+            if param in self.policy:
+                policy["m.%s" % param] = self.policy[param]
+
+        return (200, policy)
+
+
+def register_servlets(hs, http_server):
+    PasswordPolicyServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 107854c669..c6c53188ab 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
-# Copyright 2017 Vector Creations Ltd
+# Copyright 2015-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 
 import hmac
 import logging
+import re
 
 from six import string_types
 
@@ -40,6 +42,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.ratelimitutils import FederationRateLimiter
+from synapse.util.stringutils import assert_valid_client_secret
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_patterns, interactive_auth_handler
@@ -79,13 +82,15 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
             body, ["id_server", "client_secret", "email", "send_attempt"]
         )
 
-        if not check_3pid_allowed(self.hs, "email", body["email"]):
+        if not (yield check_3pid_allowed(self.hs, "email", body["email"])):
             raise SynapseError(
                 403,
-                "Your email domain is not authorized to register on this server",
+                "Your email is not authorized to register on this server",
                 Codes.THREEPID_DENIED,
             )
 
+        assert_params_in_dict(body["client_secret"])
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             "email", body["email"]
         )
@@ -120,7 +125,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body["country"], body["phone_number"])
 
-        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+        assert_valid_client_secret(body["client_secret"])
+
+        if not (yield check_3pid_allowed(self.hs, "msisdn", msisdn)):
             raise SynapseError(
                 403,
                 "Phone numbers are not authorized to register on this server",
@@ -199,6 +206,7 @@ class RegisterRestServlet(RestServlet):
         self.room_member_handler = hs.get_room_member_handler()
         self.macaroon_gen = hs.get_macaroon_generator()
         self.ratelimiter = hs.get_registration_ratelimiter()
+        self.password_policy_handler = hs.get_password_policy_handler()
         self.clock = hs.get_clock()
 
     @interactive_auth_handler
@@ -237,12 +245,15 @@ class RegisterRestServlet(RestServlet):
 
         # we do basic sanity checks here because the auth layer will store these
         # in sessions. Pull out the username/password provided to us.
+        desired_password = None
         if "password" in body:
             if (
                 not isinstance(body["password"], string_types)
                 or len(body["password"]) > 512
             ):
                 raise SynapseError(400, "Invalid password")
+            self.password_policy_handler.validate_password(body["password"])
+            desired_password = body["password"]
 
         desired_username = None
         if "username" in body:
@@ -253,6 +264,8 @@ class RegisterRestServlet(RestServlet):
                 raise SynapseError(400, "Invalid username")
             desired_username = body["username"]
 
+        desired_display_name = body.get("display_name")
+
         appservice = None
         if self.auth.has_access_token(request):
             appservice = yield self.auth.get_appservice_by_req(request)
@@ -276,7 +289,11 @@ class RegisterRestServlet(RestServlet):
 
             if isinstance(desired_username, string_types):
                 result = yield self._do_appservice_registration(
-                    desired_username, access_token, body
+                    desired_username,
+                    desired_password,
+                    desired_display_name,
+                    access_token,
+                    body,
                 )
             return 200, result  # we throw for non 200 responses
 
@@ -398,7 +415,7 @@ class RegisterRestServlet(RestServlet):
                     medium = auth_result[login_type]["medium"]
                     address = auth_result[login_type]["address"]
 
-                    if not check_3pid_allowed(self.hs, medium, address):
+                    if not (yield check_3pid_allowed(self.hs, medium, address)):
                         raise SynapseError(
                             403,
                             "Third party identifiers (email/phone numbers)"
@@ -406,6 +423,80 @@ class RegisterRestServlet(RestServlet):
                             Codes.THREEPID_DENIED,
                         )
 
+                    existingUid = yield self.store.get_user_id_by_threepid(
+                        medium, address
+                    )
+
+                    if existingUid is not None:
+                        raise SynapseError(
+                            400, "%s is already in use" % medium, Codes.THREEPID_IN_USE
+                        )
+
+        if self.hs.config.register_mxid_from_3pid:
+            # override the desired_username based on the 3PID if any.
+            # reset it first to avoid folks picking their own username.
+            desired_username = None
+
+            # we should have an auth_result at this point if we're going to progress
+            # to register the user (i.e. we haven't picked up a registered_user_id
+            # from our session store), in which case get ready and gen the
+            # desired_username
+            if auth_result:
+                if (
+                    self.hs.config.register_mxid_from_3pid == "email"
+                    and LoginType.EMAIL_IDENTITY in auth_result
+                ):
+                    address = auth_result[LoginType.EMAIL_IDENTITY]["address"]
+                    desired_username = synapse.types.strip_invalid_mxid_characters(
+                        address.replace("@", "-").lower()
+                    )
+
+                    # find a unique mxid for the account, suffixing numbers
+                    # if needed
+                    while True:
+                        try:
+                            yield self.registration_handler.check_username(
+                                desired_username,
+                                guest_access_token=guest_access_token,
+                                assigned_user_id=registered_user_id,
+                            )
+                            # if we got this far we passed the check.
+                            break
+                        except SynapseError as e:
+                            if e.errcode == Codes.USER_IN_USE:
+                                m = re.match(r"^(.*?)(\d+)$", desired_username)
+                                if m:
+                                    desired_username = m.group(1) + str(
+                                        int(m.group(2)) + 1
+                                    )
+                                else:
+                                    desired_username += "1"
+                            else:
+                                # something else went wrong.
+                                break
+
+                    if self.hs.config.register_just_use_email_for_display_name:
+                        desired_display_name = address
+                    else:
+                        # Custom mapping between email address and display name
+                        desired_display_name = self._map_email_to_displayname(address)
+                elif (
+                    self.hs.config.register_mxid_from_3pid == "msisdn"
+                    and LoginType.MSISDN in auth_result
+                ):
+                    desired_username = auth_result[LoginType.MSISDN]["address"]
+                else:
+                    raise SynapseError(
+                        400, "Cannot derive mxid from 3pid; no recognised 3pid"
+                    )
+
+        if desired_username is not None:
+            yield self.registration_handler.check_username(
+                desired_username,
+                guest_access_token=guest_access_token,
+                assigned_user_id=registered_user_id,
+            )
+
         if registered_user_id is not None:
             logger.info(
                 "Already registered user ID %r for this session", registered_user_id
@@ -416,9 +507,16 @@ class RegisterRestServlet(RestServlet):
             # NB: This may be from the auth handler and NOT from the POST
             assert_params_in_dict(params, ["password"])
 
-            desired_username = params.get("username", None)
+            if not self.hs.config.register_mxid_from_3pid:
+                desired_username = params.get("username", None)
+            else:
+                # we keep the original desired_username derived from the 3pid above
+                pass
+
             guest_access_token = params.get("guest_access_token", None)
-            new_password = params.get("password", None)
+
+            # XXX: don't we need to validate these for length etc like we did on
+            # the ones from the JSON body earlier on in the method?
 
             if desired_username is not None:
                 desired_username = desired_username.lower()
@@ -451,8 +549,9 @@ class RegisterRestServlet(RestServlet):
 
             registered_user_id = yield self.registration_handler.register_user(
                 localpart=desired_username,
-                password=new_password,
+                password=params.get("password", None),
                 guest_access_token=guest_access_token,
+                default_display_name=desired_display_name,
                 threepid=threepid,
                 address=client_addr,
             )
@@ -464,6 +563,14 @@ class RegisterRestServlet(RestServlet):
                 ):
                     yield self.store.upsert_monthly_active_user(registered_user_id)
 
+            if self.hs.config.shadow_server:
+                yield self.registration_handler.shadow_register(
+                    localpart=desired_username,
+                    display_name=desired_display_name,
+                    auth_result=auth_result,
+                    params=params,
+                )
+
             # remember that we've now registered that user account, and with
             #  what user ID (since the user may not have specified)
             self.auth_handler.set_session_data(
@@ -491,11 +598,31 @@ class RegisterRestServlet(RestServlet):
         return 200, {}
 
     @defer.inlineCallbacks
-    def _do_appservice_registration(self, username, as_token, body):
+    def _do_appservice_registration(
+        self, username, password, display_name, as_token, body
+    ):
+
+        # FIXME: appservice_register() is horribly duplicated with register()
+        # and they should probably just be combined together with a config flag.
         user_id = yield self.registration_handler.appservice_register(
-            username, as_token
+            username, as_token, password, display_name
         )
-        return (yield self._create_registration_details(user_id, body))
+        result = yield self._create_registration_details(user_id, body)
+
+        auth_result = body.get("auth_result")
+        if auth_result and LoginType.EMAIL_IDENTITY in auth_result:
+            threepid = auth_result[LoginType.EMAIL_IDENTITY]
+            yield self._register_email_threepid(
+                user_id, threepid, result["access_token"], body.get("bind_email")
+            )
+
+        if auth_result and LoginType.MSISDN in auth_result:
+            threepid = auth_result[LoginType.MSISDN]
+            yield self._register_msisdn_threepid(
+                user_id, threepid, result["access_token"], body.get("bind_msisdn")
+            )
+
+        return result
 
     @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
@@ -548,6 +675,60 @@ class RegisterRestServlet(RestServlet):
         )
 
 
+def cap(name):
+    """Capitalise parts of a name containing different words, including those
+    separated by hyphens.
+    For example, 'John-Doe'
+
+    Args:
+        name (str): The name to parse
+    """
+    if not name:
+        return name
+
+    # Split the name by whitespace then hyphens, capitalizing each part then
+    # joining it back together.
+    capatilized_name = " ".join(
+        "-".join(part.capitalize() for part in space_part.split("-"))
+        for space_part in name.split()
+    )
+    return capatilized_name
+
+
+def _map_email_to_displayname(address):
+    """Custom mapping from an email address to a user displayname
+
+    Args:
+        address (str): The email address to process
+    Returns:
+        str: The new displayname
+    """
+    # Split the part before and after the @ in the email.
+    # Replace all . with spaces in the first part
+    parts = address.replace(".", " ").split("@")
+
+    # Figure out which org this email address belongs to
+    org_parts = parts[1].split(" ")
+
+    # If this is a ...matrix.org email, mark them as an Admin
+    if org_parts[-2] == "matrix" and org_parts[-1] == "org":
+        org = "Tchap Admin"
+
+    # Is this is a ...gouv.fr address, set the org to whatever is before
+    # gouv.fr. If there isn't anything (a @gouv.fr email) simply mark their
+    # org as "gouv"
+    elif org_parts[-2] == "gouv" and org_parts[-1] == "fr":
+        org = org_parts[-3] if len(org_parts) > 2 else org_parts[-2]
+
+    # Otherwise, mark their org as the email's second-level domain name
+    else:
+        org = org_parts[-2]
+
+    desired_display_name = cap(parts[0]) + " [" + cap(org) + "]"
+
+    return desired_display_name
+
+
 def register_servlets(hs, http_server):
     EmailRegisterRequestTokenRestServlet(hs).register(http_server)
     MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
index 2863affbab..e586fc595f 100644
--- a/synapse/rest/client/v2_alpha/user_directory.py
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -15,10 +15,13 @@
 
 import logging
 
+from signedjson.sign import sign_json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import UserID
 
 from ._base import client_patterns
 
@@ -37,6 +40,7 @@ class UserDirectorySearchRestServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.user_directory_handler = hs.get_user_directory_handler()
+        self.http_client = hs.get_simple_http_client()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -64,6 +68,16 @@ class UserDirectorySearchRestServlet(RestServlet):
 
         body = parse_json_object_from_request(request)
 
+        if self.hs.config.user_directory_defer_to_id_server:
+            signed_body = sign_json(
+                body, self.hs.hostname, self.hs.config.signing_key[0]
+            )
+            url = "%s/_matrix/identity/api/v1/user_directory/search" % (
+                self.hs.config.user_directory_defer_to_id_server,
+            )
+            resp = yield self.http_client.post_json_get_json(url, signed_body)
+            defer.returnValue((200, resp))
+
         limit = body.get("limit", 10)
         limit = min(limit, 50)
 
@@ -79,5 +93,87 @@ class UserDirectorySearchRestServlet(RestServlet):
         return 200, results
 
 
+class UserInfoServlet(RestServlet):
+    """
+    GET /user/{user_id}/info HTTP/1.1
+    """
+
+    PATTERNS = client_patterns("/user/(?P<user_id>[^/]*)/info$")
+
+    def __init__(self, hs):
+        super(UserInfoServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+        self.clock = hs.get_clock()
+        self.transport_layer = hs.get_federation_transport_client()
+        registry = hs.get_federation_registry()
+
+        if not registry.query_handlers.get("user_info"):
+            registry.register_query_handler("user_info", self._on_federation_query)
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id):
+        # Ensure the user is authenticated
+        yield self.auth.get_user_by_req(request, allow_guest=False)
+
+        user = UserID.from_string(user_id)
+        if not self.hs.is_mine(user):
+            # Attempt to make a federation request to the server that owns this user
+            args = {"user_id": user_id}
+            res = yield self.transport_layer.make_query(
+                user.domain, "user_info", args, retry_on_dns_fail=True
+            )
+            defer.returnValue((200, res))
+
+        res = yield self._get_user_info(user_id)
+        defer.returnValue((200, res))
+
+    @defer.inlineCallbacks
+    def _on_federation_query(self, args):
+        """Called when a request for user information appears over federation
+
+        Args:
+            args (dict): Dictionary of query arguments provided by the request
+
+        Returns:
+            Deferred[dict]: Deactivation and expiration information for a given user
+        """
+        user_id = args.get("user_id")
+        if not user_id:
+            raise SynapseError(400, "user_id not provided")
+
+        user = UserID.from_string(user_id)
+        if not self.hs.is_mine(user):
+            raise SynapseError(400, "User is not hosted on this homeserver")
+
+        res = yield self._get_user_info(user_id)
+        defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def _get_user_info(self, user_id):
+        """Retrieve information about a given user
+
+        Args:
+            user_id (str): The User ID of a given user on this homeserver
+
+        Returns:
+            Deferred[dict]: Deactivation and expiration information for a given user
+        """
+        # Check whether user is deactivated
+        is_deactivated = yield self.store.get_user_deactivated_status(user_id)
+
+        # Check whether user is expired
+        expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+        is_expired = (
+            expiration_ts is not None and self.clock.time_msec() >= expiration_ts
+        )
+
+        res = {"expired": is_expired, "deactivated": is_deactivated}
+        defer.returnValue(res)
+
+
 def register_servlets(hs, http_server):
     UserDirectorySearchRestServlet(hs).register(http_server)
+    UserInfoServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 7a56cd4b6c..fbc2fc3a2f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -74,6 +74,8 @@ class PreviewUrlResource(DirectServeResource):
             treq_args={"browser_like_redirects": True},
             ip_whitelist=hs.config.url_preview_ip_range_whitelist,
             ip_blacklist=hs.config.url_preview_ip_range_blacklist,
+            http_proxy=os.getenv("http_proxy"),
+            https_proxy=os.getenv("HTTPS_PROXY"),
         )
         self.media_repo = media_repo
         self.primary_base_path = media_repo.primary_base_path
diff --git a/synapse/rulecheck/__init__.py b/synapse/rulecheck/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/synapse/rulecheck/__init__.py
diff --git a/synapse/rulecheck/domain_rule_checker.py b/synapse/rulecheck/domain_rule_checker.py
new file mode 100644
index 0000000000..6f2a1931c5
--- /dev/null
+++ b/synapse/rulecheck/domain_rule_checker.py
@@ -0,0 +1,181 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.config._base import ConfigError
+
+logger = logging.getLogger(__name__)
+
+
+class DomainRuleChecker(object):
+    """
+    A re-implementation of the SpamChecker that prevents users in one domain from
+    inviting users in other domains to rooms, based on a configuration.
+
+    Takes a config in the format:
+
+    spam_checker:
+        module: "rulecheck.DomainRuleChecker"
+        config:
+          domain_mapping:
+            "inviter_domain": [ "invitee_domain_permitted", "other_domain_permitted" ]
+            "other_inviter_domain": [ "invitee_domain_permitted" ]
+          default: False
+
+          # Only let local users join rooms if they were explicitly invited.
+          can_only_join_rooms_with_invite: false
+
+          # Only let local users create rooms if they are inviting only one
+          # other user, and that user matches the rules above.
+          can_only_create_one_to_one_rooms: false
+
+          # Only let local users invite during room creation, regardless of the
+          # domain mapping rules above.
+          can_only_invite_during_room_creation: false
+
+          # Prevent local users from inviting users from certain domains to
+          # rooms published in the room directory.
+          domains_prevented_from_being_invited_to_published_rooms: []
+
+          # Allow third party invites
+          can_invite_by_third_party_id: true
+
+    Don't forget to consider if you can invite users from your own domain.
+    """
+
+    def __init__(self, config):
+        self.domain_mapping = config["domain_mapping"] or {}
+        self.default = config["default"]
+
+        self.can_only_join_rooms_with_invite = config.get(
+            "can_only_join_rooms_with_invite", False
+        )
+        self.can_only_create_one_to_one_rooms = config.get(
+            "can_only_create_one_to_one_rooms", False
+        )
+        self.can_only_invite_during_room_creation = config.get(
+            "can_only_invite_during_room_creation", False
+        )
+        self.can_invite_by_third_party_id = config.get(
+            "can_invite_by_third_party_id", True
+        )
+        self.domains_prevented_from_being_invited_to_published_rooms = config.get(
+            "domains_prevented_from_being_invited_to_published_rooms", []
+        )
+
+    def check_event_for_spam(self, event):
+        """Implements synapse.events.SpamChecker.check_event_for_spam
+        """
+        return False
+
+    def user_may_invite(
+        self,
+        inviter_userid,
+        invitee_userid,
+        third_party_invite,
+        room_id,
+        new_room,
+        published_room=False,
+    ):
+        """Implements synapse.events.SpamChecker.user_may_invite
+        """
+        if self.can_only_invite_during_room_creation and not new_room:
+            return False
+
+        if not self.can_invite_by_third_party_id and third_party_invite:
+            return False
+
+        # This is a third party invite (without a bound mxid), so unless we have
+        # banned all third party invites (above) we allow it.
+        if not invitee_userid:
+            return True
+
+        inviter_domain = self._get_domain_from_id(inviter_userid)
+        invitee_domain = self._get_domain_from_id(invitee_userid)
+
+        if inviter_domain not in self.domain_mapping:
+            return self.default
+
+        if (
+            published_room
+            and invitee_domain
+            in self.domains_prevented_from_being_invited_to_published_rooms
+        ):
+            return False
+
+        return invitee_domain in self.domain_mapping[inviter_domain]
+
+    def user_may_create_room(
+        self, userid, invite_list, third_party_invite_list, cloning
+    ):
+        """Implements synapse.events.SpamChecker.user_may_create_room
+        """
+
+        if cloning:
+            return True
+
+        if not self.can_invite_by_third_party_id and third_party_invite_list:
+            return False
+
+        number_of_invites = len(invite_list) + len(third_party_invite_list)
+
+        if self.can_only_create_one_to_one_rooms and number_of_invites != 1:
+            return False
+
+        return True
+
+    def user_may_create_room_alias(self, userid, room_alias):
+        """Implements synapse.events.SpamChecker.user_may_create_room_alias
+        """
+        return True
+
+    def user_may_publish_room(self, userid, room_id):
+        """Implements synapse.events.SpamChecker.user_may_publish_room
+        """
+        return True
+
+    def user_may_join_room(self, userid, room_id, is_invited):
+        """Implements synapse.events.SpamChecker.user_may_join_room
+        """
+        if self.can_only_join_rooms_with_invite and not is_invited:
+            return False
+
+        return True
+
+    @staticmethod
+    def parse_config(config):
+        """Implements synapse.events.SpamChecker.parse_config
+        """
+        if "default" in config:
+            return config
+        else:
+            raise ConfigError("No default set for spam_config DomainRuleChecker")
+
+    @staticmethod
+    def _get_domain_from_id(mxid):
+        """Parses a string and returns the domain part of the mxid.
+
+        Args:
+           mxid (str): a valid mxid
+
+        Returns:
+           str: the domain part of the mxid
+
+        """
+        idx = mxid.find(":")
+        if idx == -1:
+            raise Exception("Invalid ID: %r" % (mxid,))
+        return mxid[idx + 1 :]
diff --git a/synapse/server.py b/synapse/server.py
index 9e28dba2b1..23be3560fa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -23,6 +23,7 @@
 # Imports required for the default HomeServer() implementation
 import abc
 import logging
+import os
 
 from twisted.enterprise import adbapi
 from twisted.mail.smtp import sendmail
@@ -65,6 +66,7 @@ from synapse.handlers.groups_local import GroupsLocalHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.message import EventCreationHandler, MessageHandler
 from synapse.handlers.pagination import PaginationHandler
+from synapse.handlers.password_policy import PasswordPolicyHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
@@ -166,6 +168,7 @@ class HomeServer(object):
         "event_builder_factory",
         "filtering",
         "http_client_context_factory",
+        "proxied_http_client",
         "simple_http_client",
         "media_repository",
         "media_repository_resource",
@@ -196,6 +199,7 @@ class HomeServer(object):
         "account_validity_handler",
         "saml_handler",
         "event_client_serializer",
+        "password_policy_handler",
     ]
 
     REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@@ -304,6 +308,13 @@ class HomeServer(object):
     def build_simple_http_client(self):
         return SimpleHttpClient(self)
 
+    def build_proxied_http_client(self):
+        return SimpleHttpClient(
+            self,
+            http_proxy=os.getenv("http_proxy"),
+            https_proxy=os.getenv("HTTPS_PROXY"),
+        )
+
     def build_room_creation_handler(self):
         return RoomCreationHandler(self)
 
@@ -533,6 +544,9 @@ class HomeServer(object):
     def build_event_client_serializer(self):
         return EventClientSerializer(self)
 
+    def build_password_policy_handler(self):
+        return PasswordPolicyHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 16f8f6b573..56f9cd06e5 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -12,6 +12,7 @@ import synapse.handlers.message
 import synapse.handlers.room
 import synapse.handlers.room_member
 import synapse.handlers.set_password
+import synapse.http.client
 import synapse.rest.media.v1.media_repository
 import synapse.server_notices.server_notices_manager
 import synapse.server_notices.server_notices_sender
@@ -38,6 +39,14 @@ class HomeServer(object):
         pass
     def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler:
         pass
+    def get_simple_http_client(self) -> synapse.http.client.SimpleHttpClient:
+        """Fetch an HTTP client implementation which doesn't do any blacklisting
+        or support any HTTP_PROXY settings"""
+        pass
+    def get_proxied_http_client(self) -> synapse.http.client.SimpleHttpClient:
+        """Fetch an HTTP client implementation which doesn't do any blacklisting
+        but does support HTTP_PROXY settings"""
+        pass
     def get_deactivate_account_handler(
         self
     ) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index abe16334ec..1ef5662c31 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -360,14 +360,11 @@ class SQLBaseStore(object):
                 expiration_ts,
             )
 
-        self._simple_insert_txn(
+        self._simple_upsert_txn(
             txn,
             "account_validity",
-            values={
-                "user_id": user_id,
-                "expiration_ts_ms": expiration_ts,
-                "email_sent": False,
-            },
+            keyvalues={"user_id": user_id},
+            values={"expiration_ts_ms": expiration_ts, "email_sent": False},
         )
 
     def start_profiling(self):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 435b2acd4d..ec19ce5ca4 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -35,7 +35,7 @@ def _make_exclusive_regex(services_cache):
     exclusive_user_regexes = [
         regex.pattern
         for service in services_cache
-        for regex in service.get_exlusive_user_regexes()
+        for regex in service.get_exclusive_user_regexes()
     ]
     if exclusive_user_regexes:
         exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 41f62828bd..cc2852026f 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -1,5 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -20,7 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.api.errors import StoreError
+from synapse.api.errors import Codes, StoreError
 from synapse.logging.opentracing import (
     get_active_span_text_map,
     set_tag,
@@ -42,7 +44,8 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
 
 class DeviceWorkerStore(SQLBaseStore):
     def get_device(self, user_id, device_id):
-        """Retrieve a device.
+        """Retrieve a device. Only returns devices that are not marked as
+        hidden.
 
         Args:
             user_id (str): The ID of the user which owns the device
@@ -54,14 +57,15 @@ class DeviceWorkerStore(SQLBaseStore):
         """
         return self._simple_select_one(
             table="devices",
-            keyvalues={"user_id": user_id, "device_id": device_id},
+            keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             retcols=("user_id", "device_id", "display_name"),
             desc="get_device",
         )
 
     @defer.inlineCallbacks
     def get_devices_by_user(self, user_id):
-        """Retrieve all of a user's registered devices.
+        """Retrieve all of a user's registered devices. Only returns devices
+        that are not marked as hidden.
 
         Args:
             user_id (str):
@@ -72,7 +76,7 @@ class DeviceWorkerStore(SQLBaseStore):
         """
         devices = yield self._simple_select_list(
             table="devices",
-            keyvalues={"user_id": user_id},
+            keyvalues={"user_id": user_id, "hidden": False},
             retcols=("user_id", "device_id", "display_name"),
             desc="get_devices_by_user",
         )
@@ -567,6 +571,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
         Returns:
             defer.Deferred: boolean whether the device was inserted or an
                 existing device existed with that ID.
+        Raises:
+            StoreError: if the device is already in use
         """
         key = (user_id, device_id)
         if self.device_id_exists_cache.get(key, None):
@@ -579,12 +585,25 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "user_id": user_id,
                     "device_id": device_id,
                     "display_name": initial_device_display_name,
+                    "hidden": False,
                 },
                 desc="store_device",
                 or_ignore=True,
             )
+            if not inserted:
+                # if the device already exists, check if it's a real device, or
+                # if the device ID is reserved by something else
+                hidden = yield self._simple_select_one_onecol(
+                    "devices",
+                    keyvalues={"user_id": user_id, "device_id": device_id},
+                    retcol="hidden",
+                )
+                if hidden:
+                    raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
             self.device_id_exists_cache.prefill(key, True)
             return inserted
+        except StoreError:
+            raise
         except Exception as e:
             logger.error(
                 "store_device with device_id=%s(%r) user_id=%s(%r)"
@@ -611,7 +630,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
         """
         yield self._simple_delete_one(
             table="devices",
-            keyvalues={"user_id": user_id, "device_id": device_id},
+            keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             desc="delete_device",
         )
 
@@ -631,14 +650,15 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             table="devices",
             column="device_id",
             iterable=device_ids,
-            keyvalues={"user_id": user_id},
+            keyvalues={"user_id": user_id, "hidden": False},
             desc="delete_devices",
         )
         for device_id in device_ids:
             self.device_id_exists_cache.invalidate((user_id, device_id))
 
     def update_device(self, user_id, device_id, new_display_name=None):
-        """Update a device.
+        """Update a device. Only updates the device if it is not marked as
+        hidden.
 
         Args:
             user_id (str): The ID of the user which owns the device
@@ -657,7 +677,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             return defer.succeed(None)
         return self._simple_update_one(
             table="devices",
-            keyvalues={"user_id": user_id, "device_id": device_id},
+            keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             updatevalues=updates,
             desc="update_device",
         )
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 33e3a84933..b6f2538e84 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -92,7 +92,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             "    k.key_json"
             " FROM devices d"
             "    %s JOIN e2e_device_keys_json k USING (user_id, device_id)"
-            " WHERE %s"
+            " WHERE %s AND NOT d.hidden"
         ) % (
             "LEFT" if include_all_devices else "INNER",
             " OR ".join("(" + q + ")" for q in query_clauses),
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 32050868ff..8b9e3f9e73 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1461,6 +1461,9 @@ class EventsStore(
             elif event.type == EventTypes.Redaction:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
+            elif event.type == EventTypes.Retention:
+                # Update the room_retention table.
+                self._store_retention_policy_for_room_txn(txn, event)
 
             self._handle_event_relations(txn, event)
 
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 912c1df6be..0a36c9cb34 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,8 +19,11 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.storage.roommember import ProfileInfo
 
+from . import background_updates
 from ._base import SQLBaseStore
 
+BATCH_SIZE = 100
+
 
 class ProfileWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
@@ -58,6 +62,54 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_profile_avatar_url",
         )
 
+    def get_latest_profile_replication_batch_number(self):
+        def f(txn):
+            txn.execute("SELECT MAX(batch) as maxbatch FROM profiles")
+            rows = self.cursor_to_dict(txn)
+            return rows[0]["maxbatch"]
+
+        return self.runInteraction("get_latest_profile_replication_batch_number", f)
+
+    def get_profile_batch(self, batchnum):
+        return self._simple_select_list(
+            table="profiles",
+            keyvalues={"batch": batchnum},
+            retcols=("user_id", "displayname", "avatar_url", "active"),
+            desc="get_profile_batch",
+        )
+
+    def assign_profile_batch(self):
+        def f(txn):
+            sql = (
+                "UPDATE profiles SET batch = "
+                "(SELECT COALESCE(MAX(batch), -1) + 1 FROM profiles) "
+                "WHERE user_id in ("
+                "    SELECT user_id FROM profiles WHERE batch is NULL limit ?"
+                ")"
+            )
+            txn.execute(sql, (BATCH_SIZE,))
+            return txn.rowcount
+
+        return self.runInteraction("assign_profile_batch", f)
+
+    def get_replication_hosts(self):
+        def f(txn):
+            txn.execute(
+                "SELECT host, last_synced_batch FROM profile_replication_status"
+            )
+            rows = self.cursor_to_dict(txn)
+            return {r["host"]: r["last_synced_batch"] for r in rows}
+
+        return self.runInteraction("get_replication_hosts", f)
+
+    def update_replication_batch_for_host(self, host, last_synced_batch):
+        return self._simple_upsert(
+            table="profile_replication_status",
+            keyvalues={"host": host},
+            values={"last_synced_batch": last_synced_batch},
+            desc="update_replication_batch_for_host",
+        )
+
     def get_from_remote_profile_cache(self, user_id):
         return self._simple_select_one(
             table="remote_profile_cache",
@@ -67,29 +119,53 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_from_remote_profile_cache",
         )
 
-    def create_profile(self, user_localpart):
-        return self._simple_insert(
-            table="profiles", values={"user_id": user_localpart}, desc="create_profile"
-        )
-
-    def set_profile_displayname(self, user_localpart, new_displayname):
-        return self._simple_update_one(
+    def set_profile_displayname(self, user_localpart, new_displayname, batchnum):
+        return self._simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
+            values={"displayname": new_displayname, "batch": batchnum},
             desc="set_profile_displayname",
+            lock=False,  # we can do this because user_id has a unique index
         )
 
-    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
-        return self._simple_update_one(
+    def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum):
+        return self._simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
+            values={"avatar_url": new_avatar_url, "batch": batchnum},
             desc="set_profile_avatar_url",
+            lock=False,  # we can do this because user_id has a unique index
+        )
+
+    def set_profile_active(self, user_localpart, active, hide, batchnum):
+        values = {"active": int(active), "batch": batchnum}
+        if not active and not hide:
+            # we are deactivating for real (not in hide mode)
+            # so clear the profile.
+            values["avatar_url"] = None
+            values["displayname"] = None
+        return self._simple_upsert(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            values=values,
+            desc="set_profile_active",
+            lock=False,  # we can do this because user_id has a unique index
         )
 
 
-class ProfileStore(ProfileWorkerStore):
+class ProfileStore(ProfileWorkerStore, background_updates.BackgroundUpdateStore):
+    def __init__(self, db_conn, hs):
+
+        super(ProfileStore, self).__init__(db_conn, hs)
+
+        self.register_background_index_update(
+            "profile_replication_status_host_index",
+            index_name="profile_replication_status_idx",
+            table="profile_replication_status",
+            columns=["host"],
+            unique=True,
+        )
+
     def add_remote_profile_cache(self, user_id, displayname, avatar_url):
         """Ensure we are caching the remote user's profiles.
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3f50324253..6e7db564bb 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -156,6 +156,28 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
+    def get_expired_users(self):
+        """Get IDs of all expired users
+
+        Returns:
+            Deferred[list[str]]: List of expired user IDs
+        """
+
+        def get_expired_users_txn(txn, now_ms):
+            sql = """
+                SELECT user_id from account_validity
+                WHERE expiration_ts_ms <= ?
+            """
+            txn.execute(sql, (now_ms,))
+            rows = txn.fetchall()
+            return [row[0] for row in rows]
+
+        res = yield self.runInteraction(
+            "get_expired_users", get_expired_users_txn, self.clock.time_msec()
+        )
+        defer.returnValue(res)
+
+    @defer.inlineCallbacks
     def set_renewal_token_for_user(self, user_id, renewal_token):
         """Defines a renewal token for a given user.
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..89d6ecd23f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -17,10 +17,13 @@ import collections
 import logging
 import re
 
+from six import integer_types
+
 from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
@@ -171,6 +174,24 @@ class RoomWorkerStore(SQLBaseStore):
             desc="is_room_blocked",
         )
 
+    @defer.inlineCallbacks
+    def is_room_published(self, room_id):
+        """Check whether a room has been published in the local public room
+        directory.
+
+        Args:
+            room_id (str)
+        Returns:
+            bool: Whether the room is currently published in the room directory
+        """
+        # Get room information
+        room_info = yield self.get_room(room_id)
+        if not room_info:
+            defer.returnValue(False)
+
+        # Check the is_public value
+        defer.returnValue(room_info.get("is_public", False))
+
     @cachedInlineCallbacks(max_entries=10000)
     def get_ratelimit_for_user(self, user_id):
         """Check if there are any overrides for ratelimiting for the given
@@ -200,8 +221,146 @@ class RoomWorkerStore(SQLBaseStore):
         else:
             return None
 
+    @cachedInlineCallbacks()
+    def get_retention_policy_for_room(self, room_id):
+        """Get the retention policy for a given room.
+
+        If no retention policy has been found for this room, returns a policy defined
+        by the configured default policy (which has None as both the 'min_lifetime' and
+        the 'max_lifetime' if no default policy has been defined in the server's
+        configuration).
+
+        Args:
+            room_id (str): The ID of the room to get the retention policy of.
+
+        Returns:
+            dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
+        """
+        # If the room retention feature is disabled, return a policy with no minimum nor
+        # maximum, in order not to filter out events we should filter out when sending to
+        # the client.
+        if not self.config.retention_enabled:
+            defer.returnValue({"min_lifetime": None, "max_lifetime": None})
+
+        def get_retention_policy_for_room_txn(txn):
+            txn.execute(
+                """
+                SELECT min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                WHERE room_id = ?;
+                """,
+                (room_id,),
+            )
+
+            return self.cursor_to_dict(txn)
+
+        ret = yield self.runInteraction(
+            "get_retention_policy_for_room", get_retention_policy_for_room_txn
+        )
+
+        # If we don't know this room ID, ret will be None, in this case return the default
+        # policy.
+        if not ret:
+            defer.returnValue(
+                {
+                    "min_lifetime": self.config.retention_default_min_lifetime,
+                    "max_lifetime": self.config.retention_default_max_lifetime,
+                }
+            )
+
+        row = ret[0]
+
+        # If one of the room's policy's attributes isn't defined, use the matching
+        # attribute from the default policy.
+        # The default values will be None if no default policy has been defined, or if one
+        # of the attributes is missing from the default policy.
+        if row["min_lifetime"] is None:
+            row["min_lifetime"] = self.config.retention_default_min_lifetime
+
+        if row["max_lifetime"] is None:
+            row["max_lifetime"] = self.config.retention_default_max_lifetime
+
+        defer.returnValue(row)
+
 
 class RoomStore(RoomWorkerStore, SearchStore):
+    def __init__(self, db_conn, hs):
+        super(RoomStore, self).__init__(db_conn, hs)
+
+        self.config = hs.config
+
+        self.register_background_update_handler(
+            "insert_room_retention", self._background_insert_retention
+        )
+
+    @defer.inlineCallbacks
+    def _background_insert_retention(self, progress, batch_size):
+        """Retrieves a list of all rooms within a range and inserts an entry for each of
+        them into the room_retention table.
+        NULLs the property's columns if missing from the retention event in the room's
+        state (or NULLs all of them if there's no retention event in the room's state),
+        so that we fall back to the server's retention policy.
+        """
+
+        last_room = progress.get("room_id", "")
+
+        def _background_insert_retention_txn(txn):
+            txn.execute(
+                """
+                SELECT state.room_id, state.event_id, events.json
+                FROM current_state_events as state
+                LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
+                WHERE state.room_id > ? AND state.type = '%s'
+                ORDER BY state.room_id ASC
+                LIMIT ?;
+                """
+                % EventTypes.Retention,
+                (last_room, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            for row in rows:
+                if not row["json"]:
+                    retention_policy = {}
+                else:
+                    ev = json.loads(row["json"])
+                    retention_policy = json.dumps(ev["content"])
+
+                self._simple_insert_txn(
+                    txn=txn,
+                    table="room_retention",
+                    values={
+                        "room_id": row["room_id"],
+                        "event_id": row["event_id"],
+                        "min_lifetime": retention_policy.get("min_lifetime"),
+                        "max_lifetime": retention_policy.get("max_lifetime"),
+                    },
+                )
+
+            logger.info("Inserted %d rows into room_retention", len(rows))
+
+            self._background_update_progress_txn(
+                txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "insert_room_retention", _background_insert_retention_txn
+        )
+
+        if end:
+            yield self._end_background_update("insert_room_retention")
+
+        defer.returnValue(batch_size)
+
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
         """Stores a room.
@@ -402,6 +561,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 txn, event, "content.body", event.content["body"]
             )
 
+    def _store_retention_policy_for_room_txn(self, txn, event):
+        if hasattr(event, "content") and (
+            "min_lifetime" in event.content or "max_lifetime" in event.content
+        ):
+            if (
+                "min_lifetime" in event.content
+                and not isinstance(event.content.get("min_lifetime"), integer_types)
+            ) or (
+                "max_lifetime" in event.content
+                and not isinstance(event.content.get("max_lifetime"), integer_types)
+            ):
+                # Ignore the event if one of the value isn't an integer.
+                return
+
+            self._simple_insert_txn(
+                txn=txn,
+                table="room_retention",
+                values={
+                    "room_id": event.room_id,
+                    "event_id": event.event_id,
+                    "min_lifetime": event.content.get("min_lifetime"),
+                    "max_lifetime": event.content.get("max_lifetime"),
+                },
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.get_retention_policy_for_room, (event.room_id,)
+            )
+
     def add_event_report(
         self, room_id, event_id, user_id, reason, content, received_ts
     ):
@@ -583,3 +771,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
                             remote_media_mxcs.append((hostname, media_id))
 
         return local_media_mxcs, remote_media_mxcs
+
+    @defer.inlineCallbacks
+    def get_rooms_for_retention_period_in_range(
+        self, min_ms, max_ms, include_null=False
+    ):
+        """Retrieves all of the rooms within the given retention range.
+
+        Optionally includes the rooms which don't have a retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, doesn't set a lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, doesn't set an upper limit.
+            include_null (bool): Whether to include rooms which retention policy is NULL
+                in the returned set.
+
+        Returns:
+            dict[str, dict]: The rooms within this range, along with their retention
+                policy. The key is "room_id", and maps to a dict describing the retention
+                policy associated with this room ID. The keys for this nested dict are
+                "min_lifetime" (int|None), and "max_lifetime" (int|None).
+        """
+
+        def get_rooms_for_retention_period_in_range_txn(txn):
+            range_conditions = []
+            args = []
+
+            if min_ms is not None:
+                range_conditions.append("max_lifetime > ?")
+                args.append(min_ms)
+
+            if max_ms is not None:
+                range_conditions.append("max_lifetime <= ?")
+                args.append(max_ms)
+
+            # Do a first query which will retrieve the rooms that have a retention policy
+            # in their current state.
+            sql = """
+                SELECT room_id, min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                """
+
+            if len(range_conditions):
+                sql += " WHERE (" + " AND ".join(range_conditions) + ")"
+
+                if include_null:
+                    sql += " OR max_lifetime IS NULL"
+
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+            rooms_dict = {}
+
+            for row in rows:
+                rooms_dict[row["room_id"]] = {
+                    "min_lifetime": row["min_lifetime"],
+                    "max_lifetime": row["max_lifetime"],
+                }
+
+            if include_null:
+                # If required, do a second query that retrieves all of the rooms we know
+                # of so we can handle rooms with no retention policy.
+                sql = "SELECT DISTINCT room_id FROM current_state_events"
+
+                txn.execute(sql)
+
+                rows = self.cursor_to_dict(txn)
+
+                # If a room isn't already in the dict (i.e. it doesn't have a retention
+                # policy in its state), add it with a null policy.
+                for row in rows:
+                    if row["room_id"] not in rooms_dict:
+                        rooms_dict[row["room_id"]] = {
+                            "min_lifetime": None,
+                            "max_lifetime": None,
+                        }
+
+            return rooms_dict
+
+        rooms = yield self.runInteraction(
+            "get_rooms_for_retention_period_in_range",
+            get_rooms_for_retention_period_in_range_txn,
+        )
+
+        defer.returnValue(rooms)
diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql
new file mode 100644
index 0000000000..e744c02fe8
--- /dev/null
+++ b/synapse/storage/schema/delta/48/profiles_batch.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Add a batch number to track changes to profiles and the
+ * order they're made in so we can replicate user profiles
+ * to other hosts as they change
+ */
+ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL;
+
+/*
+ * Index on the batch number so we can get profiles
+ * by their batch
+ */
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+
+/*
+ * A table to track what batch of user profiles has been
+ * synced to what profile replication target.
+ */
+CREATE TABLE profile_replication_status (
+    host TEXT NOT NULL,
+    last_synced_batch BIGINT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/50/profiles_deactivated_users.sql b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
new file mode 100644
index 0000000000..c8893ecbe8
--- /dev/null
+++ b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
@@ -0,0 +1,23 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * A flag saying whether the user owning the profile has been deactivated
+ * This really belongs on the users table, not here, but the users table
+ * stores users by their full user_id and profiles stores them by localpart,
+ * so we can't easily join between the two tables. Plus, the batch number
+ * realy ought to represent data in this table that has changed.
+ */
+ALTER TABLE profiles ADD COLUMN active SMALLINT DEFAULT 1 NOT NULL;
diff --git a/synapse/storage/schema/delta/55/profile_replication_status_index.sql b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
new file mode 100644
index 0000000000..18a0f7e10c
--- /dev/null
+++ b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('profile_replication_status_host_index', '{}');
diff --git a/synapse/storage/schema/delta/55/room_retention.sql b/synapse/storage/schema/delta/55/room_retention.sql
new file mode 100644
index 0000000000..ee6cdf7a14
--- /dev/null
+++ b/synapse/storage/schema/delta/55/room_retention.sql
@@ -0,0 +1,33 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Tracks the retention policy of a room.
+-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
+-- the room's retention policy state event.
+-- If a room doesn't have a retention policy state event in its state, both max_lifetime
+-- and min_lifetime are NULL.
+CREATE TABLE IF NOT EXISTS room_retention(
+    room_id TEXT,
+    event_id TEXT,
+    min_lifetime BIGINT,
+    max_lifetime BIGINT,
+
+    PRIMARY KEY(room_id, event_id)
+);
+
+CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('insert_room_retention', '{}');
diff --git a/synapse/storage/schema/delta/56/hidden_devices.sql b/synapse/storage/schema/delta/56/hidden_devices.sql
new file mode 100644
index 0000000000..67f8b20297
--- /dev/null
+++ b/synapse/storage/schema/delta/56/hidden_devices.sql
@@ -0,0 +1,18 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- device list needs to know which ones are "real" devices, and which ones are
+-- just used to avoid collisions
+ALTER TABLE devices ADD COLUMN hidden BOOLEAN DEFAULT FALSE;
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres
index 098434356f..01a2b0e024 100644
--- a/synapse/storage/schema/full_schemas/54/full.sql.postgres
+++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres
@@ -667,10 +667,19 @@ CREATE TABLE presence_stream (
 
 
 
+CREATE TABLE profile_replication_status (
+    host text NOT NULL,
+    last_synced_batch bigint NOT NULL
+);
+
+
+
 CREATE TABLE profiles (
     user_id text NOT NULL,
     displayname text,
-    avatar_url text
+    avatar_url text,
+    batch bigint,
+    active smallint DEFAULT 1 NOT NULL
 );
 
 
@@ -1842,6 +1851,10 @@ CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id);
 
 
 
+CREATE INDEX profiles_batch_idx ON profiles USING btree (batch);
+
+
+
 CREATE INDEX public_room_index ON rooms USING btree (is_public);
 
 
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
index be9295e4c9..f1a71627f0 100644
--- a/synapse/storage/schema/full_schemas/54/full.sql.sqlite
+++ b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
@@ -6,7 +6,7 @@ CREATE TABLE presence_allow_inbound( observed_user_id TEXT NOT NULL, observer_us
 CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, UNIQUE(name) );
 CREATE TABLE access_tokens( id BIGINT PRIMARY KEY, user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, last_used BIGINT, UNIQUE(token) );
 CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL );
-CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) );
+CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, batch BIGINT DEFAULT NULL, active SMALLINT DEFAULT 1 NOT NULL, UNIQUE(user_id) );
 CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) );
 CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER );
 CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, UNIQUE (event_id) );
@@ -208,6 +208,8 @@ CREATE INDEX group_users_u_idx ON group_users(user_id);
 CREATE INDEX group_invites_u_idx ON group_invites(user_id);
 CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
 CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+CREATE TABLE profile_replication_status ( host TEXT NOT NULL, last_synced_batch BIGINT NOT NULL );
 CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL );
 CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
 CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
diff --git a/synapse/third_party_rules/access_rules.py b/synapse/third_party_rules/access_rules.py
new file mode 100644
index 0000000000..253bba664b
--- /dev/null
+++ b/synapse/third_party_rules/access_rules.py
@@ -0,0 +1,586 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import email.utils
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset
+from synapse.api.errors import SynapseError
+from synapse.config._base import ConfigError
+from synapse.types import get_domain_from_id
+
+ACCESS_RULES_TYPE = "im.vector.room.access_rules"
+ACCESS_RULE_RESTRICTED = "restricted"
+ACCESS_RULE_UNRESTRICTED = "unrestricted"
+ACCESS_RULE_DIRECT = "direct"
+
+VALID_ACCESS_RULES = (
+    ACCESS_RULE_DIRECT,
+    ACCESS_RULE_RESTRICTED,
+    ACCESS_RULE_UNRESTRICTED,
+)
+
+# Rules to which we need to apply the power levels restrictions.
+#
+# These are all of the rules that neither:
+#  * forbid users from joining based on a server blacklist (which means that there
+#     is no need to apply power level restrictions), nor
+#  * target direct chats (since we allow both users to be room admins in this case).
+#
+# The power-level restrictions, when they are applied, prevent the following:
+#  * the default power level for users (users_default) being set to anything other than 0.
+#  * a non-default power level being assigned to any user which would be forbidden from
+#     joining a restricted room.
+RULES_WITH_RESTRICTED_POWER_LEVELS = (ACCESS_RULE_UNRESTRICTED,)
+
+
+class RoomAccessRules(object):
+    """Implementation of the ThirdPartyEventRules module API that allows federation admins
+    to define custom rules for specific events and actions.
+    Implements the custom behaviour for the "im.vector.room.access_rules" state event.
+
+    Takes a config in the format:
+
+    third_party_event_rules:
+        module: third_party_rules.RoomAccessRules
+        config:
+            # List of domains (server names) that can't be invited to rooms if the
+            # "restricted" rule is set. Defaults to an empty list.
+            domains_forbidden_when_restricted: []
+
+            # Identity server to use when checking the HS an email address belongs to
+            # using the /info endpoint. Required.
+            id_server: "vector.im"
+
+    Don't forget to consider if you can invite users from your own domain.
+    """
+
+    def __init__(self, config, http_client):
+        self.http_client = http_client
+
+        self.id_server = config["id_server"]
+
+        self.domains_forbidden_when_restricted = config.get(
+            "domains_forbidden_when_restricted", []
+        )
+
+    @staticmethod
+    def parse_config(config):
+        if "id_server" in config:
+            return config
+        else:
+            raise ConfigError("No IS for event rules TchapEventRules")
+
+    def on_create_room(self, requester, config, is_requester_admin):
+        """Implements synapse.events.ThirdPartyEventRules.on_create_room
+
+        Checks if a im.vector.room.access_rules event is being set during room creation.
+        If yes, make sure the event is correct. Otherwise, append an event with the
+        default rule to the initial state.
+        """
+        is_direct = config.get("is_direct")
+        preset = config.get("preset")
+        access_rule = None
+        join_rule = None
+
+        # If there's a rules event in the initial state, check if it complies with the
+        # spec for im.vector.room.access_rules and deny the request if not.
+        for event in config.get("initial_state", []):
+            if event["type"] == ACCESS_RULES_TYPE:
+                access_rule = event["content"].get("rule")
+
+                # Make sure the event has a valid content.
+                if access_rule is None:
+                    raise SynapseError(400, "Invalid access rule")
+
+                # Make sure the rule name is valid.
+                if access_rule not in VALID_ACCESS_RULES:
+                    raise SynapseError(400, "Invalid access rule")
+
+                # Make sure the rule is "direct" if the room is a direct chat.
+                if (is_direct and access_rule != ACCESS_RULE_DIRECT) or (
+                    access_rule == ACCESS_RULE_DIRECT and not is_direct
+                ):
+                    raise SynapseError(400, "Invalid access rule")
+
+            if event["type"] == EventTypes.JoinRules:
+                join_rule = event["content"].get("join_rule")
+
+        if access_rule is None:
+            # If there's no access rules event in the initial state, create one with the
+            # default setting.
+            if is_direct:
+                default_rule = ACCESS_RULE_DIRECT
+            else:
+                # If the default value for non-direct chat changes, we should make another
+                # case here for rooms created with either a "public" join_rule or the
+                # "public_chat" preset to make sure those keep defaulting to "restricted"
+                default_rule = ACCESS_RULE_RESTRICTED
+
+            if not config.get("initial_state"):
+                config["initial_state"] = []
+
+            config["initial_state"].append(
+                {
+                    "type": ACCESS_RULES_TYPE,
+                    "state_key": "",
+                    "content": {"rule": default_rule},
+                }
+            )
+
+            access_rule = default_rule
+
+        # Check that the preset or the join rule in use is compatible with the access
+        # rule, whether it's a user-defined one or the default one (i.e. if it involves
+        # a "public" join rule, the access rule must be "restricted").
+        if (
+            join_rule == JoinRules.PUBLIC or preset == RoomCreationPreset.PUBLIC_CHAT
+        ) and access_rule != ACCESS_RULE_RESTRICTED:
+            raise SynapseError(400, "Invalid access rule")
+
+        # Check if the creator can override values for the power levels.
+        allowed = self._is_power_level_content_allowed(
+            config.get("power_level_content_override", {}), access_rule
+        )
+        if not allowed:
+            raise SynapseError(400, "Invalid power levels content override")
+
+        # Second loop for events we need to know the current rule to process.
+        for event in config.get("initial_state", []):
+            if event["type"] == EventTypes.PowerLevels:
+                allowed = self._is_power_level_content_allowed(
+                    event["content"], access_rule
+                )
+                if not allowed:
+                    raise SynapseError(400, "Invalid power levels content")
+
+    @defer.inlineCallbacks
+    def check_threepid_can_be_invited(self, medium, address, state_events):
+        """Implements synapse.events.ThirdPartyEventRules.check_threepid_can_be_invited
+
+        Check if a threepid can be invited to the room via a 3PID invite given the current
+        rules and the threepid's address, by retrieving the HS it's mapped to from the
+        configured identity server, and checking if we can invite users from it.
+        """
+        rule = self._get_rule_from_state(state_events)
+
+        if medium != "email":
+            defer.returnValue(False)
+
+        if rule != ACCESS_RULE_RESTRICTED:
+            # Only "restricted" requires filtering 3PID invites. We don't need to do
+            # anything for "direct" here, because only "restricted" requires filtering
+            # based on the HS the address is mapped to.
+            defer.returnValue(True)
+
+        parsed_address = email.utils.parseaddr(address)[1]
+        if parsed_address != address:
+            # Avoid reproducing the security issue described here:
+            # https://matrix.org/blog/2019/04/18/security-update-sydent-1-0-2
+            # It's probably not worth it but let's just be overly safe here.
+            defer.returnValue(False)
+
+        # Get the HS this address belongs to from the identity server.
+        res = yield self.http_client.get_json(
+            "https://%s/_matrix/identity/api/v1/info" % (self.id_server,),
+            {"medium": medium, "address": address},
+        )
+
+        # Look for a domain that's not forbidden from being invited.
+        if not res.get("hs"):
+            defer.returnValue(False)
+        if res.get("hs") in self.domains_forbidden_when_restricted:
+            defer.returnValue(False)
+
+        defer.returnValue(True)
+
+    def check_event_allowed(self, event, state_events):
+        """Implements synapse.events.ThirdPartyEventRules.check_event_allowed
+
+        Checks the event's type and the current rule and calls the right function to
+        determine whether the event can be allowed.
+        """
+        if event.type == ACCESS_RULES_TYPE:
+            return self._on_rules_change(event, state_events)
+
+        # We need to know the rule to apply when processing the event types below.
+        rule = self._get_rule_from_state(state_events)
+
+        if event.type == EventTypes.PowerLevels:
+            return self._is_power_level_content_allowed(event.content, rule)
+
+        if event.type == EventTypes.Member or event.type == EventTypes.ThirdPartyInvite:
+            return self._on_membership_or_invite(event, rule, state_events)
+
+        if event.type == EventTypes.JoinRules:
+            return self._on_join_rule_change(event, rule)
+
+        if event.type == EventTypes.RoomAvatar:
+            return self._on_room_avatar_change(event, rule)
+
+        if event.type == EventTypes.Name:
+            return self._on_room_name_change(event, rule)
+
+        if event.type == EventTypes.Topic:
+            return self._on_room_topic_change(event, rule)
+
+        return True
+
+    def _on_rules_change(self, event, state_events):
+        """Implement the checks and behaviour specified on allowing or forbidding a new
+        im.vector.room.access_rules event.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            state_events (dict[tuple[event type, state key], EventBase]): The state of the
+                room before the event was sent.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        new_rule = event.content.get("rule")
+
+        # Check for invalid values.
+        if new_rule not in VALID_ACCESS_RULES:
+            return False
+
+        # We must not allow rooms with the "public" join rule to be given any other access
+        # rule than "restricted".
+        join_rule = self._get_join_rule_from_state(state_events)
+        if join_rule == JoinRules.PUBLIC and new_rule != ACCESS_RULE_RESTRICTED:
+            return False
+
+        # Make sure we don't apply "direct" if the room has more than two members.
+        if new_rule == ACCESS_RULE_DIRECT:
+            existing_members, threepid_tokens = self._get_members_and_tokens_from_state(
+                state_events
+            )
+
+            if len(existing_members) > 2 or len(threepid_tokens) > 1:
+                return False
+
+        prev_rules_event = state_events.get((ACCESS_RULES_TYPE, ""))
+
+        # Now that we know the new rule doesn't break the "direct" case, we can allow any
+        # new rule in rooms that had none before.
+        if prev_rules_event is None:
+            return True
+
+        prev_rule = prev_rules_event.content.get("rule")
+
+        # Currently, we can only go from "restricted" to "unrestricted".
+        if prev_rule == ACCESS_RULE_RESTRICTED and new_rule == ACCESS_RULE_UNRESTRICTED:
+            return True
+
+        return False
+
+    def _on_membership_or_invite(self, event, rule, state_events):
+        """Applies the correct rule for incoming m.room.member and
+        m.room.third_party_invite events.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            rule (str): The name of the rule to apply.
+            state_events (dict[tuple[event type, state key], EventBase]): The state of the
+                room before the event was sent.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        if rule == ACCESS_RULE_RESTRICTED:
+            ret = self._on_membership_or_invite_restricted(event)
+        elif rule == ACCESS_RULE_UNRESTRICTED:
+            ret = self._on_membership_or_invite_unrestricted()
+        elif rule == ACCESS_RULE_DIRECT:
+            ret = self._on_membership_or_invite_direct(event, state_events)
+        else:
+            # We currently apply the default (restricted) if we don't know the rule, we
+            # might want to change that in the future.
+            ret = self._on_membership_or_invite_restricted(event)
+
+        return ret
+
+    def _on_membership_or_invite_restricted(self, event):
+        """Implements the checks and behaviour specified for the "restricted" rule.
+
+        "restricted" currently means that users can only invite users if their server is
+        included in a limited list of domains.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        # We're not applying the rules on m.room.third_party_member events here because
+        # the filtering on threepids is done in check_threepid_can_be_invited, which is
+        # called before check_event_allowed.
+        if event.type == EventTypes.ThirdPartyInvite:
+            return True
+
+        # We only need to process "join" and "invite" memberships, in order to be backward
+        # compatible, e.g. if a user from a blacklisted server joined a restricted room
+        # before the rules started being enforced on the server, that user must be able to
+        # leave it.
+        if event.membership not in [Membership.JOIN, Membership.INVITE]:
+            return True
+
+        invitee_domain = get_domain_from_id(event.state_key)
+        return invitee_domain not in self.domains_forbidden_when_restricted
+
+    def _on_membership_or_invite_unrestricted(self):
+        """Implements the checks and behaviour specified for the "unrestricted" rule.
+
+        "unrestricted" currently means that every event is allowed.
+
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        return True
+
+    def _on_membership_or_invite_direct(self, event, state_events):
+        """Implements the checks and behaviour specified for the "direct" rule.
+
+        "direct" currently means that no member is allowed apart from the two initial
+        members the room was created for (i.e. the room's creator and their first
+        invitee).
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            state_events (dict[tuple[event type, state key], EventBase]): The state of the
+                room before the event was sent.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        # Get the room memberships and 3PID invite tokens from the room's state.
+        existing_members, threepid_tokens = self._get_members_and_tokens_from_state(
+            state_events
+        )
+
+        # There should never be more than one 3PID invite in the room state: if the second
+        # original user came and left, and we're inviting them using their email address,
+        # given we know they have a Matrix account binded to the address (so they could
+        # join the first time), Synapse will successfully look it up before attempting to
+        # store an invite on the IS.
+        if len(threepid_tokens) == 1 and event.type == EventTypes.ThirdPartyInvite:
+            # If we already have a 3PID invite in flight, don't accept another one, unless
+            # the new one has the same invite token as its state key. This is because 3PID
+            # invite revocations must be allowed, and a revocation is basically a new 3PID
+            # invite event with an empty content and the same token as the invite it
+            # revokes.
+            return event.state_key in threepid_tokens
+
+        if len(existing_members) == 2:
+            # If the user was within the two initial user of the room, Synapse would have
+            # looked it up successfully and thus sent a m.room.member here instead of
+            # m.room.third_party_invite.
+            if event.type == EventTypes.ThirdPartyInvite:
+                return False
+
+            # We can only have m.room.member events here. The rule in this case is to only
+            # allow the event if its target is one of the initial two members in the room,
+            # i.e. the state key of one of the two m.room.member states in the room.
+            return event.state_key in existing_members
+
+        # We're alone in the room (and always have been) and there's one 3PID invite in
+        # flight.
+        if len(existing_members) == 1 and len(threepid_tokens) == 1:
+            # We can only have m.room.member events here. In this case, we can only allow
+            # the event if it's either a m.room.member from the joined user (we can assume
+            # that the only m.room.member event is a join otherwise we wouldn't be able to
+            # send an event to the room) or an an invite event which target is the invited
+            # user.
+            target = event.state_key
+            is_from_threepid_invite = self._is_invite_from_threepid(
+                event, threepid_tokens[0]
+            )
+            if is_from_threepid_invite or target == existing_members[0]:
+                return True
+
+            return False
+
+        return True
+
+    def _is_power_level_content_allowed(self, content, access_rule):
+        """Check if a given power levels event is permitted under the given access rule.
+
+        It shouldn't be allowed if it either changes the default PL to a non-0 value or
+        gives a non-0 PL to a user that would have been forbidden from joining the room
+        under a more restrictive access rule.
+
+        Args:
+            content (dict[]): The content of the m.room.power_levels event to check.
+            access_rule (str): The access rule in place in this room.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        # Check if we need to apply the restrictions with the current rule.
+        if access_rule not in RULES_WITH_RESTRICTED_POWER_LEVELS:
+            return True
+
+        # If users_default is explicitly set to a non-0 value, deny the event.
+        users_default = content.get("users_default", 0)
+        if users_default:
+            return False
+
+        users = content.get("users", {})
+        for user_id, power_level in users.items():
+            server_name = get_domain_from_id(user_id)
+            # Check the domain against the blacklist. If found, and the PL isn't 0, deny
+            # the event.
+            if (
+                server_name in self.domains_forbidden_when_restricted
+                and power_level != 0
+            ):
+                return False
+
+        return True
+
+    def _on_join_rule_change(self, event, rule):
+        """Check whether a join rule change is allowed. A join rule change is always
+        allowed unless the new join rule is "public" and the current access rule isn't
+        "restricted".
+        The rationale is that external users (those whose server would be denied access
+        to rooms enforcing the "restricted" access rule) should always rely on non-
+        external users for access to rooms, therefore they shouldn't be able to access
+        rooms that don't require an invite to be joined.
+
+        Note that we currently rely on the default access rule being "restricted": during
+        room creation, the m.room.join_rules event will be sent *before* the
+        im.vector.room.access_rules one, so the access rule that will be considered here
+        in this case will be the default "restricted" one. This is fine since the
+        "restricted" access rule allows any value for the join rule, but we should keep
+        that in mind if we need to change the default access rule in the future.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            rule (str): The name of the rule to apply.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        if event.content.get("join_rule") == JoinRules.PUBLIC:
+            return rule == ACCESS_RULE_RESTRICTED
+
+        return True
+
+    def _on_room_avatar_change(self, event, rule):
+        """Check whether a change of room avatar is allowed.
+        The current rule is to forbid such a change in direct chats but allow it
+        everywhere else.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            rule (str): The name of the rule to apply.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        return rule != ACCESS_RULE_DIRECT
+
+    def _on_room_name_change(self, event, rule):
+        """Check whether a change of room name is allowed.
+        The current rule is to forbid such a change in direct chats but allow it
+        everywhere else.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            rule (str): The name of the rule to apply.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        return rule != ACCESS_RULE_DIRECT
+
+    def _on_room_topic_change(self, event, rule):
+        """Check whether a change of room topic is allowed.
+        The current rule is to forbid such a change in direct chats but allow it
+        everywhere else.
+
+        Args:
+            event (synapse.events.EventBase): The event to check.
+            rule (str): The name of the rule to apply.
+        Returns:
+            bool, True if the event can be allowed, False otherwise.
+        """
+        return rule != ACCESS_RULE_DIRECT
+
+    @staticmethod
+    def _get_rule_from_state(state_events):
+        """Extract the rule to be applied from the given set of state events.
+
+        Args:
+            state_events (dict[tuple[event type, state key], EventBase]): The set of state
+                events.
+        Returns:
+            str, the name of the rule (either "direct", "restricted" or "unrestricted")
+        """
+        access_rules = state_events.get((ACCESS_RULES_TYPE, ""))
+        if access_rules is None:
+            rule = ACCESS_RULE_RESTRICTED
+        else:
+            rule = access_rules.content.get("rule")
+        return rule
+
+    @staticmethod
+    def _get_join_rule_from_state(state_events):
+        """Extract the room's join rule from the given set of state events.
+
+        Args:
+            state_events (dict[tuple[event type, state key], EventBase]): The set of state
+                events.
+        Returns:
+            str, the name of the join rule (either "public", or "invite")
+        """
+        join_rule_event = state_events.get((EventTypes.JoinRules, ""))
+        if join_rule_event is None:
+            return None
+        return join_rule_event.content.get("join_rule")
+
+    @staticmethod
+    def _get_members_and_tokens_from_state(state_events):
+        """Retrieves from a list of state events the list of users that have a
+        m.room.member event in the room, and the tokens of 3PID invites in the room.
+
+        Args:
+            state_events (dict[tuple[event type, state key], EventBase]): The set of state
+                events.
+        Returns:
+            existing_members (list[str]): List of targets of the m.room.member events in
+                the state.
+            threepid_invite_tokens (list[str]): List of tokens of the 3PID invites in the
+                state.
+        """
+        existing_members = []
+        threepid_invite_tokens = []
+        for key, state_event in state_events.items():
+            if key[0] == EventTypes.Member and state_event.content:
+                existing_members.append(state_event.state_key)
+            if key[0] == EventTypes.ThirdPartyInvite and state_event.content:
+                # Don't include revoked invites.
+                threepid_invite_tokens.append(state_event.state_key)
+
+        return existing_members, threepid_invite_tokens
+
+    @staticmethod
+    def _is_invite_from_threepid(invite, threepid_invite_token):
+        """Checks whether the given invite follows the given 3PID invite.
+
+        Args:
+             invite (EventBase): The m.room.member event with "invite" membership.
+             threepid_invite_token (str): The state key from the 3PID invite.
+        """
+        token = (
+            invite.content.get("third_party_invite", {})
+            .get("signed", {})
+            .get("token", "")
+        )
+
+        return token == threepid_invite_token
diff --git a/synapse/types.py b/synapse/types.py
index 51eadb6ad4..94c01b0a18 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -16,6 +16,8 @@ import re
 import string
 from collections import namedtuple
 
+from six.moves import filter
+
 import attr
 
 from synapse.api.errors import SynapseError
@@ -235,6 +237,19 @@ def contains_invalid_mxid_characters(localpart):
     return any(c not in mxid_localpart_allowed_characters for c in localpart)
 
 
+def strip_invalid_mxid_characters(localpart):
+    """Removes any invalid characters from an mxid
+
+    Args:
+        localpart (basestring): the localpart to be stripped
+
+    Returns:
+        localpart (basestring): the localpart having been stripped
+    """
+    filtered = filter(lambda c: c in mxid_localpart_allowed_characters, localpart)
+    return "".join(filtered)
+
+
 UPPER_CASE_PATTERN = re.compile(b"[A-Z_]")
 
 # the following is a pattern which matches '=', and bytes which are not allowed in a mxid
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 982c6d81ca..6a2464cab3 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,12 +15,15 @@
 # limitations under the License.
 
 import random
+import re
 import string
 
 import six
 from six import PY2, PY3
 from six.moves import range
 
+from synapse.api.errors import Codes, SynapseError
+
 _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
 
 # random_string and random_string_with_symbols are used for a range of things,
@@ -27,6 +31,8 @@ _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
 # we get cryptographically-secure randoms.
 rand = random.SystemRandom()
 
+client_secret_regex = re.compile(r"^[0-9a-zA-Z.=_-]+$")
+
 
 def random_string(length):
     return "".join(rand.choice(string.ascii_letters) for _ in range(length))
@@ -109,3 +115,11 @@ def exception_to_unicode(e):
         return msg.decode("utf-8", errors="replace")
     else:
         return msg
+
+
+def assert_valid_client_secret(client_secret):
+    """Validate that a given string matches the client_secret regex defined by the spec"""
+    if client_secret_regex.match(client_secret) is None:
+        raise SynapseError(
+            400, "Invalid client_secret parameter", errcode=Codes.INVALID_PARAM
+        )
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 3ec1dfb0c2..34ce7cac16 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -16,11 +16,14 @@
 import logging
 import re
 
+from twisted.internet import defer
+
 logger = logging.getLogger(__name__)
 
 
+@defer.inlineCallbacks
 def check_3pid_allowed(hs, medium, address):
-    """Checks whether a given format of 3PID is allowed to be used on this HS
+    """Checks whether a given 3PID is allowed to be used on this HS
 
     Args:
         hs (synapse.server.HomeServer): server
@@ -28,9 +31,36 @@ def check_3pid_allowed(hs, medium, address):
         address (str): address within that medium (e.g. "wotan@matrix.org")
             msisdns need to first have been canonicalised
     Returns:
-        bool: whether the 3PID medium/address is allowed to be added to this HS
+        defered bool: whether the 3PID medium/address is allowed to be added to this HS
     """
 
+    if hs.config.check_is_for_allowed_local_3pids:
+        data = yield hs.get_simple_http_client().get_json(
+            "https://%s%s"
+            % (
+                hs.config.check_is_for_allowed_local_3pids,
+                "/_matrix/identity/api/v1/internal-info",
+            ),
+            {"medium": medium, "address": address},
+        )
+
+        # Check for invalid response
+        if "hs" not in data and "shadow_hs" not in data:
+            defer.returnValue(False)
+
+        # Check if this user is intended to register for this homeserver
+        if (
+            data.get("hs") != hs.config.server_name
+            and data.get("shadow_hs") != hs.config.server_name
+        ):
+            defer.returnValue(False)
+
+        if data.get("requires_invite", False) and not data.get("invited", False):
+            # Requires an invite but hasn't been invited
+            defer.returnValue(False)
+
+        defer.returnValue(True)
+
     if hs.config.allowed_local_3pids:
         for constraint in hs.config.allowed_local_3pids:
             logger.debug(
@@ -43,8 +73,8 @@ def check_3pid_allowed(hs, medium, address):
             if medium == constraint["medium"] and re.match(
                 constraint["pattern"], address
             ):
-                return True
+                defer.returnValue(True)
     else:
-        return True
+        defer.returnValue(True)
 
-    return False
+    defer.returnValue(False)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index bf0f1eebd8..a19011b793 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -43,7 +43,12 @@ MEMBERSHIP_PRIORITY = (
 
 @defer.inlineCallbacks
 def filter_events_for_client(
-    store, user_id, events, is_peeking=False, always_include_ids=frozenset()
+    store,
+    user_id,
+    events,
+    is_peeking=False,
+    always_include_ids=frozenset(),
+    apply_retention_policies=True,
 ):
     """
     Check which events a user is allowed to see
@@ -59,6 +64,10 @@ def filter_events_for_client(
             events
         always_include_ids (set(event_id)): set of event ids to specifically
             include (unless sender is ignored)
+        apply_retention_policies (bool): Whether to filter out events that's older than
+            allowed by the room's retention policy. Useful when this function is called
+            to e.g. check whether a user should be allowed to see the state at a given
+            event rather than to know if it should send an event to a user's client(s).
 
     Returns:
         Deferred[list[synapse.events.EventBase]]
@@ -86,6 +95,15 @@ def filter_events_for_client(
 
     erased_senders = yield store.are_users_erased((e.sender for e in events))
 
+    if apply_retention_policies:
+        room_ids = set(e.room_id for e in events)
+        retention_policies = {}
+
+        for room_id in room_ids:
+            retention_policies[room_id] = (
+                yield store.get_retention_policy_for_room(room_id)
+            )
+
     def allowed(event):
         """
         Args:
@@ -103,6 +121,18 @@ def filter_events_for_client(
         if not event.is_state() and event.sender in ignore_list:
             return None
 
+        # Don't try to apply the room's retention policy if the event is a state event, as
+        # MSC1763 states that retention is only considered for non-state events.
+        if apply_retention_policies and not event.is_state():
+            retention_policy = retention_policies[event.room_id]
+            max_lifetime = retention_policy.get("max_lifetime")
+
+            if max_lifetime is not None:
+                oldest_allowed_ts = store.clock.time_msec() - max_lifetime
+
+                if event.origin_server_ts < oldest_allowed_ts:
+                    return None
+
         if event.event_id in always_include_ids:
             return event