summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py7
-rw-r--r--synapse/handlers/deactivate_account.py6
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/handlers/identity.py73
-rw-r--r--synapse/handlers/message.py9
-rw-r--r--synapse/handlers/presence.py337
-rw-r--r--synapse/handlers/register.py17
-rw-r--r--synapse/handlers/room.py12
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py14
-rw-r--r--synapse/handlers/sync.py8
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/handlers/user_directory.py4
15 files changed, 275 insertions, 237 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4544de821d..aa5d89a9ac 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -912,7 +912,7 @@ class AuthHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def delete_threepid(self, user_id, medium, address):
+    def delete_threepid(self, user_id, medium, address, id_server=None):
         """Attempts to unbind the 3pid on the identity servers and deletes it
         from the local database.
 
@@ -920,6 +920,10 @@ class AuthHandler(BaseHandler):
             user_id (str)
             medium (str)
             address (str)
+            id_server (str|None): Use the given identity server when unbinding
+                any threepids. If None then will attempt to unbind using the
+                identity server specified when binding (if known).
+
 
         Returns:
             Deferred[bool]: Returns True if successfully unbound the 3pid on
@@ -937,6 +941,7 @@ class AuthHandler(BaseHandler):
             {
                 'medium': medium,
                 'address': address,
+                'id_server': id_server,
             },
         )
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 97d3f31d98..6a91f7698e 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -43,12 +43,15 @@ class DeactivateAccountHandler(BaseHandler):
         hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
-    def deactivate_account(self, user_id, erase_data):
+    def deactivate_account(self, user_id, erase_data, id_server=None):
         """Deactivate a user's account
 
         Args:
             user_id (str): ID of user to be deactivated
             erase_data (bool): whether to GDPR-erase the user's data
+            id_server (str|None): Use the given identity server when unbinding
+                any threepids. If None then will attempt to unbind using the
+                identity server specified when binding (if known).
 
         Returns:
             Deferred[bool]: True if identity server supports removing
@@ -74,6 +77,7 @@ class DeactivateAccountHandler(BaseHandler):
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
+                        'id_server': id_server,
                     },
                 )
                 identity_server_supports_unbinding &= result
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index fe128d9c88..27bd06df5d 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -68,7 +68,7 @@ class DirectoryHandler(BaseHandler):
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = yield self.state.get_current_user_in_room(room_id)
+            users = yield self.state.get_current_users_in_room(room_id)
             servers = set(get_domain_from_id(u) for u in users)
 
         if not servers:
@@ -268,7 +268,7 @@ class DirectoryHandler(BaseHandler):
                 Codes.NOT_FOUND
             )
 
-        users = yield self.state.get_current_user_in_room(room_id)
+        users = yield self.state.get_current_users_in_room(room_id)
         extra_servers = set(get_domain_from_id(u) for u in users)
         servers = set(extra_servers) | set(servers)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d883e98381..1b4d8c74ae 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -102,7 +102,7 @@ class EventStreamHandler(BaseHandler):
                     # Send down presence.
                     if event.state_key == auth_user_id:
                         # Send down presence for everyone in the room.
-                        users = yield self.state.get_current_user_in_room(event.room_id)
+                        users = yield self.state.get_current_users_in_room(event.room_id)
                         states = yield presence_handler.get_states(
                             users,
                             as_event=True,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    Membership,
-    RejectedReason,
-    RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -44,6 +38,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
 from synapse.events.validator import EventValidator
@@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler):
             # invalid, and it would fail auth checks anyway.
             raise SynapseError(400, "No create event in state")
 
-        room_version = create_event.content.get("room_version", RoomVersions.V1)
+        room_version = create_event.content.get(
+            "room_version", RoomVersions.V1.identifier,
+        )
 
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 39184f0e22..22469486d7 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -132,6 +132,14 @@ class IdentityHandler(BaseHandler):
                 }
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
+
+            # Remember where we bound the threepid
+            yield self.store.add_user_bound_threepid(
+                user_id=mxid,
+                medium=data["medium"],
+                address=data["address"],
+                id_server=id_server,
+            )
         except CodeMessageException as e:
             data = json.loads(e.msg)  # XXX WAT?
         defer.returnValue(data)
@@ -142,30 +150,61 @@ class IdentityHandler(BaseHandler):
 
         Args:
             mxid (str): Matrix user ID of binding to be removed
-            threepid (dict): Dict with medium & address of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be
+                removed, and an optional id_server.
 
         Raises:
             SynapseError: If we failed to contact the identity server
 
         Returns:
             Deferred[bool]: True on success, otherwise False if the identity
-            server doesn't support unbinding
+            server doesn't support unbinding (or no identity server found to
+            contact).
         """
-        logger.debug("unbinding threepid %r from %s", threepid, mxid)
-        if not self.trusted_id_servers:
-            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+        if threepid.get("id_server"):
+            id_servers = [threepid["id_server"]]
+        else:
+            id_servers = yield self.store.get_id_servers_user_bound(
+                user_id=mxid,
+                medium=threepid["medium"],
+                address=threepid["address"],
+            )
+
+        # We don't know where to unbind, so we don't have a choice but to return
+        if not id_servers:
             defer.returnValue(False)
 
-        # We don't track what ID server we added 3pids on (perhaps we ought to)
-        # but we assume that any of the servers in the trusted list are in the
-        # same ID server federation, so we can pick any one of them to send the
-        # deletion request to.
-        id_server = next(iter(self.trusted_id_servers))
+        changed = True
+        for id_server in id_servers:
+            changed &= yield self.try_unbind_threepid_with_id_server(
+                mxid, threepid, id_server,
+            )
+
+        defer.returnValue(changed)
+
+    @defer.inlineCallbacks
+    def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
+        """Removes a binding from an identity server
 
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+            id_server (str): Identity server to unbind from
+
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
+        """
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
         content = {
             "mxid": mxid,
-            "threepid": threepid,
+            "threepid": {
+                "medium": threepid["medium"],
+                "address": threepid["address"],
+            },
         }
 
         # we abuse the federation http client to sign the request, but we have to send it
@@ -188,16 +227,24 @@ class IdentityHandler(BaseHandler):
                 content,
                 headers,
             )
+            changed = True
         except HttpResponseException as e:
+            changed = False
             if e.code in (400, 404, 501,):
                 # The remote server probably doesn't support unbinding (yet)
                 logger.warn("Received %d response while unbinding threepid", e.code)
-                defer.returnValue(False)
             else:
                 logger.error("Failed to unbind threepid on identity server: %s", e)
                 raise SynapseError(502, "Failed to contact identity server")
 
-        defer.returnValue(True)
+        yield self.store.remove_user_bound_threepid(
+            user_id=mxid,
+            medium=threepid["medium"],
+            address=threepid["address"],
+            id_server=id_server,
+        )
+
+        defer.returnValue(changed)
 
     @defer.inlineCallbacks
     def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9b41c7b205..224d34ef3a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership, RoomVersions
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -30,6 +30,7 @@ from synapse.api.errors import (
     NotFoundError,
     SynapseError,
 )
+from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -191,7 +192,7 @@ class MessageHandler(object):
                     "Getting joined members after leaving is not implemented"
                 )
 
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
@@ -603,7 +604,9 @@ class EventCreationHandler(object):
         """
 
         if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
-            room_version = event.content.get("room_version", RoomVersions.V1)
+            room_version = event.content.get(
+                "room_version", RoomVersions.V1.identifier
+            )
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..bd1285b15c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -110,30 +113,6 @@ class PresenceHandler(object):
         federation_registry.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        federation_registry.register_edu_handler(
-            "m.presence_invite",
-            lambda origin, content: self.invite_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_accept",
-            lambda origin, content: self.accept_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_deny",
-            lambda origin, content: self.deny_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
 
         active_presence = self.store.take_presence_startup_info()
 
@@ -220,6 +199,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,199 +739,173 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
+    def is_visible(self, observed_user, observer_user):
+        """Returns whether a user can see another user's presence.
         """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
-            states = yield self.current_state_for_users(user_ids)
+        if observer_room_ids & observed_room_ids:
+            defer.returnValue(True)
 
-            self._push_to_remotes(list(states.values()))
+        defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def get_presence_list(self, observer_user, accepted=None):
-        """Returns the presence for all users in their presence list.
+    def get_all_presence_updates(self, last_id, current_id):
         """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
-
-        presence_list = yield self.store.get_presence_list(
-            observer_user.localpart, accepted=accepted
-        )
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
+        """
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
 
-        results = yield self.get_states(
-            target_user_ids=[row["observed_user_id"] for row in presence_list],
-            as_event=False,
-        )
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
 
-        now = self.clock.time_msec()
-        results[:] = [format_user_presence_state(r, now) for r in results]
+        if self._event_processing:
+            return
 
-        is_accepted = {
-            row["observed_user_id"]: row["accepted"] for row in presence_list
-        }
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
 
-        for result in results:
-            result.update({
-                "accepted": is_accepted,
-            })
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
 
-        defer.returnValue(results)
+        run_as_background_process("presence.notify_new_event", _process_presence)
 
     @defer.inlineCallbacks
-    def send_presence_invite(self, observer_user, observed_user):
-        """Sends a presence invite.
-        """
-        yield self.store.add_presence_list_pending(
-            observer_user.localpart, observed_user.to_string()
-        )
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
 
-        if self.is_mine(observed_user):
-            yield self.invite_presence(observed_user, observer_user)
-        else:
-            yield self.federation.build_and_send_edu(
-                destination=observed_user.domain,
-                edu_type="m.presence_invite",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
 
     @defer.inlineCallbacks
-    def invite_presence(self, observed_user, observer_user):
-        """Handles new presence invites.
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
         """
-        if not self.is_mine(observed_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
 
-        # TODO: Don't auto accept
-        if self.is_mine(observer_user):
-            yield self.accept_presence(observed_user, observer_user)
-        else:
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence_accept",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            state_dict = yield self.get_state(observed_user, as_event=False)
-            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
+            if typ != EventTypes.Member:
+                continue
 
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence",
-                content={
-                    "push": [state_dict]
-                }
-            )
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
 
-    @defer.inlineCallbacks
-    def accept_presence(self, observed_user, observer_user):
-        """Handles a m.presence_accept EDU. Mark a presence invite from a
-        local or remote user as accepted in a local user's presence list.
-        Starts polling for presence updates from the local or remote user.
-        Args:
-            observed_user(UserID): The user to update in the presence list.
-            observer_user(UserID): The owner of the presence list to update.
-        """
-        yield self.store.set_presence_list_accepted(
-            observer_user.localpart, observed_user.to_string()
-        )
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
+
+            yield self._on_user_joined_room(room_id, state_key)
 
     @defer.inlineCallbacks
-    def deny_presence(self, observed_user, observer_user):
-        """Handle a m.presence_deny EDU. Removes a local or remote user from a
-        local user's presence list.
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
         Args:
-            observed_user(UserID): The local or remote user to remove from the
-                list.
-            observer_user(UserID): The local owner of the presence list.
+            room_id (str)
+            user_id (str)
+
         Returns:
-            A Deferred.
+            Deferred
         """
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
 
-        # TODO(paul): Inform the user somehow?
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
 
-    @defer.inlineCallbacks
-    def drop(self, observed_user, observer_user):
-        """Remove a local or remote user from a local user's presence list and
-        unsubscribe the local user from updates that user.
-        Args:
-            observed_user(UserId): The local or remote user to remove from the
-                list.
-            observer_user(UserId): The local owner of the presence list.
-        Returns:
-            A Deferred.
-        """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
 
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
 
-        # TODO: Inform the remote that we've dropped the presence list.
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
 
-    @defer.inlineCallbacks
-    def is_visible(self, observed_user, observer_user):
-        """Returns whether a user can see another user's presence.
-        """
-        observer_room_ids = yield self.store.get_rooms_for_user(
-            observer_user.to_string()
-        )
-        observed_room_ids = yield self.store.get_rooms_for_user(
-            observed_user.to_string()
-        )
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
 
-        if observer_room_ids & observed_room_ids:
-            defer.returnValue(True)
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
 
-        accepted_observers = yield self.store.get_presence_list_observers_accepted(
-            observed_user.to_string()
-        )
+            # TODO: Check that this is actually a new server joining the
+            # room.
 
-        defer.returnValue(observer_user.to_string() in accepted_observers)
+            user_ids = yield self.state.get_current_users_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
 
-    @defer.inlineCallbacks
-    def get_all_presence_updates(self, last_id, current_id):
-        """
-        Gets a list of presence update rows from between the given stream ids.
-        Each row has:
-        - stream_id(str)
-        - user_id(str)
-        - state(str)
-        - last_active_ts(int)
-        - last_federation_update_ts(int)
-        - last_user_sync_ts(int)
-        - status_msg(int)
-        - currently_active(int)
-        """
-        # TODO(markjh): replicate the unpersisted changes.
-        # This could use the in-memory stores for recent changes.
-        rows = yield self.store.get_all_presence_updates(last_id, current_id)
-        defer.returnValue(rows)
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
 
 
 def should_notify(old_state, new_state):
@@ -1086,10 +1048,7 @@ class PresenceEventSource(object):
         updates for
         """
         user_id = user.to_string()
-        plist = yield self.store.get_presence_list_accepted(
-            user.localpart, on_invalidate=cache_context.invalidate,
-        )
-        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@@ -1294,10 +1253,6 @@ def get_interested_parties(store, states):
         for room_id in room_ids:
             room_ids_to_states.setdefault(room_id, []).append(state)
 
-        plist = yield store.get_presence_list_observers_accepted(state.user_id)
-        for u in plist:
-            users_to_states.setdefault(u, []).append(state)
-
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler):
         user_type=None,
         default_display_name=None,
         address=None,
+        bind_emails=[],
     ):
         """Registers a new client on the server.
 
@@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler):
             default_display_name (unicode|None): if set, the new user's displayname
               will be set to this. Defaults to 'localpart'.
             address (str|None): the IP address used to perform the registration.
+            bind_emails (List[str]): list of emails to bind to this account.
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler):
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
 
+        # Bind any specified emails to this account
+        current_time = self.hs.get_clock().time_msec()
+        for email in bind_emails:
+            # generate threepid dict
+            threepid_dict = {
+                "medium": "email",
+                "address": email,
+                "validated_at": current_time,
+            }
+
+            # Bind email to new account
+            yield self._register_email_threepid(
+                user_id, threepid_dict, None, False,
+            )
+
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..17628e2684 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    DEFAULT_ROOM_VERSION,
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    JoinRules,
-    RoomCreationPreset,
-)
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -285,6 +280,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.RoomAvatar, ""),
             (EventTypes.Encryption, ""),
             (EventTypes.ServerACL, ""),
+            (EventTypes.RelatedGroups, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -479,7 +475,7 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d6c9d56007..617d1c9ef8 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -167,7 +167,7 @@ class RoomListHandler(BaseHandler):
                 if not latest_event_ids:
                     return
 
-                joined_users = yield self.state_handler.get_current_user_in_room(
+                joined_users = yield self.state_handler.get_current_users_in_room(
                     room_id, latest_event_ids,
                 )
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 71ce5b54e5..024d6db27a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -70,6 +70,7 @@ class RoomMemberHandler(object):
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
         self._server_notices_mxid = self.config.server_notices_mxid
+        self._enable_lookup = hs.config.enable_3pid_lookup
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -421,6 +422,9 @@ class RoomMemberHandler(object):
             room_id, latest_event_ids=latest_event_ids,
         )
 
+        # TODO: Refactor into dictionary of explicitly allowed transitions
+        # between old and new state, with specific error messages for some
+        # transitions and generic otherwise
         old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
         if old_state_id:
             old_state = yield self.store.get_event(old_state_id, allow_none=True)
@@ -446,6 +450,9 @@ class RoomMemberHandler(object):
                 if same_sender and same_membership and same_content:
                     defer.returnValue(old_state)
 
+            if old_membership in ["ban", "leave"] and action == "kick":
+                raise AuthError(403, "The target user is not in the room")
+
             # we don't allow people to reject invites to the server notice
             # room, but they can leave it once they are joined.
             if (
@@ -459,6 +466,9 @@ class RoomMemberHandler(object):
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
+        else:
+            if action == "kick":
+                raise AuthError(403, "The target user is not in the room")
 
         is_host_in_room = yield self._is_host_in_room(current_state_ids)
 
@@ -729,6 +739,10 @@ class RoomMemberHandler(object):
         Returns:
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
+        if not self._enable_lookup:
+            raise SynapseError(
+                403, "Looking up third-party identifiers is denied from this server",
+            )
         try:
             data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 57bb996245..153312e39f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1049,11 +1049,11 @@ class SyncHandler(object):
             # TODO: Be more clever than this, i.e. remove users who we already
             # share a room with?
             for room_id in newly_joined_rooms:
-                joined_users = yield self.state.get_current_user_in_room(room_id)
+                joined_users = yield self.state.get_current_users_in_room(room_id)
                 newly_joined_users.update(joined_users)
 
             for room_id in newly_left_rooms:
-                left_users = yield self.state.get_current_user_in_room(room_id)
+                left_users = yield self.state.get_current_users_in_room(room_id)
                 newly_left_users.update(left_users)
 
             # TODO: Check that these users are actually new, i.e. either they
@@ -1213,7 +1213,7 @@ class SyncHandler(object):
 
         extra_users_ids = set(newly_joined_users)
         for room_id in newly_joined_rooms:
-            users = yield self.state.get_current_user_in_room(room_id)
+            users = yield self.state.get_current_users_in_room(room_id)
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
@@ -1855,7 +1855,7 @@ class SyncHandler(object):
             extrems = yield self.store.get_forward_extremeties_for_room(
                 room_id, stream_ordering,
             )
-            users_in_room = yield self.state.get_current_user_in_room(
+            users_in_room = yield self.state.get_current_users_in_room(
                 room_id, extrems,
             )
             if user_id in users_in_room:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 39df960c31..972662eb48 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,7 @@ class TypingHandler(object):
     @defer.inlineCallbacks
     def _push_remote(self, member, typing):
         try:
-            users = yield self.state.get_current_user_in_room(member.room_id)
+            users = yield self.state.get_current_users_in_room(member.room_id)
             self._member_last_federation_poke[member] = self.clock.time_msec()
 
             now = self.clock.time_msec()
@@ -261,7 +261,7 @@ class TypingHandler(object):
             )
             return
 
-        users = yield self.state.get_current_user_in_room(room_id)
+        users = yield self.state.get_current_users_in_room(room_id)
         domains = set(get_domain_from_id(u) for u in users)
 
         if self.server_name in domains:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b689979b4b..5de9630950 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -276,7 +276,7 @@ class UserDirectoryHandler(StateDeltasHandler):
             # ignore the change
             return
 
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         # Remove every user from the sharing tables for that room.
         for user_id in iterkeys(users_with_profile):
@@ -325,7 +325,7 @@ class UserDirectoryHandler(StateDeltasHandler):
             room_id
         )
         # Now we update users who share rooms with users.
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        users_with_profile = yield self.state.get_current_users_in_room(room_id)
 
         if is_public:
             yield self.store.add_users_in_public_rooms(room_id, (user_id,))