summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_validity.py253
-rw-r--r--synapse/handlers/auth.py7
-rw-r--r--synapse/handlers/deactivate_account.py6
-rw-r--r--synapse/handlers/directory.py26
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/handlers/identity.py73
-rw-r--r--synapse/handlers/message.py9
-rw-r--r--synapse/handlers/presence.py342
-rw-r--r--synapse/handlers/register.py17
-rw-r--r--synapse/handlers/room.py15
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py14
-rw-r--r--synapse/handlers/sync.py8
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/handlers/user_directory.py4
16 files changed, 556 insertions, 239 deletions
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
new file mode 100644
index 0000000000..261446517d
--- /dev/null
+++ b/synapse/handlers/account_validity.py
@@ -0,0 +1,253 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import email.mime.multipart
+import email.utils
+import logging
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from synapse.types import UserID
+from synapse.util import stringutils
+from synapse.util.logcontext import make_deferred_yieldable
+
+try:
+    from synapse.push.mailer import load_jinja2_templates
+except ImportError:
+    load_jinja2_templates = None
+
+logger = logging.getLogger(__name__)
+
+
+class AccountValidityHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = self.hs.get_datastore()
+        self.sendmail = self.hs.get_sendmail()
+        self.clock = self.hs.get_clock()
+
+        self._account_validity = self.hs.config.account_validity
+
+        if self._account_validity.renew_by_email_enabled and load_jinja2_templates:
+            # Don't do email-specific configuration if renewal by email is disabled.
+            try:
+                app_name = self.hs.config.email_app_name
+
+                self._subject = self._account_validity.renew_email_subject % {
+                    "app": app_name,
+                }
+
+                self._from_string = self.hs.config.email_notif_from % {
+                    "app": app_name,
+                }
+            except Exception:
+                # If substitution failed, fall back to the bare strings.
+                self._subject = self._account_validity.renew_email_subject
+                self._from_string = self.hs.config.email_notif_from
+
+            self._raw_from = email.utils.parseaddr(self._from_string)[1]
+
+            self._template_html, self._template_text = load_jinja2_templates(
+                config=self.hs.config,
+                template_html_name=self.hs.config.email_expiry_template_html,
+                template_text_name=self.hs.config.email_expiry_template_text,
+            )
+
+            # Check the renewal emails to send and send them every 30min.
+            self.clock.looping_call(
+                self.send_renewal_emails,
+                30 * 60 * 1000,
+            )
+
+    @defer.inlineCallbacks
+    def send_renewal_emails(self):
+        """Gets the list of users whose account is expiring in the amount of time
+        configured in the ``renew_at`` parameter from the ``account_validity``
+        configuration, and sends renewal emails to all of these users as long as they
+        have an email 3PID attached to their account.
+        """
+        expiring_users = yield self.store.get_users_expiring_soon()
+
+        if expiring_users:
+            for user in expiring_users:
+                yield self._send_renewal_email(
+                    user_id=user["user_id"],
+                    expiration_ts=user["expiration_ts_ms"],
+                )
+
+    @defer.inlineCallbacks
+    def send_renewal_email_to_user(self, user_id):
+        expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+        yield self._send_renewal_email(user_id, expiration_ts)
+
+    @defer.inlineCallbacks
+    def _send_renewal_email(self, user_id, expiration_ts):
+        """Sends out a renewal email to every email address attached to the given user
+        with a unique link allowing them to renew their account.
+
+        Args:
+            user_id (str): ID of the user to send email(s) to.
+            expiration_ts (int): Timestamp in milliseconds for the expiration date of
+                this user's account (used in the email templates).
+        """
+        addresses = yield self._get_email_addresses_for_user(user_id)
+
+        # Stop right here if the user doesn't have at least one email address.
+        # In this case, they will have to ask their server admin to renew their
+        # account manually.
+        if not addresses:
+            return
+
+        try:
+            user_display_name = yield self.store.get_profile_displayname(
+                UserID.from_string(user_id).localpart
+            )
+            if user_display_name is None:
+                user_display_name = user_id
+        except StoreError:
+            user_display_name = user_id
+
+        renewal_token = yield self._get_renewal_token(user_id)
+        url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % (
+            self.hs.config.public_baseurl,
+            renewal_token,
+        )
+
+        template_vars = {
+            "display_name": user_display_name,
+            "expiration_ts": expiration_ts,
+            "url": url,
+        }
+
+        html_text = self._template_html.render(**template_vars)
+        html_part = MIMEText(html_text, "html", "utf8")
+
+        plain_text = self._template_text.render(**template_vars)
+        text_part = MIMEText(plain_text, "plain", "utf8")
+
+        for address in addresses:
+            raw_to = email.utils.parseaddr(address)[1]
+
+            multipart_msg = MIMEMultipart('alternative')
+            multipart_msg['Subject'] = self._subject
+            multipart_msg['From'] = self._from_string
+            multipart_msg['To'] = address
+            multipart_msg['Date'] = email.utils.formatdate()
+            multipart_msg['Message-ID'] = email.utils.make_msgid()
+            multipart_msg.attach(text_part)
+            multipart_msg.attach(html_part)
+
+            logger.info("Sending renewal email to %s", address)
+
+            yield make_deferred_yieldable(self.sendmail(
+                self.hs.config.email_smtp_host,
+                self._raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
+                reactor=self.hs.get_reactor(),
+                port=self.hs.config.email_smtp_port,
+                requireAuthentication=self.hs.config.email_smtp_user is not None,
+                username=self.hs.config.email_smtp_user,
+                password=self.hs.config.email_smtp_pass,
+                requireTransportSecurity=self.hs.config.require_transport_security
+            ))
+
+        yield self.store.set_renewal_mail_status(
+            user_id=user_id,
+            email_sent=True,
+        )
+
+    @defer.inlineCallbacks
+    def _get_email_addresses_for_user(self, user_id):
+        """Retrieve the list of email addresses attached to a user's account.
+
+        Args:
+            user_id (str): ID of the user to lookup email addresses for.
+
+        Returns:
+            defer.Deferred[list[str]]: Email addresses for this account.
+        """
+        threepids = yield self.store.user_get_threepids(user_id)
+
+        addresses = []
+        for threepid in threepids:
+            if threepid["medium"] == "email":
+                addresses.append(threepid["address"])
+
+        defer.returnValue(addresses)
+
+    @defer.inlineCallbacks
+    def _get_renewal_token(self, user_id):
+        """Generates a 32-byte long random string that will be inserted into the
+        user's renewal email's unique link, then saves it into the database.
+
+        Args:
+            user_id (str): ID of the user to generate a string for.
+
+        Returns:
+            defer.Deferred[str]: The generated string.
+
+        Raises:
+            StoreError(500): Couldn't generate a unique string after 5 attempts.
+        """
+        attempts = 0
+        while attempts < 5:
+            try:
+                renewal_token = stringutils.random_string(32)
+                yield self.store.set_renewal_token_for_user(user_id, renewal_token)
+                defer.returnValue(renewal_token)
+            except StoreError:
+                attempts += 1
+        raise StoreError(500, "Couldn't generate a unique string as refresh string.")
+
+    @defer.inlineCallbacks
+    def renew_account(self, renewal_token):
+        """Renews the account attached to a given renewal token by pushing back the
+        expiration date by the current validity period in the server's configuration.
+
+        Args:
+            renewal_token (str): Token sent with the renewal request.
+        """
+        user_id = yield self.store.get_user_from_renewal_token(renewal_token)
+        logger.debug("Renewing an account for user %s", user_id)
+        yield self.renew_account_for_user(user_id)
+
+    @defer.inlineCallbacks
+    def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
+        """Renews the account attached to a given user by pushing back the
+        expiration date by the current validity period in the server's
+        configuration.
+
+        Args:
+            renewal_token (str): Token sent with the renewal request.
+            expiration_ts (int): New expiration date. Defaults to now + validity period.
+            email_sent (bool): Whether an email has been sent for this validity period.
+                Defaults to False.
+
+        Returns:
+            defer.Deferred[int]: New expiration date for this account, as a timestamp
+                in milliseconds since epoch.
+        """
+        if expiration_ts is None:
+            expiration_ts = self.clock.time_msec() + self._account_validity.period
+
+        yield self.store.set_account_validity_for_user(
+            user_id=user_id,
+            expiration_ts=expiration_ts,
+            email_sent=email_sent,
+        )
+
+        defer.returnValue(expiration_ts)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4544de821d..aa5d89a9ac 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -912,7 +912,7 @@ class AuthHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def delete_threepid(self, user_id, medium, address):
+    def delete_threepid(self, user_id, medium, address, id_server=None):
         """Attempts to unbind the 3pid on the identity servers and deletes it
         from the local database.
 
@@ -920,6 +920,10 @@ class AuthHandler(BaseHandler):
             user_id (str)
             medium (str)
             address (str)
+            id_server (str|None): Use the given identity server when unbinding
+                any threepids. If None then will attempt to unbind using the
+                identity server specified when binding (if known).
+
 
         Returns:
             Deferred[bool]: Returns True if successfully unbound the 3pid on
@@ -937,6 +941,7 @@ class AuthHandler(BaseHandler):
             {
                 'medium': medium,
                 'address': address,
+                'id_server': id_server,
             },
         )
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 97d3f31d98..6a91f7698e 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -43,12 +43,15 @@ class DeactivateAccountHandler(BaseHandler):
         hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
-    def deactivate_account(self, user_id, erase_data):
+    def deactivate_account(self, user_id, erase_data, id_server=None):
         """Deactivate a user's account
 
         Args:
             user_id (str): ID of user to be deactivated
             erase_data (bool): whether to GDPR-erase the user's data
+            id_server (str|None): Use the given identity server when unbinding
+                any threepids. If None then will attempt to unbind using the
+                identity server specified when binding (if known).
 
         Returns:
             Deferred[bool]: True if identity server supports removing
@@ -74,6 +77,7 @@ class DeactivateAccountHandler(BaseHandler):
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
+                        'id_server': id_server,
                     },
                 )
                 identity_server_supports_unbinding &= result
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index fe128d9c88..50c587aa61 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
 
 
 class DirectoryHandler(BaseHandler):
+    MAX_ALIAS_LENGTH = 255
 
     def __init__(self, hs):
         super(DirectoryHandler, self).__init__(hs)
@@ -43,8 +44,10 @@ class DirectoryHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
         self.config = hs.config
         self.enable_room_list_search = hs.config.enable_room_list_search
+        self.require_membership = hs.config.require_membership_for_aliases
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -68,7 +71,7 @@ class DirectoryHandler(BaseHandler):
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = yield self.state.get_current_user_in_room(room_id)
+            users = yield self.state.get_current_users_in_room(room_id)
             servers = set(get_domain_from_id(u) for u in users)
 
         if not servers:
@@ -83,7 +86,7 @@ class DirectoryHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def create_association(self, requester, room_alias, room_id, servers=None,
-                           send_event=True):
+                           send_event=True, check_membership=True):
         """Attempt to create a new alias
 
         Args:
@@ -93,6 +96,8 @@ class DirectoryHandler(BaseHandler):
             servers (list[str]|None): List of servers that others servers
                 should try and join via
             send_event (bool): Whether to send an updated m.room.aliases event
+            check_membership (bool): Whether to check if the user is in the room
+                before the alias can be set (if the server's config requires it).
 
         Returns:
             Deferred
@@ -100,6 +105,13 @@ class DirectoryHandler(BaseHandler):
 
         user_id = requester.user.to_string()
 
+        if len(room_alias.to_string()) > self.MAX_ALIAS_LENGTH:
+            raise SynapseError(
+                400,
+                "Can't create aliases longer than %s characters" % self.MAX_ALIAS_LENGTH,
+                Codes.INVALID_PARAM,
+            )
+
         service = requester.app_service
         if service:
             if not service.is_interested_in_alias(room_alias.to_string()):
@@ -108,6 +120,14 @@ class DirectoryHandler(BaseHandler):
                     " this kind of alias.", errcode=Codes.EXCLUSIVE
                 )
         else:
+            if self.require_membership and check_membership:
+                rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+                if room_id not in rooms_for_user:
+                    raise AuthError(
+                        403,
+                        "You must be in the room to create an alias for it",
+                    )
+
             if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
                 raise AuthError(
                     403, "This user is not permitted to create this alias",
@@ -268,7 +288,7 @@ class DirectoryHandler(BaseHandler):
                 Codes.NOT_FOUND
             )
 
-        users = yield self.state.get_current_user_in_room(room_id)
+        users = yield self.state.get_current_users_in_room(room_id)
         extra_servers = set(get_domain_from_id(u) for u in users)
         servers = set(extra_servers) | set(servers)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d883e98381..1b4d8c74ae 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -102,7 +102,7 @@ class EventStreamHandler(BaseHandler):
                     # Send down presence.
                     if event.state_key == auth_user_id:
                         # Send down presence for everyone in the room.
-                        users = yield self.state.get_current_user_in_room(event.room_id)
+                        users = yield self.state.get_current_users_in_room(event.room_id)
                         states = yield presence_handler.get_states(
                             users,
                             as_event=True,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    Membership,
-    RejectedReason,
-    RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -44,6 +38,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
 from synapse.events.validator import EventValidator
@@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler):
             # invalid, and it would fail auth checks anyway.
             raise SynapseError(400, "No create event in state")
 
-        room_version = create_event.content.get("room_version", RoomVersions.V1)
+        room_version = create_event.content.get(
+            "room_version", RoomVersions.V1.identifier,
+        )
 
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 39184f0e22..22469486d7 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -132,6 +132,14 @@ class IdentityHandler(BaseHandler):
                 }
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
+
+            # Remember where we bound the threepid
+            yield self.store.add_user_bound_threepid(
+                user_id=mxid,
+                medium=data["medium"],
+                address=data["address"],
+                id_server=id_server,
+            )
         except CodeMessageException as e:
             data = json.loads(e.msg)  # XXX WAT?
         defer.returnValue(data)
@@ -142,30 +150,61 @@ class IdentityHandler(BaseHandler):
 
         Args:
             mxid (str): Matrix user ID of binding to be removed
-            threepid (dict): Dict with medium & address of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be
+                removed, and an optional id_server.
 
         Raises:
             SynapseError: If we failed to contact the identity server
 
         Returns:
             Deferred[bool]: True on success, otherwise False if the identity
-            server doesn't support unbinding
+            server doesn't support unbinding (or no identity server found to
+            contact).
         """
-        logger.debug("unbinding threepid %r from %s", threepid, mxid)
-        if not self.trusted_id_servers:
-            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+        if threepid.get("id_server"):
+            id_servers = [threepid["id_server"]]
+        else:
+            id_servers = yield self.store.get_id_servers_user_bound(
+                user_id=mxid,
+                medium=threepid["medium"],
+                address=threepid["address"],
+            )
+
+        # We don't know where to unbind, so we don't have a choice but to return
+        if not id_servers:
             defer.returnValue(False)
 
-        # We don't track what ID server we added 3pids on (perhaps we ought to)
-        # but we assume that any of the servers in the trusted list are in the
-        # same ID server federation, so we can pick any one of them to send the
-        # deletion request to.
-        id_server = next(iter(self.trusted_id_servers))
+        changed = True
+        for id_server in id_servers:
+            changed &= yield self.try_unbind_threepid_with_id_server(
+                mxid, threepid, id_server,
+            )
+
+        defer.returnValue(changed)
+
+    @defer.inlineCallbacks
+    def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
+        """Removes a binding from an identity server
 
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+            id_server (str): Identity server to unbind from
+
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
+        """
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
         content = {
             "mxid": mxid,
-            "threepid": threepid,
+            "threepid": {
+                "medium": threepid["medium"],
+                "address": threepid["address"],
+            },
         }
 
         # we abuse the federation http client to sign the request, but we have to send it
@@ -188,16 +227,24 @@ class IdentityHandler(BaseHandler):
                 content,
                 headers,
             )
+            changed = True
         except HttpResponseException as e:
+            changed = False
             if e.code in (400, 404, 501,):
                 # The remote server probably doesn't support unbinding (yet)
                 logger.warn("Received %d response while unbinding threepid", e.code)
-                defer.returnValue(False)
             else:
                 logger.error("Failed to unbind threepid on identity server: %s", e)
                 raise SynapseError(502, "Failed to contact identity server")
 
-        defer.returnValue(True)
+        yield self.store.remove_user_bound_threepid(
+            user_id=mxid,
+            medium=threepid["medium"],
+            address=threepid["address"],
+            id_server=id_server,
+        )
+
+        defer.returnValue(changed)
 
     @defer.inlineCallbacks
     def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9b41c7b205..224d34ef3a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership, RoomVersions
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -30,6 +30,7 @@ from synapse.api.errors import (
     NotFoundError,
     SynapseError,
 )
+from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -191,7 +192,7 @@ class MessageHandler(object):
                     "Getting joined members after leaving is not implemented"
                 )
 
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
@@ -603,7 +604,9 @@ class EventCreationHandler(object):
         """
 
         if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
-            room_version = event.content.get("room_version", RoomVersions.V1)
+            room_version = event.content.get(
+                "room_version", RoomVersions.V1.identifier
+            )
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..59d53f1050 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -110,30 +113,6 @@ class PresenceHandler(object):
         federation_registry.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        federation_registry.register_edu_handler(
-            "m.presence_invite",
-            lambda origin, content: self.invite_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_accept",
-            lambda origin, content: self.accept_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_deny",
-            lambda origin, content: self.deny_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
 
         active_presence = self.store.take_presence_startup_info()
 
@@ -220,6 +199,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,199 +739,178 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
+    def is_visible(self, observed_user, observer_user):
+        """Returns whether a user can see another user's presence.
         """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
-            states = yield self.current_state_for_users(user_ids)
+        if observer_room_ids & observed_room_ids:
+            defer.returnValue(True)
 
-            self._push_to_remotes(list(states.values()))
+        defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def get_presence_list(self, observer_user, accepted=None):
-        """Returns the presence for all users in their presence list.
+    def get_all_presence_updates(self, last_id, current_id):
         """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
-
-        presence_list = yield self.store.get_presence_list(
-            observer_user.localpart, accepted=accepted
-        )
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
+        """
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
 
-        results = yield self.get_states(
-            target_user_ids=[row["observed_user_id"] for row in presence_list],
-            as_event=False,
-        )
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
 
-        now = self.clock.time_msec()
-        results[:] = [format_user_presence_state(r, now) for r in results]
+        if self._event_processing:
+            return
 
-        is_accepted = {
-            row["observed_user_id"]: row["accepted"] for row in presence_list
-        }
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
 
-        for result in results:
-            result.update({
-                "accepted": is_accepted,
-            })
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
 
-        defer.returnValue(results)
+        run_as_background_process("presence.notify_new_event", _process_presence)
 
     @defer.inlineCallbacks
-    def send_presence_invite(self, observer_user, observed_user):
-        """Sends a presence invite.
-        """
-        yield self.store.add_presence_list_pending(
-            observer_user.localpart, observed_user.to_string()
-        )
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
 
-        if self.is_mine(observed_user):
-            yield self.invite_presence(observed_user, observer_user)
-        else:
-            yield self.federation.build_and_send_edu(
-                destination=observed_user.domain,
-                edu_type="m.presence_invite",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
 
     @defer.inlineCallbacks
-    def invite_presence(self, observed_user, observer_user):
-        """Handles new presence invites.
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
         """
-        if not self.is_mine(observed_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
 
-        # TODO: Don't auto accept
-        if self.is_mine(observer_user):
-            yield self.accept_presence(observed_user, observer_user)
-        else:
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence_accept",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            state_dict = yield self.get_state(observed_user, as_event=False)
-            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
+            if typ != EventTypes.Member:
+                continue
 
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence",
-                content={
-                    "push": [state_dict]
-                }
-            )
+            if event_id is None:
+                # state has been deleted, so this is not a join. We only care about
+                # joins.
+                continue
 
-    @defer.inlineCallbacks
-    def accept_presence(self, observed_user, observer_user):
-        """Handles a m.presence_accept EDU. Mark a presence invite from a
-        local or remote user as accepted in a local user's presence list.
-        Starts polling for presence updates from the local or remote user.
-        Args:
-            observed_user(UserID): The user to update in the presence list.
-            observer_user(UserID): The owner of the presence list to update.
-        """
-        yield self.store.set_presence_list_accepted(
-            observer_user.localpart, observed_user.to_string()
-        )
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
 
-    @defer.inlineCallbacks
-    def deny_presence(self, observed_user, observer_user):
-        """Handle a m.presence_deny EDU. Removes a local or remote user from a
-        local user's presence list.
-        Args:
-            observed_user(UserID): The local or remote user to remove from the
-                list.
-            observer_user(UserID): The local owner of the presence list.
-        Returns:
-            A Deferred.
-        """
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
 
-        # TODO(paul): Inform the user somehow?
+            yield self._on_user_joined_room(room_id, state_key)
 
     @defer.inlineCallbacks
-    def drop(self, observed_user, observer_user):
-        """Remove a local or remote user from a local user's presence list and
-        unsubscribe the local user from updates that user.
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
         Args:
-            observed_user(UserId): The local or remote user to remove from the
-                list.
-            observer_user(UserId): The local owner of the presence list.
+            room_id (str)
+            user_id (str)
+
         Returns:
-            A Deferred.
+            Deferred
         """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
 
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
 
-        # TODO: Inform the remote that we've dropped the presence list.
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
 
-    @defer.inlineCallbacks
-    def is_visible(self, observed_user, observer_user):
-        """Returns whether a user can see another user's presence.
-        """
-        observer_room_ids = yield self.store.get_rooms_for_user(
-            observer_user.to_string()
-        )
-        observed_room_ids = yield self.store.get_rooms_for_user(
-            observed_user.to_string()
-        )
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
 
-        if observer_room_ids & observed_room_ids:
-            defer.returnValue(True)
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
 
-        accepted_observers = yield self.store.get_presence_list_observers_accepted(
-            observed_user.to_string()
-        )
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
 
-        defer.returnValue(observer_user.to_string() in accepted_observers)
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
 
-    @defer.inlineCallbacks
-    def get_all_presence_updates(self, last_id, current_id):
-        """
-        Gets a list of presence update rows from between the given stream ids.
-        Each row has:
-        - stream_id(str)
-        - user_id(str)
-        - state(str)
-        - last_active_ts(int)
-        - last_federation_update_ts(int)
-        - last_user_sync_ts(int)
-        - status_msg(int)
-        - currently_active(int)
-        """
-        # TODO(markjh): replicate the unpersisted changes.
-        # This could use the in-memory stores for recent changes.
-        rows = yield self.store.get_all_presence_updates(last_id, current_id)
-        defer.returnValue(rows)
+            # TODO: Check that this is actually a new server joining the
+            # room.
+
+            user_ids = yield self.state.get_current_users_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
+
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
 
 
 def should_notify(old_state, new_state):
@@ -1086,10 +1053,7 @@ class PresenceEventSource(object):
         updates for
         """
         user_id = user.to_string()
-        plist = yield self.store.get_presence_list_accepted(
-            user.localpart, on_invalidate=cache_context.invalidate,
-        )
-        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@@ -1294,10 +1258,6 @@ def get_interested_parties(store, states):
         for room_id in room_ids:
             room_ids_to_states.setdefault(room_id, []).append(state)
 
-        plist = yield store.get_presence_list_observers_accepted(state.user_id)
-        for u in plist:
-            users_to_states.setdefault(u, []).append(state)
-
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler):
         user_type=None,
         default_display_name=None,
         address=None,
+        bind_emails=[],
     ):
         """Registers a new client on the server.
 
@@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler):
             default_display_name (unicode|None): if set, the new user's displayname
               will be set to this. Defaults to 'localpart'.
             address (str|None): the IP address used to perform the registration.
+            bind_emails (List[str]): list of emails to bind to this account.
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler):
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
 
+        # Bind any specified emails to this account
+        current_time = self.hs.get_clock().time_msec()
+        for email in bind_emails:
+            # generate threepid dict
+            threepid_dict = {
+                "medium": "email",
+                "address": email,
+                "validated_at": current_time,
+            }
+
+            # Bind email to new account
+            yield self._register_email_threepid(
+                user_id, threepid_dict, None, False,
+            )
+
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..e37ae96899 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    DEFAULT_ROOM_VERSION,
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    JoinRules,
-    RoomCreationPreset,
-)
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -285,6 +280,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.RoomAvatar, ""),
             (EventTypes.Encryption, ""),
             (EventTypes.ServerACL, ""),
+            (EventTypes.RelatedGroups, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -406,7 +402,7 @@ class RoomCreationHandler(BaseHandler):
                 yield directory_handler.create_association(
                     requester, RoomAlias.from_string(alias),
                     new_room_id, servers=(self.hs.hostname, ),
-                    send_event=False,
+                    send_event=False, check_membership=False,
                 )
                 logger.info("Moved alias %s to new room", alias)
             except SynapseError as e:
@@ -479,7 +475,7 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
@@ -542,6 +538,7 @@ class RoomCreationHandler(BaseHandler):
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
                 send_event=False,
+                check_membership=False,
             )
 
         preset_config = config.get(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d6c9d56007..617d1c9ef8 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -167,7 +167,7 @@ class RoomListHandler(BaseHandler):
                 if not latest_event_ids:
                     return
 
-                joined_users = yield self.state_handler.get_current_user_in_room(
+                joined_users = yield self.state_handler.get_current_users_in_room(
                     room_id, latest_event_ids,
                 )
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 71ce5b54e5..024d6db27a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -70,6 +70,7 @@ class RoomMemberHandler(object):
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
         self._server_notices_mxid = self.config.server_notices_mxid
+        self._enable_lookup = hs.config.enable_3pid_lookup
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -421,6 +422,9 @@ class RoomMemberHandler(object):
             room_id, latest_event_ids=latest_event_ids,
         )
 
+        # TODO: Refactor into dictionary of explicitly allowed transitions
+        # between old and new state, with specific error messages for some
+        # transitions and generic otherwise
         old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
         if old_state_id:
             old_state = yield self.store.get_event(old_state_id, allow_none=True)
@@ -446,6 +450,9 @@ class RoomMemberHandler(object):
                 if same_sender and same_membership and same_content:
                     defer.returnValue(old_state)
 
+            if old_membership in ["ban", "leave"] and action == "kick":
+                raise AuthError(403, "The target user is not in the room")
+
             # we don't allow people to reject invites to the server notice
             # room, but they can leave it once they are joined.
             if (
@@ -459,6 +466,9 @@ class RoomMemberHandler(object):
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
+        else:
+            if action == "kick":
+                raise AuthError(403, "The target user is not in the room")
 
         is_host_in_room = yield self._is_host_in_room(current_state_ids)
 
@@ -729,6 +739,10 @@ class RoomMemberHandler(object):
         Returns:
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
+        if not self._enable_lookup:
+            raise SynapseError(
+                403, "Looking up third-party identifiers is denied from this server",
+            )
         try:
             data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 57bb996245..153312e39f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1049,11 +1049,11 @@ class SyncHandler(object):
             # TODO: Be more clever than this, i.e. remove users who we already
             # share a room with?
             for room_id in newly_joined_rooms:
-                joined_users = yield self.state.get_current_user_in_room(room_id)
+                joined_users = yield self.state.get_current_users_in_room(room_id)
                 newly_joined_users.update(joined_users)
 
             for room_id in newly_left_rooms:
-                left_users = yield self.state.get_current_user_in_room(room_id)
+                left_users = yield self.state.get_current_users_in_room(room_id)
                 newly_left_users.update(left_users)
 
             # TODO: Check that these users are actually new, i.e. either they
@@ -1213,7 +1213,7 @@ class SyncHandler(object):
 
         extra_users_ids = set(newly_joined_users)
         for room_id in newly_joined_rooms:
-            users = yield self.state.get_current_user_in_room(room_id)
+            users = yield self.state.get_current_users_in_room(room_id)
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
@@ -1855,7 +1855,7 @@ class SyncHandler(object):
             extrems = yield self.store.get_forward_extremeties_for_room(
                 room_id, stream_ordering,
             )
-            users_in_room = yield self.state.get_current_user_in_room(
+            users_in_room = yield self.state.get_current_users_in_room(
                 room_id, extrems,
             )
             if user_id in users_in_room:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 39df960c31..972662eb48 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,7 @@ class TypingHandler(object):
     @defer.inlineCallbacks
     def _push_remote(self, member, typing):
         try:
-            users = yield self.state.get_current_user_in_room(member.room_id)
+            users = yield self.state.get_current_users_in_room(member.room_id)
             self._member_last_federation_poke[member] = self.clock.time_msec()
 
             now = self.clock.time_msec()
@@ -261,7 +261,7 @@ class TypingHandler(object):
             )
             return
 
-        users = yield self.state.get_current_user_in_room(room_id)
+        users = yield self.state.get_current_users_in_room(room_id)
         domains = set(get_domain_from_id(u) for u in users)
 
         if self.server_name in domains:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b689979b4b..5de9630950 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -276,7 +276,7 @@ class UserDirectoryHandler(StateDeltasHandler):
             # ignore the change
             return
 
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         # Remove every user from the sharing tables for that room.
         for user_id in iterkeys(users_with_profile):
@@ -325,7 +325,7 @@ class UserDirectoryHandler(StateDeltasHandler):
             room_id
         )
         # Now we update users who share rooms with users.
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         if is_public:
             yield self.store.add_users_in_public_rooms(room_id, (user_id,))