summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-18 16:11:43 +0100
committerErik Johnston <erik@matrix.org>2019-06-18 16:11:43 +0100
commit19b80fe68a9e594dd00878ae8a2d34f94000755b (patch)
tree21708e4462b2dcf551f717e4c8188cd561171591 /synapse/handlers
parentNewsfile (diff)
parentFix seven contrib files with Python syntax errors (#5446) (diff)
downloadsynapse-19b80fe68a9e594dd00878ae8a2d34f94000755b.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/fix_get_missing_events_error
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py4
-rw-r--r--synapse/handlers/account_validity.py256
-rw-r--r--synapse/handlers/auth.py65
-rw-r--r--synapse/handlers/deactivate_account.py10
-rw-r--r--synapse/handlers/directory.py23
-rw-r--r--synapse/handlers/events.py11
-rw-r--r--synapse/handlers/federation.py421
-rw-r--r--synapse/handlers/groups_local.py4
-rw-r--r--synapse/handlers/identity.py13
-rw-r--r--synapse/handlers/initial_sync.py44
-rw-r--r--synapse/handlers/message.py70
-rw-r--r--synapse/handlers/pagination.py22
-rw-r--r--synapse/handlers/presence.py123
-rw-r--r--synapse/handlers/profile.py85
-rw-r--r--synapse/handlers/register.py13
-rw-r--r--synapse/handlers/room.py37
-rw-r--r--synapse/handlers/room_member.py32
-rw-r--r--synapse/handlers/search.py42
-rw-r--r--synapse/handlers/stats.py333
-rw-r--r--synapse/handlers/sync.py87
20 files changed, 1372 insertions, 323 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ac09d03ba9..dca337ec61 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -90,8 +90,8 @@ class BaseHandler(object):
             messages_per_second = override.messages_per_second
             burst_count = override.burst_count
         else:
-            messages_per_second = self.hs.config.rc_messages_per_second
-            burst_count = self.hs.config.rc_message_burst_count
+            messages_per_second = self.hs.config.rc_message.per_second
+            burst_count = self.hs.config.rc_message.burst_count
 
         allowed, time_allowed = self.ratelimiter.can_do_action(
             user_id, time_now,
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
new file mode 100644
index 0000000000..5e0b92eb1c
--- /dev/null
+++ b/synapse/handlers/account_validity.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import email.mime.multipart
+import email.utils
+import logging
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from synapse.types import UserID
+from synapse.util import stringutils
+from synapse.util.logcontext import make_deferred_yieldable
+
+try:
+    from synapse.push.mailer import load_jinja2_templates
+except ImportError:
+    load_jinja2_templates = None
+
+logger = logging.getLogger(__name__)
+
+
+class AccountValidityHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = self.hs.get_datastore()
+        self.sendmail = self.hs.get_sendmail()
+        self.clock = self.hs.get_clock()
+
+        self._account_validity = self.hs.config.account_validity
+
+        if self._account_validity.renew_by_email_enabled and load_jinja2_templates:
+            # Don't do email-specific configuration if renewal by email is disabled.
+            try:
+                app_name = self.hs.config.email_app_name
+
+                self._subject = self._account_validity.renew_email_subject % {
+                    "app": app_name,
+                }
+
+                self._from_string = self.hs.config.email_notif_from % {
+                    "app": app_name,
+                }
+            except Exception:
+                # If substitution failed, fall back to the bare strings.
+                self._subject = self._account_validity.renew_email_subject
+                self._from_string = self.hs.config.email_notif_from
+
+            self._raw_from = email.utils.parseaddr(self._from_string)[1]
+
+            self._template_html, self._template_text = load_jinja2_templates(
+                config=self.hs.config,
+                template_html_name=self.hs.config.email_expiry_template_html,
+                template_text_name=self.hs.config.email_expiry_template_text,
+            )
+
+            # Check the renewal emails to send and send them every 30min.
+            self.clock.looping_call(
+                self.send_renewal_emails,
+                30 * 60 * 1000,
+            )
+
+    @defer.inlineCallbacks
+    def send_renewal_emails(self):
+        """Gets the list of users whose account is expiring in the amount of time
+        configured in the ``renew_at`` parameter from the ``account_validity``
+        configuration, and sends renewal emails to all of these users as long as they
+        have an email 3PID attached to their account.
+        """
+        expiring_users = yield self.store.get_users_expiring_soon()
+
+        if expiring_users:
+            for user in expiring_users:
+                yield self._send_renewal_email(
+                    user_id=user["user_id"],
+                    expiration_ts=user["expiration_ts_ms"],
+                )
+
+    @defer.inlineCallbacks
+    def send_renewal_email_to_user(self, user_id):
+        expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+        yield self._send_renewal_email(user_id, expiration_ts)
+
+    @defer.inlineCallbacks
+    def _send_renewal_email(self, user_id, expiration_ts):
+        """Sends out a renewal email to every email address attached to the given user
+        with a unique link allowing them to renew their account.
+
+        Args:
+            user_id (str): ID of the user to send email(s) to.
+            expiration_ts (int): Timestamp in milliseconds for the expiration date of
+                this user's account (used in the email templates).
+        """
+        addresses = yield self._get_email_addresses_for_user(user_id)
+
+        # Stop right here if the user doesn't have at least one email address.
+        # In this case, they will have to ask their server admin to renew their
+        # account manually.
+        # We don't need to do a specific check to make sure the account isn't
+        # deactivated, as a deactivated account isn't supposed to have any
+        # email address attached to it.
+        if not addresses:
+            return
+
+        try:
+            user_display_name = yield self.store.get_profile_displayname(
+                UserID.from_string(user_id).localpart
+            )
+            if user_display_name is None:
+                user_display_name = user_id
+        except StoreError:
+            user_display_name = user_id
+
+        renewal_token = yield self._get_renewal_token(user_id)
+        url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % (
+            self.hs.config.public_baseurl,
+            renewal_token,
+        )
+
+        template_vars = {
+            "display_name": user_display_name,
+            "expiration_ts": expiration_ts,
+            "url": url,
+        }
+
+        html_text = self._template_html.render(**template_vars)
+        html_part = MIMEText(html_text, "html", "utf8")
+
+        plain_text = self._template_text.render(**template_vars)
+        text_part = MIMEText(plain_text, "plain", "utf8")
+
+        for address in addresses:
+            raw_to = email.utils.parseaddr(address)[1]
+
+            multipart_msg = MIMEMultipart('alternative')
+            multipart_msg['Subject'] = self._subject
+            multipart_msg['From'] = self._from_string
+            multipart_msg['To'] = address
+            multipart_msg['Date'] = email.utils.formatdate()
+            multipart_msg['Message-ID'] = email.utils.make_msgid()
+            multipart_msg.attach(text_part)
+            multipart_msg.attach(html_part)
+
+            logger.info("Sending renewal email to %s", address)
+
+            yield make_deferred_yieldable(self.sendmail(
+                self.hs.config.email_smtp_host,
+                self._raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
+                reactor=self.hs.get_reactor(),
+                port=self.hs.config.email_smtp_port,
+                requireAuthentication=self.hs.config.email_smtp_user is not None,
+                username=self.hs.config.email_smtp_user,
+                password=self.hs.config.email_smtp_pass,
+                requireTransportSecurity=self.hs.config.require_transport_security
+            ))
+
+        yield self.store.set_renewal_mail_status(
+            user_id=user_id,
+            email_sent=True,
+        )
+
+    @defer.inlineCallbacks
+    def _get_email_addresses_for_user(self, user_id):
+        """Retrieve the list of email addresses attached to a user's account.
+
+        Args:
+            user_id (str): ID of the user to lookup email addresses for.
+
+        Returns:
+            defer.Deferred[list[str]]: Email addresses for this account.
+        """
+        threepids = yield self.store.user_get_threepids(user_id)
+
+        addresses = []
+        for threepid in threepids:
+            if threepid["medium"] == "email":
+                addresses.append(threepid["address"])
+
+        defer.returnValue(addresses)
+
+    @defer.inlineCallbacks
+    def _get_renewal_token(self, user_id):
+        """Generates a 32-byte long random string that will be inserted into the
+        user's renewal email's unique link, then saves it into the database.
+
+        Args:
+            user_id (str): ID of the user to generate a string for.
+
+        Returns:
+            defer.Deferred[str]: The generated string.
+
+        Raises:
+            StoreError(500): Couldn't generate a unique string after 5 attempts.
+        """
+        attempts = 0
+        while attempts < 5:
+            try:
+                renewal_token = stringutils.random_string(32)
+                yield self.store.set_renewal_token_for_user(user_id, renewal_token)
+                defer.returnValue(renewal_token)
+            except StoreError:
+                attempts += 1
+        raise StoreError(500, "Couldn't generate a unique string as refresh string.")
+
+    @defer.inlineCallbacks
+    def renew_account(self, renewal_token):
+        """Renews the account attached to a given renewal token by pushing back the
+        expiration date by the current validity period in the server's configuration.
+
+        Args:
+            renewal_token (str): Token sent with the renewal request.
+        """
+        user_id = yield self.store.get_user_from_renewal_token(renewal_token)
+        logger.debug("Renewing an account for user %s", user_id)
+        yield self.renew_account_for_user(user_id)
+
+    @defer.inlineCallbacks
+    def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
+        """Renews the account attached to a given user by pushing back the
+        expiration date by the current validity period in the server's
+        configuration.
+
+        Args:
+            renewal_token (str): Token sent with the renewal request.
+            expiration_ts (int): New expiration date. Defaults to now + validity period.
+            email_sent (bool): Whether an email has been sent for this validity period.
+                Defaults to False.
+
+        Returns:
+            defer.Deferred[int]: New expiration date for this account, as a timestamp
+                in milliseconds since epoch.
+        """
+        if expiration_ts is None:
+            expiration_ts = self.clock.time_msec() + self._account_validity.period
+
+        yield self.store.set_account_validity_for_user(
+            user_id=user_id,
+            expiration_ts=expiration_ts,
+            email_sent=email_sent,
+        )
+
+        defer.returnValue(expiration_ts)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index aa5d89a9ac..a0cf37a9f9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
         defer.returnValue(params)
 
     @defer.inlineCallbacks
-    def check_auth(self, flows, clientdict, clientip):
+    def check_auth(self, flows, clientdict, clientip, password_servlet=False):
         """
         Takes a dictionary sent by the client in the login / registration
         protocol and handles the User-Interactive Auth flow.
@@ -186,6 +186,16 @@ class AuthHandler(BaseHandler):
 
             clientip (str): The IP address of the client.
 
+            password_servlet (bool): Whether the request originated from
+                PasswordRestServlet.
+                XXX: This is a temporary hack to distinguish between checking
+                for threepid validations locally (in the case of password
+                resets) and using the identity server (in the case of binding
+                a 3PID during registration). Once we start using the
+                homeserver for both tasks, this distinction will no longer be
+                necessary.
+
+
         Returns:
             defer.Deferred[dict, dict, str]: a deferred tuple of
                 (creds, params, session_id).
@@ -241,7 +251,9 @@ class AuthHandler(BaseHandler):
         if 'type' in authdict:
             login_type = authdict['type']
             try:
-                result = yield self._check_auth_dict(authdict, clientip)
+                result = yield self._check_auth_dict(
+                    authdict, clientip, password_servlet=password_servlet,
+                )
                 if result:
                     creds[login_type] = result
                     self._save_session(session)
@@ -351,7 +363,7 @@ class AuthHandler(BaseHandler):
         return sess.setdefault('serverdict', {}).get(key, default)
 
     @defer.inlineCallbacks
-    def _check_auth_dict(self, authdict, clientip):
+    def _check_auth_dict(self, authdict, clientip, password_servlet=False):
         """Attempt to validate the auth dict provided by a client
 
         Args:
@@ -369,7 +381,13 @@ class AuthHandler(BaseHandler):
         login_type = authdict['type']
         checker = self.checkers.get(login_type)
         if checker is not None:
-            res = yield checker(authdict, clientip)
+            # XXX: Temporary workaround for having Synapse handle password resets
+            # See AuthHandler.check_auth for further details
+            res = yield checker(
+                authdict,
+                clientip=clientip,
+                password_servlet=password_servlet,
+            )
             defer.returnValue(res)
 
         # build a v1-login-style dict out of the authdict and fall back to the
@@ -383,7 +401,7 @@ class AuthHandler(BaseHandler):
         defer.returnValue(canonical_id)
 
     @defer.inlineCallbacks
-    def _check_recaptcha(self, authdict, clientip):
+    def _check_recaptcha(self, authdict, clientip, **kwargs):
         try:
             user_response = authdict["response"]
         except KeyError:
@@ -429,20 +447,20 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    def _check_email_identity(self, authdict, _):
-        return self._check_threepid('email', authdict)
+    def _check_email_identity(self, authdict, **kwargs):
+        return self._check_threepid('email', authdict, **kwargs)
 
-    def _check_msisdn(self, authdict, _):
+    def _check_msisdn(self, authdict, **kwargs):
         return self._check_threepid('msisdn', authdict)
 
-    def _check_dummy_auth(self, authdict, _):
+    def _check_dummy_auth(self, authdict, **kwargs):
         return defer.succeed(True)
 
-    def _check_terms_auth(self, authdict, _):
+    def _check_terms_auth(self, authdict, **kwargs):
         return defer.succeed(True)
 
     @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict):
+    def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs):
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -451,7 +469,30 @@ class AuthHandler(BaseHandler):
         identity_handler = self.hs.get_handlers().identity_handler
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
-        threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+        if (
+            not password_servlet
+            or self.hs.config.email_password_reset_behaviour == "remote"
+        ):
+            threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+        elif self.hs.config.email_password_reset_behaviour == "local":
+            row = yield self.store.get_threepid_validation_session(
+                medium,
+                threepid_creds["client_secret"],
+                sid=threepid_creds["sid"],
+                validated=True,
+            )
+
+            threepid = {
+                "medium": row["medium"],
+                "address": row["address"],
+                "validated_at": row["validated_at"],
+            } if row else None
+
+            if row:
+                # Valid threepid returned, delete from the db
+                yield self.store.delete_threepid_session(threepid_creds["sid"])
+        else:
+            raise SynapseError(400, "Password resets are not enabled on this homeserver")
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 6a91f7698e..7378b56c1d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -42,6 +43,8 @@ class DeactivateAccountHandler(BaseHandler):
         # it left off (if it has work left to do).
         hs.get_reactor().callWhenRunning(self._start_user_parting)
 
+        self._account_validity_enabled = hs.config.account_validity.enabled
+
     @defer.inlineCallbacks
     def deactivate_account(self, user_id, erase_data, id_server=None):
         """Deactivate a user's account
@@ -114,6 +117,13 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        # Remove all information on the user from the account_validity table.
+        if self._account_validity_enabled:
+            yield self.store.delete_account_validity_for_user(user_id)
+
+        # Mark the user as deactivated.
+        yield self.store.set_user_deactivated_status(user_id, True)
+
         defer.returnValue(identity_server_supports_unbinding)
 
     def _start_user_parting(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 27bd06df5d..a12f9508d8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,7 +19,7 @@ import string
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -43,8 +43,10 @@ class DirectoryHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
         self.config = hs.config
         self.enable_room_list_search = hs.config.enable_room_list_search
+        self.require_membership = hs.config.require_membership_for_aliases
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -83,7 +85,7 @@ class DirectoryHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def create_association(self, requester, room_alias, room_id, servers=None,
-                           send_event=True):
+                           send_event=True, check_membership=True):
         """Attempt to create a new alias
 
         Args:
@@ -93,6 +95,8 @@ class DirectoryHandler(BaseHandler):
             servers (list[str]|None): List of servers that others servers
                 should try and join via
             send_event (bool): Whether to send an updated m.room.aliases event
+            check_membership (bool): Whether to check if the user is in the room
+                before the alias can be set (if the server's config requires it).
 
         Returns:
             Deferred
@@ -100,6 +104,13 @@ class DirectoryHandler(BaseHandler):
 
         user_id = requester.user.to_string()
 
+        if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
+            raise SynapseError(
+                400,
+                "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
+                Codes.INVALID_PARAM,
+            )
+
         service = requester.app_service
         if service:
             if not service.is_interested_in_alias(room_alias.to_string()):
@@ -108,6 +119,14 @@ class DirectoryHandler(BaseHandler):
                     " this kind of alias.", errcode=Codes.EXCLUSIVE
                 )
         else:
+            if self.require_membership and check_membership:
+                rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+                if room_id not in rooms_for_user:
+                    raise AuthError(
+                        403,
+                        "You must be in the room to create an alias for it",
+                    )
+
             if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
                 raise AuthError(
                     403, "This user is not permitted to create this alias",
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1b4d8c74ae..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
-from synapse.events.utils import serialize_event
 from synapse.types import UserID
 from synapse.util.logutils import log_function
 from synapse.visibility import filter_events_for_client
@@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
         self.state = hs.get_state_handler()
         self._server_notices_sender = hs.get_server_notices_sender()
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     @log_function
@@ -120,9 +120,12 @@ class EventStreamHandler(BaseHandler):
 
             time_now = self.clock.time_msec()
 
-            chunks = [
-                serialize_event(e, time_now, as_client_event) for e in events
-            ]
+            chunks = yield self._event_serializer.serialize_events(
+                events, time_now, as_client_event=as_client_event,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
+            )
 
             chunk = {
                 "chunk": chunks,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 07a5dc182d..cc08e3e027 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -33,6 +34,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
+    Codes,
     FederationDeniedError,
     FederationError,
     RequestSendFailed,
@@ -127,6 +129,8 @@ class FederationHandler(BaseHandler):
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
+        self.third_party_event_rules = hs.get_third_party_event_rules()
+
     @defer.inlineCallbacks
     def on_receive_pdu(
             self, origin, pdu, sent_to_us_directly=False,
@@ -1268,6 +1272,15 @@ class FederationHandler(BaseHandler):
             logger.warn("Failed to create join %r because %s", event, e)
             raise e
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Creation of join %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
         yield self.auth.check_from_context(
@@ -1310,6 +1323,15 @@ class FederationHandler(BaseHandler):
             origin, event
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Sending of join %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         logger.debug(
             "on_send_join_request: After _handle_new_event: %s, sigs: %s",
             event.event_id,
@@ -1468,6 +1490,15 @@ class FederationHandler(BaseHandler):
             builder=builder,
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.warning("Creation of leave %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
@@ -1494,10 +1525,19 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        yield self._handle_new_event(
+        context = yield self._handle_new_event(
             origin, event
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Sending of leave %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         logger.debug(
             "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
             event.event_id,
@@ -1927,6 +1967,11 @@ class FederationHandler(BaseHandler):
                     event.room_id, latest_event_ids=extrem_ids,
                 )
 
+            logger.debug(
+                "Doing soft-fail check for %s: state %s",
+                event.event_id, current_state_ids,
+            )
+
             # Now check if event pass auth against said current state
             auth_types = auth_types_for_event(event)
             current_state_ids = [
@@ -1943,7 +1988,7 @@ class FederationHandler(BaseHandler):
                 self.auth.check(room_version, event, auth_events=current_auth_events)
             except AuthError as e:
                 logger.warn(
-                    "Failed current state auth resolution for %r because %s",
+                    "Soft-failing %r because %s",
                     event, e,
                 )
                 event.internal_metadata.soft_failed = True
@@ -2019,15 +2064,65 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin (str):
-            event (synapse.events.FrozenEvent):
+            event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
-            auth_events (dict[(str, str)->str]):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                What we expect the event's auth_events to be, based on the event's
+                position in the dag. I think? maybe??
+
+                Also NB that this function adds entries to it.
+        Returns:
+            defer.Deferred[None]
+        """
+        room_version = yield self.store.get_room_version(event.room_id)
+
+        try:
+            yield self._update_auth_events_and_context_for_auth(
+                origin, event, context, auth_events
+            )
+        except Exception:
+            # We don't really mind if the above fails, so lets not fail
+            # processing if it does. However, it really shouldn't fail so
+            # let's still log as an exception since we'll still want to fix
+            # any bugs.
+            logger.exception(
+                "Failed to double check auth events for %s with remote. "
+                "Ignoring failure and continuing processing of event.",
+                event.event_id,
+            )
+
+        try:
+            self.auth.check(room_version, event, auth_events=auth_events)
+        except AuthError as e:
+            logger.warn("Failed auth resolution for %r because %s", event, e)
+            raise e
+
+    @defer.inlineCallbacks
+    def _update_auth_events_and_context_for_auth(
+        self, origin, event, context, auth_events
+    ):
+        """Helper for do_auth. See there for docs.
+
+        Checks whether a given event has the expected auth events. If it
+        doesn't then we talk to the remote server to compare state to see if
+        we can come to a consensus (e.g. if one server missed some valid
+        state).
+
+        This attempts to resovle any potential divergence of state between
+        servers, but is not essential and so failures should not block further
+        processing of the event.
+
+        Args:
+            origin (str):
+            event (synapse.events.EventBase):
+            context (synapse.events.snapshot.EventContext):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
 
         Returns:
             defer.Deferred[None]
         """
-        # Check if we have all the auth events.
-        current_state = set(e.event_id for e in auth_events.values())
         event_auth_events = set(event.auth_event_ids())
 
         if event.is_state():
@@ -2035,11 +2130,21 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
 
-        if event_auth_events - current_state:
+        # if the event's auth_events refers to events which are not in our
+        # calculated auth_events, we need to fetch those events from somewhere.
+        #
+        # we start by fetching them from the store, and then try calling /event_auth/.
+        missing_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
+
+        if missing_auth:
             # TODO: can we use store.have_seen_events here instead?
             have_events = yield self.store.get_seen_events_with_rejections(
-                event_auth_events - current_state
+                missing_auth
             )
+            logger.debug("Got events %s from store", have_events)
+            missing_auth.difference_update(have_events.keys())
         else:
             have_events = {}
 
@@ -2048,17 +2153,22 @@ class FederationHandler(BaseHandler):
             for e in auth_events.values()
         })
 
-        seen_events = set(have_events.keys())
-
-        missing_auth = event_auth_events - seen_events - current_state
-
         if missing_auth:
-            logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
+            logger.info(
+                "auth_events contains unknown events: %s",
+                missing_auth,
+            )
             try:
-                remote_auth_chain = yield self.federation_client.get_event_auth(
-                    origin, event.room_id, event.event_id
-                )
+                try:
+                    remote_auth_chain = yield self.federation_client.get_event_auth(
+                        origin, event.room_id, event.event_id
+                    )
+                except RequestSendFailed as e:
+                    # The other side isn't around or doesn't implement the
+                    # endpoint, so lets just bail out.
+                    logger.info("Failed to get event auth from remote: %s", e)
+                    return
 
                 seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
@@ -2095,145 +2205,174 @@ class FederationHandler(BaseHandler):
                 have_events = yield self.store.get_seen_events_with_rejections(
                     event.auth_event_ids()
                 )
-                seen_events = set(have_events.keys())
             except Exception:
                 # FIXME:
                 logger.exception("Failed to get auth chain")
 
+        if event.internal_metadata.is_outlier():
+            logger.info("Skipping auth_event fetch for outlier")
+            return
+
         # FIXME: Assumes we have and stored all the state for all the
         # prev_events
-        current_state = set(e.event_id for e in auth_events.values())
-        different_auth = event_auth_events - current_state
+        different_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
 
-        room_version = yield self.store.get_room_version(event.room_id)
+        if not different_auth:
+            return
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            # Do auth conflict res.
-            logger.info("Different auth: %s", different_auth)
-
-            different_events = yield logcontext.make_deferred_yieldable(
-                defer.gatherResults([
-                    logcontext.run_in_background(
-                        self.store.get_event,
-                        d,
-                        allow_none=True,
-                        allow_rejected=False,
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ], consumeErrors=True)
-            ).addErrback(unwrapFirstError)
-
-            if different_events:
-                local_view = dict(auth_events)
-                remote_view = dict(auth_events)
-                remote_view.update({
-                    (d.type, d.state_key): d for d in different_events if d
-                })
+        logger.info(
+            "auth_events refers to events which are not in our calculated auth "
+            "chain: %s",
+            different_auth,
+        )
+
+        room_version = yield self.store.get_room_version(event.room_id)
 
-                new_state = yield self.state_handler.resolve_events(
-                    room_version,
-                    [list(local_view.values()), list(remote_view.values())],
-                    event
+        different_events = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults([
+                logcontext.run_in_background(
+                    self.store.get_event,
+                    d,
+                    allow_none=True,
+                    allow_rejected=False,
                 )
+                for d in different_auth
+                if d in have_events and not have_events[d]
+            ], consumeErrors=True)
+        ).addErrback(unwrapFirstError)
+
+        if different_events:
+            local_view = dict(auth_events)
+            remote_view = dict(auth_events)
+            remote_view.update({
+                (d.type, d.state_key): d for d in different_events if d
+            })
 
-                auth_events.update(new_state)
+            new_state = yield self.state_handler.resolve_events(
+                room_version,
+                [list(local_view.values()), list(remote_view.values())],
+                event
+            )
 
-                current_state = set(e.event_id for e in auth_events.values())
-                different_auth = event_auth_events - current_state
+            logger.info(
+                "After state res: updating auth_events with new state %s",
+                {
+                    (d.type, d.state_key): d.event_id for d in new_state.values()
+                    if auth_events.get((d.type, d.state_key)) != d
+                },
+            )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+            auth_events.update(new_state)
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            logger.info("Different auth after resolution: %s", different_auth)
+            different_auth = event_auth_events.difference(
+                e.event_id for e in auth_events.values()
+            )
 
-            # Only do auth resolution if we have something new to say.
-            # We can't rove an auth failure.
-            do_resolution = False
+            yield self._update_context_for_auth_events(
+                event, context, auth_events, event_key,
+            )
 
-            provable = [
-                RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
-            ]
+        if not different_auth:
+            # we're done
+            return
 
-            for e_id in different_auth:
-                if e_id in have_events:
-                    if have_events[e_id] in provable:
-                        do_resolution = True
-                        break
+        logger.info(
+            "auth_events still refers to events which are not in the calculated auth "
+            "chain after state resolution: %s",
+            different_auth,
+        )
 
-            if do_resolution:
-                prev_state_ids = yield context.get_prev_state_ids(self.store)
-                # 1. Get what we think is the auth chain.
-                auth_ids = yield self.auth.compute_auth_events(
-                    event, prev_state_ids
-                )
-                local_auth_chain = yield self.store.get_auth_chain(
-                    auth_ids, include_given=True
-                )
+        # Only do auth resolution if we have something new to say.
+        # We can't prove an auth failure.
+        do_resolution = False
 
-                try:
-                    # 2. Get remote difference.
-                    result = yield self.federation_client.query_auth(
-                        origin,
-                        event.room_id,
-                        event.event_id,
-                        local_auth_chain,
-                    )
+        for e_id in different_auth:
+            if e_id in have_events:
+                if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
+                    do_resolution = True
+                    break
 
-                    seen_remotes = yield self.store.have_seen_events(
-                        [e.event_id for e in result["auth_chain"]]
-                    )
+        if not do_resolution:
+            logger.info(
+                "Skipping auth resolution due to lack of provable rejection reasons"
+            )
+            return
 
-                    # 3. Process any remote auth chain events we haven't seen.
-                    for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes:
-                            continue
+        logger.info("Doing auth resolution")
 
-                        if ev.event_id == event.event_id:
-                            continue
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
 
-                        try:
-                            auth_ids = ev.auth_event_ids()
-                            auth = {
-                                (e.type, e.state_key): e
-                                for e in result["auth_chain"]
-                                if e.event_id in auth_ids
-                                or event.type == EventTypes.Create
-                            }
-                            ev.internal_metadata.outlier = True
+        # 1. Get what we think is the auth chain.
+        auth_ids = yield self.auth.compute_auth_events(
+            event, prev_state_ids
+        )
+        local_auth_chain = yield self.store.get_auth_chain(
+            auth_ids, include_given=True
+        )
 
-                            logger.debug(
-                                "do_auth %s different_auth: %s",
-                                event.event_id, e.event_id
-                            )
+        try:
+            # 2. Get remote difference.
+            try:
+                result = yield self.federation_client.query_auth(
+                    origin,
+                    event.room_id,
+                    event.event_id,
+                    local_auth_chain,
+                )
+            except RequestSendFailed as e:
+                # The other side isn't around or doesn't implement the
+                # endpoint, so lets just bail out.
+                logger.info("Failed to query auth from remote: %s", e)
+                return
+
+            seen_remotes = yield self.store.have_seen_events(
+                [e.event_id for e in result["auth_chain"]]
+            )
 
-                            yield self._handle_new_event(
-                                origin, ev, auth_events=auth
-                            )
+            # 3. Process any remote auth chain events we haven't seen.
+            for ev in result["auth_chain"]:
+                if ev.event_id in seen_remotes:
+                    continue
 
-                            if ev.event_id in event_auth_events:
-                                auth_events[(ev.type, ev.state_key)] = ev
-                        except AuthError:
-                            pass
+                if ev.event_id == event.event_id:
+                    continue
 
-                except Exception:
-                    # FIXME:
-                    logger.exception("Failed to query auth chain")
+                try:
+                    auth_ids = ev.auth_event_ids()
+                    auth = {
+                        (e.type, e.state_key): e
+                        for e in result["auth_chain"]
+                        if e.event_id in auth_ids
+                        or event.type == EventTypes.Create
+                    }
+                    ev.internal_metadata.outlier = True
+
+                    logger.debug(
+                        "do_auth %s different_auth: %s",
+                        event.event_id, e.event_id
+                    )
 
-                # 4. Look at rejects and their proofs.
-                # TODO.
+                    yield self._handle_new_event(
+                        origin, ev, auth_events=auth
+                    )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+                    if ev.event_id in event_auth_events:
+                        auth_events[(ev.type, ev.state_key)] = ev
+                except AuthError:
+                    pass
 
-        try:
-            self.auth.check(room_version, event, auth_events=auth_events)
-        except AuthError as e:
-            logger.warn("Failed auth resolution for %r because %s", event, e)
-            raise e
+        except Exception:
+            # FIXME:
+            logger.exception("Failed to query auth chain")
+
+        # 4. Look at rejects and their proofs.
+        # TODO.
+
+        yield self._update_context_for_auth_events(
+            event, context, auth_events, event_key,
+        )
 
     @defer.inlineCallbacks
     def _update_context_for_auth_events(self, event, context, auth_events,
@@ -2461,6 +2600,18 @@ class FederationHandler(BaseHandler):
                 builder=builder
             )
 
+            event_allowed = yield self.third_party_event_rules.check_event_allowed(
+                event, context,
+            )
+            if not event_allowed:
+                logger.info(
+                    "Creation of threepid invite %s forbidden by third-party rules",
+                    event,
+                )
+                raise SynapseError(
+                    403, "This event is not allowed in this context", Codes.FORBIDDEN,
+                )
+
             event, context = yield self.add_display_name_to_third_party_invite(
                 room_version, event_dict, event, context
             )
@@ -2509,6 +2660,18 @@ class FederationHandler(BaseHandler):
             builder=builder,
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.warning(
+                "Exchange of threepid invite %s forbidden by third-party rules",
+                event,
+            )
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         event, context = yield self.add_display_name_to_third_party_invite(
             room_version, event_dict, event, context
         )
@@ -2524,12 +2687,6 @@ class FederationHandler(BaseHandler):
         # though the sender isn't a local user.
         event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
 
-        # XXX we send the invite here, but send_membership_event also sends it,
-        # so we end up making two requests. I think this is redundant.
-        returned_invite = yield self.send_invite(origin, event)
-        # TODO: Make sure the signatures actually are correct.
-        event.signatures.update(returned_invite.signatures)
-
         member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 02c508acec..f60ace02e8 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -49,9 +49,7 @@ def _create_rerouter(func_name):
             def http_response_errback(failure):
                 failure.trap(HttpResponseException)
                 e = failure.value
-                if e.code == 403:
-                    raise e.to_synapse_error()
-                return failure
+                raise e.to_synapse_error()
 
             def request_failed_errback(failure):
                 failure.trap(RequestSendFailed)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 22469486d7..04caf65793 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -247,7 +247,14 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(changed)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
+    def requestEmailToken(
+        self,
+        id_server,
+        email,
+        client_secret,
+        send_attempt,
+        next_link=None,
+    ):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -259,7 +266,9 @@ class IdentityHandler(BaseHandler):
             'client_secret': client_secret,
             'send_attempt': send_attempt,
         }
-        params.update(kwargs)
+
+        if next_link:
+            params.update({'next_link': next_link})
 
         try:
             data = yield self.http_client.post_json_get_json(
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7dfae78db0..aaee5db0b7 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
@@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler):
         self.clock = hs.get_clock()
         self.validator = EventValidator()
         self.snapshot_cache = SnapshotCache()
+        self._event_serializer = hs.get_event_client_serializer()
 
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
                            as_client_event=True, include_archived=False):
@@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler):
                 d["inviter"] = event.sender
 
                 invite_event = yield self.store.get_event(event.event_id)
-                d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+                d["invite"] = yield self._event_serializer.serialize_event(
+                    invite_event, time_now, as_client_event,
+                )
 
             rooms_ret.append(d)
 
@@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler):
                 time_now = self.clock.time_msec()
 
                 d["messages"] = {
-                    "chunk": [
-                        serialize_event(m, time_now, as_client_event)
-                        for m in messages
-                    ],
+                    "chunk": (
+                        yield self._event_serializer.serialize_events(
+                            messages, time_now=time_now,
+                            as_client_event=as_client_event,
+                        )
+                    ),
                     "start": start_token.to_string(),
                     "end": end_token.to_string(),
                 }
 
-                d["state"] = [
-                    serialize_event(c, time_now, as_client_event)
-                    for c in current_state.values()
-                ]
+                d["state"] = yield self._event_serializer.serialize_events(
+                    current_state.values(),
+                    time_now=time_now,
+                    as_client_event=as_client_event
+                )
 
                 account_data_events = []
                 tags = tags_by_room.get(event.room_id)
@@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler):
             "membership": membership,
             "room_id": room_id,
             "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
+                "chunk": (yield self._event_serializer.serialize_events(
+                    messages, time_now,
+                )),
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
-            "state": [serialize_event(s, time_now) for s in room_state.values()],
+            "state": (yield self._event_serializer.serialize_events(
+                room_state.values(), time_now,
+            )),
             "presence": [],
             "receipts": [],
         })
@@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler):
 
         # TODO: These concurrently
         time_now = self.clock.time_msec()
-        state = [
-            serialize_event(x, time_now)
-            for x in current_state.values()
-        ]
+        state = yield self._event_serializer.serialize_events(
+            current_state.values(), time_now,
+        )
 
         now_token = yield self.hs.get_event_sources().get_current_token()
 
@@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler):
         ret = {
             "room_id": room_id,
             "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
+                "chunk": (yield self._event_serializer.serialize_events(
+                    messages, time_now,
+                )),
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 224d34ef3a..11650dc80c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 - 2018 New Vector Ltd
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -22,7 +23,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RelationTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -32,7 +33,6 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
-from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.state import StateFilter
@@ -57,6 +57,7 @@ class MessageHandler(object):
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
         self.store = hs.get_datastore()
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -164,9 +165,13 @@ class MessageHandler(object):
                 room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
-        defer.returnValue(
-            [serialize_event(c, now) for c in room_state.values()]
+        events = yield self._event_serializer.serialize_events(
+            room_state.values(), now,
+            # We don't bother bundling aggregations in when asked for state
+            # events, as clients won't use them.
+            bundle_aggregations=False,
         )
+        defer.returnValue(events)
 
     @defer.inlineCallbacks
     def get_joined_members(self, requester, room_id):
@@ -228,6 +233,7 @@ class EventCreationHandler(object):
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
         self.config = hs.config
+        self.require_membership_for_aliases = hs.config.require_membership_for_aliases
 
         self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
@@ -243,6 +249,7 @@ class EventCreationHandler(object):
         self.action_generator = hs.get_action_generator()
 
         self.spam_checker = hs.get_spam_checker()
+        self.third_party_event_rules = hs.get_third_party_event_rules()
 
         self._block_events_without_consent_error = (
             self.config.block_events_without_consent_error
@@ -336,6 +343,35 @@ class EventCreationHandler(object):
             prev_events_and_hashes=prev_events_and_hashes,
         )
 
+        # In an ideal world we wouldn't need the second part of this condition. However,
+        # this behaviour isn't spec'd yet, meaning we should be able to deactivate this
+        # behaviour. Another reason is that this code is also evaluated each time a new
+        # m.room.aliases event is created, which includes hitting a /directory route.
+        # Therefore not including this condition here would render the similar one in
+        # synapse.handlers.directory pointless.
+        if builder.type == EventTypes.Aliases and self.require_membership_for_aliases:
+            # Ideally we'd do the membership check in event_auth.check(), which
+            # describes a spec'd algorithm for authenticating events received over
+            # federation as well as those created locally. As of room v3, aliases events
+            # can be created by users that are not in the room, therefore we have to
+            # tolerate them in event_auth.check().
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+            if not prev_event or prev_event.membership != Membership.JOIN:
+                logger.warning(
+                    ("Attempt to send `m.room.aliases` in room %s by user %s but"
+                     " membership is %s"),
+                    event.room_id,
+                    event.sender,
+                    prev_event.membership if prev_event else None,
+                )
+
+                raise AuthError(
+                    403,
+                    "You must be in the room to create an alias for it",
+                )
+
         self.validator.validate_new(event)
 
         defer.returnValue((event, context))
@@ -570,6 +606,20 @@ class EventCreationHandler(object):
 
         self.validator.validate_new(event)
 
+        # If this event is an annotation then we check that that the sender
+        # can't annotate the same way twice (e.g. stops users from liking an
+        # event multiple times).
+        relation = event.content.get("m.relates_to", {})
+        if relation.get("rel_type") == RelationTypes.ANNOTATION:
+            relates_to = relation["event_id"]
+            aggregation_key = relation["key"]
+
+            already_exists = yield self.store.has_user_annotated_event(
+                relates_to, event.type, aggregation_key, event.sender,
+            )
+            if already_exists:
+                raise SynapseError(400, "Can't send same reaction twice")
+
         logger.debug(
             "Created event %s",
             event.event_id,
@@ -610,6 +660,14 @@ class EventCreationHandler(object):
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         try:
             yield self.auth.check_from_context(room_version, event, context)
         except AuthError as err:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e4fdae9266..8f811e24fe 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -20,7 +20,6 @@ from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
-from synapse.events.utils import serialize_event
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +77,7 @@ class PaginationHandler(object):
         self._purges_in_progress_by_room = set()
         # map from purge id to PurgeStatus
         self._purges_by_id = {}
+        self._event_serializer = hs.get_event_client_serializer()
 
     def start_purge_history(self, room_id, token,
                             delete_local_events=False):
@@ -278,18 +278,22 @@ class PaginationHandler(object):
         time_now = self.clock.time_msec()
 
         chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
+            "chunk": (
+                yield self._event_serializer.serialize_events(
+                    events, time_now,
+                    as_client_event=as_client_event,
+                )
+            ),
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
         }
 
         if state:
-            chunk["state"] = [
-                serialize_event(e, time_now, as_client_event)
-                for e in state
-            ]
+            chunk["state"] = (
+                yield self._event_serializer.serialize_events(
+                    state, time_now,
+                    as_client_event=as_client_event,
+                )
+            )
 
         defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bd1285b15c..557fb5f83d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -158,7 +158,13 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger(
+            "before",
+            "shutdown",
+            run_as_background_process,
+            "presence.on_shutdown",
+            self._on_shutdown,
+        )
 
         self.serial_to_user = {}
         self._next_serial = 1
@@ -182,17 +188,27 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
+
         self.clock.call_later(
             30,
             self.clock.looping_call,
-            self._handle_timeouts,
+            run_timeout_handler,
             5000,
         )
 
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
         self.clock.call_later(
             60,
             self.clock.looping_call,
-            self._persist_unpersisted_changes,
+            run_persister,
             60 * 1000,
         )
 
@@ -229,6 +245,7 @@ class PresenceHandler(object):
         )
 
         if self.unpersisted_users_changes:
+
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in self.unpersisted_users_changes
@@ -240,30 +257,18 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
+            logger.info(
+                "Persisting %d upersisted presence updates", len(unpersisted)
+            )
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in unpersisted
             ])
 
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
-
     @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +343,41 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id for process_id, last_update
+            in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -828,14 +829,22 @@ class PresenceHandler(object):
             if typ != EventTypes.Member:
                 continue
 
-            event = yield self.store.get_event(event_id)
-            if event.content.get("membership") != Membership.JOIN:
+            if event_id is None:
+                # state has been deleted, so this is not a join. We only care about
+                # joins.
+                continue
+
+            event = yield self.store.get_event(event_id, allow_none=True)
+            if not event or event.content.get("membership") != Membership.JOIN:
                 # We only care about joins
                 continue
 
             if prev_event_id:
-                prev_event = yield self.store.get_event(prev_event_id)
-                if prev_event.content.get("membership") == Membership.JOIN:
+                prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+                if (
+                    prev_event
+                    and prev_event.content.get("membership") == Membership.JOIN
+                ):
                     # Ignore changes to join events.
                     continue
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a65c98ff5c..3e04233394 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,12 +15,15 @@
 
 import logging
 
+from six import raise_from
+
 from twisted.internet import defer
 
 from synapse.api.errors import (
     AuthError,
-    CodeMessageException,
     Codes,
+    HttpResponseException,
+    RequestSendFailed,
     StoreError,
     SynapseError,
 )
@@ -31,6 +34,9 @@ from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
+MAX_DISPLAYNAME_LEN = 100
+MAX_AVATAR_URL_LEN = 1000
+
 
 class BaseProfileHandler(BaseHandler):
     """Handles fetching and updating user profile information.
@@ -53,6 +59,7 @@ class BaseProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
+
         if self.hs.is_mine(target_user):
             try:
                 displayname = yield self.store.get_profile_displayname(
@@ -81,10 +88,10 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
                 defer.returnValue(result)
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get displayname")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
     @defer.inlineCallbacks
     def get_profile_from_cache(self, user_id):
@@ -138,10 +145,10 @@ class BaseProfileHandler(BaseHandler):
                     },
                     ignore_backoff=True,
                 )
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get displayname")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
             defer.returnValue(result["displayname"])
 
@@ -161,6 +168,11 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
+        if len(new_displayname) > MAX_DISPLAYNAME_LEN:
+            raise SynapseError(
+                400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
+            )
+
         if new_displayname == '':
             new_displayname = None
 
@@ -199,10 +211,10 @@ class BaseProfileHandler(BaseHandler):
                     },
                     ignore_backoff=True,
                 )
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get avatar_url")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
             defer.returnValue(result["avatar_url"])
 
@@ -216,6 +228,11 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
+        if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
+            raise SynapseError(
+                400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
+            )
+
         yield self.store.set_profile_avatar_url(
             target_user.localpart, new_avatar_url
         )
@@ -283,6 +300,48 @@ class BaseProfileHandler(BaseHandler):
                     room_id, str(e)
                 )
 
+    @defer.inlineCallbacks
+    def check_profile_query_allowed(self, target_user, requester=None):
+        """Checks whether a profile query is allowed. If the
+        'require_auth_for_profile_requests' config flag is set to True and a
+        'requester' is provided, the query is only allowed if the two users
+        share a room.
+
+        Args:
+            target_user (UserID): The owner of the queried profile.
+            requester (None|UserID): The user querying for the profile.
+
+        Raises:
+            SynapseError(403): The two users share no room, or ne user couldn't
+                be found to be in any room the server is in, and therefore the query
+                is denied.
+        """
+        # Implementation of MSC1301: don't allow looking up profiles if the
+        # requester isn't in the same room as the target. We expect requester to
+        # be None when this function is called outside of a profile query, e.g.
+        # when building a membership event. In this case, we must allow the
+        # lookup.
+        if not self.hs.config.require_auth_for_profile_requests or not requester:
+            return
+
+        try:
+            requester_rooms = yield self.store.get_rooms_for_user(
+                requester.to_string()
+            )
+            target_user_rooms = yield self.store.get_rooms_for_user(
+                target_user.to_string(),
+            )
+
+            # Check if the room lists have no elements in common.
+            if requester_rooms.isdisjoint(target_user_rooms):
+                raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+        except StoreError as e:
+            if e.code == 404:
+                # This likely means that one of the users doesn't exist,
+                # so we act as if we couldn't find the profile.
+                raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+            raise
+
 
 class MasterProfileHandler(BaseProfileHandler):
     PROFILE_UPDATE_MS = 60 * 1000
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a51d11a257..9a388ea013 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,7 +19,7 @@ import logging
 from twisted.internet import defer
 
 from synapse import types
-from synapse.api.constants import LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -123,6 +123,15 @@ class RegistrationHandler(BaseHandler):
 
         self.check_user_id_not_appservice_exclusive(user_id)
 
+        if len(user_id) > MAX_USERID_LENGTH:
+            raise SynapseError(
+                400,
+                "User ID may not be longer than %s characters" % (
+                    MAX_USERID_LENGTH,
+                ),
+                Codes.INVALID_USERNAME
+            )
+
         users = yield self.store.get_users_by_id_case_insensitive(user_id)
         if users:
             if not guest_access_token:
@@ -522,6 +531,8 @@ class RegistrationHandler(BaseHandler):
             A tuple of (user_id, access_token).
         Raises:
             RegistrationError if there was a problem registering.
+
+        NB this is only used in tests. TODO: move it to the test package!
         """
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 17628e2684..74793bab33 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -70,10 +70,15 @@ class RoomCreationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.config = hs.config
 
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
 
+        self._server_notices_mxid = hs.config.server_notices_mxid
+
+        self.third_party_event_rules = hs.get_third_party_event_rules()
+
     @defer.inlineCallbacks
     def upgrade_room(self, requester, old_room_id, new_version):
         """Replace a room with a new room with a different version
@@ -402,7 +407,7 @@ class RoomCreationHandler(BaseHandler):
                 yield directory_handler.create_association(
                     requester, RoomAlias.from_string(alias),
                     new_room_id, servers=(self.hs.hostname, ),
-                    send_event=False,
+                    send_event=False, check_membership=False,
                 )
                 logger.info("Moved alias %s to new room", alias)
             except SynapseError as e:
@@ -469,13 +474,36 @@ class RoomCreationHandler(BaseHandler):
 
         yield self.auth.check_auth_blocking(user_id)
 
-        if not self.spam_checker.user_may_create_room(user_id):
+        if (self._server_notices_mxid is not None and
+                requester.user.to_string() == self._server_notices_mxid):
+            # allow the server notices mxid to create rooms
+            is_requester_admin = True
+        else:
+            is_requester_admin = yield self.auth.is_server_admin(
+                requester.user,
+            )
+
+        # Check whether the third party rules allows/changes the room create
+        # request.
+        yield self.third_party_event_rules.on_create_room(
+            requester,
+            config,
+            is_requester_admin=is_requester_admin,
+        )
+
+        if not is_requester_admin and not self.spam_checker.user_may_create_room(
+            user_id,
+        ):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+        room_version = config.get(
+            "room_version",
+            self.config.default_room_version.identifier,
+        )
+
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
@@ -538,6 +566,7 @@ class RoomCreationHandler(BaseHandler):
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
                 send_event=False,
+                check_membership=False,
             )
 
         preset_config = config.get(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 024d6db27a..458902bb7e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -33,6 +34,8 @@ from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
+from ._base import BaseHandler
+
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
@@ -69,8 +72,15 @@ class RoomMemberHandler(object):
 
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
+        self.third_party_event_rules = hs.get_third_party_event_rules()
         self._server_notices_mxid = self.config.server_notices_mxid
         self._enable_lookup = hs.config.enable_3pid_lookup
+        self.allow_per_room_profiles = self.config.allow_per_room_profiles
+
+        # This is only used to get at ratelimit function, and
+        # maybe_kick_guest_users. It's fine there are multiple of these as
+        # it doesn't store state.
+        self.base_handler = BaseHandler(hs)
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -350,6 +360,13 @@ class RoomMemberHandler(object):
             # later on.
             content = dict(content)
 
+        if not self.allow_per_room_profiles:
+            # Strip profile data, knowing that new profile data will be added to the
+            # event's content in event_creation_handler.create_event() using the target's
+            # global profile.
+            content.pop("displayname", None)
+            content.pop("avatar_url", None)
+
         effective_membership_state = action
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
@@ -703,6 +720,19 @@ class RoomMemberHandler(object):
                     Codes.FORBIDDEN,
                 )
 
+        # We need to rate limit *before* we send out any 3PID invites, so we
+        # can't just rely on the standard ratelimiting of events.
+        yield self.base_handler.ratelimit(requester)
+
+        can_invite = yield self.third_party_event_rules.check_threepid_can_be_invited(
+            medium, address, room_id,
+        )
+        if not can_invite:
+            raise SynapseError(
+                403, "This third-party identifier can not be invited in this room",
+                Codes.FORBIDDEN,
+            )
+
         invitee = yield self._lookup_3pid(
             id_server, medium, address
         )
@@ -924,7 +954,7 @@ class RoomMemberHandler(object):
         }
 
         if self.config.invite_3pid_guest:
-            guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+            guest_user_id, guest_access_token = yield self.get_or_register_3pid_guest(
                 requester=requester,
                 medium=medium,
                 address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 49c439313e..9bba74d6c9 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
-from synapse.events.utils import serialize_event
 from synapse.storage.state import StateFilter
 from synapse.visibility import filter_events_for_client
 
@@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):
 
     def __init__(self, hs):
         super(SearchHandler, self).__init__(hs)
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     def get_old_rooms_from_upgraded_room(self, room_id):
@@ -401,14 +401,16 @@ class SearchHandler(BaseHandler):
         time_now = self.clock.time_msec()
 
         for context in contexts.values():
-            context["events_before"] = [
-                serialize_event(e, time_now)
-                for e in context["events_before"]
-            ]
-            context["events_after"] = [
-                serialize_event(e, time_now)
-                for e in context["events_after"]
-            ]
+            context["events_before"] = (
+                yield self._event_serializer.serialize_events(
+                    context["events_before"], time_now,
+                )
+            )
+            context["events_after"] = (
+                yield self._event_serializer.serialize_events(
+                    context["events_after"], time_now,
+                )
+            )
 
         state_results = {}
         if include_state:
@@ -422,14 +424,13 @@ class SearchHandler(BaseHandler):
         # We're now about to serialize the events. We should not make any
         # blocking calls after this. Otherwise the 'age' will be wrong
 
-        results = [
-            {
+        results = []
+        for e in allowed_events:
+            results.append({
                 "rank": rank_map[e.event_id],
-                "result": serialize_event(e, time_now),
+                "result": (yield self._event_serializer.serialize_event(e, time_now)),
                 "context": contexts.get(e.event_id, {}),
-            }
-            for e in allowed_events
-        ]
+            })
 
         rooms_cat_res = {
             "results": results,
@@ -438,10 +439,13 @@ class SearchHandler(BaseHandler):
         }
 
         if state_results:
-            rooms_cat_res["state"] = {
-                room_id: [serialize_event(e, time_now) for e in state]
-                for room_id, state in state_results.items()
-            }
+            s = {}
+            for room_id, state in state_results.items():
+                s[room_id] = yield self._event_serializer.serialize_events(
+                    state, time_now,
+                )
+
+            rooms_cat_res["state"] = s
 
         if room_groups and "room_id" in group_keys:
             rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..7ad16c8566
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,333 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+    """Handles keeping the *_stats tables updated with a simple time-series of
+    information about the users, rooms and media on the server, such that admins
+    have some idea of who is consuming their resources.
+
+    Heavily derived from UserDirectoryHandler
+    """
+
+    def __init__(self, hs):
+        super(StatsHandler, self).__init__(hs)
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.stats_enabled:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating stats
+            self.clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if not self.hs.config.stats_enabled:
+            return
+
+        if self._is_processing:
+            return
+
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        self._is_processing = True
+        run_as_background_process("stats.notify_new_event", process)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # If still None then the initial background update hasn't happened yet
+        if self.pos is None:
+            defer.returnValue(None)
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "stats_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                logger.info("Handling %d state deltas", len(deltas))
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_stats_stream_pos(self.pos)
+
+                event_processing_positions.labels("stats").set(self.pos)
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """
+        Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            stream_id = delta["stream_id"]
+            prev_event_id = delta["prev_event_id"]
+            stream_pos = delta["stream_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+            # If the earliest token to begin from is larger than our current
+            # stream ID, skip processing this delta.
+            if token is not None and token >= stream_id:
+                logger.debug(
+                    "Ignoring: %s as earlier than this room's initial ingestion event",
+                    event_id,
+                )
+                continue
+
+            if event_id is None and prev_event_id is None:
+                # Errr...
+                continue
+
+            event_content = {}
+
+            if event_id is not None:
+                event = yield self.store.get_event(event_id, allow_none=True)
+                if event:
+                    event_content = event.content or {}
+
+            # We use stream_pos here rather than fetch by event_id as event_id
+            # may be None
+            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+
+            # quantise time to the nearest bucket
+            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+            if typ == EventTypes.Member:
+                # we could use _get_key_change here but it's a bit inefficient
+                # given we're not testing for a specific result; might as well
+                # just grab the prev_membership and membership strings and
+                # compare them.
+                prev_event_content = {}
+                if prev_event_id is not None:
+                    prev_event = yield self.store.get_event(
+                        prev_event_id, allow_none=True,
+                    )
+                    if prev_event:
+                        prev_event_content = prev_event.content
+
+                membership = event_content.get("membership", Membership.LEAVE)
+                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+                if prev_membership == membership:
+                    continue
+
+                if prev_membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", -1
+                    )
+                elif prev_membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", -1
+                    )
+                elif prev_membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", -1
+                    )
+                elif prev_membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", -1
+                    )
+                else:
+                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                if membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", +1
+                    )
+                elif membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", +1
+                    )
+                elif membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", +1
+                    )
+                elif membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", +1
+                    )
+                else:
+                    err = "%s is not a valid membership" % (repr(membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                user_id = state_key
+                if self.is_mine_id(user_id):
+                    # update user_stats as it's one of our users
+                    public = yield self._is_public_room(room_id)
+
+                    if membership == Membership.LEAVE:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            -1,
+                        )
+                    elif membership == Membership.JOIN:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            +1,
+                        )
+
+            elif typ == EventTypes.Create:
+                # Newly created room. Add it with all blank portions.
+                yield self.store.update_room_state(
+                    room_id,
+                    {
+                        "join_rules": None,
+                        "history_visibility": None,
+                        "encryption": None,
+                        "name": None,
+                        "topic": None,
+                        "avatar": None,
+                        "canonical_alias": None,
+                    },
+                )
+
+            elif typ == EventTypes.JoinRules:
+                yield self.store.update_room_state(
+                    room_id, {"join_rules": event_content.get("join_rule")}
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.RoomHistoryVisibility:
+                yield self.store.update_room_state(
+                    room_id,
+                    {"history_visibility": event_content.get("history_visibility")},
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "history_visibility", "world_readable"
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.Encryption:
+                yield self.store.update_room_state(
+                    room_id, {"encryption": event_content.get("algorithm")}
+                )
+            elif typ == EventTypes.Name:
+                yield self.store.update_room_state(
+                    room_id, {"name": event_content.get("name")}
+                )
+            elif typ == EventTypes.Topic:
+                yield self.store.update_room_state(
+                    room_id, {"topic": event_content.get("topic")}
+                )
+            elif typ == EventTypes.RoomAvatar:
+                yield self.store.update_room_state(
+                    room_id, {"avatar": event_content.get("url")}
+                )
+            elif typ == EventTypes.CanonicalAlias:
+                yield self.store.update_room_state(
+                    room_id, {"canonical_alias": event_content.get("alias")}
+                )
+
+    @defer.inlineCallbacks
+    def update_public_room_stats(self, ts, room_id, is_public):
+        """
+        Increment/decrement a user's number of public rooms when a room they are
+        in changes to/from public visibility.
+
+        Args:
+            ts (int): Timestamp in seconds
+            room_id (str)
+            is_public (bool)
+        """
+        # For now, blindly iterate over all local users in the room so that
+        # we can handle the whole problem of copying buckets over as needed
+        user_ids = yield self.store.get_users_in_room(room_id)
+
+        for user_id in user_ids:
+            if self.hs.is_mine(UserID.from_string(user_id)):
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
+                )
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                )
+
+    @defer.inlineCallbacks
+    def _is_public_room(self, room_id):
+        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+        history_visibility = yield self.state.get_current_state(
+            room_id, EventTypes.RoomHistoryVisibility
+        )
+
+        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+            (
+                history_visibility
+                and history_visibility.content.get("history_visibility")
+                == "world_readable"
+            )
+        ):
+            defer.returnValue(True)
+        else:
+            defer.returnValue(False)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 153312e39f..62fda0c664 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -583,30 +583,42 @@ class SyncHandler(object):
         )
 
         # if the room has a name or canonical_alias set, we can skip
-        # calculating heroes.  we assume that if the event has contents, it'll
-        # be a valid name or canonical_alias - i.e. we're checking that they
-        # haven't been "deleted" by blatting {} over the top.
+        # calculating heroes. Empty strings are falsey, so we check
+        # for the "name" value and default to an empty string.
         if name_id:
             name = yield self.store.get_event(name_id, allow_none=True)
-            if name and name.content:
+            if name and name.content.get("name"):
                 defer.returnValue(summary)
 
         if canonical_alias_id:
             canonical_alias = yield self.store.get_event(
                 canonical_alias_id, allow_none=True,
             )
-            if canonical_alias and canonical_alias.content:
+            if canonical_alias and canonical_alias.content.get("alias"):
                 defer.returnValue(summary)
 
+        me = sync_config.user.to_string()
+
         joined_user_ids = [
-            r[0] for r in details.get(Membership.JOIN, empty_ms).members
+            r[0]
+            for r in details.get(Membership.JOIN, empty_ms).members
+            if r[0] != me
         ]
         invited_user_ids = [
-            r[0] for r in details.get(Membership.INVITE, empty_ms).members
+            r[0]
+            for r in details.get(Membership.INVITE, empty_ms).members
+            if r[0] != me
         ]
         gone_user_ids = (
-            [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
-            [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+            [
+                r[0]
+                for r in details.get(Membership.LEAVE, empty_ms).members
+                if r[0] != me
+            ] + [
+                r[0]
+                for r in details.get(Membership.BAN, empty_ms).members
+                if r[0] != me
+            ]
         )
 
         # FIXME: only build up a member_ids list for our heroes
@@ -621,22 +633,13 @@ class SyncHandler(object):
                 member_ids[user_id] = event_id
 
         # FIXME: order by stream ordering rather than as returned by SQL
-        me = sync_config.user.to_string()
         if (joined_user_ids or invited_user_ids):
             summary['m.heroes'] = sorted(
-                [
-                    user_id
-                    for user_id in (joined_user_ids + invited_user_ids)
-                    if user_id != me
-                ]
+                [user_id for user_id in (joined_user_ids + invited_user_ids)]
             )[0:5]
         else:
             summary['m.heroes'] = sorted(
-                [
-                    user_id
-                    for user_id in gone_user_ids
-                    if user_id != me
-                ]
+                [user_id for user_id in gone_user_ids]
             )[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
@@ -934,7 +937,7 @@ class SyncHandler(object):
         res = yield self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
-        newly_joined_rooms, newly_joined_users, _, _ = res
+        newly_joined_rooms, newly_joined_or_invited_users, _, _ = res
         _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
@@ -943,7 +946,7 @@ class SyncHandler(object):
         )
         if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
-                sync_result_builder, newly_joined_rooms, newly_joined_users
+                sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
             )
 
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
@@ -951,7 +954,7 @@ class SyncHandler(object):
         device_lists = yield self._generate_sync_entry_for_device_list(
             sync_result_builder,
             newly_joined_rooms=newly_joined_rooms,
-            newly_joined_users=newly_joined_users,
+            newly_joined_or_invited_users=newly_joined_or_invited_users,
             newly_left_rooms=newly_left_rooms,
             newly_left_users=newly_left_users,
         )
@@ -1036,7 +1039,8 @@ class SyncHandler(object):
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
     def _generate_sync_entry_for_device_list(self, sync_result_builder,
-                                             newly_joined_rooms, newly_joined_users,
+                                             newly_joined_rooms,
+                                             newly_joined_or_invited_users,
                                              newly_left_rooms, newly_left_users):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
@@ -1050,7 +1054,7 @@ class SyncHandler(object):
             # share a room with?
             for room_id in newly_joined_rooms:
                 joined_users = yield self.state.get_current_users_in_room(room_id)
-                newly_joined_users.update(joined_users)
+                newly_joined_or_invited_users.update(joined_users)
 
             for room_id in newly_left_rooms:
                 left_users = yield self.state.get_current_users_in_room(room_id)
@@ -1058,7 +1062,7 @@ class SyncHandler(object):
 
             # TODO: Check that these users are actually new, i.e. either they
             # weren't in the previous sync *or* they left and rejoined.
-            changed.update(newly_joined_users)
+            changed.update(newly_joined_or_invited_users)
 
             if not changed and not newly_left_users:
                 defer.returnValue(DeviceLists(
@@ -1176,7 +1180,7 @@ class SyncHandler(object):
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
-                                          newly_joined_users):
+                                          newly_joined_or_invited_users):
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -1184,8 +1188,9 @@ class SyncHandler(object):
             sync_result_builder(SyncResultBuilder)
             newly_joined_rooms(list): List of rooms that the user has joined
                 since the last sync (or empty if an initial sync)
-            newly_joined_users(list): List of users that have joined rooms
-                since the last sync (or empty if an initial sync)
+            newly_joined_or_invited_users(list): List of users that have joined
+                or been invited to rooms since the last sync (or empty if an initial
+                sync)
         """
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
@@ -1211,7 +1216,7 @@ class SyncHandler(object):
             "presence_key", presence_key
         )
 
-        extra_users_ids = set(newly_joined_users)
+        extra_users_ids = set(newly_joined_or_invited_users)
         for room_id in newly_joined_rooms:
             users = yield self.state.get_current_users_in_room(room_id)
             extra_users_ids.update(users)
@@ -1243,7 +1248,8 @@ class SyncHandler(object):
 
         Returns:
             Deferred(tuple): Returns a 4-tuple of
-            `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
+            `(newly_joined_rooms, newly_joined_or_invited_users,
+            newly_left_rooms, newly_left_users)`
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
@@ -1314,8 +1320,8 @@ class SyncHandler(object):
 
         sync_result_builder.invited.extend(invited)
 
-        # Now we want to get any newly joined users
-        newly_joined_users = set()
+        # Now we want to get any newly joined or invited users
+        newly_joined_or_invited_users = set()
         newly_left_users = set()
         if since_token:
             for joined_sync in sync_result_builder.joined:
@@ -1324,19 +1330,22 @@ class SyncHandler(object):
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
-                        if event.membership == Membership.JOIN:
-                            newly_joined_users.add(event.state_key)
+                        if (
+                            event.membership == Membership.JOIN or
+                            event.membership == Membership.INVITE
+                        ):
+                            newly_joined_or_invited_users.add(event.state_key)
                         else:
                             prev_content = event.unsigned.get("prev_content", {})
                             prev_membership = prev_content.get("membership", None)
                             if prev_membership == Membership.JOIN:
                                 newly_left_users.add(event.state_key)
 
-        newly_left_users -= newly_joined_users
+        newly_left_users -= newly_joined_or_invited_users
 
         defer.returnValue((
             newly_joined_rooms,
-            newly_joined_users,
+            newly_joined_or_invited_users,
             newly_left_rooms,
             newly_left_users,
         ))
@@ -1381,7 +1390,7 @@ class SyncHandler(object):
             where:
                 room_entries is a list [RoomSyncResultBuilder]
                 invited_rooms is a list [InvitedSyncResult]
-                newly_joined rooms is a list[str] of room ids
+                newly_joined_rooms is a list[str] of room ids
                 newly_left_rooms is a list[str] of room ids
         """
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1422,7 +1431,7 @@ class SyncHandler(object):
             if room_id in sync_result_builder.joined_room_ids and non_joins:
                 # Always include if the user (re)joined the room, especially
                 # important so that device list changes are calculated correctly.
-                # If there are non join member events, but we are still in the room,
+                # If there are non-join member events, but we are still in the room,
                 # then the user must have left and joined
                 newly_joined_rooms.append(room_id)