summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py4
-rw-r--r--synapse/_scripts/register_new_matrix_user.py6
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/errors.py2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/app/homeserver.py6
-rw-r--r--synapse/appservice/api.py2
-rw-r--r--synapse/config/appservice.py2
-rw-r--r--synapse/config/captcha.py4
-rw-r--r--synapse/config/emailconfig.py21
-rw-r--r--synapse/config/registration.py7
-rw-r--r--synapse/config/room_directory.py2
-rw-r--r--synapse/config/server.py195
-rw-r--r--synapse/events/snapshot.py159
-rw-r--r--synapse/events/validator.py100
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/federation/federation_server.py26
-rw-r--r--synapse/federation/persistence.py4
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/transaction_manager.py4
-rw-r--r--synapse/federation/transport/__init__.py4
-rw-r--r--synapse/federation/transport/client.py6
-rw-r--r--synapse/federation/transport/server.py8
-rw-r--r--synapse/handlers/auth.py88
-rw-r--r--synapse/handlers/deactivate_account.py3
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/handlers/e2e_room_keys.py130
-rw-r--r--synapse/handlers/federation.py100
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/pagination.py112
-rw-r--r--synapse/handlers/profile.py6
-rw-r--r--synapse/handlers/register.py52
-rw-r--r--synapse/handlers/room.py36
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/handlers/room_member_worker.py5
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/http/matrixfederationclient.py2
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/logging/_structured.py14
-rw-r--r--synapse/logging/_terse_json.py109
-rw-r--r--synapse/push/httppusher.py5
-rw-r--r--synapse/push/mailer.py4
-rw-r--r--synapse/replication/http/__init__.py10
-rw-r--r--synapse/replication/http/devices.py73
-rw-r--r--synapse/replication/http/membership.py7
-rw-r--r--synapse/replication/http/register.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py7
-rw-r--r--synapse/rest/admin/__init__.py567
-rw-r--r--synapse/rest/admin/groups.py46
-rw-r--r--synapse/rest/admin/rooms.py157
-rw-r--r--synapse/rest/admin/users.py406
-rw-r--r--synapse/rest/client/v1/login.py111
-rw-r--r--synapse/rest/client/v1/room.py2
-rw-r--r--synapse/rest/client/v2_alpha/account.py5
-rw-r--r--synapse/rest/client/v2_alpha/room_keys.py8
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py27
-rw-r--r--synapse/rest/media/v1/thumbnailer.py5
-rw-r--r--synapse/server.py4
-rw-r--r--synapse/server_notices/consent_server_notices.py2
-rw-r--r--synapse/state/__init__.py175
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py62
-rw-r--r--synapse/storage/data_stores/main/account_data.py6
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py19
-rw-r--r--synapse/storage/data_stores/main/e2e_room_keys.py226
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py6
-rw-r--r--synapse/storage/data_stores/main/events.py360
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py70
-rw-r--r--synapse/storage/data_stores/main/filtering.py2
-rw-r--r--synapse/storage/data_stores/main/group_server.py15
-rw-r--r--synapse/storage/data_stores/main/media_repository.py6
-rw-r--r--synapse/storage/data_stores/main/receipts.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py39
-rw-r--r--synapse/storage/data_stores/main/room.py251
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql17
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql17
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/room_retention.sql33
-rw-r--r--synapse/storage/data_stores/main/state.py31
-rw-r--r--synapse/storage/data_stores/main/stream.py42
-rw-r--r--synapse/storage/data_stores/main/tags.py4
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/purge_events.py117
-rw-r--r--synapse/streams/config.py9
-rw-r--r--synapse/util/caches/descriptors.py48
-rw-r--r--synapse/util/httpresourcetree.py2
-rw-r--r--synapse/visibility.py32
87 files changed, 3016 insertions, 1315 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 8587ffa76f..f99de2f3f3 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-""" This is a reference implementation of a Matrix home server.
+""" This is a reference implementation of a Matrix homeserver.
 """
 
 import os
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.5.0"
+__version__ = "1.6.1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index bdcd915bbe..d528450c78 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -144,8 +144,8 @@ def main():
     logging.captureWarnings(True)
 
     parser = argparse.ArgumentParser(
-        description="Used to register new users with a given home server when"
-        " registration has been disabled. The home server must be"
+        description="Used to register new users with a given homeserver when"
+        " registration has been disabled. The homeserver must be"
         " configured with the 'registration_shared_secret' option"
         " set."
     )
@@ -202,7 +202,7 @@ def main():
         "server_url",
         default="https://localhost:8448",
         nargs="?",
-        help="URL to use to talk to the home server. Defaults to "
+        help="URL to use to talk to the homeserver. Defaults to "
         " 'https://localhost:8448'.",
     )
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 312acff3d6..cf08a1cd55 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -95,6 +95,8 @@ class EventTypes(object):
     ServerACL = "m.room.server_acl"
     Pinned = "m.room.pinned_events"
 
+    Retention = "m.room.retention"
+
 
 class RejectedReason(object):
     AUTH_ERROR = "auth_error"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index cca92c34ba..5853a54c95 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -457,7 +457,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
 
 
 class FederationError(RuntimeError):
-    """  This class is used to inform remote home servers about erroneous
+    """  This class is used to inform remote homeservers about erroneous
     PDUs they sent us.
 
     FATAL: The remote server could not interpret the source event.
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 139221ad34..448e45e00f 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
+        sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 00a7f8330e..883b3fb70b 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -585,7 +585,7 @@ def run(hs):
     def performance_stats_init():
         _stats_process.clear()
         _stats_process.append(
-            (int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF)))
+            (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
         )
 
     def start_phone_stats_home():
@@ -636,7 +636,7 @@ def run(hs):
 
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
-        clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
+        clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
 
         # We need to defer this init for the cases that we daemonize
         # otherwise the process ID we get is that of the non-daemon process
@@ -644,7 +644,7 @@ def run(hs):
 
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
-        clock.call_later(5 * 60, start_phone_stats_home, hs, stats)
+        clock.call_later(5 * 60, start_phone_stats_home)
 
     _base.start_reactor(
         "synapse-homeserver",
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 3e25bf5747..57174da021 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient):
 
                 if not _is_valid_3pe_metadata(info):
                     logger.warning(
-                        "query_3pe_protocol to %s did not return a" " valid result", uri
+                        "query_3pe_protocol to %s did not return a valid result", uri
                     )
                     return None
 
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index e77d3387ff..ca43e96bd1 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
             for regex_obj in as_info["namespaces"][ns]:
                 if not isinstance(regex_obj, dict):
                     raise ValueError(
-                        "Expected namespace entry in %s to be an object," " but got %s",
+                        "Expected namespace entry in %s to be an object, but got %s",
                         ns,
                         regex_obj,
                     )
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 44bd5c6799..f0171bb5b2 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -35,11 +35,11 @@ class CaptchaConfig(Config):
         ## Captcha ##
         # See docs/CAPTCHA_SETUP for full details of configuring this.
 
-        # This Home Server's ReCAPTCHA public key.
+        # This homeserver's ReCAPTCHA public key.
         #
         #recaptcha_public_key: "YOUR_PUBLIC_KEY"
 
-        # This Home Server's ReCAPTCHA private key.
+        # This homeserver's ReCAPTCHA private key.
         #
         #recaptcha_private_key: "YOUR_PRIVATE_KEY"
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 39e7a1dddb..18f42a87f9 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -146,6 +146,8 @@ class EmailConfig(Config):
                 if k not in email_config:
                     missing.append("email." + k)
 
+            # public_baseurl is required to build password reset and validation links that
+            # will be emailed to users
             if config.get("public_baseurl") is None:
                 missing.append("public_baseurl")
 
@@ -305,8 +307,23 @@ class EmailConfig(Config):
         #   smtp_user: "exampleusername"
         #   smtp_pass: "examplepassword"
         #   require_transport_security: false
-        #   notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
-        #   app_name: Matrix
+        #
+        #   # notif_from defines the "From" address to use when sending emails.
+        #   # It must be set if email sending is enabled.
+        #   #
+        #   # The placeholder '%(app)s' will be replaced by the application name,
+        #   # which is normally 'app_name' (below), but may be overridden by the
+        #   # Matrix client application.
+        #   #
+        #   # Note that the placeholder must be written '%(app)s', including the
+        #   # trailing 's'.
+        #   #
+        #   notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
+        #
+        #   # app_name defines the default value for '%(app)s' in notif_from. It
+        #   # defaults to 'Matrix'.
+        #   #
+        #   #app_name: my_branded_matrix_server
         #
         #   # Enable email notifications by default
         #   #
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 1f6dac69da..ee9614c5f7 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -106,6 +106,13 @@ class RegistrationConfig(Config):
         account_threepid_delegates = config.get("account_threepid_delegates") or {}
         self.account_threepid_delegate_email = account_threepid_delegates.get("email")
         self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
+        if self.account_threepid_delegate_msisdn and not self.public_baseurl:
+            raise ConfigError(
+                "The configuration option `public_baseurl` is required if "
+                "`account_threepid_delegate.msisdn` is set, such that "
+                "clients know where to submit validation tokens to. Please "
+                "configure `public_baseurl`."
+            )
 
         self.default_identity_server = config.get("default_identity_server")
         self.allow_guest_access = config.get("allow_guest_access", False)
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 7c9f05bde4..7ac7699676 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
             self.action = action
         else:
             raise ConfigError(
-                "%s rules can only have action of 'allow'" " or 'deny'" % (option_name,)
+                "%s rules can only have action of 'allow' or 'deny'" % (option_name,)
             )
 
         self._alias_matches_all = alias == "*"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index d556df308d..7a9d711669 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -19,7 +19,7 @@ import logging
 import os.path
 import re
 from textwrap import indent
-from typing import List
+from typing import Dict, List, Optional
 
 import attr
 import yaml
@@ -41,7 +41,7 @@ logger = logging.Logger(__name__)
 # in the list.
 DEFAULT_BIND_ADDRESSES = ["::", "0.0.0.0"]
 
-DEFAULT_ROOM_VERSION = "4"
+DEFAULT_ROOM_VERSION = "5"
 
 ROOM_COMPLEXITY_TOO_GREAT = (
     "Your homeserver is unable to join rooms this large or complex. "
@@ -223,7 +223,7 @@ class ServerConfig(Config):
             self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
         except Exception as e:
             raise ConfigError(
-                "Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e
+                "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
             )
 
         if self.public_baseurl is not None:
@@ -246,6 +246,124 @@ class ServerConfig(Config):
         # events with profile information that differ from the target's global profile.
         self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
 
+        retention_config = config.get("retention")
+        if retention_config is None:
+            retention_config = {}
+
+        self.retention_enabled = retention_config.get("enabled", False)
+
+        retention_default_policy = retention_config.get("default_policy")
+
+        if retention_default_policy is not None:
+            self.retention_default_min_lifetime = retention_default_policy.get(
+                "min_lifetime"
+            )
+            if self.retention_default_min_lifetime is not None:
+                self.retention_default_min_lifetime = self.parse_duration(
+                    self.retention_default_min_lifetime
+                )
+
+            self.retention_default_max_lifetime = retention_default_policy.get(
+                "max_lifetime"
+            )
+            if self.retention_default_max_lifetime is not None:
+                self.retention_default_max_lifetime = self.parse_duration(
+                    self.retention_default_max_lifetime
+                )
+
+            if (
+                self.retention_default_min_lifetime is not None
+                and self.retention_default_max_lifetime is not None
+                and (
+                    self.retention_default_min_lifetime
+                    > self.retention_default_max_lifetime
+                )
+            ):
+                raise ConfigError(
+                    "The default retention policy's 'min_lifetime' can not be greater"
+                    " than its 'max_lifetime'"
+                )
+        else:
+            self.retention_default_min_lifetime = None
+            self.retention_default_max_lifetime = None
+
+        self.retention_allowed_lifetime_min = retention_config.get(
+            "allowed_lifetime_min"
+        )
+        if self.retention_allowed_lifetime_min is not None:
+            self.retention_allowed_lifetime_min = self.parse_duration(
+                self.retention_allowed_lifetime_min
+            )
+
+        self.retention_allowed_lifetime_max = retention_config.get(
+            "allowed_lifetime_max"
+        )
+        if self.retention_allowed_lifetime_max is not None:
+            self.retention_allowed_lifetime_max = self.parse_duration(
+                self.retention_allowed_lifetime_max
+            )
+
+        if (
+            self.retention_allowed_lifetime_min is not None
+            and self.retention_allowed_lifetime_max is not None
+            and self.retention_allowed_lifetime_min
+            > self.retention_allowed_lifetime_max
+        ):
+            raise ConfigError(
+                "Invalid retention policy limits: 'allowed_lifetime_min' can not be"
+                " greater than 'allowed_lifetime_max'"
+            )
+
+        self.retention_purge_jobs = []  # type: List[Dict[str, Optional[int]]]
+        for purge_job_config in retention_config.get("purge_jobs", []):
+            interval_config = purge_job_config.get("interval")
+
+            if interval_config is None:
+                raise ConfigError(
+                    "A retention policy's purge jobs configuration must have the"
+                    " 'interval' key set."
+                )
+
+            interval = self.parse_duration(interval_config)
+
+            shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
+
+            if shortest_max_lifetime is not None:
+                shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
+
+            longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
+
+            if longest_max_lifetime is not None:
+                longest_max_lifetime = self.parse_duration(longest_max_lifetime)
+
+            if (
+                shortest_max_lifetime is not None
+                and longest_max_lifetime is not None
+                and shortest_max_lifetime > longest_max_lifetime
+            ):
+                raise ConfigError(
+                    "A retention policy's purge jobs configuration's"
+                    " 'shortest_max_lifetime' value can not be greater than its"
+                    " 'longest_max_lifetime' value."
+                )
+
+            self.retention_purge_jobs.append(
+                {
+                    "interval": interval,
+                    "shortest_max_lifetime": shortest_max_lifetime,
+                    "longest_max_lifetime": longest_max_lifetime,
+                }
+            )
+
+        if not self.retention_purge_jobs:
+            self.retention_purge_jobs = [
+                {
+                    "interval": self.parse_duration("1d"),
+                    "shortest_max_lifetime": None,
+                    "longest_max_lifetime": None,
+                }
+            ]
+
         self.listeners = []  # type: List[dict]
         for listener in config.get("listeners", []):
             if not isinstance(listener.get("port", None), int):
@@ -721,7 +839,7 @@ class ServerConfig(Config):
         # Used by phonehome stats to group together related servers.
         #server_context: context
 
-        # Resource-constrained Homeserver Settings
+        # Resource-constrained homeserver Settings
         #
         # If limit_remote_rooms.enabled is True, the room complexity will be
         # checked before a user joins a new remote room. If it is above
@@ -761,6 +879,69 @@ class ServerConfig(Config):
         # Defaults to `28d`. Set to `null` to disable clearing out of old rows.
         #
         #user_ips_max_age: 14d
+
+        # Message retention policy at the server level.
+        #
+        # Room admins and mods can define a retention period for their rooms using the
+        # 'm.room.retention' state event, and server admins can cap this period by setting
+        # the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
+        #
+        # If this feature is enabled, Synapse will regularly look for and purge events
+        # which are older than the room's maximum retention period. Synapse will also
+        # filter events received over federation so that events that should have been
+        # purged are ignored and not stored again.
+        #
+        retention:
+          # The message retention policies feature is disabled by default. Uncomment the
+          # following line to enable it.
+          #
+          #enabled: true
+
+          # Default retention policy. If set, Synapse will apply it to rooms that lack the
+          # 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
+          # matter much because Synapse doesn't take it into account yet.
+          #
+          #default_policy:
+          #  min_lifetime: 1d
+          #  max_lifetime: 1y
+
+          # Retention policy limits. If set, a user won't be able to send a
+          # 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
+          # that's not within this range. This is especially useful in closed federations,
+          # in which server admins can make sure every federating server applies the same
+          # rules.
+          #
+          #allowed_lifetime_min: 1d
+          #allowed_lifetime_max: 1y
+
+          # Server admins can define the settings of the background jobs purging the
+          # events which lifetime has expired under the 'purge_jobs' section.
+          #
+          # If no configuration is provided, a single job will be set up to delete expired
+          # events in every room daily.
+          #
+          # Each job's configuration defines which range of message lifetimes the job
+          # takes care of. For example, if 'shortest_max_lifetime' is '2d' and
+          # 'longest_max_lifetime' is '3d', the job will handle purging expired events in
+          # rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
+          # lower than or equal to 3 days. Both the minimum and the maximum value of a
+          # range are optional, e.g. a job with no 'shortest_max_lifetime' and a
+          # 'longest_max_lifetime' of '3d' will handle every room with a retention policy
+          # which 'max_lifetime' is lower than or equal to three days.
+          #
+          # The rationale for this per-job configuration is that some rooms might have a
+          # retention policy with a low 'max_lifetime', where history needs to be purged
+          # of outdated messages on a very frequent basis (e.g. every 5min), but not want
+          # that purge to be performed by a job that's iterating over every room it knows,
+          # which would be quite heavy on the server.
+          #
+          #purge_jobs:
+          #  - shortest_max_lifetime: 1d
+          #    longest_max_lifetime: 3d
+          #    interval: 5m:
+          #  - shortest_max_lifetime: 3d
+          #    longest_max_lifetime: 1y
+          #    interval: 24h
         """
             % locals()
         )
@@ -781,20 +962,20 @@ class ServerConfig(Config):
             "--daemonize",
             action="store_true",
             default=None,
-            help="Daemonize the home server",
+            help="Daemonize the homeserver",
         )
         server_group.add_argument(
             "--print-pidfile",
             action="store_true",
             default=None,
-            help="Print the path to the pidfile just" " before daemonizing",
+            help="Print the path to the pidfile just before daemonizing",
         )
         server_group.add_argument(
             "--manhole",
             metavar="PORT",
             dest="manhole",
             type=int,
-            help="Turn on the twisted telnet manhole" " service on the given port.",
+            help="Turn on the twisted telnet manhole service on the given port.",
         )
 
 
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index a269de5482..64e898f40c 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -12,6 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import Dict, Optional, Tuple, Union
+
 from six import iteritems
 
 import attr
@@ -19,54 +21,113 @@ from frozendict import frozendict
 
 from twisted.internet import defer
 
+from synapse.appservice import ApplicationService
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 
 
 @attr.s(slots=True)
 class EventContext:
     """
+    Holds information relevant to persisting an event
+
     Attributes:
-        state_group (int|None): state group id, if the state has been stored
-            as a state group. This is usually only None if e.g. the event is
-            an outlier.
-        rejected (bool|str): A rejection reason if the event was rejected, else
-            False
-
-        prev_group (int): Previously persisted state group. ``None`` for an
-            outlier.
-        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
-            (type, state_key) -> event_id. ``None`` for an outlier.
-
-        app_service: FIXME
-
-        _current_state_ids (dict[(str, str), str]|None):
-            The current state map including the current event. None if outlier
-            or we haven't fetched the state from DB yet.
+        rejected: A rejection reason if the event was rejected, else False
+
+        _state_group: The ID of the state group for this event. Note that state events
+            are persisted with a state group which includes the new event, so this is
+            effectively the state *after* the event in question.
+
+            For a *rejected* state event, where the state of the rejected event is
+            ignored, this state_group should never make it into the
+            event_to_state_groups table. Indeed, inspecting this value for a rejected
+            state event is almost certainly incorrect.
+
+            For an outlier, where we don't have the state at the event, this will be
+            None.
+
+            Note that this is a private attribute: it should be accessed via
+            the ``state_group`` property.
+
+        state_group_before_event: The ID of the state group representing the state
+            of the room before this event.
+
+            If this is a non-state event, this will be the same as ``state_group``. If
+            it's a state event, it will be the same as ``prev_group``.
+
+            If ``state_group`` is None (ie, the event is an outlier),
+            ``state_group_before_event`` will always also be ``None``.
+
+        prev_group: If it is known, ``state_group``'s prev_group. Note that this being
+            None does not necessarily mean that ``state_group`` does not have
+            a prev_group!
+
+            If the event is a state event, this is normally the same as ``prev_group``.
+
+            If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
+            will always also be ``None``.
+
+            Note that this *not* (necessarily) the state group associated with
+            ``_prev_state_ids``.
+
+        delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
+            and ``state_group``.
+
+        app_service: If this event is being sent by a (local) application service, that
+            app service.
+
+        _current_state_ids: The room state map, including this event - ie, the state
+            in ``state_group``.
+
             (type, state_key) -> event_id
 
-        _prev_state_ids (dict[(str, str), str]|None):
-            The current state map excluding the current event. None if outlier
-            or we haven't fetched the state from DB yet.
+            FIXME: what is this for an outlier? it seems ill-defined. It seems like
+            it could be either {}, or the state we were given by the remote
+            server, depending on $THINGS
+
+            Note that this is a private attribute: it should be accessed via
+            ``get_current_state_ids``. _AsyncEventContext impl calculates this
+            on-demand: it will be None until that happens.
+
+        _prev_state_ids: The room state map, excluding this event - ie, the state
+            in ``state_group_before_event``. For a non-state
+            event, this will be the same as _current_state_events.
+
+            Note that it is a completely different thing to prev_group!
+
             (type, state_key) -> event_id
+
+            FIXME: again, what is this for an outlier?
+
+            As with _current_state_ids, this is a private attribute. It should be
+            accessed via get_prev_state_ids.
     """
 
-    state_group = attr.ib(default=None)
-    rejected = attr.ib(default=False)
-    prev_group = attr.ib(default=None)
-    delta_ids = attr.ib(default=None)
-    app_service = attr.ib(default=None)
+    rejected = attr.ib(default=False, type=Union[bool, str])
+    _state_group = attr.ib(default=None, type=Optional[int])
+    state_group_before_event = attr.ib(default=None, type=Optional[int])
+    prev_group = attr.ib(default=None, type=Optional[int])
+    delta_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
+    app_service = attr.ib(default=None, type=Optional[ApplicationService])
 
-    _prev_state_ids = attr.ib(default=None)
-    _current_state_ids = attr.ib(default=None)
+    _current_state_ids = attr.ib(
+        default=None, type=Optional[Dict[Tuple[str, str], str]]
+    )
+    _prev_state_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
 
     @staticmethod
     def with_state(
-        state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None
+        state_group,
+        state_group_before_event,
+        current_state_ids,
+        prev_state_ids,
+        prev_group=None,
+        delta_ids=None,
     ):
         return EventContext(
             current_state_ids=current_state_ids,
             prev_state_ids=prev_state_ids,
             state_group=state_group,
+            state_group_before_event=state_group_before_event,
             prev_group=prev_group,
             delta_ids=delta_ids,
         )
@@ -97,7 +158,8 @@ class EventContext:
             "prev_state_id": prev_state_id,
             "event_type": event.type,
             "event_state_key": event.state_key if event.is_state() else None,
-            "state_group": self.state_group,
+            "state_group": self._state_group,
+            "state_group_before_event": self.state_group_before_event,
             "rejected": self.rejected,
             "prev_group": self.prev_group,
             "delta_ids": _encode_state_dict(self.delta_ids),
@@ -123,6 +185,7 @@ class EventContext:
             event_type=input["event_type"],
             event_state_key=input["event_state_key"],
             state_group=input["state_group"],
+            state_group_before_event=input["state_group_before_event"],
             prev_group=input["prev_group"],
             delta_ids=_decode_state_dict(input["delta_ids"]),
             rejected=input["rejected"],
@@ -134,22 +197,52 @@ class EventContext:
 
         return context
 
+    @property
+    def state_group(self) -> Optional[int]:
+        """The ID of the state group for this event.
+
+        Note that state events are persisted with a state group which includes the new
+        event, so this is effectively the state *after* the event in question.
+
+        For an outlier, where we don't have the state at the event, this will be None.
+
+        It is an error to access this for a rejected event, since rejected state should
+        not make it into the room state. Accessing this property will raise an exception
+        if ``rejected`` is set.
+        """
+        if self.rejected:
+            raise RuntimeError("Attempt to access state_group of rejected event")
+
+        return self._state_group
+
     @defer.inlineCallbacks
     def get_current_state_ids(self, store):
-        """Gets the current state IDs
+        """
+        Gets the room state map, including this event - ie, the state in ``state_group``
+
+        It is an error to access this for a rejected event, since rejected state should
+        not make it into the room state. This method will raise an exception if
+        ``rejected`` is set.
 
         Returns:
             Deferred[dict[(str, str), str]|None]: Returns None if state_group
                 is None, which happens when the associated event is an outlier.
+
                 Maps a (type, state_key) to the event ID of the state event matching
                 this tuple.
         """
+        if self.rejected:
+            raise RuntimeError("Attempt to access state_ids of rejected event")
+
         yield self._ensure_fetched(store)
         return self._current_state_ids
 
     @defer.inlineCallbacks
     def get_prev_state_ids(self, store):
-        """Gets the prev state IDs
+        """
+        Gets the room state map, excluding this event.
+
+        For a non-state event, this will be the same as get_current_state_ids().
 
         Returns:
             Deferred[dict[(str, str), str]|None]: Returns None if state_group
@@ -163,11 +256,17 @@ class EventContext:
     def get_cached_current_state_ids(self):
         """Gets the current state IDs if we have them already cached.
 
+        It is an error to access this for a rejected event, since rejected state should
+        not make it into the room state. This method will raise an exception if
+        ``rejected`` is set.
+
         Returns:
             dict[(str, str), str]|None: Returns None if we haven't cached the
             state or if state_group is None, which happens when the associated
             event is an outlier.
         """
+        if self.rejected:
+            raise RuntimeError("Attempt to access state_ids of rejected event")
 
         return self._current_state_ids
 
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 272426e105..9b90c9ce04 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import string_types
+from six import integer_types, string_types
 
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
@@ -22,11 +22,12 @@ from synapse.types import EventID, RoomID, UserID
 
 
 class EventValidator(object):
-    def validate_new(self, event):
+    def validate_new(self, event, config):
         """Validates the event has roughly the right format
 
         Args:
-            event (FrozenEvent)
+            event (FrozenEvent): The event to validate.
+            config (Config): The homeserver's configuration.
         """
         self.validate_builder(event)
 
@@ -67,6 +68,99 @@ class EventValidator(object):
                             Codes.INVALID_PARAM,
                         )
 
+        if event.type == EventTypes.Retention:
+            self._validate_retention(event, config)
+
+    def _validate_retention(self, event, config):
+        """Checks that an event that defines the retention policy for a room respects the
+        boundaries imposed by the server's administrator.
+
+        Args:
+            event (FrozenEvent): The event to validate.
+            config (Config): The homeserver's configuration.
+        """
+        min_lifetime = event.content.get("min_lifetime")
+        max_lifetime = event.content.get("max_lifetime")
+
+        if min_lifetime is not None:
+            if not isinstance(min_lifetime, integer_types):
+                raise SynapseError(
+                    code=400,
+                    msg="'min_lifetime' must be an integer",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_min is not None
+                and min_lifetime < config.retention_allowed_lifetime_min
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'min_lifetime' can't be lower than the minimum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_max is not None
+                and min_lifetime > config.retention_allowed_lifetime_max
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'min_lifetime' can't be greater than the maximum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+        if max_lifetime is not None:
+            if not isinstance(max_lifetime, integer_types):
+                raise SynapseError(
+                    code=400,
+                    msg="'max_lifetime' must be an integer",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_min is not None
+                and max_lifetime < config.retention_allowed_lifetime_min
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'max_lifetime' can't be lower than the minimum allowed value"
+                        " enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+            if (
+                config.retention_allowed_lifetime_max is not None
+                and max_lifetime > config.retention_allowed_lifetime_max
+            ):
+                raise SynapseError(
+                    code=400,
+                    msg=(
+                        "'max_lifetime' can't be greater than the maximum allowed"
+                        " value enforced by the server's administrator"
+                    ),
+                    errcode=Codes.BAD_JSON,
+                )
+
+        if (
+            min_lifetime is not None
+            and max_lifetime is not None
+            and min_lifetime > max_lifetime
+        ):
+            raise SynapseError(
+                code=400,
+                msg="'min_lifetime' can't be greater than 'max_lifetime",
+                errcode=Codes.BAD_JSON,
+            )
+
     def validate_builder(self, event):
         """Validates that the builder/event has roughly the right format. Only
         checks values that we expect a proto event to have, rather than all the
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 545d719652..27f6aff004 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -177,7 +177,7 @@ class FederationClient(FederationBase):
         given destination server.
 
         Args:
-            dest (str): The remote home server to ask.
+            dest (str): The remote homeserver to ask.
             room_id (str): The room_id to backfill.
             limit (int): The maximum number of PDUs to return.
             extremities (list): List of PDU id and origins of the first pdus
@@ -227,7 +227,7 @@ class FederationClient(FederationBase):
         one succeeds.
 
         Args:
-            destinations (list): Which home servers to query
+            destinations (list): Which homeservers to query
             event_id (str): event to fetch
             room_version (str): version of the room
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
@@ -312,7 +312,7 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_state_for_room(self, destination, room_id, event_id):
-        """Requests all of the room state at a given event from a remote home server.
+        """Requests all of the room state at a given event from a remote homeserver.
 
         Args:
             destination (str): The remote homeserver to query for the state.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d942d77a72..84d4eca041 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Federation C.I.C
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -73,6 +74,7 @@ class FederationServer(FederationBase):
 
         self.auth = hs.get_auth()
         self.handler = hs.get_handlers().federation_handler
+        self.state = hs.get_state_handler()
 
         self._server_linearizer = Linearizer("fed_server")
         self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -264,9 +266,6 @@ class FederationServer(FederationBase):
         await self.registry.on_edu(edu_type, origin, content)
 
     async def on_context_state_request(self, origin, room_id, event_id):
-        if not event_id:
-            raise NotImplementedError("Specify an event")
-
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, room_id)
 
@@ -280,13 +279,18 @@ class FederationServer(FederationBase):
         # - but that's non-trivial to get right, and anyway somewhat defeats
         # the point of the linearizer.
         with (await self._server_linearizer.queue((origin, room_id))):
-            resp = await self._state_resp_cache.wrap(
-                (room_id, event_id),
-                self._on_context_state_request_compute,
-                room_id,
-                event_id,
+            resp = dict(
+                await self._state_resp_cache.wrap(
+                    (room_id, event_id),
+                    self._on_context_state_request_compute,
+                    room_id,
+                    event_id,
+                )
             )
 
+        room_version = await self.store.get_room_version(room_id)
+        resp["room_version"] = room_version
+
         return 200, resp
 
     async def on_state_ids_request(self, origin, room_id, event_id):
@@ -306,7 +310,11 @@ class FederationServer(FederationBase):
         return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
 
     async def _on_context_state_request_compute(self, room_id, event_id):
-        pdus = await self.handler.get_state_for_pdu(room_id, event_id)
+        if event_id:
+            pdus = await self.handler.get_state_for_pdu(room_id, event_id)
+        else:
+            pdus = (await self.state.get_current_state(room_id)).values()
+
         auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
 
         return {
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 44edcabed4..d68b4bd670 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -44,7 +44,7 @@ class TransactionActions(object):
             response code and response body.
         """
         if not transaction.transaction_id:
-            raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
+            raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
         return self.store.get_received_txn_response(transaction.transaction_id, origin)
 
@@ -56,7 +56,7 @@ class TransactionActions(object):
             Deferred
         """
         if not transaction.transaction_id:
-            raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
+            raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
         return self.store.set_received_txn_response(
             transaction.transaction_id, origin, code, response
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 2b2ee8612a..4ebb0e8bc0 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
 
 sent_pdus_destination_dist_total = Counter(
     "synapse_federation_client_sent_pdu_destinations:total",
-    "" "Total number of PDUs queued for sending across all destinations",
+    "Total number of PDUs queued for sending across all destinations",
 )
 
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 67b3e1ab6e..5fed626d5b 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -84,7 +84,7 @@ class TransactionManager(object):
             txn_id = str(self._next_txn_id)
 
             logger.debug(
-                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
                 destination,
                 txn_id,
                 len(pdus),
@@ -103,7 +103,7 @@ class TransactionManager(object):
             self._next_txn_id += 1
 
             logger.info(
-                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
                 destination,
                 txn_id,
                 transaction.transaction_id,
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index d9fcc520a0..5db733af98 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 """The transport layer is responsible for both sending transactions to remote
-home servers and receiving a variety of requests from other home servers.
+homeservers and receiving a variety of requests from other homeservers.
 
-By default this is done over HTTPS (and all home servers are required to
+By default this is done over HTTPS (and all homeservers are required to
 support HTTPS), however individual pairings of servers may decide to
 communicate over a different (albeit still reliable) protocol.
 """
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 920fa86853..dc95ab2113 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -44,7 +44,7 @@ class TransportLayerClient(object):
         given event.
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             context (str): The name of the context we want the state of
             event_id (str): The event we want the context at.
@@ -68,7 +68,7 @@ class TransportLayerClient(object):
         given event. Returns the state's event_id's
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             context (str): The name of the context we want the state of
             event_id (str): The event we want the context at.
@@ -91,7 +91,7 @@ class TransportLayerClient(object):
         """ Requests the pdu with give id and origin from the given server.
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             event_id (str): The id of the event being requested.
             timeout (int): How long to try (in ms) the destination for before
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d6c23f22bd..fefc789c85 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet):
         return await self.handler.on_pdu_request(origin, event_id)
 
 
-class FederationStateServlet(BaseFederationServlet):
+class FederationStateV1Servlet(BaseFederationServlet):
     PATH = "/state/(?P<context>[^/]*)/?"
 
     # This is when someone asks for all data for a given context.
@@ -429,7 +429,7 @@ class FederationStateServlet(BaseFederationServlet):
         return await self.handler.on_context_state_request(
             origin,
             context,
-            parse_string_from_args(query, "event_id", None, required=True),
+            parse_string_from_args(query, "event_id", None, required=False),
         )
 
 
@@ -714,7 +714,7 @@ class PublicRoomList(BaseFederationServlet):
 
     This API returns information in the same format as /publicRooms on the
     client API, but will only ever include local public rooms and hence is
-    intended for consumption by other home servers.
+    intended for consumption by other homeservers.
 
     GET /publicRooms HTTP/1.1
 
@@ -1360,7 +1360,7 @@ class RoomComplexityServlet(BaseFederationServlet):
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
-    FederationStateServlet,
+    FederationStateV1Servlet,
     FederationStateIdsServlet,
     FederationBackfillServlet,
     FederationQueryServlet,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7a0f54ca24..54a71c49d2 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -102,8 +102,9 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
-        self._account_ratelimiter = Ratelimiter()
-        self._failed_attempts_ratelimiter = Ratelimiter()
+        # Ratelimiter for failed auth during UIA. Uses same ratelimit config
+        # as per `rc_login.failed_attempts`.
+        self._failed_uia_attempts_ratelimiter = Ratelimiter()
 
         self._clock = self.hs.get_clock()
 
@@ -133,12 +134,38 @@ class AuthHandler(BaseHandler):
 
             AuthError if the client has completed a login flow, and it gives
                 a different user to `requester`
+
+            LimitExceededError if the ratelimiter's failed request count for this
+                user is too high to proceed
+
         """
 
+        user_id = requester.user.to_string()
+
+        # Check if we should be ratelimited due to too many previous failed attempts
+        self._failed_uia_attempts_ratelimiter.ratelimit(
+            user_id,
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
         # build a list of supported flows
         flows = [[login_type] for login_type in self._supported_login_types]
 
-        result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        try:
+            result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        except LoginError:
+            # Update the ratelimite to say we failed (`can_do_action` doesn't raise).
+            self._failed_uia_attempts_ratelimiter.can_do_action(
+                user_id,
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=True,
+            )
+            raise
 
         # find the completed login type
         for login_type in self._supported_login_types:
@@ -223,7 +250,7 @@ class AuthHandler(BaseHandler):
             # could continue registration from your phone having clicked the
             # email auth link on there). It's probably too open to abuse
             # because it lets unauthenticated clients store arbitrary objects
-            # on a home server.
+            # on a homeserver.
             # Revisit: Assumimg the REST APIs do sensible validation, the data
             # isn't arbintrary.
             session["clientdict"] = clientdict
@@ -501,11 +528,8 @@ class AuthHandler(BaseHandler):
             multiple matches
 
         Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
             UserDeactivatedError if a user is found but is deactivated.
         """
-        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             return res[0]
@@ -572,8 +596,6 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
 
         if username.startswith("@"):
@@ -581,8 +603,6 @@ class AuthHandler(BaseHandler):
         else:
             qualified_user_id = UserID(username, self.hs.hostname).to_string()
 
-        self.ratelimit_login_per_account(qualified_user_id)
-
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -650,15 +670,6 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password.
-        self._failed_attempts_ratelimiter.ratelimit(
-            qualified_user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=True,
-        )
-
         # We raise a 403 here, but note that if we're doing user-interactive
         # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@@ -710,10 +721,6 @@ class AuthHandler(BaseHandler):
         Returns:
             Deferred[unicode] the canonical_user_id, or Deferred[None] if
                 unknown user/bad password
-
-        Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -742,7 +749,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
-        self.ratelimit_login_per_account(user_id)
+
         yield self.auth.check_auth_blocking(user_id)
         return user_id
 
@@ -810,7 +817,7 @@ class AuthHandler(BaseHandler):
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
         # 'Canonicalise' email addresses down to lower case.
-        # We've now moving towards the Home Server being the entity that
+        # We've now moving towards the homeserver being the entity that
         # is responsible for validating threepids used for resetting passwords
         # on accounts, so in future Synapse will gain knowledge of specific
         # types (mediums) of threepid. For now, we still use the existing
@@ -912,35 +919,6 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
-    def ratelimit_login_per_account(self, user_id):
-        """Checks whether the process must be stopped because of ratelimiting.
-
-        Checks against two ratelimiters: the generic one for login attempts per
-        account and the one specific to failed attempts.
-
-        Args:
-            user_id (unicode): complete @user:id
-
-        Raises:
-            LimitExceededError if one of the ratelimiters' login requests count
-                for this user is too high too proceed.
-        """
-        self._failed_attempts_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=False,
-        )
-
-        self._account_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_account.per_second,
-            burst_count=self.hs.config.rc_login_account.burst_count,
-            update=True,
-        )
-
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 63267a0a4c..6dedaaff8d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -95,6 +95,9 @@ class DeactivateAccountHandler(BaseHandler):
                 user_id, threepid["medium"], threepid["address"]
             )
 
+        # Remove all 3PIDs this user has bound to the homeserver
+        yield self.store.user_delete_threepids(user_id)
+
         # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c4632f8984..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
             if not service.is_interested_in_alias(room_alias.to_string()):
                 raise SynapseError(
                     400,
-                    "This application service has not reserved" " this kind of alias.",
+                    "This application service has not reserved this kind of alias.",
                     errcode=Codes.EXCLUSIVE,
                 )
         else:
@@ -283,7 +283,7 @@ class DirectoryHandler(BaseHandler):
     def on_directory_query(self, args):
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
-            raise SynapseError(400, "Room Alias is not hosted on this Home Server")
+            raise SynapseError(400, "Room Alias is not hosted on this homeserver")
 
         result = yield self.get_association_from_room_alias(room_alias)
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import (
     UserID,
     get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
 
         self._edu_updater = SigningKeyEduUpdater(hs, self)
 
+        self._is_master = hs.config.worker_app is None
+        if not self._is_master:
+            self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+
         federation_registry = hs.get_federation_registry()
 
         # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
                 # probably be tracking their device lists. However, we haven't
                 # done an initial sync on the device list so we do it now.
                 try:
-                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+                    if self._is_master:
+                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                            user_id
+                        )
+                    else:
+                        user_devices = yield self._user_device_resync_client(
+                            user_id=user_id
+                        )
+
                     user_devices = user_devices["devices"]
                     for device in user_devices:
                         results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
                 rooms
             session_id(string): session ID to delete keys for, for None to delete keys
                 for all sessions
+        Raises:
+            NotFoundError: if the backup version does not exist
         Returns:
-            A deferred of the deletion transaction
+            A dict containing the count and etag for the backup version
         """
 
         # lock for consistency with uploading
         with (yield self._upload_linearizer.queue(user_id)):
+            # make sure the backup version exists
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(
+                    user_id, version
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+            version_etag = version_info["etag"] + 1
+            yield self.store.update_e2e_room_keys_version(
+                user_id, version, None, version_etag
+            )
+
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
+
     @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
             }
         }
 
+        Returns:
+            A dict containing the count and etag for the backup version
+
         Raises:
             NotFoundError: if there are no versions defined
             RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
                     else:
                         raise
 
-            # go through the room_keys.
-            # XXX: this should/could be done concurrently, given we're in a lock.
+            # Fetch any existing room keys for the sessions that have been
+            # submitted.  Then compare them with the submitted keys.  If the
+            # key is new, insert it; if the key should be updated, then update
+            # it; otherwise, drop it.
+            existing_keys = yield self.store.get_e2e_room_keys_multi(
+                user_id, version, room_keys["rooms"]
+            )
+            to_insert = []  # batch the inserts together
+            changed = False  # if anything has changed, we need to update the etag
             for room_id, room in iteritems(room_keys["rooms"]):
-                for session_id, session in iteritems(room["sessions"]):
-                    yield self._upload_room_key(
-                        user_id, version, room_id, session_id, session
+                for session_id, room_key in iteritems(room["sessions"]):
+                    log_kv(
+                        {
+                            "message": "Trying to upload room key",
+                            "room_id": room_id,
+                            "session_id": session_id,
+                            "user_id": user_id,
+                        }
                     )
-
-    @defer.inlineCallbacks
-    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Upload a given room_key for a given room and session into a given
-        version of the backup.  Merges the key with any which might already exist.
-
-        Args:
-            user_id(str): the user whose backup we're setting
-            version(str): the version ID of the backup we're updating
-            room_id(str): the ID of the room whose keys we're setting
-            session_id(str): the session whose room_key we're setting
-            room_key(dict): the room_key being set
-        """
-        log_kv(
-            {
-                "message": "Trying to upload room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "user_id": user_id,
-            }
-        )
-        # get the room_key for this particular row
-        current_room_key = None
-        try:
-            current_room_key = yield self.store.get_e2e_room_key(
-                user_id, version, room_id, session_id
-            )
-        except StoreError as e:
-            if e.code == 404:
-                log_kv(
-                    {
-                        "message": "Room key not found.",
-                        "room_id": room_id,
-                        "user_id": user_id,
-                    }
+                    current_room_key = existing_keys.get(room_id, {}).get(session_id)
+                    if current_room_key:
+                        if self._should_replace_room_key(current_room_key, room_key):
+                            log_kv({"message": "Replacing room key."})
+                            # updates are done one at a time in the DB, so send
+                            # updates right away rather than batching them up,
+                            # like we do with the inserts
+                            yield self.store.update_e2e_room_key(
+                                user_id, version, room_id, session_id, room_key
+                            )
+                            changed = True
+                        else:
+                            log_kv({"message": "Not replacing room_key."})
+                    else:
+                        log_kv(
+                            {
+                                "message": "Room key not found.",
+                                "room_id": room_id,
+                                "user_id": user_id,
+                            }
+                        )
+                        log_kv({"message": "Replacing room key."})
+                        to_insert.append((room_id, session_id, room_key))
+                        changed = True
+
+            if len(to_insert):
+                yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+            version_etag = version_info["etag"]
+            if changed:
+                version_etag = version_etag + 1
+                yield self.store.update_e2e_room_keys_version(
+                    user_id, version, None, version_etag
                 )
-            else:
-                raise
 
-        if self._should_replace_room_key(current_room_key, room_key):
-            log_kv({"message": "Replacing room key."})
-            yield self.store.set_e2e_room_key(
-                user_id, version, room_id, session_id, room_key
-            )
-        else:
-            log_kv({"message": "Not replacing room_key."})
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
                     raise NotFoundError("Unknown backup version")
                 else:
                     raise
+
+            res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8cafcfdab0..d3267734f7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -97,9 +97,9 @@ class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
         a) handling received Pdus before handing them on as Events to the rest
-        of the home server (including auth and state conflict resoultion)
+        of the homeserver (including auth and state conflict resoultion)
         b) converting events that were produced by local clients that may need
-        to be sent to remote home servers.
+        to be sent to remote homeservers.
         c) doing the necessary dances to invite remote users and join remote
         rooms.
     """
@@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler):
         return event
 
     @defer.inlineCallbacks
-    def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+    def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
         origin, event, event_format_version = yield self._make_and_verify_event(
-            target_hosts, room_id, user_id, "leave"
+            target_hosts, room_id, user_id, "leave", content=content,
         )
         # Mark as outlier as we don't have any state for this event; we're not
         # even in the room.
@@ -1688,7 +1688,11 @@ class FederationHandler(BaseHandler):
         # hack around with a try/finally instead.
         success = False
         try:
-            if not event.internal_metadata.is_outlier() and not backfilled:
+            if (
+                not event.internal_metadata.is_outlier()
+                and not backfilled
+                and not context.rejected
+            ):
                 yield self.action_generator.handle_push_actions_for_event(
                     event, context
                 )
@@ -2036,8 +2040,10 @@ class FederationHandler(BaseHandler):
             auth_events (dict[(str, str)->synapse.events.EventBase]):
                 Map from (event_type, state_key) to event
 
-                What we expect the event's auth_events to be, based on the event's
-                position in the dag. I think? maybe??
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
 
                 Also NB that this function adds entries to it.
         Returns:
@@ -2087,30 +2093,35 @@ class FederationHandler(BaseHandler):
             origin (str):
             event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
+
             auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
+
+                Also NB that this function adds entries to it.
 
         Returns:
             defer.Deferred[EventContext]: updated context
         """
         event_auth_events = set(event.auth_event_ids())
 
-        if event.is_state():
-            event_key = (event.type, event.state_key)
-        else:
-            event_key = None
-
-        # if the event's auth_events refers to events which are not in our
-        # calculated auth_events, we need to fetch those events from somewhere.
-        #
-        # we start by fetching them from the store, and then try calling /event_auth/.
+        # missing_auth is the set of the event's auth_events which we don't yet have
+        # in auth_events.
         missing_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
 
+        # if we have missing events, we need to fetch those events from somewhere.
+        #
+        # we start by checking if they are in the store, and then try calling /event_auth/.
         if missing_auth:
             # TODO: can we use store.have_seen_events here instead?
             have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
-            logger.debug("Got events %s from store", have_events)
+            logger.debug("Found events %s in the store", have_events)
             missing_auth.difference_update(have_events.keys())
         else:
             have_events = {}
@@ -2165,15 +2176,17 @@ class FederationHandler(BaseHandler):
                     event.auth_event_ids()
                 )
             except Exception:
-                # FIXME:
                 logger.exception("Failed to get auth chain")
 
         if event.internal_metadata.is_outlier():
+            # XXX: given that, for an outlier, we'll be working with the
+            # event's *claimed* auth events rather than those we calculated:
+            # (a) is there any point in this test, since different_auth below will
+            # obviously be empty
+            # (b) alternatively, why don't we do it earlier?
             logger.info("Skipping auth_event fetch for outlier")
             return context
 
-        # FIXME: Assumes we have and stored all the state for all the
-        # prev_events
         different_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
@@ -2187,27 +2200,22 @@ class FederationHandler(BaseHandler):
             different_auth,
         )
 
+        # now we state-resolve between our own idea of the auth events, and the remote's
+        # idea of them.
+
         room_version = yield self.store.get_room_version(event.room_id)
+        different_event_ids = [
+            d for d in different_auth if d in have_events and not have_events[d]
+        ]
 
-        different_events = yield make_deferred_yieldable(
-            defer.gatherResults(
-                [
-                    run_in_background(
-                        self.store.get_event, d, allow_none=True, allow_rejected=False
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ],
-                consumeErrors=True,
-            )
-        ).addErrback(unwrapFirstError)
+        if different_event_ids:
+            # XXX: currently this checks for redactions but I'm not convinced that is
+            # necessary?
+            different_events = yield self.store.get_events_as_list(different_event_ids)
 
-        if different_events:
             local_view = dict(auth_events)
             remote_view = dict(auth_events)
-            remote_view.update(
-                {(d.type, d.state_key): d for d in different_events if d}
-            )
+            remote_view.update({(d.type, d.state_key): d for d in different_events})
 
             new_state = yield self.state_handler.resolve_events(
                 room_version,
@@ -2227,13 +2235,13 @@ class FederationHandler(BaseHandler):
             auth_events.update(new_state)
 
             context = yield self._update_context_for_auth_events(
-                event, context, auth_events, event_key
+                event, context, auth_events
             )
 
         return context
 
     @defer.inlineCallbacks
-    def _update_context_for_auth_events(self, event, context, auth_events, event_key):
+    def _update_context_for_auth_events(self, event, context, auth_events):
         """Update the state_ids in an event context after auth event resolution,
         storing the changes as a new state group.
 
@@ -2242,18 +2250,21 @@ class FederationHandler(BaseHandler):
 
             context (synapse.events.snapshot.EventContext): initial event context
 
-            auth_events (dict[(str, str)->str]): Events to update in the event
+            auth_events (dict[(str, str)->EventBase]): Events to update in the event
                 context.
 
-            event_key ((str, str)): (type, state_key) for the current event.
-                this will not be included in the current_state in the context.
-
         Returns:
             Deferred[EventContext]: new event context
         """
+        # exclude the state key of the new event from the current_state in the context.
+        if event.is_state():
+            event_key = (event.type, event.state_key)
+        else:
+            event_key = None
         state_updates = {
             k: a.event_id for k, a in iteritems(auth_events) if k != event_key
         }
+
         current_state_ids = yield context.get_current_state_ids(self.store)
         current_state_ids = dict(current_state_ids)
 
@@ -2276,6 +2287,7 @@ class FederationHandler(BaseHandler):
 
         return EventContext.with_state(
             state_group=state_group,
+            state_group_before_event=context.state_group_before_event,
             current_state_ids=current_state_ids,
             prev_state_ids=prev_state_ids,
             prev_group=prev_group,
@@ -2454,7 +2466,7 @@ class FederationHandler(BaseHandler):
                 room_version, event_dict, event, context
             )
 
-            EventValidator().validate_new(event)
+            EventValidator().validate_new(event, self.config)
 
             # We need to tell the transaction queue to send this out, even
             # though the sender isn't a local user.
@@ -2569,7 +2581,7 @@ class FederationHandler(BaseHandler):
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder
         )
-        EventValidator().validate_new(event)
+        EventValidator().validate_new(event, self.config)
         return (event, context)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..3b0156f516 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -138,7 +138,7 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.storage, user_id, last_events
+                self.storage, user_id, last_events, apply_retention_policies=False
             )
 
             event = last_events[0]
@@ -417,7 +417,7 @@ class EventCreationHandler(object):
                     403, "You must be in the room to create an alias for it"
                 )
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         return (event, context)
 
@@ -634,7 +634,7 @@ class EventCreationHandler(object):
         if requester:
             context.app_service = requester.app_service
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         # If this event is an annotation then we check that that the sender
         # can't annotate the same way twice (e.g. stops users from liking an
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 97f15a1c32..8514ddc600 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 
+from six import iteritems
+
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -80,6 +83,109 @@ class PaginationHandler(object):
         self._purges_by_id = {}
         self._event_serializer = hs.get_event_client_serializer()
 
+        self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+
+        if hs.config.retention_enabled:
+            # Run the purge jobs described in the configuration file.
+            for job in hs.config.retention_purge_jobs:
+                self.clock.looping_call(
+                    run_as_background_process,
+                    job["interval"],
+                    "purge_history_for_rooms_in_range",
+                    self.purge_history_for_rooms_in_range,
+                    job["shortest_max_lifetime"],
+                    job["longest_max_lifetime"],
+                )
+
+    @defer.inlineCallbacks
+    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+        """Purge outdated events from rooms within the given retention range.
+
+        If a default retention policy is defined in the server's configuration and its
+        'max_lifetime' is within this range, also targets rooms which don't have a
+        retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, it means that the range has no
+                lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, it means that the range has no
+                upper limit.
+        """
+        # We want the storage layer to to include rooms with no retention policy in its
+        # return value only if a default retention policy is defined in the server's
+        # configuration and that policy's 'max_lifetime' is either lower (or equal) than
+        # max_ms or higher than min_ms (or both).
+        if self._retention_default_max_lifetime is not None:
+            include_null = True
+
+            if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
+                # The default max_lifetime is lower than (or equal to) min_ms.
+                include_null = False
+
+            if max_ms is not None and max_ms < self._retention_default_max_lifetime:
+                # The default max_lifetime is higher than max_ms.
+                include_null = False
+        else:
+            include_null = False
+
+        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+            min_ms, max_ms, include_null
+        )
+
+        for room_id, retention_policy in iteritems(rooms):
+            if room_id in self._purges_in_progress_by_room:
+                logger.warning(
+                    "[purge] not purging room %s as there's an ongoing purge running"
+                    " for this room",
+                    room_id,
+                )
+                continue
+
+            max_lifetime = retention_policy["max_lifetime"]
+
+            if max_lifetime is None:
+                # If max_lifetime is None, it means that include_null equals True,
+                # therefore we can safely assume that there is a default policy defined
+                # in the server's configuration.
+                max_lifetime = self._retention_default_max_lifetime
+
+            # Figure out what token we should start purging at.
+            ts = self.clock.time_msec() - max_lifetime
+
+            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+
+            r = yield self.store.get_room_event_after_stream_ordering(
+                room_id, stream_ordering,
+            )
+            if not r:
+                logger.warning(
+                    "[purge] purging events not possible: No event found "
+                    "(ts %i => stream_ordering %i)",
+                    ts,
+                    stream_ordering,
+                )
+                continue
+
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
+
+            purge_id = random_string(16)
+
+            self._purges_by_id[purge_id] = PurgeStatus()
+
+            logger.info(
+                "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
+            )
+
+            # We want to purge everything, including local events, and to run the purge in
+            # the background so that it's not blocking any other operation apart from
+            # other purges in the same room.
+            run_as_background_process(
+                "_purge_history", self._purge_history, purge_id, room_id, token, True,
+            )
+
     def start_purge_history(self, room_id, token, delete_local_events=False):
         """Start off a history purge on a room.
 
@@ -127,7 +233,9 @@ class PaginationHandler(object):
         self._purges_in_progress_by_room.add(room_id)
         try:
             with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(room_id, token, delete_local_events)
+                yield self.storage.purge_events.purge_history(
+                    room_id, token, delete_local_events
+                )
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
         except Exception:
@@ -170,7 +278,7 @@ class PaginationHandler(object):
             if joined:
                 raise SynapseError(400, "Users are still joined to this room")
 
-            await self.store.purge_room(room_id)
+            await self.storage.purge_events.purge_room(room_id)
 
     @defer.inlineCallbacks
     def get_messages(
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 22e0a04da4..1e5a4613c9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -152,7 +152,7 @@ class BaseProfileHandler(BaseHandler):
             by_admin (bool): Whether this change was made by an administrator.
         """
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
@@ -207,7 +207,7 @@ class BaseProfileHandler(BaseHandler):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
@@ -231,7 +231,7 @@ class BaseProfileHandler(BaseHandler):
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         just_field = args.get("field", None)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index cff6b0d375..95806af41e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
-    LimitExceededError,
     RegistrationError,
     SynapseError,
 )
@@ -168,6 +167,7 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
+        yield self.check_registration_ratelimit(address)
 
         yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
@@ -217,8 +217,13 @@ class RegistrationHandler(BaseHandler):
 
         else:
             # autogen a sequential user ID
+            fail_count = 0
             user = None
             while not user:
+                # Fail after being unable to find a suitable ID a few times
+                if fail_count > 10:
+                    raise SynapseError(500, "Unable to find a suitable guest user ID")
+
                 localpart = yield self._generate_user_id()
                 user = UserID(localpart, self.hs.hostname)
                 user_id = user.to_string()
@@ -233,10 +238,14 @@ class RegistrationHandler(BaseHandler):
                         create_profile_with_displayname=default_display_name,
                         address=address,
                     )
+
+                    # Successfully registered
+                    break
                 except SynapseError:
                     # if user id is taken, just generate another
                     user = None
                     user_id = None
+                    fail_count += 1
 
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
@@ -414,6 +423,29 @@ class RegistrationHandler(BaseHandler):
             ratelimit=False,
         )
 
+    def check_registration_ratelimit(self, address):
+        """A simple helper method to check whether the registration rate limit has been hit
+        for a given IP address
+
+        Args:
+            address (str|None): the IP address used to perform the registration. If this is
+                None, no ratelimiting will be performed.
+
+        Raises:
+            LimitExceededError: If the rate limit has been exceeded.
+        """
+        if not address:
+            return
+
+        time_now = self.clock.time()
+
+        self.ratelimiter.ratelimit(
+            address,
+            time_now_s=time_now,
+            rate_hz=self.hs.config.rc_registration.per_second,
+            burst_count=self.hs.config.rc_registration.burst_count,
+        )
+
     def register_with_store(
         self,
         user_id,
@@ -446,22 +478,6 @@ class RegistrationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        # Don't rate limit for app services
-        if appservice_id is None and address is not None:
-            time_now = self.clock.time()
-
-            allowed, time_allowed = self.ratelimiter.can_do_action(
-                address,
-                time_now_s=time_now,
-                rate_hz=self.hs.config.rc_registration.per_second,
-                burst_count=self.hs.config.rc_registration.burst_count,
-            )
-
-            if not allowed:
-                raise LimitExceededError(
-                    retry_after_ms=int(1000 * (time_allowed - time_now))
-                )
-
         if self.hs.config.worker_app:
             return self._register_client(
                 user_id=user_id,
@@ -614,7 +630,7 @@ class RegistrationHandler(BaseHandler):
         # And we add an email pusher for them by default, but only
         # if email notifications are enabled (so people don't start
         # getting mail spam where they weren't before if email
-        # notifs are set up on a home server)
+        # notifs are set up on a homeserver)
         if (
             self.hs.config.email_enable_notifs
             and self.hs.config.email_notif_for_new_users
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index fd3ea8daf8..22768e97ff 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -199,21 +199,21 @@ class RoomCreationHandler(BaseHandler):
         # finally, shut down the PLs in the old room, and update them in the new
         # room.
         yield self._update_upgraded_room_pls(
-            requester, old_room_id, new_room_id, old_room_state
+            requester, old_room_id, new_room_id, old_room_state,
         )
 
         return new_room_id
 
     @defer.inlineCallbacks
     def _update_upgraded_room_pls(
-        self, requester, old_room_id, new_room_id, old_room_state
+        self, requester, old_room_id, new_room_id, old_room_state,
     ):
         """Send updated power levels in both rooms after an upgrade
 
         Args:
             requester (synapse.types.Requester): the user requesting the upgrade
-            old_room_id (unicode): the id of the room to be replaced
-            new_room_id (unicode): the id of the replacement room
+            old_room_id (str): the id of the room to be replaced
+            new_room_id (str): the id of the replacement room
             old_room_state (dict[tuple[str, str], str]): the state map for the old room
 
         Returns:
@@ -299,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
             tombstone_event_id (unicode|str): the ID of the tombstone event in the old
                 room.
         Returns:
-            Deferred[None]
+            Deferred
         """
         user_id = requester.user.to_string()
 
@@ -334,6 +334,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.Encryption, ""),
             (EventTypes.ServerACL, ""),
             (EventTypes.RelatedGroups, ""),
+            (EventTypes.PowerLevels, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -347,6 +348,31 @@ class RoomCreationHandler(BaseHandler):
             if old_event:
                 initial_state[k] = old_event.content
 
+        # Resolve the minimum power level required to send any state event
+        # We will give the upgrading user this power level temporarily (if necessary) such that
+        # they are able to copy all of the state events over, then revert them back to their
+        # original power level afterwards in _update_upgraded_room_pls
+
+        # Copy over user power levels now as this will not be possible with >100PL users once
+        # the room has been created
+
+        power_levels = initial_state[(EventTypes.PowerLevels, "")]
+
+        # Calculate the minimum power level needed to clone the room
+        event_power_levels = power_levels.get("events", {})
+        state_default = power_levels.get("state_default", 0)
+        ban = power_levels.get("ban")
+        needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+        # Raise the requester's power level in the new room if necessary
+        current_power_level = power_levels["users"][requester.user.to_string()]
+        if current_power_level < needed_power_level:
+            # Assign this power level to the requester
+            power_levels["users"][requester.user.to_string()] = needed_power_level
+
+        # Set the power levels to the modified state
+        initial_state[(EventTypes.PowerLevels, "")] = power_levels
+
         yield self._send_events_for_new_room(
             requester,
             new_room_id,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 06d09c2947..7b7270fc61 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -94,7 +94,9 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Attempt to reject an invite for a room this server is not in. If we
         fail to do so we locally mark the invite as rejected.
 
@@ -104,6 +106,7 @@ class RoomMemberHandler(object):
                 reject invite
             room_id (str)
             target (UserID): The user rejecting the invite
+            content (dict): The content for the rejection event
 
         Returns:
             Deferred[dict]: A dictionary to be returned to the client, may
@@ -471,7 +474,7 @@ class RoomMemberHandler(object):
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
                     res = yield self._remote_reject_invite(
-                        requester, remote_room_hosts, room_id, target
+                        requester, remote_room_hosts, room_id, target, content,
                     )
                     return res
 
@@ -515,6 +518,15 @@ class RoomMemberHandler(object):
             yield self.store.set_room_is_public(old_room_id, False)
             yield self.store.set_room_is_public(room_id, True)
 
+        # Check if any groups we own contain the predecessor room
+        local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
+        for group_id in local_group_ids:
+            # Add new the new room to those groups
+            yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+
+            # Remove the old room from those groups
+            yield self.store.remove_room_from_group(group_id, old_room_id)
+
     @defer.inlineCallbacks
     def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
         """Copy user-specific information when they join a new room when that new room is the
@@ -962,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             )
 
     @defer.inlineCallbacks
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Implements RoomMemberHandler._remote_reject_invite
         """
         fed_handler = self.federation_handler
         try:
             ret = yield fed_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, target.to_string()
+                remote_room_hosts, room_id, target.to_string(), content=content,
             )
             return ret
         except Exception as e:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 75e96ae1a2..69be86893b 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
 
         return ret
 
-    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(
+        self, requester, remote_room_hosts, room_id, target, content
+    ):
         """Implements RoomMemberHandler._remote_reject_invite
         """
         return self._remote_reject_client(
@@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
             user_id=target.to_string(),
+            content=content,
         )
 
     def _user_joined_room(self, target, room_id):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ca8ae9fb5b..856337b7e2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -120,7 +120,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
@@ -150,7 +150,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 691380abda..16765d54e0 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -530,7 +530,7 @@ class MatrixFederationHttpClient(object):
         """
         Builds the Authorization headers for a federation request
         Args:
-            destination (bytes|None): The desination home server of the request.
+            destination (bytes|None): The desination homeserver of the request.
                 May be None if the destination is an identity server, in which case
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index e9a5e46ced..13fcb408a6 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
             return {b"true": True, b"false": False}[args[name][0]]
         except Exception:
             message = (
-                "Boolean query parameter %r must be one of" " ['true', 'false']"
+                "Boolean query parameter %r must be one of ['true', 'false']"
             ) % (name,)
             raise SynapseError(400, message)
     else:
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 334ddaf39a..ffa7b20ca8 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -261,6 +261,18 @@ def parse_drain_configs(
             )
 
 
+class StoppableLogPublisher(LogPublisher):
+    """
+    A log publisher that can tell its observers to shut down any external
+    communications.
+    """
+
+    def stop(self):
+        for obs in self._observers:
+            if hasattr(obs, "stop"):
+                obs.stop()
+
+
 def setup_structured_logging(
     hs,
     config,
@@ -336,7 +348,7 @@ def setup_structured_logging(
             # We should never get here, but, just in case, throw an error.
             raise ConfigError("%s drain type cannot be configured" % (observer.type,))
 
-    publisher = LogPublisher(*observers)
+    publisher = StoppableLogPublisher(*observers)
     log_filter = LogLevelFilterPredicate()
 
     for namespace, namespace_config in log_config.get(
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 0ebbde06f2..03934956f4 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -17,25 +17,29 @@
 Log formatters that output terse JSON.
 """
 
+import json
 import sys
+import traceback
 from collections import deque
 from ipaddress import IPv4Address, IPv6Address, ip_address
 from math import floor
-from typing import IO
+from typing import IO, Optional
 
 import attr
-from simplejson import dumps
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
+from twisted.internet.defer import Deferred
 from twisted.internet.endpoints import (
     HostnameEndpoint,
     TCP4ClientEndpoint,
     TCP6ClientEndpoint,
 )
+from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
 from twisted.logger import FileLogObserver, ILogObserver, Logger
-from twisted.python.failure import Failure
+
+_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
 
 
 def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@@ -141,19 +145,57 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
 
     def formatEvent(_event: dict) -> str:
         flattened = flatten_event(_event, metadata)
-        return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
+        return _encoder.encode(flattened) + "\n"
 
     return FileLogObserver(outFile, formatEvent)
 
 
 @attr.s
+@implementer(IPushProducer)
+class LogProducer(object):
+    """
+    An IPushProducer that writes logs from its buffer to its transport when it
+    is resumed.
+
+    Args:
+        buffer: Log buffer to read logs from.
+        transport: Transport to write to.
+    """
+
+    transport = attr.ib(type=ITransport)
+    _buffer = attr.ib(type=deque)
+    _paused = attr.ib(default=False, type=bool, init=False)
+
+    def pauseProducing(self):
+        self._paused = True
+
+    def stopProducing(self):
+        self._paused = True
+        self._buffer = None
+
+    def resumeProducing(self):
+        self._paused = False
+
+        while self._paused is False and (self._buffer and self.transport.connected):
+            try:
+                event = self._buffer.popleft()
+                self.transport.write(_encoder.encode(event).encode("utf8"))
+                self.transport.write(b"\n")
+            except Exception:
+                # Something has gone wrong writing to the transport -- log it
+                # and break out of the while.
+                traceback.print_exc(file=sys.__stderr__)
+                break
+
+
+@attr.s
 @implementer(ILogObserver)
 class TerseJSONToTCPLogObserver(object):
     """
     An IObserver that writes JSON logs to a TCP target.
 
     Args:
-        hs (HomeServer): The Homeserver that is being logged for.
+        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
         metadata: Metadata to be added to each log entry.
@@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
     metadata = attr.ib(type=dict)
     maximum_buffer = attr.ib(type=int)
     _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _writer = attr.ib(default=None)
+    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
     _logger = attr.ib(default=attr.Factory(Logger))
+    _producer = attr.ib(default=None, type=Optional[LogProducer])
 
     def start(self) -> None:
 
@@ -187,38 +230,44 @@ class TerseJSONToTCPLogObserver(object):
         factory = Factory.forProtocol(Protocol)
         self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
         self._service.startService()
+        self._connect()
 
-    def _write_loop(self) -> None:
+    def stop(self):
+        self._service.stopService()
+
+    def _connect(self) -> None:
         """
-        Implement the write loop.
+        Triggers an attempt to connect then write to the remote if not already writing.
         """
-        if self._writer:
+        if self._connection_waiter:
             return
 
-        self._writer = self._service.whenConnected()
+        self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
+
+        @self._connection_waiter.addErrback
+        def fail(r):
+            r.printTraceback(file=sys.__stderr__)
+            self._connection_waiter = None
+            self._connect()
 
-        @self._writer.addBoth
+        @self._connection_waiter.addCallback
         def writer(r):
-            if isinstance(r, Failure):
-                r.printTraceback(file=sys.__stderr__)
-                self._writer = None
-                self.hs.get_reactor().callLater(1, self._write_loop)
+            # We have a connection. If we already have a producer, and its
+            # transport is the same, just trigger a resumeProducing.
+            if self._producer and r.transport is self._producer.transport:
+                self._producer.resumeProducing()
+                self._connection_waiter = None
                 return
 
-            try:
-                for event in self._buffer:
-                    r.transport.write(
-                        dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
-                            "utf8"
-                        )
-                    )
-                    r.transport.write(b"\n")
-                self._buffer.clear()
-            except Exception as e:
-                sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
-
-            self._writer = False
-            self.hs.get_reactor().callLater(1, self._write_loop)
+            # If the producer is still producing, stop it.
+            if self._producer:
+                self._producer.stopProducing()
+
+            # Make a new producer and start it.
+            self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
+            r.transport.registerProducer(self._producer, True)
+            self._producer.resumeProducing()
+            self._connection_waiter = None
 
     def _handle_pressure(self) -> None:
         """
@@ -277,4 +326,4 @@ class TerseJSONToTCPLogObserver(object):
             self._logger.failure("Failed clearing backpressure")
 
         # Try and write immediately.
-        self._write_loop()
+        self._connect()
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e994037be6..d0879b0490 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -246,7 +246,7 @@ class HttpPusher(object):
                     # fixed, we don't suddenly deliver a load
                     # of old notifications.
                     logger.warning(
-                        "Giving up on a notification to user %s, " "pushkey %s",
+                        "Giving up on a notification to user %s, pushkey %s",
                         self.user_id,
                         self.pushkey,
                     )
@@ -299,8 +299,7 @@ class HttpPusher(object):
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
                     logger.warning(
-                        ("Ignoring rejected pushkey %s because we" " didn't send it"),
-                        pk,
+                        ("Ignoring rejected pushkey %s because we didn't send it"), pk,
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 1d15a06a58..b13b646bfd 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
 
 
 MESSAGE_FROM_PERSON_IN_ROOM = (
-    "You have a message on %(app)s from %(person)s " "in the %(room)s room..."
+    "You have a message on %(app)s from %(person)s in the %(room)s room..."
 )
 MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
 MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
     "You have messages on %(app)s from %(person)s and others..."
 )
 INVITE_FROM_PERSON_TO_ROOM = (
-    "%(person)s has invited you to join the " "%(room)s room on %(app)s..."
+    "%(person)s has invited you to join the %(room)s room on %(app)s..."
 )
 INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
 
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 81b85352b1..28dbc6fcba 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,14 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import federation, login, membership, register, send_event
+from synapse.replication.http import (
+    devices,
+    federation,
+    login,
+    membership,
+    register,
+    send_event,
+)
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
         federation.register_servlets(hs, self)
         login.register_servlets(hs, self)
         register.register_servlets(hs, self)
+        devices.register_servlets(hs, self)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
new file mode 100644
index 0000000000..e32aac0a25
--- /dev/null
+++ b/synapse/replication/http/devices.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
+    """Ask master to resync the device list for a user by contacting their
+    server.
+
+    This must happen on master so that the results can be correctly cached in
+    the database and streamed to workers.
+
+    Request format:
+
+        POST /_synapse/replication/user_device_resync/:user_id
+
+        {}
+
+    Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+    response, e.g.:
+
+        {
+            "user_id": "@alice:example.org",
+            "devices": [
+                {
+                    "device_id": "JLAFKJWSCS",
+                    "keys": { ... },
+                    "device_display_name": "Alice's Mobile Phone"
+                }
+            ]
+        }
+    """
+
+    NAME = "user_device_resync"
+    PATH_ARGS = ("user_id",)
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
+
+        self.device_list_updater = hs.get_device_handler().device_list_updater
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    def _serialize_payload(user_id):
+        return {}
+
+    async def _handle_request(self, request, user_id):
+        user_devices = await self.device_list_updater.user_device_resync(user_id)
+
+        return 200, user_devices
+
+
+def register_servlets(hs, http_server):
+    ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index cc1f249740..3577611fd7 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -93,6 +93,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         {
             "requester": ...,
             "remote_room_hosts": [...],
+            "content": { ... }
         }
     """
 
@@ -107,7 +108,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
+    def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
         """
         Args:
             requester(Requester)
@@ -118,12 +119,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         return {
             "requester": requester.serialize(),
             "remote_room_hosts": remote_room_hosts,
+            "content": content,
         }
 
     async def _handle_request(self, request, room_id, user_id):
         content = parse_json_object_from_request(request)
 
         remote_room_hosts = content["remote_room_hosts"]
+        event_content = content["content"]
 
         requester = Requester.deserialize(self.store, content["requester"])
 
@@ -134,7 +137,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
 
         try:
             event = await self.federation_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, user_id
+                remote_room_hosts, room_id, user_id, event_content,
             )
             ret = event.get_pdu_json()
         except Exception as e:
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 915cfb9430..0c4aca1291 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -75,6 +75,8 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
     async def _handle_request(self, request, user_id):
         content = parse_json_object_from_request(request)
 
+        self.registration_handler.check_registration_ratelimit(content["address"])
+
         await self.registration_handler.register_with_store(
             user_id=user_id,
             password_hash=content["password_hash"],
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 9e45429d49..8512923eae 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple(
     "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
 )
 AccountDataStreamRow = namedtuple(
-    "AccountDataStream",
-    ("user_id", "room_id", "data_type", "data"),  # str  # str  # str  # dict
+    "AccountDataStream", ("user_id", "room_id", "data_type")  # str  # str  # str
 )
 GroupsStreamRow = namedtuple(
     "GroupsStreamRow",
@@ -421,8 +420,8 @@ class AccountDataStream(Stream):
 
         results = list(room_results)
         results.extend(
-            (stream_id, user_id, None, account_data_type, content)
-            for stream_id, user_id, account_data_type, content in global_results
+            (stream_id, user_id, None, account_data_type)
+            for stream_id, user_id, account_data_type in global_results
         )
 
         return results
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5c2a2eb593..68a59a3424 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -14,62 +14,39 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import hashlib
-import hmac
 import logging
 import platform
 import re
 
-from six import text_type
-from six.moves import http_client
-
 import synapse
-from synapse.api.constants import Membership, UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
 from synapse.http.server import JsonResource
-from synapse.http.servlet import (
-    RestServlet,
-    assert_params_in_dict,
-    parse_integer,
-    parse_json_object_from_request,
-    parse_string,
-)
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.rest.admin._base import (
     assert_requester_is_admin,
-    assert_user_is_admin,
     historical_admin_path_patterns,
 )
+from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
 from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
 from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
+from synapse.rest.admin.rooms import ShutdownRoomRestServlet
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
-from synapse.rest.admin.users import UserAdminServlet
-from synapse.types import UserID, create_requester
-from synapse.util.async_helpers import maybe_awaitable
+from synapse.rest.admin.users import (
+    AccountValidityRenewServlet,
+    DeactivateAccountRestServlet,
+    GetUsersPaginatedRestServlet,
+    ResetPasswordRestServlet,
+    SearchUsersRestServlet,
+    UserAdminServlet,
+    UserRegisterServlet,
+    UsersRestServlet,
+    WhoisRestServlet,
+)
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
 
-class UsersRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, user_id):
-        target_user = UserID.from_string(user_id)
-        await assert_requester_is_admin(self.auth, request)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        ret = await self.handlers.admin_handler.get_users()
-
-        return 200, ret
-
-
 class VersionServlet(RestServlet):
     PATTERNS = (re.compile("^/_synapse/admin/v1/server_version$"),)
 
@@ -83,159 +60,6 @@ class VersionServlet(RestServlet):
         return 200, self.res
 
 
-class UserRegisterServlet(RestServlet):
-    """
-    Attributes:
-         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
-         nonces (dict[str, int]): The nonces that we will accept. A dict of
-             nonce to the time it was generated, in int seconds.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/register")
-    NONCE_TIMEOUT = 60
-
-    def __init__(self, hs):
-        self.handlers = hs.get_handlers()
-        self.reactor = hs.get_reactor()
-        self.nonces = {}
-        self.hs = hs
-
-    def _clear_old_nonces(self):
-        """
-        Clear out old nonces that are older than NONCE_TIMEOUT.
-        """
-        now = int(self.reactor.seconds())
-
-        for k, v in list(self.nonces.items()):
-            if now - v > self.NONCE_TIMEOUT:
-                del self.nonces[k]
-
-    def on_GET(self, request):
-        """
-        Generate a new nonce.
-        """
-        self._clear_old_nonces()
-
-        nonce = self.hs.get_secrets().token_hex(64)
-        self.nonces[nonce] = int(self.reactor.seconds())
-        return 200, {"nonce": nonce}
-
-    async def on_POST(self, request):
-        self._clear_old_nonces()
-
-        if not self.hs.config.registration_shared_secret:
-            raise SynapseError(400, "Shared secret registration is not enabled")
-
-        body = parse_json_object_from_request(request)
-
-        if "nonce" not in body:
-            raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
-
-        nonce = body["nonce"]
-
-        if nonce not in self.nonces:
-            raise SynapseError(400, "unrecognised nonce")
-
-        # Delete the nonce, so it can't be reused, even if it's invalid
-        del self.nonces[nonce]
-
-        if "username" not in body:
-            raise SynapseError(
-                400, "username must be specified", errcode=Codes.BAD_JSON
-            )
-        else:
-            if (
-                not isinstance(body["username"], text_type)
-                or len(body["username"]) > 512
-            ):
-                raise SynapseError(400, "Invalid username")
-
-            username = body["username"].encode("utf-8")
-            if b"\x00" in username:
-                raise SynapseError(400, "Invalid username")
-
-        if "password" not in body:
-            raise SynapseError(
-                400, "password must be specified", errcode=Codes.BAD_JSON
-            )
-        else:
-            if (
-                not isinstance(body["password"], text_type)
-                or len(body["password"]) > 512
-            ):
-                raise SynapseError(400, "Invalid password")
-
-            password = body["password"].encode("utf-8")
-            if b"\x00" in password:
-                raise SynapseError(400, "Invalid password")
-
-        admin = body.get("admin", None)
-        user_type = body.get("user_type", None)
-
-        if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
-            raise SynapseError(400, "Invalid user type")
-
-        got_mac = body["mac"]
-
-        want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret.encode(),
-            digestmod=hashlib.sha1,
-        )
-        want_mac.update(nonce.encode("utf8"))
-        want_mac.update(b"\x00")
-        want_mac.update(username)
-        want_mac.update(b"\x00")
-        want_mac.update(password)
-        want_mac.update(b"\x00")
-        want_mac.update(b"admin" if admin else b"notadmin")
-        if user_type:
-            want_mac.update(b"\x00")
-            want_mac.update(user_type.encode("utf8"))
-        want_mac = want_mac.hexdigest()
-
-        if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
-            raise SynapseError(403, "HMAC incorrect")
-
-        # Reuse the parts of RegisterRestServlet to reduce code duplication
-        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
-
-        register = RegisterRestServlet(self.hs)
-
-        user_id = await register.registration_handler.register_user(
-            localpart=body["username"].lower(),
-            password=body["password"],
-            admin=bool(admin),
-            user_type=user_type,
-        )
-
-        result = await register._create_registration_details(user_id, body)
-        return 200, result
-
-
-class WhoisRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, user_id):
-        target_user = UserID.from_string(user_id)
-        requester = await self.auth.get_user_by_req(request)
-        auth_user = requester.user
-
-        if target_user != auth_user:
-            await assert_user_is_admin(self.auth, auth_user)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only whois a local user")
-
-        ret = await self.handlers.admin_handler.get_whois(target_user)
-
-        return 200, ret
-
-
 class PurgeHistoryRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns(
         "/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@@ -342,369 +166,6 @@ class PurgeHistoryStatusRestServlet(RestServlet):
         return 200, purge_status.asdict()
 
 
-class DeactivateAccountRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self._deactivate_account_handler = hs.get_deactivate_account_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, target_user_id):
-        await assert_requester_is_admin(self.auth, request)
-        body = parse_json_object_from_request(request, allow_empty_body=True)
-        erase = body.get("erase", False)
-        if not isinstance(erase, bool):
-            raise SynapseError(
-                http_client.BAD_REQUEST,
-                "Param 'erase' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
-
-        UserID.from_string(target_user_id)
-
-        result = await self._deactivate_account_handler.deactivate_account(
-            target_user_id, erase
-        )
-        if result:
-            id_server_unbind_result = "success"
-        else:
-            id_server_unbind_result = "no-support"
-
-        return 200, {"id_server_unbind_result": id_server_unbind_result}
-
-
-class ShutdownRoomRestServlet(RestServlet):
-    """Shuts down a room by removing all local users from the room and blocking
-    all future invites and joins to the room. Any local aliases will be repointed
-    to a new room created by `new_room_user_id` and kicked users will be auto
-    joined to the new room.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
-
-    DEFAULT_MESSAGE = (
-        "Sharing illegal content on this server is not permitted and rooms in"
-        " violation will be blocked."
-    )
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
-        self._room_creation_handler = hs.get_room_creation_handler()
-        self.event_creation_handler = hs.get_event_creation_handler()
-        self.room_member_handler = hs.get_room_member_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, room_id):
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        content = parse_json_object_from_request(request)
-        assert_params_in_dict(content, ["new_room_user_id"])
-        new_room_user_id = content["new_room_user_id"]
-
-        room_creator_requester = create_requester(new_room_user_id)
-
-        message = content.get("message", self.DEFAULT_MESSAGE)
-        room_name = content.get("room_name", "Content Violation Notification")
-
-        info = await self._room_creation_handler.create_room(
-            room_creator_requester,
-            config={
-                "preset": "public_chat",
-                "name": room_name,
-                "power_level_content_override": {"users_default": -10},
-            },
-            ratelimit=False,
-        )
-        new_room_id = info["room_id"]
-
-        requester_user_id = requester.user.to_string()
-
-        logger.info(
-            "Shutting down room %r, joining to new room: %r", room_id, new_room_id
-        )
-
-        # This will work even if the room is already blocked, but that is
-        # desirable in case the first attempt at blocking the room failed below.
-        await self.store.block_room(room_id, requester_user_id)
-
-        users = await self.state.get_current_users_in_room(room_id)
-        kicked_users = []
-        failed_to_kick_users = []
-        for user_id in users:
-            if not self.hs.is_mine_id(user_id):
-                continue
-
-            logger.info("Kicking %r from %r...", user_id, room_id)
-
-            try:
-                target_requester = create_requester(user_id)
-                await self.room_member_handler.update_membership(
-                    requester=target_requester,
-                    target=target_requester.user,
-                    room_id=room_id,
-                    action=Membership.LEAVE,
-                    content={},
-                    ratelimit=False,
-                    require_consent=False,
-                )
-
-                await self.room_member_handler.forget(target_requester.user, room_id)
-
-                await self.room_member_handler.update_membership(
-                    requester=target_requester,
-                    target=target_requester.user,
-                    room_id=new_room_id,
-                    action=Membership.JOIN,
-                    content={},
-                    ratelimit=False,
-                    require_consent=False,
-                )
-
-                kicked_users.append(user_id)
-            except Exception:
-                logger.exception(
-                    "Failed to leave old room and join new room for %r", user_id
-                )
-                failed_to_kick_users.append(user_id)
-
-        await self.event_creation_handler.create_and_send_nonmember_event(
-            room_creator_requester,
-            {
-                "type": "m.room.message",
-                "content": {"body": message, "msgtype": "m.text"},
-                "room_id": new_room_id,
-                "sender": new_room_user_id,
-            },
-            ratelimit=False,
-        )
-
-        aliases_for_room = await maybe_awaitable(
-            self.store.get_aliases_for_room(room_id)
-        )
-
-        await self.store.update_aliases_for_room(
-            room_id, new_room_id, requester_user_id
-        )
-
-        return (
-            200,
-            {
-                "kicked_users": kicked_users,
-                "failed_to_kick_users": failed_to_kick_users,
-                "local_aliases": aliases_for_room,
-                "new_room_id": new_room_id,
-            },
-        )
-
-
-class ResetPasswordRestServlet(RestServlet):
-    """Post request to allow an administrator reset password for a user.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/reset_password/
-            @user:to_reset_password?access_token=admin_access_token
-        JsonBodyToSend:
-            {
-                "new_password": "secret"
-            }
-        Returns:
-            200 OK with empty object if success otherwise an error.
-        """
-
-    PATTERNS = historical_admin_path_patterns(
-        "/reset_password/(?P<target_user_id>[^/]*)"
-    )
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self._set_password_handler = hs.get_set_password_handler()
-
-    async def on_POST(self, request, target_user_id):
-        """Post request to allow an administrator reset password for a user.
-        This needs user to have administrator access in Synapse.
-        """
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        UserID.from_string(target_user_id)
-
-        params = parse_json_object_from_request(request)
-        assert_params_in_dict(params, ["new_password"])
-        new_password = params["new_password"]
-
-        await self._set_password_handler.set_password(
-            target_user_id, new_password, requester
-        )
-        return 200, {}
-
-
-class GetUsersPaginatedRestServlet(RestServlet):
-    """Get request to get specific number of users from Synapse.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/users_paginate/
-            @admin:user?access_token=admin_access_token&start=0&limit=10
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-        """
-
-    PATTERNS = historical_admin_path_patterns(
-        "/users_paginate/(?P<target_user_id>[^/]*)"
-    )
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, target_user_id):
-        """Get request to get specific number of users from Synapse.
-        This needs user to have administrator access in Synapse.
-        """
-        await assert_requester_is_admin(self.auth, request)
-
-        target_user = UserID.from_string(target_user_id)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        order = "name"  # order by name in user table
-        start = parse_integer(request, "start", required=True)
-        limit = parse_integer(request, "limit", required=True)
-
-        logger.info("limit: %s, start: %s", limit, start)
-
-        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
-        return 200, ret
-
-    async def on_POST(self, request, target_user_id):
-        """Post request to get specific number of users from Synapse..
-        This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/users_paginate/
-            @admin:user?access_token=admin_access_token
-        JsonBodyToSend:
-            {
-                "start": "0",
-                "limit": "10
-            }
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-        """
-        await assert_requester_is_admin(self.auth, request)
-        UserID.from_string(target_user_id)
-
-        order = "name"  # order by name in user table
-        params = parse_json_object_from_request(request)
-        assert_params_in_dict(params, ["limit", "start"])
-        limit = params["limit"]
-        start = params["start"]
-        logger.info("limit: %s, start: %s", limit, start)
-
-        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
-        return 200, ret
-
-
-class SearchUsersRestServlet(RestServlet):
-    """Get request to search user table for specific users according to
-    search term.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/search_users/
-            @admin:user?access_token=admin_access_token&term=alice
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, target_user_id):
-        """Get request to search user table for specific users according to
-        search term.
-        This needs user to have a administrator access in Synapse.
-        """
-        await assert_requester_is_admin(self.auth, request)
-
-        target_user = UserID.from_string(target_user_id)
-
-        # To allow all users to get the users list
-        # if not is_admin and target_user != auth_user:
-        #     raise AuthError(403, "You are not a server admin")
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        term = parse_string(request, "term", required=True)
-        logger.info("term: %s ", term)
-
-        ret = await self.handlers.admin_handler.search_users(term)
-        return 200, ret
-
-
-class DeleteGroupAdminRestServlet(RestServlet):
-    """Allows deleting of local groups
-    """
-
-    PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.group_server = hs.get_groups_server_handler()
-        self.is_mine_id = hs.is_mine_id
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, group_id):
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        if not self.is_mine_id(group_id):
-            raise SynapseError(400, "Can only delete local groups")
-
-        await self.group_server.delete_group(group_id, requester.user.to_string())
-        return 200, {}
-
-
-class AccountValidityRenewServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
-
-    def __init__(self, hs):
-        """
-        Args:
-            hs (synapse.server.HomeServer): server
-        """
-        self.hs = hs
-        self.account_activity_handler = hs.get_account_validity_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request):
-        await assert_requester_is_admin(self.auth, request)
-
-        body = parse_json_object_from_request(request)
-
-        if "user_id" not in body:
-            raise SynapseError(400, "Missing property 'user_id' in the request body")
-
-        expiration_ts = await self.account_activity_handler.renew_account_for_user(
-            body["user_id"],
-            body.get("expiration_ts"),
-            not body.get("enable_renewal_emails", True),
-        )
-
-        res = {"expiration_ts": expiration_ts}
-        return 200, res
-
-
 ########################################################################################
 #
 # please don't add more servlets here: this file is already long and unwieldy. Put
diff --git a/synapse/rest/admin/groups.py b/synapse/rest/admin/groups.py
new file mode 100644
index 0000000000..0b54ca09f4
--- /dev/null
+++ b/synapse/rest/admin/groups.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet
+from synapse.rest.admin._base import (
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class DeleteGroupAdminRestServlet(RestServlet):
+    """Allows deleting of local groups
+    """
+
+    PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, group_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        if not self.is_mine_id(group_id):
+            raise SynapseError(400, "Can only delete local groups")
+
+        await self.group_server.delete_group(group_id, requester.user.to_string())
+        return 200, {}
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
new file mode 100644
index 0000000000..f7cc5e9be9
--- /dev/null
+++ b/synapse/rest/admin/rooms.py
@@ -0,0 +1,157 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from synapse.api.constants import Membership
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin._base import (
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+from synapse.types import create_requester
+from synapse.util.async_helpers import maybe_awaitable
+
+logger = logging.getLogger(__name__)
+
+
+class ShutdownRoomRestServlet(RestServlet):
+    """Shuts down a room by removing all local users from the room and blocking
+    all future invites and joins to the room. Any local aliases will be repointed
+    to a new room created by `new_room_user_id` and kicked users will be auto
+    joined to the new room.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
+
+    DEFAULT_MESSAGE = (
+        "Sharing illegal content on this server is not permitted and rooms in"
+        " violation will be blocked."
+    )
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self._room_creation_handler = hs.get_room_creation_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        content = parse_json_object_from_request(request)
+        assert_params_in_dict(content, ["new_room_user_id"])
+        new_room_user_id = content["new_room_user_id"]
+
+        room_creator_requester = create_requester(new_room_user_id)
+
+        message = content.get("message", self.DEFAULT_MESSAGE)
+        room_name = content.get("room_name", "Content Violation Notification")
+
+        info = await self._room_creation_handler.create_room(
+            room_creator_requester,
+            config={
+                "preset": "public_chat",
+                "name": room_name,
+                "power_level_content_override": {"users_default": -10},
+            },
+            ratelimit=False,
+        )
+        new_room_id = info["room_id"]
+
+        requester_user_id = requester.user.to_string()
+
+        logger.info(
+            "Shutting down room %r, joining to new room: %r", room_id, new_room_id
+        )
+
+        # This will work even if the room is already blocked, but that is
+        # desirable in case the first attempt at blocking the room failed below.
+        await self.store.block_room(room_id, requester_user_id)
+
+        users = await self.state.get_current_users_in_room(room_id)
+        kicked_users = []
+        failed_to_kick_users = []
+        for user_id in users:
+            if not self.hs.is_mine_id(user_id):
+                continue
+
+            logger.info("Kicking %r from %r...", user_id, room_id)
+
+            try:
+                target_requester = create_requester(user_id)
+                await self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=room_id,
+                    action=Membership.LEAVE,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
+
+                await self.room_member_handler.forget(target_requester.user, room_id)
+
+                await self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=new_room_id,
+                    action=Membership.JOIN,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
+
+                kicked_users.append(user_id)
+            except Exception:
+                logger.exception(
+                    "Failed to leave old room and join new room for %r", user_id
+                )
+                failed_to_kick_users.append(user_id)
+
+        await self.event_creation_handler.create_and_send_nonmember_event(
+            room_creator_requester,
+            {
+                "type": "m.room.message",
+                "content": {"body": message, "msgtype": "m.text"},
+                "room_id": new_room_id,
+                "sender": new_room_user_id,
+            },
+            ratelimit=False,
+        )
+
+        aliases_for_room = await maybe_awaitable(
+            self.store.get_aliases_for_room(room_id)
+        )
+
+        await self.store.update_aliases_for_room(
+            room_id, new_room_id, requester_user_id
+        )
+
+        return (
+            200,
+            {
+                "kicked_users": kicked_users,
+                "failed_to_kick_users": failed_to_kick_users,
+                "local_aliases": aliases_for_room,
+                "new_room_id": new_room_id,
+            },
+        )
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index d5d124a0dc..58a83f93af 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -12,17 +12,419 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import hashlib
+import hmac
+import logging
 import re
 
-from synapse.api.errors import SynapseError
+from six import text_type
+from six.moves import http_client
+
+from synapse.api.constants import UserTypes
+from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import (
     RestServlet,
     assert_params_in_dict,
+    parse_integer,
     parse_json_object_from_request,
+    parse_string,
+)
+from synapse.rest.admin._base import (
+    assert_requester_is_admin,
+    assert_user_is_admin,
+    historical_admin_path_patterns,
 )
-from synapse.rest.admin import assert_requester_is_admin, assert_user_is_admin
 from synapse.types import UserID
 
+logger = logging.getLogger(__name__)
+
+
+class UsersRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.admin_handler = hs.get_handlers().admin_handler
+
+    async def on_GET(self, request, user_id):
+        target_user = UserID.from_string(user_id)
+        await assert_requester_is_admin(self.auth, request)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        ret = await self.admin_handler.get_users()
+
+        return 200, ret
+
+
+class GetUsersPaginatedRestServlet(RestServlet):
+    """Get request to get specific number of users from Synapse.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/users_paginate/
+            @admin:user?access_token=admin_access_token&start=0&limit=10
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/users_paginate/(?P<target_user_id>[^/]*)"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, target_user_id):
+        """Get request to get specific number of users from Synapse.
+        This needs user to have administrator access in Synapse.
+        """
+        await assert_requester_is_admin(self.auth, request)
+
+        target_user = UserID.from_string(target_user_id)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        order = "name"  # order by name in user table
+        start = parse_integer(request, "start", required=True)
+        limit = parse_integer(request, "limit", required=True)
+
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        return 200, ret
+
+    async def on_POST(self, request, target_user_id):
+        """Post request to get specific number of users from Synapse..
+        This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/users_paginate/
+            @admin:user?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "start": "0",
+                "limit": "10
+            }
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+        await assert_requester_is_admin(self.auth, request)
+        UserID.from_string(target_user_id)
+
+        order = "name"  # order by name in user table
+        params = parse_json_object_from_request(request)
+        assert_params_in_dict(params, ["limit", "start"])
+        limit = params["limit"]
+        start = params["start"]
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        return 200, ret
+
+
+class UserRegisterServlet(RestServlet):
+    """
+    Attributes:
+         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+         nonces (dict[str, int]): The nonces that we will accept. A dict of
+             nonce to the time it was generated, in int seconds.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/register")
+    NONCE_TIMEOUT = 60
+
+    def __init__(self, hs):
+        self.handlers = hs.get_handlers()
+        self.reactor = hs.get_reactor()
+        self.nonces = {}
+        self.hs = hs
+
+    def _clear_old_nonces(self):
+        """
+        Clear out old nonces that are older than NONCE_TIMEOUT.
+        """
+        now = int(self.reactor.seconds())
+
+        for k, v in list(self.nonces.items()):
+            if now - v > self.NONCE_TIMEOUT:
+                del self.nonces[k]
+
+    def on_GET(self, request):
+        """
+        Generate a new nonce.
+        """
+        self._clear_old_nonces()
+
+        nonce = self.hs.get_secrets().token_hex(64)
+        self.nonces[nonce] = int(self.reactor.seconds())
+        return 200, {"nonce": nonce}
+
+    async def on_POST(self, request):
+        self._clear_old_nonces()
+
+        if not self.hs.config.registration_shared_secret:
+            raise SynapseError(400, "Shared secret registration is not enabled")
+
+        body = parse_json_object_from_request(request)
+
+        if "nonce" not in body:
+            raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
+
+        nonce = body["nonce"]
+
+        if nonce not in self.nonces:
+            raise SynapseError(400, "unrecognised nonce")
+
+        # Delete the nonce, so it can't be reused, even if it's invalid
+        del self.nonces[nonce]
+
+        if "username" not in body:
+            raise SynapseError(
+                400, "username must be specified", errcode=Codes.BAD_JSON
+            )
+        else:
+            if (
+                not isinstance(body["username"], text_type)
+                or len(body["username"]) > 512
+            ):
+                raise SynapseError(400, "Invalid username")
+
+            username = body["username"].encode("utf-8")
+            if b"\x00" in username:
+                raise SynapseError(400, "Invalid username")
+
+        if "password" not in body:
+            raise SynapseError(
+                400, "password must be specified", errcode=Codes.BAD_JSON
+            )
+        else:
+            if (
+                not isinstance(body["password"], text_type)
+                or len(body["password"]) > 512
+            ):
+                raise SynapseError(400, "Invalid password")
+
+            password = body["password"].encode("utf-8")
+            if b"\x00" in password:
+                raise SynapseError(400, "Invalid password")
+
+        admin = body.get("admin", None)
+        user_type = body.get("user_type", None)
+
+        if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
+            raise SynapseError(400, "Invalid user type")
+
+        got_mac = body["mac"]
+
+        want_mac = hmac.new(
+            key=self.hs.config.registration_shared_secret.encode(),
+            digestmod=hashlib.sha1,
+        )
+        want_mac.update(nonce.encode("utf8"))
+        want_mac.update(b"\x00")
+        want_mac.update(username)
+        want_mac.update(b"\x00")
+        want_mac.update(password)
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
+        if user_type:
+            want_mac.update(b"\x00")
+            want_mac.update(user_type.encode("utf8"))
+        want_mac = want_mac.hexdigest()
+
+        if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
+            raise SynapseError(403, "HMAC incorrect")
+
+        # Reuse the parts of RegisterRestServlet to reduce code duplication
+        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+
+        register = RegisterRestServlet(self.hs)
+
+        user_id = await register.registration_handler.register_user(
+            localpart=body["username"].lower(),
+            password=body["password"],
+            admin=bool(admin),
+            user_type=user_type,
+        )
+
+        result = await register._create_registration_details(user_id, body)
+        return 200, result
+
+
+class WhoisRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, user_id):
+        target_user = UserID.from_string(user_id)
+        requester = await self.auth.get_user_by_req(request)
+        auth_user = requester.user
+
+        if target_user != auth_user:
+            await assert_user_is_admin(self.auth, auth_user)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only whois a local user")
+
+        ret = await self.handlers.admin_handler.get_whois(target_user)
+
+        return 200, ret
+
+
+class DeactivateAccountRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, target_user_id):
+        await assert_requester_is_admin(self.auth, request)
+        body = parse_json_object_from_request(request, allow_empty_body=True)
+        erase = body.get("erase", False)
+        if not isinstance(erase, bool):
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Param 'erase' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
+        UserID.from_string(target_user_id)
+
+        result = await self._deactivate_account_handler.deactivate_account(
+            target_user_id, erase
+        )
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        return 200, {"id_server_unbind_result": id_server_unbind_result}
+
+
+class AccountValidityRenewServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        self.hs = hs
+        self.account_activity_handler = hs.get_account_validity_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
+
+        body = parse_json_object_from_request(request)
+
+        if "user_id" not in body:
+            raise SynapseError(400, "Missing property 'user_id' in the request body")
+
+        expiration_ts = await self.account_activity_handler.renew_account_for_user(
+            body["user_id"],
+            body.get("expiration_ts"),
+            not body.get("enable_renewal_emails", True),
+        )
+
+        res = {"expiration_ts": expiration_ts}
+        return 200, res
+
+
+class ResetPasswordRestServlet(RestServlet):
+    """Post request to allow an administrator reset password for a user.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/reset_password/
+            @user:to_reset_password?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "new_password": "secret"
+            }
+        Returns:
+            200 OK with empty object if success otherwise an error.
+        """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/reset_password/(?P<target_user_id>[^/]*)"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self._set_password_handler = hs.get_set_password_handler()
+
+    async def on_POST(self, request, target_user_id):
+        """Post request to allow an administrator reset password for a user.
+        This needs user to have administrator access in Synapse.
+        """
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        UserID.from_string(target_user_id)
+
+        params = parse_json_object_from_request(request)
+        assert_params_in_dict(params, ["new_password"])
+        new_password = params["new_password"]
+
+        await self._set_password_handler.set_password(
+            target_user_id, new_password, requester
+        )
+        return 200, {}
+
+
+class SearchUsersRestServlet(RestServlet):
+    """Get request to search user table for specific users according to
+    search term.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/search_users/
+            @admin:user?access_token=admin_access_token&term=alice
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, target_user_id):
+        """Get request to search user table for specific users according to
+        search term.
+        This needs user to have a administrator access in Synapse.
+        """
+        await assert_requester_is_admin(self.auth, request)
+
+        target_user = UserID.from_string(target_user_id)
+
+        # To allow all users to get the users list
+        # if not is_admin and target_user != auth_user:
+        #     raise AuthError(403, "You are not a server admin")
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        term = parse_string(request, "term", required=True)
+        logger.info("term: %s ", term)
+
+        ret = await self.handlers.admin_handler.search_users(term)
+        return 200, ret
+
 
 class UserAdminServlet(RestServlet):
     """
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 24a0ce74f2..19eb15003d 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -92,8 +92,11 @@ class LoginRestServlet(RestServlet):
         self.auth_handler = self.hs.get_auth_handler()
         self.registration_handler = hs.get_registration_handler()
         self.handlers = hs.get_handlers()
+        self._clock = hs.get_clock()
         self._well_known_builder = WellKnownBuilder(hs)
         self._address_ratelimiter = Ratelimiter()
+        self._account_ratelimiter = Ratelimiter()
+        self._failed_attempts_ratelimiter = Ratelimiter()
 
     def on_GET(self, request):
         flows = []
@@ -202,6 +205,16 @@ class LoginRestServlet(RestServlet):
                 # (See add_threepid in synapse/handlers/auth.py)
                 address = address.lower()
 
+            # We also apply account rate limiting using the 3PID as a key, as
+            # otherwise using 3PID bypasses the ratelimiting based on user ID.
+            self._failed_attempts_ratelimiter.ratelimit(
+                (medium, address),
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=False,
+            )
+
             # Check for login providers that support 3pid login types
             (
                 canonical_user_id,
@@ -211,7 +224,8 @@ class LoginRestServlet(RestServlet):
             )
             if canonical_user_id:
                 # Authentication through password provider and 3pid succeeded
-                result = yield self._register_device_with_callback(
+
+                result = yield self._complete_login(
                     canonical_user_id, login_submission, callback_3pid
                 )
                 return result
@@ -225,6 +239,21 @@ class LoginRestServlet(RestServlet):
                 logger.warning(
                     "unknown 3pid identifier medium %s, address %r", medium, address
                 )
+                # We mark that we've failed to log in here, as
+                # `check_password_provider_3pid` might have returned `None` due
+                # to an incorrect password, rather than the account not
+                # existing.
+                #
+                # If it returned None but the 3PID was bound then we won't hit
+                # this code path, which is fine as then the per-user ratelimit
+                # will kick in below.
+                self._failed_attempts_ratelimiter.can_do_action(
+                    (medium, address),
+                    time_now_s=self._clock.time(),
+                    rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                    burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                    update=True,
+                )
                 raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
             identifier = {"type": "m.id.user", "user": user_id}
@@ -236,29 +265,84 @@ class LoginRestServlet(RestServlet):
         if "user" not in identifier:
             raise SynapseError(400, "User identifier is missing 'user' key")
 
-        canonical_user_id, callback = yield self.auth_handler.validate_login(
-            identifier["user"], login_submission
+        if identifier["user"].startswith("@"):
+            qualified_user_id = identifier["user"]
+        else:
+            qualified_user_id = UserID(identifier["user"], self.hs.hostname).to_string()
+
+        # Check if we've hit the failed ratelimit (but don't update it)
+        self._failed_attempts_ratelimiter.ratelimit(
+            qualified_user_id.lower(),
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
         )
 
-        result = yield self._register_device_with_callback(
+        try:
+            canonical_user_id, callback = yield self.auth_handler.validate_login(
+                identifier["user"], login_submission
+            )
+        except LoginError:
+            # The user has failed to log in, so we need to update the rate
+            # limiter. Using `can_do_action` avoids us raising a ratelimit
+            # exception and masking the LoginError. The actual ratelimiting
+            # should have happened above.
+            self._failed_attempts_ratelimiter.can_do_action(
+                qualified_user_id.lower(),
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=True,
+            )
+            raise
+
+        result = yield self._complete_login(
             canonical_user_id, login_submission, callback
         )
         return result
 
     @defer.inlineCallbacks
-    def _register_device_with_callback(self, user_id, login_submission, callback=None):
-        """ Registers a device with a given user_id. Optionally run a callback
-        function after registration has completed.
+    def _complete_login(
+        self, user_id, login_submission, callback=None, create_non_existant_users=False
+    ):
+        """Called when we've successfully authed the user and now need to
+        actually login them in (e.g. create devices). This gets called on
+        all succesful logins.
+
+        Applies the ratelimiting for succesful login attempts against an
+        account.
 
         Args:
             user_id (str): ID of the user to register.
             login_submission (dict): Dictionary of login information.
             callback (func|None): Callback function to run after registration.
+            create_non_existant_users (bool): Whether to create the user if
+                they don't exist. Defaults to False.
 
         Returns:
             result (Dict[str,str]): Dictionary of account information after
                 successful registration.
         """
+
+        # Before we actually log them in we check if they've already logged in
+        # too often. This happens here rather than before as we don't
+        # necessarily know the user before now.
+        self._account_ratelimiter.ratelimit(
+            user_id.lower(),
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_account.per_second,
+            burst_count=self.hs.config.rc_login_account.burst_count,
+            update=True,
+        )
+
+        if create_non_existant_users:
+            user_id = yield self.auth_handler.check_user_exists(user_id)
+            if not user_id:
+                user_id = yield self.registration_handler.register_user(
+                    localpart=UserID.from_string(user_id).localpart
+                )
+
         device_id = login_submission.get("device_id")
         initial_display_name = login_submission.get("initial_device_display_name")
         device_id, access_token = yield self.registration_handler.register_device(
@@ -285,7 +369,7 @@ class LoginRestServlet(RestServlet):
             token
         )
 
-        result = yield self._register_device_with_callback(user_id, login_submission)
+        result = yield self._complete_login(user_id, login_submission)
         return result
 
     @defer.inlineCallbacks
@@ -313,15 +397,8 @@ class LoginRestServlet(RestServlet):
             raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID(user, self.hs.hostname).to_string()
-
-        registered_user_id = yield self.auth_handler.check_user_exists(user_id)
-        if not registered_user_id:
-            registered_user_id = yield self.registration_handler.register_user(
-                localpart=user
-            )
-
-        result = yield self._register_device_with_callback(
-            registered_user_id, login_submission
+        result = yield self._complete_login(
+            user_id, login_submission, create_non_existant_users=True
         )
         return result
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 86bbcc0eea..711d4ad304 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -714,7 +714,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
             target = UserID.from_string(content["user_id"])
 
         event_content = None
-        if "reason" in content and membership_action in ["kick", "ban"]:
+        if "reason" in content:
             event_content = {"reason": content["reason"]}
 
         await self.room_member_handler.update_membership(
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index f26eae794c..ad674239ab 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -642,6 +642,7 @@ class ThreepidAddRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
         requester = yield self.auth.get_user_by_req(request)
@@ -652,6 +653,10 @@ class ThreepidAddRestServlet(RestServlet):
         client_secret = body["client_secret"]
         sid = body["sid"]
 
+        yield self.auth_handler.validate_user_via_ui_auth(
+            requester, body, self.hs.get_ip_from_request(request)
+        )
+
         validation_session = yield self.identity_handler.validate_threepid_session(
             client_secret, sid
         )
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index d596786430..d83ac8e3c5 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -134,8 +134,8 @@ class RoomKeysServlet(RestServlet):
         if room_id:
             body = {"rooms": {room_id: body}}
 
-        yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
-        return 200, {}
+        ret = yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
+        return 200, ret
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id, session_id):
@@ -239,10 +239,10 @@ class RoomKeysServlet(RestServlet):
         user_id = requester.user.to_string()
         version = parse_string(request, "version")
 
-        yield self.e2e_room_keys_handler.delete_room_keys(
+        ret = yield self.e2e_room_keys_handler.delete_room_keys(
             user_id, version, room_id, session_id
         )
-        return 200, {}
+        return 200, ret
 
 
 class RoomKeysNewVersionServlet(RestServlet):
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 531d923f76..fb0d02aa83 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -56,6 +56,9 @@ logger = logging.getLogger(__name__)
 _charset_match = re.compile(br"<\s*meta[^>]*charset\s*=\s*([a-z0-9-]+)", flags=re.I)
 _content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
 
+OG_TAG_NAME_MAXLEN = 50
+OG_TAG_VALUE_MAXLEN = 1000
+
 
 class PreviewUrlResource(DirectServeResource):
     isLeaf = True
@@ -74,8 +77,8 @@ class PreviewUrlResource(DirectServeResource):
             treq_args={"browser_like_redirects": True},
             ip_whitelist=hs.config.url_preview_ip_range_whitelist,
             ip_blacklist=hs.config.url_preview_ip_range_blacklist,
-            http_proxy=os.getenv("http_proxy"),
-            https_proxy=os.getenv("HTTPS_PROXY"),
+            http_proxy=os.getenvb(b"http_proxy"),
+            https_proxy=os.getenvb(b"HTTPS_PROXY"),
         )
         self.media_repo = media_repo
         self.primary_base_path = media_repo.primary_base_path
@@ -119,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
                 pattern = entry[attrib]
                 value = getattr(url_tuple, attrib)
                 logger.debug(
-                    "Matching attrib '%s' with value '%s' against" " pattern '%s'",
+                    "Matching attrib '%s' with value '%s' against pattern '%s'",
                     attrib,
                     value,
                     pattern,
@@ -171,7 +174,7 @@ class PreviewUrlResource(DirectServeResource):
             ts (int):
 
         Returns:
-            Deferred[str]: json-encoded og data
+            Deferred[bytes]: json-encoded og data
         """
         # check the URL cache in the DB (which will also provide us with
         # historical previews, if we have any)
@@ -272,6 +275,18 @@ class PreviewUrlResource(DirectServeResource):
             logger.warning("Failed to find any OG data in %s", url)
             og = {}
 
+        # filter out any stupidly long values
+        keys_to_remove = []
+        for k, v in og.items():
+            # values can be numeric as well as strings, hence the cast to str
+            if len(k) > OG_TAG_NAME_MAXLEN or len(str(v)) > OG_TAG_VALUE_MAXLEN:
+                logger.warning(
+                    "Pruning overlong tag %s from OG data", k[:OG_TAG_NAME_MAXLEN]
+                )
+                keys_to_remove.append(k)
+        for k in keys_to_remove:
+            del og[k]
+
         logger.debug("Calculated OG for %s as %s", url, og)
 
         jsonog = json.dumps(og)
@@ -506,6 +521,10 @@ def _calc_og(tree, media_uri):
     og = {}
     for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
         if "content" in tag.attrib:
+            # if we've got more than 50 tags, someone is taking the piss
+            if len(og) >= 50:
+                logger.warning("Skipping OG for page with too many 'og:' tags")
+                return {}
             og[tag.attrib["property"]] = tag.attrib["content"]
 
     # TODO: grab article: meta tags too, e.g.:
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index 8cf415e29d..c234ea7421 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -129,5 +129,8 @@ class Thumbnailer(object):
 
     def _encode_image(self, output_image, output_type):
         output_bytes_io = BytesIO()
-        output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80)
+        fmt = self.FORMATS[output_type]
+        if fmt == "JPEG":
+            output_image = output_image.convert("RGB")
+        output_image.save(output_bytes_io, fmt, quality=80)
         return output_bytes_io
diff --git a/synapse/server.py b/synapse/server.py
index 90c3b072e8..be9af7f986 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -318,8 +318,8 @@ class HomeServer(object):
     def build_proxied_http_client(self):
         return SimpleHttpClient(
             self,
-            http_proxy=os.getenv("http_proxy"),
-            https_proxy=os.getenv("HTTPS_PROXY"),
+            http_proxy=os.getenvb(b"http_proxy"),
+            https_proxy=os.getenvb(b"HTTPS_PROXY"),
         )
 
     def build_room_creation_handler(self):
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 415e9c17d8..5736c56032 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -54,7 +54,7 @@ class ConsentServerNotices(object):
                 )
             if "body" not in self._server_notice_content:
                 raise ConfigError(
-                    "user_consent server_notice_consent must contain a 'body' " "key."
+                    "user_consent server_notice_consent must contain a 'body' key."
                 )
 
             self._consent_uri_builder = ConsentURIBuilder(hs.config)
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 4e91eb66fe..139beef8ed 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -16,6 +16,7 @@
 
 import logging
 from collections import namedtuple
+from typing import Iterable, Optional
 
 from six import iteritems, itervalues
 
@@ -27,6 +28,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
+from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.logging.utils import log_function
 from synapse.state import v1, v2
@@ -212,15 +214,17 @@ class StateHandler(object):
         return joined_hosts
 
     @defer.inlineCallbacks
-    def compute_event_context(self, event, old_state=None):
+    def compute_event_context(
+        self, event: EventBase, old_state: Optional[Iterable[EventBase]] = None
+    ):
         """Build an EventContext structure for the event.
 
         This works out what the current state should be for the event, and
         generates a new state group if necessary.
 
         Args:
-            event (synapse.events.EventBase):
-            old_state (dict|None): The state at the event if it can't be
+            event:
+            old_state: The state at the event if it can't be
                 calculated from existing events. This is normally only specified
                 when receiving an event from federation where we don't have the
                 prev events for, e.g. when backfilling.
@@ -232,6 +236,9 @@ class StateHandler(object):
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
+
+            # FIXME: why do we populate current_state_ids? I thought the point was
+            # that we weren't supposed to have any state for outliers?
             if old_state:
                 prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
                 if event.is_state():
@@ -248,113 +255,103 @@ class StateHandler(object):
             # group for it.
             context = EventContext.with_state(
                 state_group=None,
+                state_group_before_event=None,
                 current_state_ids=current_state_ids,
                 prev_state_ids=prev_state_ids,
             )
 
             return context
 
+        #
+        # first of all, figure out the state before the event
+        #
+
         if old_state:
-            # We already have the state, so we don't need to calculate it.
-            # Let's just correctly fill out the context and create a
-            # new state group for it.
-
-            prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
-
-            if event.is_state():
-                key = (event.type, event.state_key)
-                if key in prev_state_ids:
-                    replaces = prev_state_ids[key]
-                    if replaces != event.event_id:  # Paranoia check
-                        event.unsigned["replaces_state"] = replaces
-                current_state_ids = dict(prev_state_ids)
-                current_state_ids[key] = event.event_id
-            else:
-                current_state_ids = prev_state_ids
+            # if we're given the state before the event, then we use that
+            state_ids_before_event = {
+                (s.type, s.state_key): s.event_id for s in old_state
+            }
+            state_group_before_event = None
+            state_group_before_event_prev_group = None
+            deltas_to_state_group_before_event = None
 
-            state_group = yield self.state_store.store_state_group(
-                event.event_id,
-                event.room_id,
-                prev_group=None,
-                delta_ids=None,
-                current_state_ids=current_state_ids,
-            )
+        else:
+            # otherwise, we'll need to resolve the state across the prev_events.
+            logger.debug("calling resolve_state_groups from compute_event_context")
 
-            context = EventContext.with_state(
-                state_group=state_group,
-                current_state_ids=current_state_ids,
-                prev_state_ids=prev_state_ids,
+            entry = yield self.resolve_state_groups_for_events(
+                event.room_id, event.prev_event_ids()
             )
 
-            return context
+            state_ids_before_event = entry.state
+            state_group_before_event = entry.state_group
+            state_group_before_event_prev_group = entry.prev_group
+            deltas_to_state_group_before_event = entry.delta_ids
 
-        logger.debug("calling resolve_state_groups from compute_event_context")
+        #
+        # make sure that we have a state group at that point. If it's not a state event,
+        # that will be the state group for the new event. If it *is* a state event,
+        # it might get rejected (in which case we'll need to persist it with the
+        # previous state group)
+        #
 
-        entry = yield self.resolve_state_groups_for_events(
-            event.room_id, event.prev_event_ids()
-        )
+        if not state_group_before_event:
+            state_group_before_event = yield self.state_store.store_state_group(
+                event.event_id,
+                event.room_id,
+                prev_group=state_group_before_event_prev_group,
+                delta_ids=deltas_to_state_group_before_event,
+                current_state_ids=state_ids_before_event,
+            )
 
-        prev_state_ids = entry.state
-        prev_group = None
-        delta_ids = None
+            # XXX: can we update the state cache entry for the new state group? or
+            # could we set a flag on resolve_state_groups_for_events to tell it to
+            # always make a state group?
+
+        #
+        # now if it's not a state event, we're done
+        #
+
+        if not event.is_state():
+            return EventContext.with_state(
+                state_group_before_event=state_group_before_event,
+                state_group=state_group_before_event,
+                current_state_ids=state_ids_before_event,
+                prev_state_ids=state_ids_before_event,
+                prev_group=state_group_before_event_prev_group,
+                delta_ids=deltas_to_state_group_before_event,
+            )
 
-        if event.is_state():
-            # If this is a state event then we need to create a new state
-            # group for the state after this event.
+        #
+        # otherwise, we'll need to create a new state group for after the event
+        #
 
-            key = (event.type, event.state_key)
-            if key in prev_state_ids:
-                replaces = prev_state_ids[key]
+        key = (event.type, event.state_key)
+        if key in state_ids_before_event:
+            replaces = state_ids_before_event[key]
+            if replaces != event.event_id:
                 event.unsigned["replaces_state"] = replaces
 
-            current_state_ids = dict(prev_state_ids)
-            current_state_ids[key] = event.event_id
-
-            if entry.state_group:
-                # If the state at the event has a state group assigned then
-                # we can use that as the prev group
-                prev_group = entry.state_group
-                delta_ids = {key: event.event_id}
-            elif entry.prev_group:
-                # If the state at the event only has a prev group, then we can
-                # use that as a prev group too.
-                prev_group = entry.prev_group
-                delta_ids = dict(entry.delta_ids)
-                delta_ids[key] = event.event_id
-
-            state_group = yield self.state_store.store_state_group(
-                event.event_id,
-                event.room_id,
-                prev_group=prev_group,
-                delta_ids=delta_ids,
-                current_state_ids=current_state_ids,
-            )
-        else:
-            current_state_ids = prev_state_ids
-            prev_group = entry.prev_group
-            delta_ids = entry.delta_ids
-
-            if entry.state_group is None:
-                entry.state_group = yield self.state_store.store_state_group(
-                    event.event_id,
-                    event.room_id,
-                    prev_group=entry.prev_group,
-                    delta_ids=entry.delta_ids,
-                    current_state_ids=current_state_ids,
-                )
-                entry.state_id = entry.state_group
-
-            state_group = entry.state_group
-
-        context = EventContext.with_state(
-            state_group=state_group,
-            current_state_ids=current_state_ids,
-            prev_state_ids=prev_state_ids,
-            prev_group=prev_group,
+        state_ids_after_event = dict(state_ids_before_event)
+        state_ids_after_event[key] = event.event_id
+        delta_ids = {key: event.event_id}
+
+        state_group_after_event = yield self.state_store.store_state_group(
+            event.event_id,
+            event.room_id,
+            prev_group=state_group_before_event,
             delta_ids=delta_ids,
+            current_state_ids=state_ids_after_event,
         )
 
-        return context
+        return EventContext.with_state(
+            state_group=state_group_after_event,
+            state_group_before_event=state_group_before_event,
+            current_state_ids=state_ids_after_event,
+            prev_state_ids=state_ids_before_event,
+            prev_group=state_group_before_event,
+            delta_ids=delta_ids,
+        )
 
     @measure_func()
     @defer.inlineCallbacks
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0a1a8cc1e5..0460fe8cc9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
 from synapse.storage.data_stores import DataStores
 from synapse.storage.data_stores.main import DataStore
 from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.purge_events import PurgeEventsStorage
 from synapse.storage.state import StateGroupStorage
 
 __all__ = ["DataStores", "DataStore"]
@@ -46,6 +47,7 @@ class Storage(object):
         self.main = stores.main
 
         self.persistence = EventsPersistenceStorage(hs, stores)
+        self.purge_events = PurgeEventsStorage(hs, stores)
         self.state = StateGroupStorage(hs, stores)
 
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1a2b7ebe25..459901ac60 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -361,14 +361,11 @@ class SQLBaseStore(object):
                 expiration_ts,
             )
 
-        self._simple_insert_txn(
+        self._simple_upsert_txn(
             txn,
             "account_validity",
-            values={
-                "user_id": user_id,
-                "expiration_ts_ms": expiration_ts,
-                "email_sent": False,
-            },
+            keyvalues={"user_id": user_id},
+            values={"expiration_ts_ms": expiration_ts, "email_sent": False},
         )
 
     def start_profiling(self):
@@ -412,16 +409,15 @@ class SQLBaseStore(object):
             i = 0
             N = 5
             while True:
+                cursor = LoggingTransaction(
+                    conn.cursor(),
+                    name,
+                    self.database_engine,
+                    after_callbacks,
+                    exception_callbacks,
+                )
                 try:
-                    txn = conn.cursor()
-                    txn = LoggingTransaction(
-                        txn,
-                        name,
-                        self.database_engine,
-                        after_callbacks,
-                        exception_callbacks,
-                    )
-                    r = func(txn, *args, **kwargs)
+                    r = func(cursor, *args, **kwargs)
                     conn.commit()
                     return r
                 except self.database_engine.module.OperationalError as e:
@@ -459,6 +455,40 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
+                finally:
+                    # we're either about to retry with a new cursor, or we're about to
+                    # release the connection. Once we release the connection, it could
+                    # get used for another query, which might do a conn.rollback().
+                    #
+                    # In the latter case, even though that probably wouldn't affect the
+                    # results of this transaction, python's sqlite will reset all
+                    # statements on the connection [1], which will make our cursor
+                    # invalid [2].
+                    #
+                    # In any case, continuing to read rows after commit()ing seems
+                    # dubious from the PoV of ACID transactional semantics
+                    # (sqlite explicitly says that once you commit, you may see rows
+                    # from subsequent updates.)
+                    #
+                    # In psycopg2, cursors are essentially a client-side fabrication -
+                    # all the data is transferred to the client side when the statement
+                    # finishes executing - so in theory we could go on streaming results
+                    # from the cursor, but attempting to do so would make us
+                    # incompatible with sqlite, so let's make sure we're not doing that
+                    # by closing the cursor.
+                    #
+                    # (*named* cursors in psycopg2 are different and are proper server-
+                    # side things, but (a) we don't use them and (b) they are implicitly
+                    # closed by ending the transaction anyway.)
+                    #
+                    # In short, if we haven't finished with the cursor yet, that's a
+                    # problem waiting to bite us.
+                    #
+                    # TL;DR: we're done with the cursor, so we can close it.
+                    #
+                    # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+                    # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+                    cursor.close()
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
@@ -854,7 +884,7 @@ class SQLBaseStore(object):
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
+        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index 6afbfc0d74..22093484ed 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
             current_id(int): The position to fetch up to.
         Returns:
             A deferred pair of lists of tuples of stream_id int, user_id string,
-            room_id string, type string, and content string.
+            room_id string, and type string.
         """
         if last_room_id == current_id and last_global_id == current_id:
             return defer.succeed(([], []))
 
         def get_updated_account_data_txn(txn):
             sql = (
-                "SELECT stream_id, user_id, account_data_type, content"
+                "SELECT stream_id, user_id, account_data_type"
                 " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC LIMIT ?"
             )
@@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             global_results = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, room_id, account_data_type, content"
+                "SELECT stream_id, user_id, room_id, account_data_type"
                 " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC LIMIT ?"
             )
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index f04aad0743..a23744f11c 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
-        sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
-        txn.execute(sql, (stream_id, stream_id))
+        # Compatible method of performing an upsert
+        sql = "SELECT stream_id FROM device_max_stream_id"
+
+        txn.execute(sql)
+        rows = txn.fetchone()
+        if rows:
+            db_stream_id = rows[0]
+            if db_stream_id < stream_id:
+                # Insert the new stream_id
+                sql = "UPDATE device_max_stream_id SET stream_id = ?"
+        else:
+            # No rows, perform an insert
+            sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
+
+        txn.execute(sql, (stream_id,))
 
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
@@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
+                sql = "SELECT device_id FROM devices WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
                 message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 1cbbae5b63..113224fd7c 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
 
 class EndToEndRoomKeyStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_e2e_room_key(self, user_id, version, room_id, session_id):
-        """Get the encrypted E2E room key for a given session from a given
-        backup version of room_keys.  We only store the 'best' room key for a given
-        session at a given time, as determined by the handler.
-
-        Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): the ID of the room whose keys we're querying.
-                This is a bit redundant as it's implied by the session_id, but
-                we include for consistency with the rest of the API.
-            session_id(str): the session whose room_key we're querying.
-
-        Returns:
-            A deferred dict giving the session_data and message metadata for
-            this room key.
-        """
-
-        row = yield self._simple_select_one(
-            table="e2e_room_keys",
-            keyvalues={
-                "user_id": user_id,
-                "version": version,
-                "room_id": room_id,
-                "session_id": session_id,
-            },
-            retcols=(
-                "first_message_index",
-                "forwarded_count",
-                "is_verified",
-                "session_data",
-            ),
-            desc="get_e2e_room_key",
-        )
-
-        row["session_data"] = json.loads(row["session_data"])
-
-        return row
-
-    @defer.inlineCallbacks
-    def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Replaces or inserts the encrypted E2E room key for a given session in
-        a given backup
+    def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Replaces the encrypted E2E room key for a given session in a given backup
 
         Args:
             user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             StoreError
         """
 
-        yield self._simple_upsert(
+        yield self._simple_update_one(
             table="e2e_room_keys",
             keyvalues={
                 "user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "room_id": room_id,
                 "session_id": session_id,
             },
-            values={
+            updatevalues={
                 "first_message_index": room_key["first_message_index"],
                 "forwarded_count": room_key["forwarded_count"],
                 "is_verified": room_key["is_verified"],
                 "session_data": json.dumps(room_key["session_data"]),
             },
-            lock=False,
+            desc="update_e2e_room_key",
         )
-        log_kv(
-            {
-                "message": "Set room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "room_key": room_key,
-            }
+
+    @defer.inlineCallbacks
+    def add_e2e_room_keys(self, user_id, version, room_keys):
+        """Bulk add room keys to a given backup.
+
+        Args:
+            user_id (str): the user whose backup we're adding to
+            version (str): the version ID of the backup for the set of keys we're adding to
+            room_keys (iterable[(str, str, dict)]): the keys to add, in the form
+                (roomID, sessionID, keyData)
+        """
+
+        values = []
+        for (room_id, session_id, room_key) in room_keys:
+            values.append(
+                {
+                    "user_id": user_id,
+                    "version": version,
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "first_message_index": room_key["first_message_index"],
+                    "forwarded_count": room_key["forwarded_count"],
+                    "is_verified": room_key["is_verified"],
+                    "session_data": json.dumps(room_key["session_data"]),
+                }
+            )
+            log_kv(
+                {
+                    "message": "Set room key",
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "room_key": room_key,
+                }
+            )
+
+        yield self._simple_insert_many(
+            table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
         )
 
     @trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         room, or a given session.
 
         Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup for the set of keys we're querying
+            room_id (str): Optional. the ID of the room whose keys we're querying, if any.
                 If not specified, we return the keys for all the rooms in the backup.
-            session_id(str): Optional. the session whose room_key we're querying, if any.
+            session_id (str): Optional. the session whose room_key we're querying, if any.
                 If specified, we also require the room_id to be specified.
                 If not specified, we return all the keys in this version of
                 the backup (or for the specified room)
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+        """Get multiple room keys at a time.  The difference between this function and
+        get_e2e_room_keys is that this function can be used to retrieve
+        multiple specific keys at a time, whereas get_e2e_room_keys is used for
+        getting all the keys in a backup version, all the keys for a room, or a
+        specific key.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+            room_keys (dict[str, dict[str, iterable[str]]]): a map from
+                room ID -> {"session": [session ids]} indicating the session IDs
+                that we want to query
+
+        Returns:
+           Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+        """
+
+        return self.runInteraction(
+            "get_e2e_room_keys_multi",
+            self._get_e2e_room_keys_multi_txn,
+            user_id,
+            version,
+            room_keys,
+        )
+
+    @staticmethod
+    def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
+        if not room_keys:
+            return {}
+
+        where_clauses = []
+        params = [user_id, version]
+        for room_id, room in room_keys.items():
+            sessions = list(room["sessions"])
+            if not sessions:
+                continue
+            params.append(room_id)
+            params.extend(sessions)
+            where_clauses.append(
+                "(room_id = ? AND session_id IN (%s))"
+                % (",".join(["?" for _ in sessions]),)
+            )
+
+        # check if we're actually querying something
+        if not where_clauses:
+            return {}
+
+        sql = """
+        SELECT room_id, session_id, first_message_index, forwarded_count,
+               is_verified, session_data
+        FROM e2e_room_keys
+        WHERE user_id = ? AND version = ? AND (%s)
+        """ % (
+            " OR ".join(where_clauses)
+        )
+
+        txn.execute(sql, params)
+
+        ret = {}
+
+        for row in txn:
+            room_id = row[0]
+            session_id = row[1]
+            ret.setdefault(room_id, {})
+            ret[room_id][session_id] = {
+                "first_message_index": row[2],
+                "forwarded_count": row[3],
+                "is_verified": row[4],
+                "session_data": json.loads(row[5]),
+            }
+
+        return ret
+
+    def count_e2e_room_keys(self, user_id, version):
+        """Get the number of keys in a backup version.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+        """
+
+        return self._simple_select_one_onecol(
+            table="e2e_room_keys",
+            keyvalues={"user_id": user_id, "version": version},
+            retcol="COUNT(*)",
+            desc="count_e2e_room_keys",
+        )
+
     @trace
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 version(str)
                 algorithm(str)
                 auth_data(object): opaque dict supplied by the client
+                etag(int): tag of the keys in the backup
         """
 
         def _get_e2e_room_keys_version_info_txn(txn):
@@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 txn,
                 table="e2e_room_keys_versions",
                 keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
-                retcols=("version", "algorithm", "auth_data"),
+                retcols=("version", "algorithm", "auth_data", "etag"),
             )
             result["auth_data"] = json.loads(result["auth_data"])
             result["version"] = str(result["version"])
+            if result["etag"] is None:
+                result["etag"] = 0
             return result
 
         return self.runInteraction(
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
 
     @trace
-    def update_e2e_room_keys_version(self, user_id, version, info):
+    def update_e2e_room_keys_version(
+        self, user_id, version, info=None, version_etag=None
+    ):
         """Update a given backup version
 
         Args:
             user_id(str): the user whose backup version we're updating
             version(str): the version ID of the backup version we're updating
-            info(dict): the new backup version info to store
+            info (dict): the new backup version info to store.  If None, then
+                the backup version info is not updated
+            version_etag (Optional[int]): etag of the keys in the backup.  If
+                None, then the etag is not updated
         """
+        updatevalues = {}
 
-        return self._simple_update(
-            table="e2e_room_keys_versions",
-            keyvalues={"user_id": user_id, "version": version},
-            updatevalues={"auth_data": json.dumps(info["auth_data"])},
-            desc="update_e2e_room_keys_version",
-        )
+        if info is not None and "auth_data" in info:
+            updatevalues["auth_data"] = json.dumps(info["auth_data"])
+        if version_etag is not None:
+            updatevalues["etag"] = version_etag
+
+        if updatevalues:
+            return self._simple_update(
+                table="e2e_room_keys_versions",
+                keyvalues={"user_id": user_id, "version": version},
+                updatevalues=updatevalues,
+                desc="update_e2e_room_keys_version",
+            )
 
     @trace
     def delete_e2e_room_keys_version(self, user_id, version=None):
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 073412a78d..d8ad59ad93 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 result.setdefault(user_id, {})[device_id] = None
 
         # get signatures on the device
-        signature_sql = (
-            "SELECT * " "  FROM e2e_cross_signing_signatures " " WHERE %s"
-        ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
+        signature_sql = ("SELECT *  FROM e2e_cross_signing_signatures WHERE %s") % (
+            " OR ".join("(" + q + ")" for q in signature_query_clauses)
+        )
 
         txn.execute(signature_sql, signature_query_params)
         rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 301f8ea128..2737a1d3ae 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -713,9 +713,7 @@ class EventsStore(
 
                 metadata_json = encode_json(event.internal_metadata.get_dict())
 
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
-                )
+                sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
                 txn.execute(sql, (metadata_json, event.event_id))
 
                 # Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
                     },
                 )
 
-                sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+                sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
                 txn.execute(sql, (False, event.event_id))
 
                 # Update the event_backward_extremities table now that this
@@ -929,6 +927,9 @@ class EventsStore(
             elif event.type == EventTypes.Redaction:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
+            elif event.type == EventTypes.Retention:
+                # Update the room_retention table.
+                self._store_retention_policy_for_room_txn(txn, event)
 
             self._handle_event_relations(txn, event)
 
@@ -1375,6 +1376,10 @@ class EventsStore(
                 if True, we will delete local events as well as remote ones
                 (instead of just marking them as outliers and deleting their
                 state groups).
+
+        Returns:
+            Deferred[set[int]]: The set of state groups that are referenced by
+            deleted events.
         """
 
         return self.runInteraction(
@@ -1475,7 +1480,7 @@ class EventsStore(
 
         # We do joins against events_to_purge for e.g. calculating state
         # groups to purge, etc., so lets make an index.
-        txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
+        txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
 
         txn.execute("SELECT event_id, should_delete FROM events_to_purge")
         event_rows = txn.fetchall()
@@ -1511,11 +1516,10 @@ class EventsStore(
             [(room_id, event_id) for event_id, in new_backwards_extrems],
         )
 
-        logger.info("[purge] finding redundant state groups")
+        logger.info("[purge] finding state groups referenced by deleted events")
 
         # Get all state groups that are referenced by events that are to be
-        # deleted. We then go and check if they are referenced by other events
-        # or state groups, and if not we delete them.
+        # deleted.
         txn.execute(
             """
             SELECT DISTINCT state_group FROM events_to_purge
@@ -1528,60 +1532,6 @@ class EventsStore(
             "[purge] found %i referenced state groups", len(referenced_state_groups)
         )
 
-        logger.info("[purge] finding state groups that can be deleted")
-
-        _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
-        state_groups_to_delete, remaining_state_groups = _
-
-        logger.info(
-            "[purge] found %i state groups to delete", len(state_groups_to_delete)
-        )
-
-        logger.info(
-            "[purge] de-delta-ing %i remaining state groups",
-            len(remaining_state_groups),
-        )
-
-        # Now we turn the state groups that reference to-be-deleted state
-        # groups to non delta versions.
-        for sg in remaining_state_groups:
-            logger.info("[purge] de-delta-ing remaining state group %s", sg)
-            curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
-            curr_state = curr_state[sg]
-
-            self._simple_delete_txn(
-                txn, table="state_groups_state", keyvalues={"state_group": sg}
-            )
-
-            self._simple_delete_txn(
-                txn, table="state_group_edges", keyvalues={"state_group": sg}
-            )
-
-            self._simple_insert_many_txn(
-                txn,
-                table="state_groups_state",
-                values=[
-                    {
-                        "state_group": sg,
-                        "room_id": room_id,
-                        "type": key[0],
-                        "state_key": key[1],
-                        "event_id": state_id,
-                    }
-                    for key, state_id in iteritems(curr_state)
-                ],
-            )
-
-        logger.info("[purge] removing redundant state groups")
-        txn.executemany(
-            "DELETE FROM state_groups_state WHERE state_group = ?",
-            ((sg,) for sg in state_groups_to_delete),
-        )
-        txn.executemany(
-            "DELETE FROM state_groups WHERE id = ?",
-            ((sg,) for sg in state_groups_to_delete),
-        )
-
         logger.info("[purge] removing events from event_to_state_groups")
         txn.execute(
             "DELETE FROM event_to_state_groups "
@@ -1668,138 +1618,35 @@ class EventsStore(
 
         logger.info("[purge] done")
 
-    def _find_unreferenced_groups_during_purge(self, txn, state_groups):
-        """Used when purging history to figure out which state groups can be
-        deleted and which need to be de-delta'ed (due to one of its prev groups
-        being scheduled for deletion).
-
-        Args:
-            txn
-            state_groups (set[int]): Set of state groups referenced by events
-                that are going to be deleted.
-
-        Returns:
-            tuple[set[int], set[int]]: The set of state groups that can be
-            deleted and the set of state groups that need to be de-delta'ed
-        """
-        # Graph of state group -> previous group
-        graph = {}
-
-        # Set of events that we have found to be referenced by events
-        referenced_groups = set()
-
-        # Set of state groups we've already seen
-        state_groups_seen = set(state_groups)
-
-        # Set of state groups to handle next.
-        next_to_search = set(state_groups)
-        while next_to_search:
-            # We bound size of groups we're looking up at once, to stop the
-            # SQL query getting too big
-            if len(next_to_search) < 100:
-                current_search = next_to_search
-                next_to_search = set()
-            else:
-                current_search = set(itertools.islice(next_to_search, 100))
-                next_to_search -= current_search
-
-            # Check if state groups are referenced
-            sql = """
-                SELECT DISTINCT state_group FROM event_to_state_groups
-                LEFT JOIN events_to_purge AS ep USING (event_id)
-                WHERE ep.event_id IS NULL AND
-            """
-            clause, args = make_in_list_sql_clause(
-                txn.database_engine, "state_group", current_search
-            )
-            txn.execute(sql + clause, list(args))
-
-            referenced = set(sg for sg, in txn)
-            referenced_groups |= referenced
-
-            # We don't continue iterating up the state group graphs for state
-            # groups that are referenced.
-            current_search -= referenced
-
-            rows = self._simple_select_many_txn(
-                txn,
-                table="state_group_edges",
-                column="prev_state_group",
-                iterable=current_search,
-                keyvalues={},
-                retcols=("prev_state_group", "state_group"),
-            )
-
-            prevs = set(row["state_group"] for row in rows)
-            # We don't bother re-handling groups we've already seen
-            prevs -= state_groups_seen
-            next_to_search |= prevs
-            state_groups_seen |= prevs
-
-            for row in rows:
-                # Note: Each state group can have at most one prev group
-                graph[row["state_group"]] = row["prev_state_group"]
-
-        to_delete = state_groups_seen - referenced_groups
-
-        to_dedelta = set()
-        for sg in referenced_groups:
-            prev_sg = graph.get(sg)
-            if prev_sg and prev_sg in to_delete:
-                to_dedelta.add(sg)
-
-        return to_delete, to_dedelta
+        return referenced_state_groups
 
     def purge_room(self, room_id):
         """Deletes all record of a room
 
         Args:
-            room_id (str):
+            room_id (str)
+
+        Returns:
+            Deferred[List[int]]: The list of state groups to delete.
         """
 
         return self.runInteraction("purge_room", self._purge_room_txn, room_id)
 
     def _purge_room_txn(self, txn, room_id):
-        # first we have to delete the state groups states
-        logger.info("[purge] removing %s from state_groups_state", room_id)
-
+        # First we fetch all the state groups that should be deleted, before
+        # we delete that information.
         txn.execute(
             """
-            DELETE FROM state_groups_state WHERE state_group IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
+                SELECT DISTINCT state_group FROM events
+                INNER JOIN event_to_state_groups USING(event_id)
+                WHERE events.room_id = ?
             """,
             (room_id,),
         )
 
-        # ... and the state group edges
-        logger.info("[purge] removing %s from state_group_edges", room_id)
+        state_groups = [row[0] for row in txn]
 
-        txn.execute(
-            """
-            DELETE FROM state_group_edges WHERE state_group IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
-            """,
-            (room_id,),
-        )
-
-        # ... and the state groups
-        logger.info("[purge] removing %s from state_groups", room_id)
-
-        txn.execute(
-            """
-            DELETE FROM state_groups WHERE id IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
-            """,
-            (room_id,),
-        )
-
-        # and then tables which lack an index on room_id but have one on event_id
+        # Now we delete tables which lack an index on room_id but have one on event_id
         for table in (
             "event_auth",
             "event_edges",
@@ -1887,6 +1734,165 @@ class EventsStore(
 
         logger.info("[purge] done")
 
+        return state_groups
+
+    def purge_unreferenced_state_groups(
+        self, room_id: str, state_groups_to_delete
+    ) -> defer.Deferred:
+        """Deletes no longer referenced state groups and de-deltas any state
+        groups that reference them.
+
+        Args:
+            room_id: The room the state groups belong to (must all be in the
+                same room).
+            state_groups_to_delete (Collection[int]): Set of all state groups
+                to delete.
+        """
+
+        return self.runInteraction(
+            "purge_unreferenced_state_groups",
+            self._purge_unreferenced_state_groups,
+            room_id,
+            state_groups_to_delete,
+        )
+
+    def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
+        logger.info(
+            "[purge] found %i state groups to delete", len(state_groups_to_delete)
+        )
+
+        rows = self._simple_select_many_txn(
+            txn,
+            table="state_group_edges",
+            column="prev_state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+            retcols=("state_group",),
+        )
+
+        remaining_state_groups = set(
+            row["state_group"]
+            for row in rows
+            if row["state_group"] not in state_groups_to_delete
+        )
+
+        logger.info(
+            "[purge] de-delta-ing %i remaining state groups",
+            len(remaining_state_groups),
+        )
+
+        # Now we turn the state groups that reference to-be-deleted state
+        # groups to non delta versions.
+        for sg in remaining_state_groups:
+            logger.info("[purge] de-delta-ing remaining state group %s", sg)
+            curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
+            curr_state = curr_state[sg]
+
+            self._simple_delete_txn(
+                txn, table="state_groups_state", keyvalues={"state_group": sg}
+            )
+
+            self._simple_delete_txn(
+                txn, table="state_group_edges", keyvalues={"state_group": sg}
+            )
+
+            self._simple_insert_many_txn(
+                txn,
+                table="state_groups_state",
+                values=[
+                    {
+                        "state_group": sg,
+                        "room_id": room_id,
+                        "type": key[0],
+                        "state_key": key[1],
+                        "event_id": state_id,
+                    }
+                    for key, state_id in iteritems(curr_state)
+                ],
+            )
+
+        logger.info("[purge] removing redundant state groups")
+        txn.executemany(
+            "DELETE FROM state_groups_state WHERE state_group = ?",
+            ((sg,) for sg in state_groups_to_delete),
+        )
+        txn.executemany(
+            "DELETE FROM state_groups WHERE id = ?",
+            ((sg,) for sg in state_groups_to_delete),
+        )
+
+    @defer.inlineCallbacks
+    def get_previous_state_groups(self, state_groups):
+        """Fetch the previous groups of the given state groups.
+
+        Args:
+            state_groups (Iterable[int])
+
+        Returns:
+            Deferred[dict[int, int]]: mapping from state group to previous
+            state group.
+        """
+
+        rows = yield self._simple_select_many_batch(
+            table="state_group_edges",
+            column="prev_state_group",
+            iterable=state_groups,
+            keyvalues={},
+            retcols=("prev_state_group", "state_group"),
+            desc="get_previous_state_groups",
+        )
+
+        return {row["state_group"]: row["prev_state_group"] for row in rows}
+
+    def purge_room_state(self, room_id, state_groups_to_delete):
+        """Deletes all record of a room from state tables
+
+        Args:
+            room_id (str):
+            state_groups_to_delete (list[int]): State groups to delete
+        """
+
+        return self.runInteraction(
+            "purge_room_state",
+            self._purge_room_state_txn,
+            room_id,
+            state_groups_to_delete,
+        )
+
+    def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
+        # first we have to delete the state groups states
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_groups_state",
+            column="state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
+        # ... and the state group edges
+        logger.info("[purge] removing %s from state_group_edges", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_group_edges",
+            column="state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
+        # ... and the state groups
+        logger.info("[purge] removing %s from state_groups", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_groups",
+            column="id",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
     async def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
         """
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 51352b9966..aa87f9abc5 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventContentFields
 from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 
@@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             "event_fix_redactions_bytes", self._event_fix_redactions_bytes
         )
 
+        self.register_background_update_handler(
+            "event_store_labels", self._event_store_labels
+        )
+
     @defer.inlineCallbacks
     def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -503,3 +508,68 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         yield self._end_background_update("event_fix_redactions_bytes")
 
         return 1
+
+    @defer.inlineCallbacks
+    def _event_store_labels(self, progress, batch_size):
+        """Background update handler which will store labels for existing events."""
+        last_event_id = progress.get("last_event_id", "")
+
+        def _event_store_labels_txn(txn):
+            txn.execute(
+                """
+                SELECT event_id, json FROM event_json
+                LEFT JOIN event_labels USING (event_id)
+                WHERE event_id > ? AND label IS NULL
+                ORDER BY event_id LIMIT ?
+                """,
+                (last_event_id, batch_size),
+            )
+
+            results = list(txn)
+
+            nbrows = 0
+            last_row_event_id = ""
+            for (event_id, event_json_raw) in results:
+                try:
+                    event_json = json.loads(event_json_raw)
+
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="event_labels",
+                        values=[
+                            {
+                                "event_id": event_id,
+                                "label": label,
+                                "room_id": event_json["room_id"],
+                                "topological_ordering": event_json["depth"],
+                            }
+                            for label in event_json["content"].get(
+                                EventContentFields.LABELS, []
+                            )
+                            if isinstance(label, str)
+                        ],
+                    )
+                except Exception as e:
+                    logger.warning(
+                        "Unable to load event %s (no labels will be imported): %s",
+                        event_id,
+                        e,
+                    )
+
+                nbrows += 1
+                last_row_event_id = event_id
+
+            self._background_update_progress_txn(
+                txn, "event_store_labels", {"last_event_id": last_row_event_id}
+            )
+
+            return nbrows
+
+        num_rows = yield self.runInteraction(
+            desc="event_store_labels", func=_event_store_labels_txn
+        )
+
+        if not num_rows:
+            yield self._end_background_update("event_store_labels")
+
+        return num_rows
diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py
index a2a2a67927..f05ace299a 100644
--- a/synapse/storage/data_stores/main/filtering.py
+++ b/synapse/storage/data_stores/main/filtering.py
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
             if filter_id_response is not None:
                 return filter_id_response[0]
 
-            sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
+            sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
             txn.execute(sql, (user_localpart,))
             max_id = txn.fetchone()[0]
             if max_id is None:
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index b3a2771f1b..5ded539af8 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_user_from_summary",
         )
 
+    def get_local_groups_for_room(self, room_id):
+        """Get all of the local group that contain a given room
+        Args:
+            room_id (str): The ID of a room
+        Returns:
+            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+                containing this room
+        """
+        return self._simple_select_onecol(
+            table="group_rooms",
+            keyvalues={"room_id": room_id},
+            retcol="group_id",
+            desc="get_local_groups_for_room",
+        )
+
     def get_users_for_summary_by_role(self, group_id, include_private=False):
         """Get the users and roles that should be included in a summary request
 
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 84b5f3ad5e..0f2887bdce 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         if len(media_ids) == 0:
             return
 
-        sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
+        sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             return
 
         def _delete_url_cache_media_txn(txn):
-            sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
-            sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..8b17334ff4 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 args.append(limit)
             txn.execute(sql, args)
 
-            return (r[0:5] + (json.loads(r[5]),) for r in txn)
+            return list(r[0:5] + (json.loads(r[5]),) for r in txn)
 
         return self.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index f70d41ecab..98cf6427c3 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,7 +19,6 @@ import logging
 import re
 
 from six import iterkeys
-from six.moves import range
 
 from twisted.internet import defer
 from twisted.internet.defer import Deferred
@@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def f(txn):
-            sql = (
-                "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
-            )
+            sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
             txn.execute(sql, (user_id,))
             return dict(txn)
 
@@ -484,30 +481,25 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
         Gets the localpart of the next generated user ID.
 
-        Generated user IDs are integers, and we aim for them to be as small as
-        we can. Unfortunately, it's possible some of them are already taken by
-        existing users, and there may be gaps in the already taken range. This
-        function returns the start of the first allocatable gap. This is to
-        avoid the case of ID 10000000 being pre-allocated, so us wasting the
-        first (and shortest) many generated user IDs.
+        Generated user IDs are integers, so we find the largest integer user ID
+        already taken and return that plus one.
         """
 
         def _find_next_generated_user_id(txn):
-            # We bound between '@1' and '@a' to avoid pulling the entire table
+            # We bound between '@0' and '@a' to avoid pulling the entire table
             # out.
-            txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
+            txn.execute("SELECT name FROM users WHERE '@0' <= name AND name < '@a'")
 
             regex = re.compile(r"^@(\d+):")
 
-            found = set()
+            max_found = 0
 
             for (user_id,) in txn:
                 match = regex.search(user_id)
                 if match:
-                    found.add(int(match.group(1)))
-            for i in range(len(found) + 1):
-                if i not in found:
-                    return i
+                    max_found = max(int(match.group(1)), max_found)
+
+            return max_found + 1
 
         return (
             (
@@ -577,6 +569,19 @@ class RegistrationWorkerStore(SQLBaseStore):
         return self._simple_delete(
             "user_threepids",
             keyvalues={"user_id": user_id, "medium": medium, "address": address},
+            desc="user_delete_threepid",
+        )
+
+    def user_delete_threepids(self, user_id: str):
+        """Delete all threepid this user has bound
+
+        Args:
+             user_id: The user id to delete all threepids of
+
+        """
+        return self._simple_delete(
+            "user_threepids",
+            keyvalues={"user_id": user_id},
             desc="user_delete_threepids",
         )
 
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 67bb1b6f60..b7f9024811 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -19,10 +19,13 @@ import logging
 import re
 from typing import Optional, Tuple
 
+from six import integer_types
+
 from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.search import SearchStore
@@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
         else:
             return None
 
+    @cachedInlineCallbacks()
+    def get_retention_policy_for_room(self, room_id):
+        """Get the retention policy for a given room.
+
+        If no retention policy has been found for this room, returns a policy defined
+        by the configured default policy (which has None as both the 'min_lifetime' and
+        the 'max_lifetime' if no default policy has been defined in the server's
+        configuration).
+
+        Args:
+            room_id (str): The ID of the room to get the retention policy of.
+
+        Returns:
+            dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
+        """
+
+        def get_retention_policy_for_room_txn(txn):
+            txn.execute(
+                """
+                SELECT min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                WHERE room_id = ?;
+                """,
+                (room_id,),
+            )
+
+            return self.cursor_to_dict(txn)
+
+        ret = yield self.runInteraction(
+            "get_retention_policy_for_room", get_retention_policy_for_room_txn,
+        )
+
+        # If we don't know this room ID, ret will be None, in this case return the default
+        # policy.
+        if not ret:
+            defer.returnValue(
+                {
+                    "min_lifetime": self.config.retention_default_min_lifetime,
+                    "max_lifetime": self.config.retention_default_max_lifetime,
+                }
+            )
+
+        row = ret[0]
+
+        # If one of the room's policy's attributes isn't defined, use the matching
+        # attribute from the default policy.
+        # The default values will be None if no default policy has been defined, or if one
+        # of the attributes is missing from the default policy.
+        if row["min_lifetime"] is None:
+            row["min_lifetime"] = self.config.retention_default_min_lifetime
+
+        if row["max_lifetime"] is None:
+            row["max_lifetime"] = self.config.retention_default_max_lifetime
+
+        defer.returnValue(row)
+
 
 class RoomStore(RoomWorkerStore, SearchStore):
+    def __init__(self, db_conn, hs):
+        super(RoomStore, self).__init__(db_conn, hs)
+
+        self.config = hs.config
+
+        self.register_background_update_handler(
+            "insert_room_retention", self._background_insert_retention,
+        )
+
+    @defer.inlineCallbacks
+    def _background_insert_retention(self, progress, batch_size):
+        """Retrieves a list of all rooms within a range and inserts an entry for each of
+        them into the room_retention table.
+        NULLs the property's columns if missing from the retention event in the room's
+        state (or NULLs all of them if there's no retention event in the room's state),
+        so that we fall back to the server's retention policy.
+        """
+
+        last_room = progress.get("room_id", "")
+
+        def _background_insert_retention_txn(txn):
+            txn.execute(
+                """
+                SELECT state.room_id, state.event_id, events.json
+                FROM current_state_events as state
+                LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
+                WHERE state.room_id > ? AND state.type = '%s'
+                ORDER BY state.room_id ASC
+                LIMIT ?;
+                """
+                % EventTypes.Retention,
+                (last_room, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            for row in rows:
+                if not row["json"]:
+                    retention_policy = {}
+                else:
+                    ev = json.loads(row["json"])
+                    retention_policy = json.dumps(ev["content"])
+
+                self._simple_insert_txn(
+                    txn=txn,
+                    table="room_retention",
+                    values={
+                        "room_id": row["room_id"],
+                        "event_id": row["event_id"],
+                        "min_lifetime": retention_policy.get("min_lifetime"),
+                        "max_lifetime": retention_policy.get("max_lifetime"),
+                    },
+                )
+
+            logger.info("Inserted %d rows into room_retention", len(rows))
+
+            self._background_update_progress_txn(
+                txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "insert_room_retention", _background_insert_retention_txn,
+        )
+
+        if end:
+            yield self._end_background_update("insert_room_retention")
+
+        defer.returnValue(batch_size)
+
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
         """Stores a room.
@@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 txn, event, "content.body", event.content["body"]
             )
 
+    def _store_retention_policy_for_room_txn(self, txn, event):
+        if hasattr(event, "content") and (
+            "min_lifetime" in event.content or "max_lifetime" in event.content
+        ):
+            if (
+                "min_lifetime" in event.content
+                and not isinstance(event.content.get("min_lifetime"), integer_types)
+            ) or (
+                "max_lifetime" in event.content
+                and not isinstance(event.content.get("max_lifetime"), integer_types)
+            ):
+                # Ignore the event if one of the value isn't an integer.
+                return
+
+            self._simple_insert_txn(
+                txn=txn,
+                table="room_retention",
+                values={
+                    "room_id": event.room_id,
+                    "event_id": event.event_id,
+                    "min_lifetime": event.content.get("min_lifetime"),
+                    "max_lifetime": event.content.get("max_lifetime"),
+                },
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.get_retention_policy_for_room, (event.room_id,)
+            )
+
     def add_event_report(
         self, room_id, event_id, user_id, reason, content, received_ts
     ):
@@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
                             remote_media_mxcs.append((hostname, media_id))
 
         return local_media_mxcs, remote_media_mxcs
+
+    @defer.inlineCallbacks
+    def get_rooms_for_retention_period_in_range(
+        self, min_ms, max_ms, include_null=False
+    ):
+        """Retrieves all of the rooms within the given retention range.
+
+        Optionally includes the rooms which don't have a retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, doesn't set a lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, doesn't set an upper limit.
+            include_null (bool): Whether to include rooms which retention policy is NULL
+                in the returned set.
+
+        Returns:
+            dict[str, dict]: The rooms within this range, along with their retention
+                policy. The key is "room_id", and maps to a dict describing the retention
+                policy associated with this room ID. The keys for this nested dict are
+                "min_lifetime" (int|None), and "max_lifetime" (int|None).
+        """
+
+        def get_rooms_for_retention_period_in_range_txn(txn):
+            range_conditions = []
+            args = []
+
+            if min_ms is not None:
+                range_conditions.append("max_lifetime > ?")
+                args.append(min_ms)
+
+            if max_ms is not None:
+                range_conditions.append("max_lifetime <= ?")
+                args.append(max_ms)
+
+            # Do a first query which will retrieve the rooms that have a retention policy
+            # in their current state.
+            sql = """
+                SELECT room_id, min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                """
+
+            if len(range_conditions):
+                sql += " WHERE (" + " AND ".join(range_conditions) + ")"
+
+                if include_null:
+                    sql += " OR max_lifetime IS NULL"
+
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+            rooms_dict = {}
+
+            for row in rows:
+                rooms_dict[row["room_id"]] = {
+                    "min_lifetime": row["min_lifetime"],
+                    "max_lifetime": row["max_lifetime"],
+                }
+
+            if include_null:
+                # If required, do a second query that retrieves all of the rooms we know
+                # of so we can handle rooms with no retention policy.
+                sql = "SELECT DISTINCT room_id FROM current_state_events"
+
+                txn.execute(sql)
+
+                rows = self.cursor_to_dict(txn)
+
+                # If a room isn't already in the dict (i.e. it doesn't have a retention
+                # policy in its state), add it with a null policy.
+                for row in rows:
+                    if row["room_id"] not in rooms_dict:
+                        rooms_dict[row["room_id"]] = {
+                            "min_lifetime": None,
+                            "max_lifetime": None,
+                        }
+
+            return rooms_dict
+
+        rooms = yield self.runInteraction(
+            "get_rooms_for_retention_period_in_range",
+            get_rooms_for_retention_period_in_range_txn,
+        )
+
+        defer.returnValue(rooms)
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
new file mode 100644
index 0000000000..5f5e0499ae
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('event_store_labels', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
new file mode 100644
index 0000000000..7d70dd071e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- store the current etag of backup version
+ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
new file mode 100644
index 0000000000..ee6cdf7a14
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
@@ -0,0 +1,33 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Tracks the retention policy of a room.
+-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
+-- the room's retention policy state event.
+-- If a room doesn't have a retention policy state event in its state, both max_lifetime
+-- and min_lifetime are NULL.
+CREATE TABLE IF NOT EXISTS room_retention(
+    room_id TEXT,
+    event_id TEXT,
+    min_lifetime BIGINT,
+    max_lifetime BIGINT,
+
+    PRIMARY KEY(room_id, event_id)
+);
+
+CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('insert_room_retention', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 3132848034..6a90daea31 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -285,7 +285,11 @@ class StateGroupWorkerStore(
             room_id (str)
 
         Returns:
-            Deferred[unicode|None]: predecessor room id
+            Deferred[dict|None]: A dictionary containing the structure of the predecessor
+                field from the room's create event. The structure is subject to other servers,
+                but it is expected to be:
+                    * room_id (str): The room ID of the predecessor room
+                    * event_id (str): The ID of the tombstone event in the predecessor room
 
         Raises:
             NotFoundError if the room is unknown
@@ -991,6 +995,29 @@ class StateGroupWorkerStore(
 
         return self.runInteraction("store_state_group", _store_state_group_txn)
 
+    @defer.inlineCallbacks
+    def get_referenced_state_groups(self, state_groups):
+        """Check if the state groups are referenced by events.
+
+        Args:
+            state_groups (Iterable[int])
+
+        Returns:
+            Deferred[set[int]]: The subset of state groups that are
+            referenced.
+        """
+
+        rows = yield self._simple_select_many_batch(
+            table="event_to_state_groups",
+            column="state_group",
+            iterable=state_groups,
+            keyvalues={},
+            retcols=("DISTINCT state_group",),
+            desc="get_referenced_state_groups",
+        )
+
+        return set(row["state_group"] for row in rows)
+
 
 class StateBackgroundUpdateStore(
     StateGroupBackgroundUpdateStore, BackgroundUpdateStore
@@ -1231,7 +1258,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
             # if the event was rejected, just give it the same state as its
             # predecessor.
             if context.rejected:
-                state_groups[event.event_id] = context.prev_group
+                state_groups[event.event_id] = context.state_group_before_event
                 continue
 
             state_groups[event.event_id] = context.state_group
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 9cac664880..21a410afd0 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -619,7 +619,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     def _get_max_topological_txn(self, txn, room_id):
         txn.execute(
-            "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
+            "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
             (room_id,),
         )
 
@@ -874,14 +874,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         args.append(int(limit))
 
-        sql = (
-            "SELECT DISTINCT event_id, topological_ordering, stream_ordering"
-            " FROM events"
-            " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
-            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
-            " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s LIMIT ?"
-        ) % {"bounds": bounds, "order": order}
+        select_keywords = "SELECT"
+        join_clause = ""
+        if event_filter and event_filter.labels:
+            # If we're not filtering on a label, then joining on event_labels will
+            # return as many row for a single event as the number of labels it has. To
+            # avoid this, only join if we're filtering on at least one label.
+            join_clause = """
+                LEFT JOIN event_labels
+                USING (event_id, room_id, topological_ordering)
+            """
+            if len(event_filter.labels) > 1:
+                # Using DISTINCT in this SELECT query is quite expensive, because it
+                # requires the engine to sort on the entire (not limited) result set,
+                # i.e. the entire events table. We only need to use it when we're
+                # filtering on more than two labels, because that's the only scenario
+                # in which we can possibly to get multiple times the same event ID in
+                # the results.
+                select_keywords += "DISTINCT"
+
+        sql = """
+            %(select_keywords)s event_id, topological_ordering, stream_ordering
+            FROM events
+            %(join_clause)s
+            WHERE outlier = ? AND room_id = ? AND %(bounds)s
+            ORDER BY topological_ordering %(order)s,
+            stream_ordering %(order)s LIMIT ?
+        """ % {
+            "select_keywords": select_keywords,
+            "join_clause": join_clause,
+            "bounds": bounds,
+            "order": order,
+        }
 
         txn.execute(sql, args)
 
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 10d1887f75..aa24339717 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
         )
 
         def get_tag_content(txn, tag_ids):
-            sql = (
-                "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
-            )
+            sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
             results = []
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2e7753820e..731e1c9d9c 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
         # Mark as done.
         cur.execute(
             database_engine.convert_param_style(
-                "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
+                "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
             ),
             (modname, name),
         )
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
new file mode 100644
index 0000000000..a368182034
--- /dev/null
+++ b/synapse/storage/purge_events.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeEventsStorage(object):
+    """High level interface for purging rooms and event history.
+    """
+
+    def __init__(self, hs, stores):
+        self.stores = stores
+
+    @defer.inlineCallbacks
+    def purge_room(self, room_id: str):
+        """Deletes all record of a room
+        """
+
+        state_groups_to_delete = yield self.stores.main.purge_room(room_id)
+        yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
+
+    @defer.inlineCallbacks
+    def purge_history(self, room_id, token, delete_local_events):
+        """Deletes room history before a certain point
+
+        Args:
+            room_id (str):
+
+            token (str): A topological token to delete events before
+
+            delete_local_events (bool):
+                if True, we will delete local events as well as remote ones
+                (instead of just marking them as outliers and deleting their
+                state groups).
+        """
+        state_groups = yield self.stores.main.purge_history(
+            room_id, token, delete_local_events
+        )
+
+        logger.info("[purge] finding state groups that can be deleted")
+
+        sg_to_delete = yield self._find_unreferenced_groups(state_groups)
+
+        yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
+
+    @defer.inlineCallbacks
+    def _find_unreferenced_groups(self, state_groups):
+        """Used when purging history to figure out which state groups can be
+        deleted.
+
+        Args:
+            state_groups (set[int]): Set of state groups referenced by events
+                that are going to be deleted.
+
+        Returns:
+            Deferred[set[int]] The set of state groups that can be deleted.
+        """
+        # Graph of state group -> previous group
+        graph = {}
+
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                current_search = set(itertools.islice(next_to_search, 100))
+                next_to_search -= current_search
+
+            referenced = yield self.stores.main.get_referenced_state_groups(
+                current_search
+            )
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            edges = yield self.stores.main.get_previous_state_groups(current_search)
+
+            prevs = set(edges.values())
+            # We don't bother re-handling groups we've already seen
+            prevs -= state_groups_seen
+            next_to_search |= prevs
+            state_groups_seen |= prevs
+
+            graph.update(edges)
+
+        to_delete = state_groups_seen - referenced_groups
+
+        return to_delete
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 02994ab2a5..cd56cd91ed 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -88,9 +88,12 @@ class PaginationConfig(object):
             raise SynapseError(400, "Invalid request.")
 
     def __repr__(self):
-        return (
-            "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
-        ) % (self.from_token, self.to_token, self.direction, self.limit)
+        return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
+            self.from_token,
+            self.to_token,
+            self.direction,
+            self.limit,
+        )
 
     def get_source_config(self, source_name):
         keyname = "%s_key" % source_name
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 0e8da27f53..84f5ae22c3 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,8 +17,8 @@ import functools
 import inspect
 import logging
 import threading
-from collections import namedtuple
-from typing import Any, cast
+from typing import Any, Tuple, Union, cast
+from weakref import WeakValueDictionary
 
 from six import itervalues
 
@@ -38,6 +38,8 @@ from . import register_cache
 
 logger = logging.getLogger(__name__)
 
+CacheKey = Union[Tuple, Any]
+
 
 class _CachedFunction(Protocol):
     invalidate = None  # type: Any
@@ -430,7 +432,7 @@ class CacheDescriptor(_CacheDescriptorBase):
             # Add our own `cache_context` to argument list if the wrapped function
             # has asked for one
             if self.add_cache_context:
-                kwargs["cache_context"] = _CacheContext(cache, cache_key)
+                kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)
 
             try:
                 cached_result_d = cache.get(cache_key, callback=invalidate_callback)
@@ -624,14 +626,38 @@ class CacheListDescriptor(_CacheDescriptorBase):
         return wrapped
 
 
-class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
-    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
-    # which namedtuple does for us (i.e. two _CacheContext are the same if
-    # their caches and keys match). This is important in particular to
-    # dedupe when we add callbacks to lru cache nodes, otherwise the number
-    # of callbacks would grow.
-    def invalidate(self):
-        self.cache.invalidate(self.key)
+class _CacheContext:
+    """Holds cache information from the cached function higher in the calling order.
+
+    Can be used to invalidate the higher level cache entry if something changes
+    on a lower level.
+    """
+
+    _cache_context_objects = (
+        WeakValueDictionary()
+    )  # type: WeakValueDictionary[Tuple[Cache, CacheKey], _CacheContext]
+
+    def __init__(self, cache, cache_key):  # type: (Cache, CacheKey) -> None
+        self._cache = cache
+        self._cache_key = cache_key
+
+    def invalidate(self):  # type: () -> None
+        """Invalidates the cache entry referred to by the context."""
+        self._cache.invalidate(self._cache_key)
+
+    @classmethod
+    def get_instance(cls, cache, cache_key):  # type: (Cache, CacheKey) -> _CacheContext
+        """Returns an instance constructed with the given arguments.
+
+        A new instance is only created if none already exists.
+        """
+
+        # We make sure there are no identical _CacheContext instances. This is
+        # important in particular to dedupe when we add callbacks to lru cache
+        # nodes, otherwise the number of callbacks would grow.
+        return cls._cache_context_objects.setdefault(
+            (cache, cache_key), cls(cache, cache_key)
+        )
 
 
 def cached(
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index 1a20c596bf..3c0e8469f3 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
 
 
 def create_resource_tree(desired_tree, root_resource):
-    """Create the resource tree for this Home Server.
+    """Create the resource tree for this homeserver.
 
     This in unduly complicated because Twisted does not support putting
     child resources more than 1 level deep at a time.
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 8c843febd8..dffe943b28 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -44,7 +44,12 @@ MEMBERSHIP_PRIORITY = (
 
 @defer.inlineCallbacks
 def filter_events_for_client(
-    storage: Storage, user_id, events, is_peeking=False, always_include_ids=frozenset()
+    storage: Storage,
+    user_id,
+    events,
+    is_peeking=False,
+    always_include_ids=frozenset(),
+    apply_retention_policies=True,
 ):
     """
     Check which events a user is allowed to see
@@ -59,6 +64,10 @@ def filter_events_for_client(
             events
         always_include_ids (set(event_id)): set of event ids to specifically
             include (unless sender is ignored)
+        apply_retention_policies (bool): Whether to filter out events that's older than
+            allowed by the room's retention policy. Useful when this function is called
+            to e.g. check whether a user should be allowed to see the state at a given
+            event rather than to know if it should send an event to a user's client(s).
 
     Returns:
         Deferred[list[synapse.events.EventBase]]
@@ -86,6 +95,15 @@ def filter_events_for_client(
 
     erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
 
+    if apply_retention_policies:
+        room_ids = set(e.room_id for e in events)
+        retention_policies = {}
+
+        for room_id in room_ids:
+            retention_policies[
+                room_id
+            ] = yield storage.main.get_retention_policy_for_room(room_id)
+
     def allowed(event):
         """
         Args:
@@ -103,6 +121,18 @@ def filter_events_for_client(
         if not event.is_state() and event.sender in ignore_list:
             return None
 
+        # Don't try to apply the room's retention policy if the event is a state event, as
+        # MSC1763 states that retention is only considered for non-state events.
+        if apply_retention_policies and not event.is_state():
+            retention_policy = retention_policies[event.room_id]
+            max_lifetime = retention_policy.get("max_lifetime")
+
+            if max_lifetime is not None:
+                oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
+
+                if event.origin_server_ts < oldest_allowed_ts:
+                    return None
+
         if event.event_id in always_include_ids:
             return event