summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-02-04 08:28:56 +0000
committerErik Johnston <erik@matrix.org>2017-02-04 08:28:56 +0000
commitfad3a8433535d7de321350bbd17373138c6fd3ec (patch)
tree70fe66754b86a7a6b1466030dcc8d7e260063f4d /synapse/handlers
parentBump version and changelog (diff)
parentBump changelog and version (diff)
downloadsynapse-fad3a8433535d7de321350bbd17373138c6fd3ec.tar.xz
Merge branch 'release-v0.19.0' of github.com:matrix-org/synapse v0.19.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py8
-rw-r--r--synapse/handlers/auth.py97
-rw-r--r--synapse/handlers/device.py201
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/federation.py7
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/presence.py48
-rw-r--r--synapse/handlers/register.py10
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/room_list.py7
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/handlers/sync.py34
12 files changed, 383 insertions, 114 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90f96209f8..e83adc8339 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -88,9 +88,13 @@ class BaseHandler(object):
                     current_state = yield self.store.get_events(
                         context.current_state_ids.values()
                     )
-                    current_state = current_state.values()
                 else:
-                    current_state = yield self.store.get_current_state(event.room_id)
+                    current_state = yield self.state_handler.get_current_state(
+                        event.room_id
+                    )
+
+                current_state = current_state.values()
+
                 logger.info("maybe_kick_guest_users %r", current_state)
                 yield self.kick_guest_users(current_state)
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3b146f09d6..fffba34383 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -65,6 +65,7 @@ class AuthHandler(BaseHandler):
 
         self.hs = hs  # FIXME better possibility to access registrationHandler later?
         self.device_handler = hs.get_device_handler()
+        self.macaroon_gen = hs.get_macaroon_generator()
 
     @defer.inlineCallbacks
     def check_auth(self, flows, clientdict, clientip):
@@ -529,37 +530,11 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def issue_access_token(self, user_id, device_id=None):
-        access_token = self.generate_access_token(user_id)
+        access_token = self.macaroon_gen.generate_access_token(user_id)
         yield self.store.add_access_token_to_user(user_id, access_token,
                                                   device_id)
         defer.returnValue(access_token)
 
-    def generate_access_token(self, user_id, extra_caveats=None):
-        extra_caveats = extra_caveats or []
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = access")
-        # Include a nonce, to make sure that each login gets a different
-        # access token.
-        macaroon.add_first_party_caveat("nonce = %s" % (
-            stringutils.random_string_with_symbols(16),
-        ))
-        for caveat in extra_caveats:
-            macaroon.add_first_party_caveat(caveat)
-        return macaroon.serialize()
-
-    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = login")
-        now = self.hs.get_clock().time_msec()
-        expiry = now + duration_in_ms
-        macaroon.add_first_party_caveat("time < %d" % (expiry,))
-        return macaroon.serialize()
-
-    def generate_delete_pusher_token(self, user_id):
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = delete_pusher")
-        return macaroon.serialize()
-
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         auth_api = self.hs.get_auth()
         try:
@@ -570,15 +545,6 @@ class AuthHandler(BaseHandler):
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
 
-    def _generate_base_macaroon(self, user_id):
-        macaroon = pymacaroons.Macaroon(
-            location=self.hs.config.server_name,
-            identifier="key",
-            key=self.hs.config.macaroon_secret_key)
-        macaroon.add_first_party_caveat("gen = 1")
-        macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
-        return macaroon
-
     @defer.inlineCallbacks
     def set_password(self, user_id, newpassword, requester=None):
         password_hash = self.hash(newpassword)
@@ -607,7 +573,7 @@ class AuthHandler(BaseHandler):
         # types (mediums) of threepid. For now, we still use the existing
         # infrastructure, but this is the start of synapse gaining knowledge
         # of specific types of threepid (and fixes the fact that checking
-        # for the presenc eof an email address during password reset was
+        # for the presence of an email address during password reset was
         # case sensitive).
         if medium == 'email':
             address = address.lower()
@@ -617,6 +583,17 @@ class AuthHandler(BaseHandler):
             self.hs.get_clock().time_msec()
         )
 
+    @defer.inlineCallbacks
+    def delete_threepid(self, user_id, medium, address):
+        # 'Canonicalise' email addresses as per above
+        if medium == 'email':
+            address = address.lower()
+
+        ret = yield self.store.user_delete_threepid(
+            user_id, medium, address,
+        )
+        defer.returnValue(ret)
+
     def _save_session(self, session):
         # TODO: Persistent storage
         logger.debug("Saving session %s", session)
@@ -656,12 +633,54 @@ class AuthHandler(BaseHandler):
             Whether self.hash(password) == stored_hash (bool).
         """
         if stored_hash:
-            return bcrypt.hashpw(password + self.hs.config.password_pepper,
-                                 stored_hash.encode('utf-8')) == stored_hash
+            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
+                                 stored_hash.encode('utf8')) == stored_hash
         else:
             return False
 
 
+class MacaroonGeneartor(object):
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+        self.server_name = hs.config.server_name
+        self.macaroon_secret_key = hs.config.macaroon_secret_key
+
+    def generate_access_token(self, user_id, extra_caveats=None):
+        extra_caveats = extra_caveats or []
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = access")
+        # Include a nonce, to make sure that each login gets a different
+        # access token.
+        macaroon.add_first_party_caveat("nonce = %s" % (
+            stringutils.random_string_with_symbols(16),
+        ))
+        for caveat in extra_caveats:
+            macaroon.add_first_party_caveat(caveat)
+        return macaroon.serialize()
+
+    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = login")
+        now = self.clock.time_msec()
+        expiry = now + duration_in_ms
+        macaroon.add_first_party_caveat("time < %d" % (expiry,))
+        return macaroon.serialize()
+
+    def generate_delete_pusher_token(self, user_id):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = delete_pusher")
+        return macaroon.serialize()
+
+    def _generate_base_macaroon(self, user_id):
+        macaroon = pymacaroons.Macaroon(
+            location=self.server_name,
+            identifier="key",
+            key=self.macaroon_secret_key)
+        macaroon.add_first_party_caveat("gen = 1")
+        macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
+        return macaroon
+
+
 class _AccountHandler(object):
     """A proxy object that gets passed to password auth providers so they
     can register new users etc if necessary.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index aa68755936..8cb47ac417 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,7 +14,11 @@
 # limitations under the License.
 
 from synapse.api import errors
+from synapse.api.constants import EventTypes
 from synapse.util import stringutils
+from synapse.util.async import Linearizer
+from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id, RoomStreamToken
 from twisted.internet import defer
 from ._base import BaseHandler
 
@@ -27,6 +31,21 @@ class DeviceHandler(BaseHandler):
     def __init__(self, hs):
         super(DeviceHandler, self).__init__(hs)
 
+        self.hs = hs
+        self.state = hs.get_state_handler()
+        self.federation_sender = hs.get_federation_sender()
+        self.federation = hs.get_replication_layer()
+        self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+
+        self.federation.register_edu_handler(
+            "m.device_list_update", self._incoming_device_list_update,
+        )
+        self.federation.register_query_handler(
+            "user_devices", self.on_federation_query_user_devices,
+        )
+
+        hs.get_distributor().observe("user_left_room", self.user_left_room)
+
     @defer.inlineCallbacks
     def check_device_registered(self, user_id, device_id,
                                 initial_device_display_name=None):
@@ -45,29 +64,29 @@ class DeviceHandler(BaseHandler):
             str: device id (generated if none was supplied)
         """
         if device_id is not None:
-            yield self.store.store_device(
+            new_device = yield self.store.store_device(
                 user_id=user_id,
                 device_id=device_id,
                 initial_device_display_name=initial_device_display_name,
-                ignore_if_known=True,
             )
+            if new_device:
+                yield self.notify_device_update(user_id, [device_id])
             defer.returnValue(device_id)
 
         # if the device id is not specified, we'll autogen one, but loop a few
         # times in case of a clash.
         attempts = 0
         while attempts < 5:
-            try:
-                device_id = stringutils.random_string(10).upper()
-                yield self.store.store_device(
-                    user_id=user_id,
-                    device_id=device_id,
-                    initial_device_display_name=initial_device_display_name,
-                    ignore_if_known=False,
-                )
+            device_id = stringutils.random_string(10).upper()
+            new_device = yield self.store.store_device(
+                user_id=user_id,
+                device_id=device_id,
+                initial_device_display_name=initial_device_display_name,
+            )
+            if new_device:
+                yield self.notify_device_update(user_id, [device_id])
                 defer.returnValue(device_id)
-            except errors.StoreError:
-                attempts += 1
+            attempts += 1
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
@@ -147,6 +166,8 @@ class DeviceHandler(BaseHandler):
             user_id=user_id, device_id=device_id
         )
 
+        yield self.notify_device_update(user_id, [device_id])
+
     @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
@@ -166,12 +187,168 @@ class DeviceHandler(BaseHandler):
                 device_id,
                 new_display_name=content.get("display_name")
             )
+            yield self.notify_device_update(user_id, [device_id])
         except errors.StoreError, e:
             if e.code == 404:
                 raise errors.NotFoundError()
             else:
                 raise
 
+    @measure_func("notify_device_update")
+    @defer.inlineCallbacks
+    def notify_device_update(self, user_id, device_ids):
+        """Notify that a user's device(s) has changed. Pokes the notifier, and
+        remote servers if the user is local.
+        """
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
+
+        hosts = set()
+        if self.hs.is_mine_id(user_id):
+            hosts.update(get_domain_from_id(u) for u in users_who_share_room)
+            hosts.discard(self.server_name)
+
+        position = yield self.store.add_device_change_to_streams(
+            user_id, device_ids, list(hosts)
+        )
+
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        room_ids = [r.room_id for r in rooms]
+
+        yield self.notifier.on_new_event(
+            "device_list_key", position, rooms=room_ids,
+        )
+
+        if hosts:
+            logger.info("Sending device list update notif to: %r", hosts)
+            for host in hosts:
+                self.federation_sender.send_device_messages(host)
+
+    @measure_func("device.get_user_ids_changed")
+    @defer.inlineCallbacks
+    def get_user_ids_changed(self, user_id, from_token):
+        """Get list of users that have had the devices updated, or have newly
+        joined a room, that `user_id` may be interested in.
+
+        Args:
+            user_id (str)
+            from_token (StreamToken)
+        """
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        room_ids = set(r.room_id for r in rooms)
+
+        # First we check if any devices have changed
+        changed = yield self.store.get_user_whose_devices_changed(
+            from_token.device_list_key
+        )
+
+        # Then work out if any users have since joined
+        rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+
+        possibly_changed = set(changed)
+        for room_id in rooms_changed:
+            # Fetch  the current state at the time.
+            stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
+
+            try:
+                event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_ordering=stream_ordering
+                )
+                prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+            except:
+                prev_state_ids = {}
+
+            current_state_ids = yield self.state.get_current_state_ids(room_id)
+
+            # If there has been any change in membership, include them in the
+            # possibly changed list. We'll check if they are joined below,
+            # and we're not toooo worried about spuriously adding users.
+            for key, event_id in current_state_ids.iteritems():
+                etype, state_key = key
+                if etype == EventTypes.Member:
+                    prev_event_id = prev_state_ids.get(key, None)
+                    if not prev_event_id or prev_event_id != event_id:
+                        possibly_changed.add(state_key)
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
+
+        # Take the intersection of the users whose devices may have changed
+        # and those that actually still share a room with the user
+        defer.returnValue(users_who_share_room & possibly_changed)
+
+    @measure_func("_incoming_device_list_update")
+    @defer.inlineCallbacks
+    def _incoming_device_list_update(self, origin, edu_content):
+        user_id = edu_content["user_id"]
+        device_id = edu_content["device_id"]
+        stream_id = edu_content["stream_id"]
+        prev_ids = edu_content.get("prev_id", [])
+
+        if get_domain_from_id(user_id) != origin:
+            # TODO: Raise?
+            logger.warning("Got device list update edu for %r from %r", user_id, origin)
+            return
+
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        if not rooms:
+            # We don't share any rooms with this user. Ignore update, as we
+            # probably won't get any further updates.
+            return
+
+        with (yield self._remote_edue_linearizer.queue(user_id)):
+            # If the prev id matches whats in our cache table, then we don't need
+            # to resync the users device list, otherwise we do.
+            resync = True
+            if len(prev_ids) == 1:
+                extremity = yield self.store.get_device_list_last_stream_id_for_remote(
+                    user_id
+                )
+                logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
+                if str(extremity) == str(prev_ids[0]):
+                    resync = False
+
+            if resync:
+                # Fetch all devices for the user.
+                result = yield self.federation.query_user_devices(origin, user_id)
+                stream_id = result["stream_id"]
+                devices = result["devices"]
+                yield self.store.update_remote_device_list_cache(
+                    user_id, devices, stream_id,
+                )
+                device_ids = [device["device_id"] for device in devices]
+                yield self.notify_device_update(user_id, device_ids)
+            else:
+                # Simply update the single device, since we know that is the only
+                # change (becuase of the single prev_id matching the current cache)
+                content = dict(edu_content)
+                for key in ("user_id", "device_id", "stream_id", "prev_ids"):
+                    content.pop(key, None)
+                yield self.store.update_remote_device_list_cache_entry(
+                    user_id, device_id, content, stream_id,
+                )
+                yield self.notify_device_update(user_id, [device_id])
+
+    @defer.inlineCallbacks
+    def on_federation_query_user_devices(self, user_id):
+        stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+        defer.returnValue({
+            "user_id": user_id,
+            "stream_id": stream_id,
+            "devices": devices,
+        })
+
+    @defer.inlineCallbacks
+    def user_left_room(self, user, room_id):
+        user_id = user.to_string()
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        if not rooms:
+            # We no longer share rooms with this user, so we'll no longer
+            # receive device updates. Mark this in DB.
+            yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
 
 def _update_device_from_client_ips(device, client_ips):
     ip = client_ips.get((device["user_id"], device["device_id"]), {})
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index b63a660c06..e40495d1ab 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -73,10 +73,9 @@ class E2eKeysHandler(object):
             if self.is_mine_id(user_id):
                 local_query[user_id] = device_ids
             else:
-                domain = get_domain_from_id(user_id)
-                remote_queries.setdefault(domain, {})[user_id] = device_ids
+                remote_queries[user_id] = device_ids
 
-        # do the queries
+        # Firt get local devices.
         failures = {}
         results = {}
         if local_query:
@@ -85,9 +84,42 @@ class E2eKeysHandler(object):
                 if user_id in local_query:
                     results[user_id] = keys
 
+        # Now attempt to get any remote devices from our local cache.
+        remote_queries_not_in_cache = {}
+        if remote_queries:
+            query_list = []
+            for user_id, device_ids in remote_queries.iteritems():
+                if device_ids:
+                    query_list.extend((user_id, device_id) for device_id in device_ids)
+                else:
+                    query_list.append((user_id, None))
+
+            user_ids_not_in_cache, remote_results = (
+                yield self.store.get_user_devices_from_cache(
+                    query_list
+                )
+            )
+            for user_id, devices in remote_results.iteritems():
+                user_devices = results.setdefault(user_id, {})
+                for device_id, device in devices.iteritems():
+                    keys = device.get("keys", None)
+                    device_display_name = device.get("device_display_name", None)
+                    if keys:
+                        result = dict(keys)
+                        unsigned = result.setdefault("unsigned", {})
+                        if device_display_name:
+                            unsigned["device_display_name"] = device_display_name
+                        user_devices[device_id] = result
+
+            for user_id in user_ids_not_in_cache:
+                domain = get_domain_from_id(user_id)
+                r = remote_queries_not_in_cache.setdefault(domain, {})
+                r[user_id] = remote_queries[user_id]
+
+        # Now fetch any devices that we don't have in our cache
         @defer.inlineCallbacks
         def do_remote_query(destination):
-            destination_query = remote_queries[destination]
+            destination_query = remote_queries_not_in_cache[destination]
             try:
                 limiter = yield get_retry_limiter(
                     destination, self.clock, self.store
@@ -119,7 +151,7 @@ class E2eKeysHandler(object):
 
         yield preserve_context_over_deferred(defer.gatherResults([
             preserve_fn(do_remote_query)(destination)
-            for destination in remote_queries
+            for destination in remote_queries_not_in_cache
         ]))
 
         defer.returnValue({
@@ -162,7 +194,7 @@ class E2eKeysHandler(object):
         # "unsigned" section
         for user_id, device_keys in results.items():
             for device_id, device_info in device_keys.items():
-                r = json.loads(device_info["key_json"])
+                r = dict(device_info["keys"])
                 r["unsigned"] = {}
                 display_name = device_info["device_display_name"]
                 if display_name is not None:
@@ -255,10 +287,12 @@ class E2eKeysHandler(object):
                 device_id, user_id, time_now
             )
             # TODO: Sign the JSON with the server key
-            yield self.store.set_e2e_device_keys(
-                user_id, device_id, time_now,
-                encode_canonical_json(device_keys)
+            changed = yield self.store.set_e2e_device_keys(
+                user_id, device_id, time_now, device_keys,
             )
+            if changed:
+                # Only notify about device updates *if* the keys actually changed
+                yield self.device_handler.notify_device_update(user_id, [device_id])
 
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1021bcc405..996bfd0e23 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -591,12 +591,12 @@ class FederationHandler(BaseHandler):
 
         event_ids = list(extremities.keys())
 
-        logger.info("calling resolve_state_groups in _maybe_backfill")
+        logger.debug("calling resolve_state_groups in _maybe_backfill")
         states = yield preserve_context_over_deferred(defer.gatherResults([
             preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
             for e in event_ids
         ]))
-        states = dict(zip(event_ids, [s[1] for s in states]))
+        states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
             [e_id for ids in states.values() for e_id in ids],
@@ -1319,7 +1319,6 @@ class FederationHandler(BaseHandler):
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event, new_event_context,
-            current_state=state,
         )
 
         defer.returnValue((event_stream_id, max_stream_id))
@@ -1530,7 +1529,7 @@ class FederationHandler(BaseHandler):
                     (d.type, d.state_key): d for d in different_events if d
                 })
 
-                new_state, prev_state = self.state_handler.resolve_events(
+                new_state = self.state_handler.resolve_events(
                     [local_view.values(), remote_view.values()],
                     event
                 )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7a57a69bd3..7a498af5a2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -208,8 +208,10 @@ class MessageHandler(BaseHandler):
                     content = builder.content
 
                     try:
-                        content["displayname"] = yield profile.get_displayname(target)
-                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                        if "displayname" not in content:
+                            content["displayname"] = yield profile.get_displayname(target)
+                        if "avatar_url" not in content:
+                            content["avatar_url"] = yield profile.get_avatar_url(target)
                     except Exception as e:
                         logger.info(
                             "Failed to get profile information for %r: %s",
@@ -279,7 +281,9 @@ class MessageHandler(BaseHandler):
 
         if event.type == EventTypes.Message:
             presence = self.hs.get_presence_handler()
-            yield presence.bump_presence_active_time(user)
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            preserve_fn(presence.bump_presence_active_time)(user)
 
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1b89dc6274..fdfce2a88c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -574,7 +574,7 @@ class PresenceHandler(object):
                 if not local_states:
                     continue
 
-                users = yield self.state.get_current_user_in_room(room_id)
+                users = yield self.store.get_users_in_room(room_id)
                 hosts = set(get_domain_from_id(u) for u in users)
 
                 for host in hosts:
@@ -766,7 +766,7 @@ class PresenceHandler(object):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        user_ids = yield self.state.get_current_user_in_room(room_id)
+        user_ids = yield self.store.get_users_in_room(room_id)
         if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
@@ -1011,7 +1011,7 @@ class PresenceEventSource(object):
     @defer.inlineCallbacks
     @log_function
     def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
-                       **kwargs):
+                       explicit_room_id=None, **kwargs):
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1028,22 +1028,24 @@ class PresenceEventSource(object):
             user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
-            room_ids = room_ids or []
 
             presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
-            if not room_ids:
-                rooms = yield self.store.get_rooms_for_user(user_id)
-                room_ids = set(e.room_id for e in rooms)
-            else:
-                room_ids = set(room_ids)
-
             max_token = self.store.get_current_presence_token()
 
             plist = yield self.store.get_presence_list_accepted(user.localpart)
-            friends = set(row["observed_user_id"] for row in plist)
-            friends.add(user_id)  # So that we receive our own presence
+            users_interested_in = set(row["observed_user_id"] for row in plist)
+            users_interested_in.add(user_id)  # So that we receive our own presence
+
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+            users_interested_in.update(users_who_share_room)
+
+            if explicit_room_id:
+                user_ids = yield self.store.get_users_in_room(explicit_room_id)
+                users_interested_in.update(user_ids)
 
             user_ids_changed = set()
             changed = None
@@ -1055,35 +1057,19 @@ class PresenceEventSource(object):
                 # work out if we share a room or they're in our presence list
                 get_updates_counter.inc("stream")
                 for other_user_id in changed:
-                    if other_user_id in friends:
+                    if other_user_id in users_interested_in:
                         user_ids_changed.add(other_user_id)
-                        continue
-                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                    if room_ids.intersection(e.room_id for e in other_rooms):
-                        user_ids_changed.add(other_user_id)
-                        continue
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
                 get_updates_counter.inc("full")
 
-                user_ids_to_check = set()
-                for room_id in room_ids:
-                    users = yield self.state.get_current_user_in_room(room_id)
-                    user_ids_to_check.update(users)
-
-                user_ids_to_check.update(friends)
-
-                # Always include yourself. Only really matters for when the user is
-                # not in any rooms, but still.
-                user_ids_to_check.add(user_id)
-
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
-                        user_ids_to_check, from_key,
+                        users_interested_in, from_key,
                     )
                 else:
-                    user_ids_changed = user_ids_to_check
+                    user_ids_changed = users_interested_in
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 286f0cef0a..03c6a85fc6 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -40,6 +40,8 @@ class RegistrationHandler(BaseHandler):
 
         self._next_generated_user_id = None
 
+        self.macaroon_gen = hs.get_macaroon_generator()
+
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None,
                        assigned_user_id=None):
@@ -143,7 +145,7 @@ class RegistrationHandler(BaseHandler):
 
             token = None
             if generate_token:
-                token = self.auth_handler().generate_access_token(user_id)
+                token = self.macaroon_gen.generate_access_token(user_id)
             yield self.store.register(
                 user_id=user_id,
                 token=token,
@@ -167,7 +169,7 @@ class RegistrationHandler(BaseHandler):
                 user_id = user.to_string()
                 yield self.check_user_id_not_appservice_exclusive(user_id)
                 if generate_token:
-                    token = self.auth_handler().generate_access_token(user_id)
+                    token = self.macaroon_gen.generate_access_token(user_id)
                 try:
                     yield self.store.register(
                         user_id=user_id,
@@ -254,7 +256,7 @@ class RegistrationHandler(BaseHandler):
         user_id = user.to_string()
 
         yield self.check_user_id_not_appservice_exclusive(user_id)
-        token = self.auth_handler().generate_access_token(user_id)
+        token = self.macaroon_gen.generate_access_token(user_id)
         try:
             yield self.store.register(
                 user_id=user_id,
@@ -399,7 +401,7 @@ class RegistrationHandler(BaseHandler):
 
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
-        token = self.auth_handler().generate_access_token(user_id)
+        token = self.macaroon_gen.generate_access_token(user_id)
 
         if need_register:
             yield self.store.register(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5f18007e90..7e7671c9a2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -437,6 +437,7 @@ class RoomEventSource(object):
             limit,
             room_ids,
             is_guest,
+            explicit_room_id=None,
     ):
         # We just ignore the key for now.
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 667223df0c..19eebbd43f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -62,17 +62,18 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
-        if search_filter or (network_tuple and network_tuple.appservice_id is not None):
+        if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
             return self._get_public_room_list(
                 limit, since_token, search_filter, network_tuple=network_tuple,
             )
 
-        result = self.response_cache.get((limit, since_token))
+        key = (limit, since_token, network_tuple)
+        result = self.response_cache.get(key)
         if not result:
             result = self.response_cache.set(
-                (limit, since_token),
+                key,
                 self._get_public_room_list(
                     limit, since_token, network_tuple=network_tuple
                 )
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2f8782e522..b2806555cf 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -45,7 +45,7 @@ class RoomMemberHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomMemberHandler, self).__init__(hs)
 
-        self.member_linearizer = Linearizer()
+        self.member_linearizer = Linearizer(name="member")
 
         self.clock = hs.get_clock()
 
@@ -89,7 +89,7 @@ class RoomMemberHandler(BaseHandler):
         duplicate = yield msg_handler.deduplicate_state_event(event, context)
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
-            return
+            defer.returnValue(duplicate)
 
         yield msg_handler.handle_new_client_event(
             requester,
@@ -120,6 +120,8 @@ class RoomMemberHandler(BaseHandler):
                 if prev_member_event.membership == Membership.JOIN:
                     user_left_room(self.distributor, target, room_id)
 
+        defer.returnValue(event)
+
     @defer.inlineCallbacks
     def remote_join(self, remote_room_hosts, room_id, user, content):
         if len(remote_room_hosts) == 0:
@@ -187,6 +189,7 @@ class RoomMemberHandler(BaseHandler):
             ratelimit=True,
             content=None,
     ):
+        content_specified = bool(content)
         if content is None:
             content = {}
 
@@ -229,6 +232,13 @@ class RoomMemberHandler(BaseHandler):
                     errcode=Codes.BAD_STATE
                 )
 
+            if old_state:
+                same_content = content == old_state.content
+                same_membership = old_membership == effective_membership_state
+                same_sender = requester.user.to_string() == old_state.sender
+                if same_sender and same_membership and same_content:
+                    defer.returnValue(old_state)
+
         is_host_in_room = yield self._is_host_in_room(current_state_ids)
 
         if effective_membership_state == Membership.JOIN:
@@ -247,8 +257,9 @@ class RoomMemberHandler(BaseHandler):
                 content["membership"] = Membership.JOIN
 
                 profile = self.hs.get_handlers().profile_handler
-                content["displayname"] = yield profile.get_displayname(target)
-                content["avatar_url"] = yield profile.get_avatar_url(target)
+                if not content_specified:
+                    content["displayname"] = yield profile.get_displayname(target)
+                    content["avatar_url"] = yield profile.get_avatar_url(target)
 
                 if requester.is_guest:
                     content["kind"] = "guest"
@@ -290,7 +301,7 @@ class RoomMemberHandler(BaseHandler):
 
                         defer.returnValue({})
 
-        yield self._local_membership_update(
+        res = yield self._local_membership_update(
             requester=requester,
             target=target,
             room_id=room_id,
@@ -300,6 +311,7 @@ class RoomMemberHandler(BaseHandler):
             prev_event_ids=latest_event_ids,
             content=content,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def send_membership_event(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c880f61685..d7dcd1ce5b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,7 +16,7 @@
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
@@ -115,6 +115,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "invited",  # InvitedSyncResult for each invited room.
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
+    "device_lists",  # List of user_ids whose devices have chanegd
 ])):
     __slots__ = []
 
@@ -129,7 +130,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
             self.invited or
             self.archived or
             self.account_data or
-            self.to_device
+            self.to_device or
+            self.device_lists
         )
 
 
@@ -544,6 +546,10 @@ class SyncHandler(object):
 
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
 
+        device_lists = yield self._generate_sync_entry_for_device_list(
+            sync_result_builder
+        )
+
         defer.returnValue(SyncResult(
             presence=sync_result_builder.presence,
             account_data=sync_result_builder.account_data,
@@ -551,9 +557,33 @@ class SyncHandler(object):
             invited=sync_result_builder.invited,
             archived=sync_result_builder.archived,
             to_device=sync_result_builder.to_device,
+            device_lists=device_lists,
             next_batch=sync_result_builder.now_token,
         ))
 
+    @measure_func("_generate_sync_entry_for_device_list")
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_device_list(self, sync_result_builder):
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+
+        if since_token and since_token.device_list_key:
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            room_ids = set(r.room_id for r in rooms)
+
+            user_ids_changed = set()
+            changed = yield self.store.get_user_whose_devices_changed(
+                since_token.device_list_key
+            )
+            for other_user_id in changed:
+                other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+                if room_ids.intersection(e.room_id for e in other_rooms):
+                    user_ids_changed.add(other_user_id)
+
+            defer.returnValue(user_ids_changed)
+        else:
+            defer.returnValue([])
+
     @defer.inlineCallbacks
     def _generate_sync_entry_for_to_device(self, sync_result_builder):
         """Generates the portion of the sync response. Populates