summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--README.rst24
-rw-r--r--UPGRADE.rst2
-rw-r--r--synapse/api/auth.py13
-rw-r--r--synapse/appservice/__init__.py10
-rw-r--r--synapse/federation/transaction_queue.py1
-rw-r--r--synapse/handlers/federation.py28
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/sync.py11
-rw-r--r--synapse/handlers/typing.py12
-rw-r--r--synapse/handlers/user_directory.py446
-rw-r--r--synapse/notifier.py3
-rw-r--r--synapse/push/action_generator.py14
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py336
-rw-r--r--synapse/push/emailpusher.py15
-rw-r--r--synapse/push/mailer.py87
-rw-r--r--synapse/push/pusher.py56
-rw-r--r--synapse/push/pusherpool.py7
-rw-r--r--synapse/replication/slave/storage/devices.py2
-rw-r--r--synapse/replication/slave/storage/events.py5
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/sync.py1
-rw-r--r--synapse/rest/client/v2_alpha/user_directory.py75
-rw-r--r--synapse/rest/media/v1/media_repository.py1
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/server.py10
-rw-r--r--synapse/state.py61
-rw-r--r--synapse/storage/__init__.py17
-rw-r--r--synapse/storage/_base.py34
-rw-r--r--synapse/storage/appservice.py26
-rw-r--r--synapse/storage/client_ips.py7
-rw-r--r--synapse/storage/devices.py45
-rw-r--r--synapse/storage/end_to_end_keys.py35
-rw-r--r--synapse/storage/event_federation.py91
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/receipts.py4
-rw-r--r--synapse/storage/roommember.py159
-rw-r--r--synapse/storage/schema/delta/42/current_state_delta.sql26
-rw-r--r--synapse/storage/schema/delta/42/device_list_last_id.sql33
-rw-r--r--synapse/storage/schema/delta/42/event_auth_state_only.sql17
-rw-r--r--synapse/storage/schema/delta/42/user_dir.py84
-rw-r--r--synapse/storage/state.py104
-rw-r--r--synapse/storage/user_directory.py506
-rw-r--r--synapse/types.py7
-rw-r--r--synapse/util/caches/descriptors.py56
-rw-r--r--synapse/util/caches/dictionary_cache.py57
-rw-r--r--synapse/util/caches/stream_change_cache.py15
-rw-r--r--tests/test_state.py2
-rw-r--r--tests/util/test_dict_cache.py2
-rw-r--r--tests/utils.py1
52 files changed, 2267 insertions, 344 deletions
diff --git a/README.rst b/README.rst
index 35141ac71b..12f0c0c51a 100644
--- a/README.rst
+++ b/README.rst
@@ -528,6 +528,30 @@ fix try re-installing from PyPI or directly from
     # Install from github
     pip install --user https://github.com/pyca/pynacl/tarball/master
 
+Running out of File Handles
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If synapse runs out of filehandles, it typically fails badly - live-locking
+at 100% CPU, and/or failing to accept new TCP connections (blocking the
+connecting client).  Matrix currently can legitimately use a lot of file handles,
+thanks to busy rooms like #matrix:matrix.org containing hundreds of participating
+servers.  The first time a server talks in a room it will try to connect
+simultaneously to all participating servers, which could exhaust the available
+file descriptors between DNS queries & HTTPS sockets, especially if DNS is slow
+to respond.  (We need to improve the routing algorithm used to be better than
+full mesh, but as of June 2017 this hasn't happened yet).
+
+If you hit this failure mode, we recommend increasing the maximum number of
+open file handles to be at least 4096 (assuming a default of 1024 or 256).
+This is typically done by editing ``/etc/security/limits.conf``
+
+Separately, Synapse may leak file handles if inbound HTTP requests get stuck
+during processing - e.g. blocked behind a lock or talking to a remote server etc.
+This is best diagnosed by matching up the 'Received request' and 'Processed request'
+log lines and looking for any 'Processed request' lines which take more than
+a few seconds to execute.  Please let us know at #matrix-dev:matrix.org if
+you see this failure mode so we can help debug it, however.
+
 ArchLinux
 ~~~~~~~~~
 
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 6164df8833..62b22e9108 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -33,7 +33,7 @@ To check whether your update was sucessfull, run:
 
 .. code:: bash
 
-	 # replace your.server.domain with ther domain of your synaspe homeserver
+	 # replace your.server.domain with ther domain of your synapse homeserver
 	 curl https://<your.server.domain>/_matrix/federation/v1/version 
 
 So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9dbc7993df..0c297cb022 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -144,17 +144,8 @@ class Auth(object):
     @defer.inlineCallbacks
     def check_host_in_room(self, room_id, host):
         with Measure(self.clock, "check_host_in_room"):
-            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
-
-            logger.debug("calling resolve_state_groups from check_host_in_room")
-            entry = yield self.state.resolve_state_groups(
-                room_id, latest_event_ids
-            )
-
-            ret = yield self.store.is_host_joined(
-                room_id, host, entry.state_group, entry.state
-            )
-            defer.returnValue(ret)
+            latest_event_ids = yield self.store.is_host_joined(room_id, host)
+            defer.returnValue(latest_event_ids)
 
     def _check_joined_room(self, member, user_id, room_id):
         if not member or member.membership != Membership.JOIN:
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 7346206bb1..b989007314 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -241,6 +241,16 @@ class ApplicationService(object):
     def is_exclusive_room(self, room_id):
         return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
 
+    def get_exlusive_user_regexes(self):
+        """Get the list of regexes used to determine if a user is exclusively
+        registered by the AS
+        """
+        return [
+            regex_obj["regex"]
+            for regex_obj in self.namespaces[ApplicationService.NS_USERS]
+            if regex_obj["exclusive"]
+        ]
+
     def is_rate_limited(self):
         return self.rate_limited
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a15198e05d..003eaba893 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -187,6 +187,7 @@ class TransactionQueue(object):
                             prev_id for prev_id, _ in event.prev_events
                         ],
                     )
+                    destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 52d97dfbf3..39d2bee8da 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
 
 from synapse.util.retryutils import NotRetryingDestination
 
-from synapse.push.action_generator import ActionGenerator
 from synapse.util.distributor import user_joined_room
 
 from twisted.internet import defer
@@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
+        self.action_generator = hs.get_action_generator()
 
         self.replication_layer.set_handler(self)
 
@@ -832,7 +832,11 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_event_auth(self, event_id):
-        auth = yield self.store.get_auth_chain([event_id])
+        event = yield self.store.get_event(event_id)
+        auth = yield self.store.get_auth_chain(
+            [auth_id for auth_id, _ in event.auth_events],
+            include_given=True
+        )
 
         for event in auth:
             event.signatures.update(
@@ -1047,9 +1051,7 @@ class FederationHandler(BaseHandler):
                 yield user_joined_room(self.distributor, user, event.room_id)
 
         state_ids = context.prev_state_ids.values()
-        auth_chain = yield self.store.get_auth_chain(set(
-            [event.event_id] + state_ids
-        ))
+        auth_chain = yield self.store.get_auth_chain(state_ids)
 
         state = yield self.store.get_events(context.prev_state_ids.values())
 
@@ -1100,6 +1102,9 @@ class FederationHandler(BaseHandler):
             user_id,
             "leave"
         )
+        # Mark as outlier as we don't have any state for this event; we're not
+        # even in the room.
+        event.internal_metadata.outlier = True
         event = self._sign_event(event)
 
         # Try the host that we succesfully called /make_leave/ on first for
@@ -1389,8 +1394,7 @@ class FederationHandler(BaseHandler):
         )
 
         if not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
+            yield self.action_generator.handle_push_actions_for_event(
                 event, context
             )
 
@@ -1599,7 +1603,11 @@ class FederationHandler(BaseHandler):
                 pass
 
         # Now get the current auth_chain for the event.
-        local_auth_chain = yield self.store.get_auth_chain([event_id])
+        event = yield self.store.get_event(event_id)
+        local_auth_chain = yield self.store.get_auth_chain(
+            [auth_id for auth_id, _ in event.auth_events],
+            include_given=True
+        )
 
         # TODO: Check if we would now reject event_id. If so we need to tell
         # everyone.
@@ -1792,7 +1800,9 @@ class FederationHandler(BaseHandler):
                 auth_ids = yield self.auth.compute_auth_events(
                     event, context.prev_state_ids
                 )
-                local_auth_chain = yield self.store.get_auth_chain(auth_ids)
+                local_auth_chain = yield self.store.get_auth_chain(
+                    auth_ids, include_given=True
+                )
 
                 try:
                     # 2. Get remote difference.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 196925edad..a04f634c5c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
@@ -54,6 +53,8 @@ class MessageHandler(BaseHandler):
         # This is to stop us from diverging history *too* much.
         self.limiter = Limiter(max_count=5)
 
+        self.action_generator = hs.get_action_generator()
+
     @defer.inlineCallbacks
     def purge_history(self, room_id, event_id):
         event = yield self.store.get_event(event_id)
@@ -590,8 +591,7 @@ class MessageHandler(BaseHandler):
                 "Changing the room create event is forbidden",
             )
 
-        action_generator = ActionGenerator(self.hs)
-        yield action_generator.handle_push_actions_for_event(
+        yield self.action_generator.handle_push_actions_for_event(
             event, context
         )
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c0205da1a9..91c6c6be3c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
     "device_lists",  # List of user_ids whose devices have chanegd
+    "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
+                                   # for this device
 ])):
     __slots__ = []
 
@@ -550,6 +552,14 @@ class SyncHandler(object):
             sync_result_builder
         )
 
+        device_id = sync_config.device_id
+        one_time_key_counts = {}
+        if device_id:
+            user_id = sync_config.user.to_string()
+            one_time_key_counts = yield self.store.count_e2e_one_time_keys(
+                user_id, device_id
+            )
+
         defer.returnValue(SyncResult(
             presence=sync_result_builder.presence,
             account_data=sync_result_builder.account_data,
@@ -558,6 +568,7 @@ class SyncHandler(object):
             archived=sync_result_builder.archived,
             to_device=sync_result_builder.to_device,
             device_lists=device_lists,
+            device_one_time_keys_count=one_time_key_counts,
             next_batch=sync_result_builder.now_token,
         ))
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3b7818af5c..82dedbbc99 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -89,7 +89,7 @@ class TypingHandler(object):
             until = self._member_typing_until.get(member, None)
             if not until or until <= now:
                 logger.info("Timing out typing for: %s", member.user_id)
-                preserve_fn(self._stopped_typing)(member)
+                self._stopped_typing(member)
                 continue
 
             # Check if we need to resend a keep alive over federation for this
@@ -147,7 +147,7 @@ class TypingHandler(object):
             # No point sending another notification
             defer.returnValue(None)
 
-        yield self._push_update(
+        self._push_update(
             member=member,
             typing=True,
         )
@@ -171,7 +171,7 @@ class TypingHandler(object):
 
         member = RoomMember(room_id=room_id, user_id=target_user_id)
 
-        yield self._stopped_typing(member)
+        self._stopped_typing(member)
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
@@ -180,7 +180,6 @@ class TypingHandler(object):
             member = RoomMember(room_id=room_id, user_id=user_id)
             yield self._stopped_typing(member)
 
-    @defer.inlineCallbacks
     def _stopped_typing(self, member):
         if member.user_id not in self._room_typing.get(member.room_id, set()):
             # No point
@@ -189,16 +188,15 @@ class TypingHandler(object):
         self._member_typing_until.pop(member, None)
         self._member_last_federation_poke.pop(member, None)
 
-        yield self._push_update(
+        self._push_update(
             member=member,
             typing=False,
         )
 
-    @defer.inlineCallbacks
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            yield self._push_remote(member, typing)
+            preserve_fn(self._push_remote)(member, typing)
 
         self._push_update_local(
             member=member,
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
new file mode 100644
index 0000000000..f4451e5dfb
--- /dev/null
+++ b/synapse/handlers/user_directory.py
@@ -0,0 +1,446 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.storage.roommember import ProfileInfo
+from synapse.util.metrics import Measure
+
+
+logger = logging.getLogger(__name__)
+
+
+class UserDirectoyHandler(object):
+    """Handles querying of and keeping updated the user_directory.
+
+    N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
+
+    The user directory is filled with users who this server can see are joined to a
+    world_readable or publically joinable room. We keep a database table up to date
+    by streaming changes of the current state and recalculating whether users should
+    be in the directory or not when necessary.
+
+    For each user in the directory we also store a room_id which is public and that the
+    user is joined to. This allows us to ignore history_visibility and join_rules changes
+    for that user in all other public rooms, as we know they'll still be in at least
+    one public room.
+    """
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+
+        self.notifier.add_replication_callback(self.notify_new_event)
+
+        # When start up for the first time we need to populate the user_directory.
+        # This is a set of user_id's we've inserted already
+        self.initially_handled_users = set()
+        self.initially_handled_users_in_public = set()
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        # We kick this off so that we don't have to wait for a change before
+        # we start populating the user directory
+        self.clock.call_later(0, self.notify_new_event)
+
+    def search_users(self, search_term, limit):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+        return self.store.search_user_dir(search_term, limit)
+
+    @defer.inlineCallbacks
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+        try:
+            yield self._unsafe_process()
+        finally:
+            self._is_processing = False
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_user_directory_stream_pos()
+
+        # If still None then we need to do the initial fill of directory
+        if self.pos is None:
+            yield self._do_initial_spam()
+            self.pos = yield self.store.get_user_directory_stream_pos()
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "user_dir_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_user_directory_stream_pos(self.pos)
+
+    @defer.inlineCallbacks
+    def _do_initial_spam(self):
+        """Populates the user_directory from the current state of the DB, used
+        when synapse first starts with user_directory support
+        """
+        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
+
+        # Delete any existing entries just in case there are any
+        yield self.store.delete_all_from_user_dir()
+
+        # We process by going through each existing room at a time.
+        room_ids = yield self.store.get_all_rooms()
+
+        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
+        num_processed_rooms = 1
+
+        for room_id in room_ids:
+            logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
+            yield self._handle_intial_room(room_id)
+            num_processed_rooms += 1
+
+        logger.info("Processed all rooms.")
+
+        self.initially_handled_users = None
+
+        yield self.store.update_user_directory_stream_pos(new_pos)
+
+    @defer.inlineCallbacks
+    def _handle_intial_room(self, room_id):
+        """Called when we initially fill out user_directory one room at a time
+        """
+        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
+        if not is_in_room:
+            return
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
+
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        unhandled_users = set(users_with_profile) - self.initially_handled_users
+
+        yield self.store.add_profiles_to_user_dir(
+            room_id, {
+                user_id: users_with_profile[user_id] for user_id in unhandled_users
+            }
+        )
+
+        self.initially_handled_users |= unhandled_users
+
+        if is_public:
+            yield self.store.add_users_to_public_room(
+                room_id,
+                user_ids=unhandled_users - self.initially_handled_users_in_public
+            )
+            self.initially_handled_users_in_public != unhandled_users
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            # For join rule and visibility changes we need to check if the room
+            # may have become public or not and add/remove the users in said room
+            if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
+                yield self._handle_room_publicity_change(
+                    room_id, prev_event_id, event_id, typ,
+                )
+            elif typ == EventTypes.Member:
+                change = yield self._get_key_change(
+                    prev_event_id, event_id,
+                    key_name="membership",
+                    public_value=Membership.JOIN,
+                )
+
+                if change is None:
+                    # Handle any profile changes
+                    yield self._handle_profile_change(
+                        state_key, room_id, prev_event_id, event_id,
+                    )
+                    continue
+
+                if not change:
+                    # Need to check if the server left the room entirely, if so
+                    # we might need to remove all the users in that room
+                    is_in_room = yield self.store.is_host_joined(
+                        room_id, self.server_name,
+                    )
+                    if not is_in_room:
+                        logger.debug("Server left room: %r", room_id)
+                        # Fetch all the users that we marked as being in user
+                        # directory due to being in the room and then check if
+                        # need to remove those users or not
+                        user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
+                        for user_id in user_ids:
+                            yield self._handle_remove_user(room_id, user_id)
+                        return
+                    else:
+                        logger.debug("Server is still in room: %r", room_id)
+
+                if change:  # The user joined
+                    event = yield self.store.get_event(event_id, allow_none=True)
+                    profile = ProfileInfo(
+                        avatar_url=event.content.get("avatar_url"),
+                        display_name=event.content.get("displayname"),
+                    )
+
+                    yield self._handle_new_user(room_id, state_key, profile)
+                else:  # The user left
+                    yield self._handle_remove_user(room_id, state_key)
+            else:
+                logger.debug("Ignoring irrelevant type: %r", typ)
+
+    @defer.inlineCallbacks
+    def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ):
+        """Handle a room having potentially changed from/to world_readable/publically
+        joinable.
+
+        Args:
+            room_id (str)
+            prev_event_id (str|None): The previous event before the state change
+            event_id (str|None): The new event after the state change
+            typ (str): Type of the event
+        """
+        logger.debug("Handling change for %s", typ)
+
+        if typ == EventTypes.RoomHistoryVisibility:
+            change = yield self._get_key_change(
+                prev_event_id, event_id,
+                key_name="history_visibility",
+                public_value="world_readable",
+            )
+        elif typ == EventTypes.JoinRules:
+            change = yield self._get_key_change(
+                prev_event_id, event_id,
+                key_name="join_rule",
+                public_value=JoinRules.PUBLIC,
+            )
+        else:
+            raise Exception("Invalid event type")
+        # If change is None, no change. True => become world_readable/public,
+        # False => was world_readable/public
+        if change is None:
+            logger.debug("No change")
+            return
+
+        # There's been a change to or from being world readable.
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        logger.debug("Change: %r, is_public: %r", change, is_public)
+
+        if change and not is_public:
+            # If we became world readable but room isn't currently public then
+            # we ignore the change
+            return
+        elif not change and is_public:
+            # If we stopped being world readable but are still public,
+            # ignore the change
+            return
+
+        if change:
+            users_with_profile = yield self.state.get_current_user_in_room(room_id)
+            for user_id, profile in users_with_profile.iteritems():
+                yield self._handle_new_user(room_id, user_id, profile)
+        else:
+            users = yield self.store.get_users_in_public_due_to_room(room_id)
+            for user_id in users:
+                yield self._handle_remove_user(room_id, user_id)
+
+    @defer.inlineCallbacks
+    def _handle_new_user(self, room_id, user_id, profile):
+        """Called when we might need to add user to directory
+
+        Args:
+            room_id (str): room_id that user joined or started being public that
+            user_id (str)
+        """
+        logger.debug("Adding user to dir, %r", user_id)
+
+        row = yield self.store.get_user_in_directory(user_id)
+        if not row:
+            yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            return
+
+        row = yield self.store.get_user_in_public_room(user_id)
+        if not row:
+            yield self.store.add_users_to_public_room(room_id, [user_id])
+
+    @defer.inlineCallbacks
+    def _handle_remove_user(self, room_id, user_id):
+        """Called when we might need to remove user to directory
+
+        Args:
+            room_id (str): room_id that user left or stopped being public that
+            user_id (str)
+        """
+        logger.debug("Maybe removing user %r", user_id)
+
+        row = yield self.store.get_user_in_directory(user_id)
+        update_user_dir = row and row["room_id"] == room_id
+
+        row = yield self.store.get_user_in_public_room(user_id)
+        update_user_in_public = row and row["room_id"] == room_id
+
+        if not update_user_in_public and not update_user_dir:
+            return
+
+        # XXX: Make this faster?
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        for j_room_id in rooms:
+            if not update_user_in_public and not update_user_dir:
+                break
+
+            is_in_room = yield self.store.is_host_joined(
+                j_room_id, self.server_name,
+            )
+
+            if not is_in_room:
+                continue
+
+            if update_user_dir:
+                update_user_dir = False
+                yield self.store.update_user_in_user_dir(user_id, j_room_id)
+
+            if update_user_in_public:
+                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+                    j_room_id
+                )
+
+                if is_public:
+                    yield self.store.update_user_in_public_user_list(user_id, j_room_id)
+                    update_user_in_public = False
+
+        if update_user_dir:
+            yield self.store.remove_from_user_dir(user_id)
+        elif update_user_in_public:
+            yield self.store.remove_from_user_in_public_room(user_id)
+
+    @defer.inlineCallbacks
+    def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
+        """Check member event changes for any profile changes and update the
+        database if there are.
+        """
+        if not prev_event_id or not event_id:
+            return
+
+        prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+        event = yield self.store.get_event(event_id, allow_none=True)
+
+        if not prev_event or not event:
+            return
+
+        if event.membership != Membership.JOIN:
+            return
+
+        prev_name = prev_event.content.get("displayname")
+        new_name = event.content.get("displayname")
+
+        prev_avatar = prev_event.content.get("avatar_url")
+        new_avatar = event.content.get("avatar_url")
+
+        if prev_name != new_name or prev_avatar != new_avatar:
+            yield self.store.update_profile_in_user_dir(
+                user_id, new_name, new_avatar, room_id,
+            )
+
+    @defer.inlineCallbacks
+    def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+        """Given two events check if the `key_name` field in content changed
+        from not matching `public_value` to doing so.
+
+        For example, check if `history_visibility` (`key_name`) changed from
+        `shared` to `world_readable` (`public_value`).
+
+        Returns:
+            None if the field in the events either both match `public_value`
+            or if neither do, i.e. there has been no change.
+            True if it didnt match `public_value` but now does
+            False if it did match `public_value` but now doesn't
+        """
+        prev_event = None
+        event = None
+        if prev_event_id:
+            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+        if event_id:
+            event = yield self.store.get_event(event_id, allow_none=True)
+
+        if not event and not prev_event:
+            logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+            defer.returnValue(None)
+
+        prev_value = None
+        value = None
+
+        if prev_event:
+            prev_value = prev_event.content.get(key_name)
+
+        if event:
+            value = event.content.get(key_name)
+
+        logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+        if value == public_value and prev_value != public_value:
+            defer.returnValue(True)
+        elif value != public_value and prev_value == public_value:
+            defer.returnValue(False)
+        else:
+            defer.returnValue(None)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48566187ab..385208b574 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -251,7 +251,8 @@ class Notifier(object):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         preserve_fn(self.appservice_handler.notify_interested_services)(
-            room_stream_id)
+            room_stream_id
+        )
 
         if self.federation_sender:
             preserve_fn(self.federation_sender.notify_new_events)(
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 3f75d3f921..fe09d50d55 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -15,7 +15,7 @@
 
 from twisted.internet import defer
 
-from .bulk_push_rule_evaluator import evaluator_for_event
+from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
 
 from synapse.util.metrics import Measure
 
@@ -24,11 +24,12 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class ActionGenerator:
+class ActionGenerator(object):
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
+        self.bulk_evaluator = BulkPushRuleEvaluator(hs)
         # really we want to get all user ids and all profile tags too,
         # since we want the actions for each profile tag for every user and
         # also actions for a client with no profile tag for each user.
@@ -38,16 +39,11 @@ class ActionGenerator:
 
     @defer.inlineCallbacks
     def handle_push_actions_for_event(self, event, context):
-        with Measure(self.clock, "evaluator_for_event"):
-            bulk_evaluator = yield evaluator_for_event(
-                event, self.hs, self.store, context
-            )
-
         with Measure(self.clock, "action_for_event_by_user"):
-            actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+            actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
                 event, context
             )
 
         context.push_actions = [
-            (uid, actions) for uid, actions in actions_by_user.items()
+            (uid, actions) for uid, actions in actions_by_user.iteritems()
         ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index f943ff640f..9a96e6fe8f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,60 +19,83 @@ from twisted.internet import defer
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
-from synapse.api.constants import EventTypes
 from synapse.visibility import filter_events_for_clients_context
+from synapse.api.constants import EventTypes, Membership
+from synapse.util.caches.descriptors import cached
+from synapse.util.async import Linearizer
 
+from collections import namedtuple
 
-logger = logging.getLogger(__name__)
 
+logger = logging.getLogger(__name__)
 
-@defer.inlineCallbacks
-def evaluator_for_event(event, hs, store, context):
-    rules_by_user = yield store.bulk_get_push_rules_for_room(
-        event, context
-    )
-
-    # if this event is an invite event, we may need to run rules for the user
-    # who's been invited, otherwise they won't get told they've been invited
-    if event.type == 'm.room.member' and event.content['membership'] == 'invite':
-        invited_user = event.state_key
-        if invited_user and hs.is_mine_id(invited_user):
-            has_pusher = yield store.user_has_pusher(invited_user)
-            if has_pusher:
-                rules_by_user = dict(rules_by_user)
-                rules_by_user[invited_user] = yield store.get_push_rules_for_user(
-                    invited_user
-                )
 
-    defer.returnValue(BulkPushRuleEvaluator(
-        event.room_id, rules_by_user, store
-    ))
+rules_by_room = {}
 
 
-class BulkPushRuleEvaluator:
+class BulkPushRuleEvaluator(object):
+    """Calculates the outcome of push rules for an event for all users in the
+    room at once.
     """
-    Runs push rules for all users in a room.
-    This is faster than running PushRuleEvaluator for each user because it
-    fetches all the rules for all the users in one (batched) db query
-    rather than doing multiple queries per-user. It currently uses
-    the same logic to run the actual rules, but could be optimised further
-    (see https://matrix.org/jira/browse/SYN-562)
-    """
-    def __init__(self, room_id, rules_by_user, store):
-        self.room_id = room_id
-        self.rules_by_user = rules_by_user
-        self.store = store
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def _get_rules_for_event(self, event, context):
+        """This gets the rules for all users in the room at the time of the event,
+        as well as the push rules for the invitee if the event is an invite.
+
+        Returns:
+            dict of user_id -> push_rules
+        """
+        room_id = event.room_id
+        rules_for_room = self._get_rules_for_room(room_id)
+
+        rules_by_user = yield rules_for_room.get_rules(event, context)
+
+        # if this event is an invite event, we may need to run rules for the user
+        # who's been invited, otherwise they won't get told they've been invited
+        if event.type == 'm.room.member' and event.content['membership'] == 'invite':
+            invited = event.state_key
+            if invited and self.hs.is_mine_id(invited):
+                has_pusher = yield self.store.user_has_pusher(invited)
+                if has_pusher:
+                    rules_by_user = dict(rules_by_user)
+                    rules_by_user[invited] = yield self.store.get_push_rules_for_user(
+                        invited
+                    )
+
+        defer.returnValue(rules_by_user)
+
+    @cached()
+    def _get_rules_for_room(self, room_id):
+        """Get the current RulesForRoom object for the given room id
+
+        Returns:
+            RulesForRoom
+        """
+        # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
+        # before any lookup methods get called on it as otherwise there may be
+        # a race if invalidate_all gets called (which assumes its in the cache)
+        return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
 
     @defer.inlineCallbacks
     def action_for_event_by_user(self, event, context):
+        """Given an event and context, evaluate the push rules and return
+        the results
+
+        Returns:
+            dict of user_id -> action
+        """
+        rules_by_user = yield self._get_rules_for_event(event, context)
         actions_by_user = {}
 
         # None of these users can be peeking since this list of users comes
         # from the set of users in the room, so we know for sure they're all
         # actually in the room.
-        user_tuples = [
-            (u, False) for u in self.rules_by_user.keys()
-        ]
+        user_tuples = [(u, False) for u in rules_by_user]
 
         filtered_by_user = yield filter_events_for_clients_context(
             self.store, user_tuples, [event], {event.event_id: context}
@@ -86,7 +109,7 @@ class BulkPushRuleEvaluator:
 
         condition_cache = {}
 
-        for uid, rules in self.rules_by_user.items():
+        for uid, rules in rules_by_user.iteritems():
             display_name = None
             profile_info = room_members.get(uid)
             if profile_info:
@@ -138,3 +161,240 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
             return False
 
     return True
+
+
+class RulesForRoom(object):
+    """Caches push rules for users in a room.
+
+    This efficiently handles users joining/leaving the room by not invalidating
+    the entire cache for the room.
+    """
+
+    def __init__(self, hs, room_id, rules_for_room_cache):
+        """
+        Args:
+            hs (HomeServer)
+            room_id (str)
+            rules_for_room_cache(Cache): The cache object that caches these
+                RoomsForUser objects.
+        """
+        self.room_id = room_id
+        self.is_mine_id = hs.is_mine_id
+        self.store = hs.get_datastore()
+
+        self.linearizer = Linearizer(name="rules_for_room")
+
+        self.member_map = {}  # event_id -> (user_id, state)
+        self.rules_by_user = {}  # user_id -> rules
+
+        # The last state group we updated the caches for. If the state_group of
+        # a new event comes along, we know that we can just return the cached
+        # result.
+        # On invalidation of the rules themselves (if the user changes them),
+        # we invalidate everything and set state_group to `object()`
+        self.state_group = object()
+
+        # A sequence number to keep track of when we're allowed to update the
+        # cache. We bump the sequence number when we invalidate the cache. If
+        # the sequence number changes while we're calculating stuff we should
+        # not update the cache with it.
+        self.sequence = 0
+
+        # A cache of user_ids that we *know* aren't interesting, e.g. user_ids
+        # owned by AS's, or remote users, etc. (I.e. users we will never need to
+        # calculate push for)
+        # These never need to be invalidated as we will never set up push for
+        # them.
+        self.uninteresting_user_set = set()
+
+        # We need to be clever on the invalidating caches callbacks, as
+        # otherwise the invalidation callback holds a reference to the object,
+        # potentially causing it to leak.
+        # To get around this we pass a function that on invalidations looks ups
+        # the RoomsForUser entry in the cache, rather than keeping a reference
+        # to self around in the callback.
+        self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
+
+    @defer.inlineCallbacks
+    def get_rules(self, event, context):
+        """Given an event context return the rules for all users who are
+        currently in the room.
+        """
+        state_group = context.state_group
+
+        with (yield self.linearizer.queue(())):
+            if state_group and self.state_group == state_group:
+                logger.debug("Using cached rules for %r", self.room_id)
+                defer.returnValue(self.rules_by_user)
+
+            ret_rules_by_user = {}
+            missing_member_event_ids = {}
+            if state_group and self.state_group == context.prev_group:
+                # If we have a simple delta then we can reuse most of the previous
+                # results.
+                ret_rules_by_user = self.rules_by_user
+                current_state_ids = context.delta_ids
+            else:
+                current_state_ids = context.current_state_ids
+
+            logger.debug(
+                "Looking for member changes in %r %r", state_group, current_state_ids
+            )
+
+            # Loop through to see which member events we've seen and have rules
+            # for and which we need to fetch
+            for key in current_state_ids:
+                typ, user_id = key
+                if typ != EventTypes.Member:
+                    continue
+
+                if user_id in self.uninteresting_user_set:
+                    continue
+
+                if not self.is_mine_id(user_id):
+                    self.uninteresting_user_set.add(user_id)
+                    continue
+
+                if self.store.get_if_app_services_interested_in_user(user_id):
+                    self.uninteresting_user_set.add(user_id)
+                    continue
+
+                event_id = current_state_ids[key]
+
+                res = self.member_map.get(event_id, None)
+                if res:
+                    user_id, state = res
+                    if state == Membership.JOIN:
+                        rules = self.rules_by_user.get(user_id, None)
+                        if rules:
+                            ret_rules_by_user[user_id] = rules
+                    continue
+
+                # If a user has left a room we remove their push rule. If they
+                # joined then we readd it later in _update_rules_with_member_event_ids
+                ret_rules_by_user.pop(user_id, None)
+                missing_member_event_ids[user_id] = event_id
+
+            if missing_member_event_ids:
+                # If we have some memebr events we haven't seen, look them up
+                # and fetch push rules for them if appropriate.
+                logger.debug("Found new member events %r", missing_member_event_ids)
+                yield self._update_rules_with_member_event_ids(
+                    ret_rules_by_user, missing_member_event_ids, state_group, event
+                )
+
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug(
+                "Returning push rules for %r %r",
+                self.room_id, ret_rules_by_user.keys(),
+            )
+        defer.returnValue(ret_rules_by_user)
+
+    @defer.inlineCallbacks
+    def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids,
+                                            state_group, event):
+        """Update the partially filled rules_by_user dict by fetching rules for
+        any newly joined users in the `member_event_ids` list.
+
+        Args:
+            ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
+                updated with any new rules.
+            member_event_ids (list): List of event ids for membership events that
+                have happened since the last time we filled rules_by_user
+            state_group: The state group we are currently computing push rules
+                for. Used when updating the cache.
+        """
+        sequence = self.sequence
+
+        rows = yield self.store._simple_select_many_batch(
+            table="room_memberships",
+            column="event_id",
+            iterable=member_event_ids.values(),
+            retcols=('user_id', 'membership', 'event_id'),
+            keyvalues={},
+            batch_size=500,
+            desc="_get_rules_for_member_event_ids",
+        )
+
+        members = {
+            row["event_id"]: (row["user_id"], row["membership"])
+            for row in rows
+        }
+
+        # If the event is a join event then it will be in current state evnts
+        # map but not in the DB, so we have to explicitly insert it.
+        if event.type == EventTypes.Member:
+            for event_id in member_event_ids.itervalues():
+                if event_id == event.event_id:
+                    members[event_id] = (event.state_key, event.membership)
+
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug("Found members %r: %r", self.room_id, members.values())
+
+        interested_in_user_ids = set(
+            user_id for user_id, membership in members.itervalues()
+            if membership == Membership.JOIN
+        )
+
+        logger.debug("Joined: %r", interested_in_user_ids)
+
+        if_users_with_pushers = yield self.store.get_if_users_have_pushers(
+            interested_in_user_ids,
+            on_invalidate=self.invalidate_all_cb,
+        )
+
+        user_ids = set(
+            uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
+        )
+
+        logger.debug("With pushers: %r", user_ids)
+
+        users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
+            self.room_id, on_invalidate=self.invalidate_all_cb,
+        )
+
+        logger.debug("With receipts: %r", users_with_receipts)
+
+        # any users with pushers must be ours: they have pushers
+        for uid in users_with_receipts:
+            if uid in interested_in_user_ids:
+                user_ids.add(uid)
+
+        rules_by_user = yield self.store.bulk_get_push_rules(
+            user_ids, on_invalidate=self.invalidate_all_cb,
+        )
+
+        ret_rules_by_user.update(
+            item for item in rules_by_user.iteritems() if item[0] is not None
+        )
+
+        self.update_cache(sequence, members, ret_rules_by_user, state_group)
+
+    def invalidate_all(self):
+        # Note: Don't hand this function directly to an invalidation callback
+        # as it keeps a reference to self and will stop this instance from being
+        # GC'd if it gets dropped from the rules_to_user cache. Instead use
+        # `self.invalidate_all_cb`
+        logger.debug("Invalidating RulesForRoom for %r", self.room_id)
+        self.sequence += 1
+        self.state_group = object()
+        self.member_map = {}
+        self.rules_by_user = {}
+
+    def update_cache(self, sequence, members, rules_by_user, state_group):
+        if sequence == self.sequence:
+            self.member_map.update(members)
+            self.rules_by_user = rules_by_user
+            self.state_group = state_group
+
+
+class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
+    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+    # which namedtuple does for us (i.e. two _CacheContext are the same if
+    # their caches and keys match). This is important in particular to
+    # dedupe when we add callbacks to lru cache nodes, otherwise the number
+    # of callbacks would grow.
+    def __call__(self):
+        rules = self.cache.get(self.room_id, None, update_metrics=False)
+        if rules:
+            rules.invalidate_all()
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index c7afd11111..a69dda7b09 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -21,7 +21,6 @@ import logging
 from synapse.util.metrics import Measure
 from synapse.util.logcontext import LoggingContext
 
-from mailer import Mailer
 
 logger = logging.getLogger(__name__)
 
@@ -56,8 +55,10 @@ class EmailPusher(object):
     This shares quite a bit of code with httpusher: it would be good to
     factor out the common parts
     """
-    def __init__(self, hs, pusherdict):
+    def __init__(self, hs, pusherdict, mailer):
         self.hs = hs
+        self.mailer = mailer
+
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.pusher_id = pusherdict['id']
@@ -73,16 +74,6 @@ class EmailPusher(object):
 
         self.processing = False
 
-        if self.hs.config.email_enable_notifs:
-            if 'data' in pusherdict and 'brand' in pusherdict['data']:
-                app_name = pusherdict['data']['brand']
-            else:
-                app_name = self.hs.config.email_app_name
-
-            self.mailer = Mailer(self.hs, app_name)
-        else:
-            self.mailer = None
-
     @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index f83aa7625c..b5cd9b426a 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -78,23 +78,17 @@ ALLOWED_ATTRS = {
 
 
 class Mailer(object):
-    def __init__(self, hs, app_name):
+    def __init__(self, hs, app_name, notif_template_html, notif_template_text):
         self.hs = hs
+        self.notif_template_html = notif_template_html
+        self.notif_template_text = notif_template_text
+
         self.store = self.hs.get_datastore()
         self.macaroon_gen = self.hs.get_macaroon_generator()
         self.state_handler = self.hs.get_state_handler()
-        loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
         self.app_name = app_name
+
         logger.info("Created Mailer for app_name %s" % app_name)
-        env = jinja2.Environment(loader=loader)
-        env.filters["format_ts"] = format_ts_filter
-        env.filters["mxc_to_http"] = self.mxc_to_http_filter
-        self.notif_template_html = env.get_template(
-            self.hs.config.email_notif_template_html
-        )
-        self.notif_template_text = env.get_template(
-            self.hs.config.email_notif_template_text
-        )
 
     @defer.inlineCallbacks
     def send_notification_mail(self, app_id, user_id, email_address,
@@ -481,28 +475,6 @@ class Mailer(object):
             urllib.urlencode(params),
         )
 
-    def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
-        if value[0:6] != "mxc://":
-            return ""
-
-        serverAndMediaId = value[6:]
-        fragment = None
-        if '#' in serverAndMediaId:
-            (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
-            fragment = "#" + fragment
-
-        params = {
-            "width": width,
-            "height": height,
-            "method": resize_method,
-        }
-        return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
-            self.hs.config.public_baseurl,
-            serverAndMediaId,
-            urllib.urlencode(params),
-            fragment or "",
-        )
-
 
 def safe_markup(raw_html):
     return jinja2.Markup(bleach.linkify(bleach.clean(
@@ -543,3 +515,52 @@ def string_ordinal_total(s):
 
 def format_ts_filter(value, format):
     return time.strftime(format, time.localtime(value / 1000))
+
+
+def load_jinja2_templates(config):
+    """Load the jinja2 email templates from disk
+
+    Returns:
+        (notif_template_html, notif_template_text)
+    """
+    logger.info("loading jinja2")
+
+    loader = jinja2.FileSystemLoader(config.email_template_dir)
+    env = jinja2.Environment(loader=loader)
+    env.filters["format_ts"] = format_ts_filter
+    env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config)
+
+    notif_template_html = env.get_template(
+        config.email_notif_template_html
+    )
+    notif_template_text = env.get_template(
+        config.email_notif_template_text
+    )
+
+    return notif_template_html, notif_template_text
+
+
+def _create_mxc_to_http_filter(config):
+    def mxc_to_http_filter(value, width, height, resize_method="crop"):
+        if value[0:6] != "mxc://":
+            return ""
+
+        serverAndMediaId = value[6:]
+        fragment = None
+        if '#' in serverAndMediaId:
+            (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
+            fragment = "#" + fragment
+
+        params = {
+            "width": width,
+            "height": height,
+            "method": resize_method,
+        }
+        return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
+            config.public_baseurl,
+            serverAndMediaId,
+            urllib.urlencode(params),
+            fragment or "",
+        )
+
+    return mxc_to_http_filter
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index de9c33b936..491f27bded 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -26,22 +26,54 @@ logger = logging.getLogger(__name__)
 # process works fine)
 try:
     from synapse.push.emailpusher import EmailPusher
+    from synapse.push.mailer import Mailer, load_jinja2_templates
 except:
     pass
 
 
-def create_pusher(hs, pusherdict):
-    logger.info("trying to create_pusher for %r", pusherdict)
+class PusherFactory(object):
+    def __init__(self, hs):
+        self.hs = hs
 
-    PUSHER_TYPES = {
-        "http": HttpPusher,
-    }
+        self.pusher_types = {
+            "http": HttpPusher,
+        }
 
-    logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
-    if hs.config.email_enable_notifs:
-        PUSHER_TYPES["email"] = EmailPusher
-        logger.info("defined email pusher type")
+        logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
+        if hs.config.email_enable_notifs:
+            self.mailers = {}  # app_name -> Mailer
 
-    if pusherdict['kind'] in PUSHER_TYPES:
-        logger.info("found pusher")
-        return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
+            templates = load_jinja2_templates(hs.config)
+            self.notif_template_html, self.notif_template_text = templates
+
+            self.pusher_types["email"] = self._create_email_pusher
+
+            logger.info("defined email pusher type")
+
+    def create_pusher(self, pusherdict):
+        logger.info("trying to create_pusher for %r", pusherdict)
+
+        if pusherdict['kind'] in self.pusher_types:
+            logger.info("found pusher")
+            return self.pusher_types[pusherdict['kind']](self.hs, pusherdict)
+
+    def _create_email_pusher(self, _hs, pusherdict):
+        app_name = self._app_name_from_pusherdict(pusherdict)
+        mailer = self.mailers.get(app_name)
+        if not mailer:
+            mailer = Mailer(
+                hs=self.hs,
+                app_name=app_name,
+                notif_template_html=self.notif_template_html,
+                notif_template_text=self.notif_template_text,
+            )
+            self.mailers[app_name] = mailer
+        return EmailPusher(self.hs, pusherdict, mailer)
+
+    def _app_name_from_pusherdict(self, pusherdict):
+        if 'data' in pusherdict and 'brand' in pusherdict['data']:
+            app_name = pusherdict['data']['brand']
+        else:
+            app_name = self.hs.config.email_app_name
+
+        return app_name
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3837be523d..43cb6e9c01 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,7 +16,7 @@
 
 from twisted.internet import defer
 
-import pusher
+from .pusher import PusherFactory
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.util.async import run_on_reactor
 
@@ -28,6 +28,7 @@ logger = logging.getLogger(__name__)
 class PusherPool:
     def __init__(self, _hs):
         self.hs = _hs
+        self.pusher_factory = PusherFactory(_hs)
         self.start_pushers = _hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
@@ -48,7 +49,7 @@ class PusherPool:
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
-        pusher.create_pusher(self.hs, {
+        self.pusher_factory.create_pusher({
             "id": None,
             "user_name": user_id,
             "kind": kind,
@@ -186,7 +187,7 @@ class PusherPool:
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             try:
-                p = pusher.create_pusher(self.hs, pusherdict)
+                p = self.pusher_factory.create_pusher(pusherdict)
             except:
                 logger.exception("Couldn't start a pusher: caught Exception")
                 continue
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 4d4a435471..7687867aee 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -16,6 +16,7 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
+from synapse.storage.end_to_end_keys import EndToEndKeyStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 
@@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore):
     _mark_as_sent_devices_by_remote_txn = (
         DataStore._mark_as_sent_devices_by_remote_txn.__func__
     )
+    count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]
 
     def stream_positions(self):
         result = super(SlavedDeviceStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index fcaf58b93b..94ebbffc1b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore):
     get_current_state_ids = (
         StateStore.__dict__["get_current_state_ids"]
     )
+    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
+    _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
     get_unread_push_actions_for_user_in_range_for_http = (
@@ -151,8 +153,7 @@ class SlavedEventStore(BaseSlavedStore):
     get_room_events_stream_for_rooms = (
         DataStore.get_room_events_stream_for_rooms.__func__
     )
-    is_host_joined = DataStore.is_host_joined.__func__
-    _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"]
+    is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
     get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 
     _set_before_and_after = staticmethod(DataStore._set_before_and_after)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..69c46911ec 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -67,6 +67,7 @@ class ReplicationStreamer(object):
         self.store = hs.get_datastore()
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
 
         # Current connections.
         self.connections = []
@@ -99,7 +100,7 @@ class ReplicationStreamer(object):
         if not hs.config.send_federation:
             self.federation_sender = hs.get_federation_sender()
 
-        hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+        self.notifier.add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index aa8d874f96..3d809d181b 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -51,6 +51,7 @@ from synapse.rest.client.v2_alpha import (
     devices,
     thirdparty,
     sendtodevice,
+    user_directory,
 )
 
 from synapse.http.server import JsonResource
@@ -100,3 +101,4 @@ class ClientRestResource(JsonResource):
         devices.register_servlets(hs, client_resource)
         thirdparty.register_servlets(hs, client_resource)
         sendtodevice.register_servlets(hs, client_resource)
+        user_directory.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 771e127ab9..83e209d18f 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -192,6 +192,7 @@ class SyncRestServlet(RestServlet):
                 "invite": invited,
                 "leave": archived,
             },
+            "device_one_time_keys_count": sync_result.device_one_time_keys_count,
             "next_batch": sync_result.next_batch.to_string(),
         }
 
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
new file mode 100644
index 0000000000..17d3dffc8f
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from ._base import client_v2_patterns
+
+logger = logging.getLogger(__name__)
+
+
+class UserDirectorySearchRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/user_directory/search$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(UserDirectorySearchRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.user_directory_handler = hs.get_user_directory_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+        yield self.auth.get_user_by_req(request, allow_guest=False)
+        body = parse_json_object_from_request(request)
+
+        limit = body.get("limit", 10)
+        limit = min(limit, 50)
+
+        try:
+            search_term = body["search_term"]
+        except:
+            raise SynapseError(400, "`search_term` is required field")
+
+        results = yield self.user_directory_handler.search_users(search_term, limit)
+
+        defer.returnValue((200, results))
+
+
+def register_servlets(hs, http_server):
+    UserDirectorySearchRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index caca96c222..bae2b4c757 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -184,6 +184,7 @@ class MediaRepository(object):
                     raise
                 except NotRetryingDestination:
                     logger.warn("Not retrying destination %r", server_name)
+                    raise SynapseError(502, "Failed to fetch remote media")
                 except Exception:
                     logger.exception("Failed to fetch remote media %s/%s",
                                      server_name, media_id)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 99760d622f..c680fddab5 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -434,6 +434,8 @@ def _calc_og(tree, media_uri):
                 for el in _iterate_over_text(tree.find("body"), *TAGS_TO_REMOVE)
             )
             og['og:description'] = summarize_paragraphs(text_nodes)
+    else:
+        og['og:description'] = summarize_paragraphs([og['og:description']])
 
     # TODO: delete the url downloads to stop diskfilling,
     # as we only ever cared about its OG
diff --git a/synapse/server.py b/synapse/server.py
index 12754c89ae..a38e5179e0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -49,9 +49,11 @@ from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.receipts import ReceiptsHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
+from synapse.handlers.user_directory import UserDirectoyHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
+from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
 from synapse.rest.media.v1.media_repository import MediaRepository
 from synapse.state import StateHandler
@@ -135,6 +137,8 @@ class HomeServer(object):
         'macaroon_generator',
         'tcp_replication',
         'read_marker_handler',
+        'action_generator',
+        'user_directory_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -299,6 +303,12 @@ class HomeServer(object):
     def build_tcp_replication(self):
         raise NotImplementedError()
 
+    def build_action_generator(self):
+        return ActionGenerator(self)
+
+    def build_user_directory_handler(self):
+        return UserDirectoyHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/state.py b/synapse/state.py
index 02fee47f39..5b386e3183 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -170,9 +170,7 @@ class StateHandler(object):
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_user_in_room")
         entry = yield self.resolve_state_groups(room_id, latest_event_ids)
-        joined_users = yield self.store.get_joined_users_from_state(
-            room_id, entry.state_id, entry.state
-        )
+        joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
         defer.returnValue(joined_users)
 
     @defer.inlineCallbacks
@@ -181,9 +179,7 @@ class StateHandler(object):
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
         entry = yield self.resolve_state_groups(room_id, latest_event_ids)
-        joined_hosts = yield self.store.get_joined_hosts(
-            room_id, entry.state_id, entry.state
-        )
+        joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
         defer.returnValue(joined_hosts)
 
     @defer.inlineCallbacks
@@ -195,12 +191,12 @@ class StateHandler(object):
         Returns:
             synapse.events.snapshot.EventContext:
         """
-        context = EventContext()
 
         if event.internal_metadata.is_outlier():
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
+            context = EventContext()
             if old_state:
                 context.prev_state_ids = {
                     (s.type, s.state_key): s.event_id for s in old_state
@@ -219,6 +215,7 @@ class StateHandler(object):
             defer.returnValue(context)
 
         if old_state:
+            context = EventContext()
             context.prev_state_ids = {
                 (s.type, s.state_key): s.event_id for s in old_state
             }
@@ -239,19 +236,13 @@ class StateHandler(object):
             defer.returnValue(context)
 
         logger.debug("calling resolve_state_groups from compute_event_context")
-        if event.is_state():
-            entry = yield self.resolve_state_groups(
-                event.room_id, [e for e, _ in event.prev_events],
-                event_type=event.type,
-                state_key=event.state_key,
-            )
-        else:
-            entry = yield self.resolve_state_groups(
-                event.room_id, [e for e, _ in event.prev_events],
-            )
+        entry = yield self.resolve_state_groups(
+            event.room_id, [e for e, _ in event.prev_events],
+        )
 
         curr_state = entry.state
 
+        context = EventContext()
         context.prev_state_ids = curr_state
         if event.is_state():
             context.state_group = self.store.get_next_state_group()
@@ -264,10 +255,14 @@ class StateHandler(object):
             context.current_state_ids = dict(context.prev_state_ids)
             context.current_state_ids[key] = event.event_id
 
-            context.prev_group = entry.prev_group
-            context.delta_ids = entry.delta_ids
-            if context.delta_ids is not None:
-                context.delta_ids = dict(context.delta_ids)
+            if entry.state_group:
+                context.prev_group = entry.state_group
+                context.delta_ids = {
+                    key: event.event_id
+                }
+            elif entry.prev_group:
+                context.prev_group = entry.prev_group
+                context.delta_ids = dict(entry.delta_ids)
                 context.delta_ids[key] = event.event_id
         else:
             if entry.state_group is None:
@@ -284,7 +279,7 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     @log_function
-    def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""):
+    def resolve_state_groups(self, room_id, event_ids):
         """ Given a list of event_ids this method fetches the state at each
         event, resolves conflicts between them and returns them.
 
@@ -309,11 +304,13 @@ class StateHandler(object):
         if len(group_names) == 1:
             name, state_list = state_groups_ids.items().pop()
 
+            prev_group, delta_ids = yield self.store.get_state_group_delta(name)
+
             defer.returnValue(_StateCacheEntry(
                 state=state_list,
                 state_group=name,
-                prev_group=name,
-                delta_ids={},
+                prev_group=prev_group,
+                delta_ids=delta_ids,
             ))
 
         with (yield self.resolve_linearizer.queue(group_names)):
@@ -357,20 +354,18 @@ class StateHandler(object):
                 if new_state_event_ids == frozenset(e_id for e_id in events):
                     state_group = sg
                     break
-            if state_group is None:
-                # Worker instances don't have access to this method, but we want
-                # to set the state_group on the main instance to increase cache
-                # hits.
-                if hasattr(self.store, "get_next_state_group"):
-                    state_group = self.store.get_next_state_group()
+
+            # TODO: We want to create a state group for this set of events, to
+            # increase cache hits, but we need to make sure that it doesn't
+            # end up as a prev_group without being added to the database
 
             prev_group = None
             delta_ids = None
-            for old_group, old_ids in state_groups_ids.items():
-                if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
+            for old_group, old_ids in state_groups_ids.iteritems():
+                if not set(new_state) - set(old_ids):
                     n_delta_ids = {
                         k: v
-                        for k, v in new_state.items()
+                        for k, v in new_state.iteritems()
                         if old_ids.get(k) != v
                     }
                     if not delta_ids or len(n_delta_ids) < len(delta_ids):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 2970df138b..f119c5a758 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -49,6 +49,7 @@ from .tags import TagsStore
 from .account_data import AccountDataStore
 from .openid import OpenIdStore
 from .client_ips import ClientIpStore
+from .user_directory import UserDirectoryStore
 
 from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
 from .engines import PostgresEngine
@@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 ClientIpStore,
                 DeviceStore,
                 DeviceInboxStore,
+                UserDirectoryStore,
                 ):
 
     def __init__(self, db_conn, hs):
@@ -221,11 +223,24 @@ class DataStore(RoomMemberStore, RoomStore,
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
+        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
+            db_conn, "current_state_delta_stream",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=events_max,  # As we share the stream id with events token
+            limit=1000,
+        )
+        self._curr_state_delta_stream_cache = StreamChangeCache(
+            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            prefilled_cache=curr_state_delta_prefill,
+        )
+
         cur = LoggingTransaction(
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
             database_engine=self.database_engine,
-            after_callbacks=[]
+            after_callbacks=[],
+            final_callbacks=[],
         )
         self._find_stream_orderings_for_times_txn(cur)
         cur.close()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 58b73af7d2..51730a88bf 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -52,13 +52,17 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name", "database_engine", "after_callbacks"]
+    __slots__ = [
+        "txn", "name", "database_engine", "after_callbacks", "final_callbacks",
+    ]
 
-    def __init__(self, txn, name, database_engine, after_callbacks):
+    def __init__(self, txn, name, database_engine, after_callbacks,
+                 final_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
+        object.__setattr__(self, "final_callbacks", final_callbacks)
 
     def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
@@ -67,6 +71,9 @@ class LoggingTransaction(object):
         """
         self.after_callbacks.append((callback, args, kwargs))
 
+    def call_finally(self, callback, *args, **kwargs):
+        self.final_callbacks.append((callback, args, kwargs))
+
     def __getattr__(self, name):
         return getattr(self.txn, name)
 
@@ -217,8 +224,8 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(self, conn, desc, after_callbacks, logging_context,
-                         func, *args, **kwargs):
+    def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
+                         logging_context, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
 
@@ -237,7 +244,8 @@ class SQLBaseStore(object):
                 try:
                     txn = conn.cursor()
                     txn = LoggingTransaction(
-                        txn, name, self.database_engine, after_callbacks
+                        txn, name, self.database_engine, after_callbacks,
+                        final_callbacks,
                     )
                     r = func(txn, *args, **kwargs)
                     conn.commit()
@@ -298,6 +306,7 @@ class SQLBaseStore(object):
         start_time = time.time() * 1000
 
         after_callbacks = []
+        final_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
@@ -309,7 +318,7 @@ class SQLBaseStore(object):
 
                 current_context.copy_to(context)
                 return self._new_transaction(
-                    conn, desc, after_callbacks, current_context,
+                    conn, desc, after_callbacks, final_callbacks, current_context,
                     func, *args, **kwargs
                 )
 
@@ -318,9 +327,13 @@ class SQLBaseStore(object):
                 result = yield self._db_pool.runWithConnection(
                     inner_func, *args, **kwargs
                 )
-        finally:
+
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
+        finally:
+            for after_callback, after_args, after_kwargs in final_callbacks:
+                after_callback(*after_args, **after_kwargs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -425,6 +438,11 @@ class SQLBaseStore(object):
 
         txn.execute(sql, vals)
 
+    def _simple_insert_many(self, table, values, desc):
+        return self.runInteraction(
+            desc, self._simple_insert_many_txn, table, values
+        )
+
     @staticmethod
     def _simple_insert_many_txn(txn, table, values):
         if not values:
@@ -936,7 +954,7 @@ class SQLBaseStore(object):
             # __exit__ called after the transaction finishes.
             ctx = self._cache_id_gen.get_next()
             stream_id = ctx.__enter__()
-            txn.call_after(ctx.__exit__, None, None, None)
+            txn.call_finally(ctx.__exit__, None, None, None)
             txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
             self._simple_insert_txn(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 514570561f..532df736a5 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import re
 import simplejson as json
 from twisted.internet import defer
 
@@ -36,16 +37,31 @@ class ApplicationServiceStore(SQLBaseStore):
             hs.config.app_service_config_files
         )
 
+        # We precompie a regex constructed from all the regexes that the AS's
+        # have registered for exclusive users.
+        exclusive_user_regexes = [
+            regex.pattern
+            for service in self.services_cache
+            for regex in service.get_exlusive_user_regexes()
+        ]
+        if exclusive_user_regexes:
+            exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
+            self.exclusive_user_regex = re.compile(exclusive_user_regex)
+        else:
+            # We handle this case specially otherwise the constructed regex
+            # will always match
+            self.exclusive_user_regex = None
+
     def get_app_services(self):
         return self.services_cache
 
     def get_if_app_services_interested_in_user(self, user_id):
-        """Check if the user is one associated with an app service
+        """Check if the user is one associated with an app service (exclusively)
         """
-        for service in self.services_cache:
-            if service.is_interested_in_user(user_id):
-                return True
-        return False
+        if self.exclusive_user_regex:
+            return bool(self.exclusive_user_regex.match(user_id))
+        else:
+            return False
 
     def get_app_service_by_user_id(self, user_id):
         """Retrieve an application service from their user ID.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 747d2df622..014ab635b7 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -20,6 +20,8 @@ from twisted.internet import defer
 from ._base import Cache
 from . import background_updates
 
+import os
+
 logger = logging.getLogger(__name__)
 
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
@@ -28,12 +30,15 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 120 * 1000
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class ClientIpStore(background_updates.BackgroundUpdateStore):
     def __init__(self, hs):
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
-            max_entries=5000,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
         )
 
         super(ClientIpStore, self).__init__(hs)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d9936c88bb..bb27fd1f70 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -368,7 +368,7 @@ class DeviceStore(SQLBaseStore):
 
         prev_sent_id_sql = """
             SELECT coalesce(max(stream_id), 0) as stream_id
-            FROM device_lists_outbound_pokes
+            FROM device_lists_outbound_last_success
             WHERE destination = ? AND user_id = ? AND stream_id <= ?
         """
 
@@ -510,32 +510,43 @@ class DeviceStore(SQLBaseStore):
         )
 
     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
-        # First we DELETE all rows such that only the latest row for each
-        # (destination, user_id is left. We do this by selecting first and
-        # deleting.
+        # We update the device_lists_outbound_last_success with the successfully
+        # poked users. We do the join to see which users need to be inserted and
+        # which updated.
         sql = """
-            SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
-            WHERE destination = ? AND stream_id <= ?
+            SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
+            FROM device_lists_outbound_pokes as o
+            LEFT JOIN device_lists_outbound_last_success as s
+                USING (destination, user_id)
+            WHERE destination = ? AND o.stream_id <= ?
             GROUP BY user_id
-            HAVING count(*) > 1
         """
         txn.execute(sql, (destination, stream_id,))
         rows = txn.fetchall()
 
         sql = """
-            DELETE FROM device_lists_outbound_pokes
-            WHERE destination = ? AND user_id = ? AND stream_id < ?
+            UPDATE device_lists_outbound_last_success
+            SET stream_id = ?
+            WHERE destination = ? AND user_id = ?
         """
         txn.executemany(
-            sql, ((destination, row[0], row[1],) for row in rows)
+            sql, ((row[1], destination, row[0],) for row in rows if row[2])
         )
 
-        # Mark everything that is left as sent
         sql = """
-            UPDATE device_lists_outbound_pokes SET sent = ?
+            INSERT INTO device_lists_outbound_last_success
+            (destination, user_id, stream_id) VALUES (?, ?, ?)
+        """
+        txn.executemany(
+            sql, ((destination, row[0], row[1],) for row in rows if not row[2])
+        )
+
+        # Delete all sent outbound pokes
+        sql = """
+            DELETE FROM device_lists_outbound_pokes
             WHERE destination = ? AND stream_id <= ?
         """
-        txn.execute(sql, (True, destination, stream_id,))
+        txn.execute(sql, (destination, stream_id,))
 
     @defer.inlineCallbacks
     def get_user_whose_devices_changed(self, from_key):
@@ -670,6 +681,14 @@ class DeviceStore(SQLBaseStore):
                 )
             )
 
+            # Since we've deleted unsent deltas, we need to remove the entry
+            # of last successful sent so that the prev_ids are correctly set.
+            sql = """
+                DELETE FROM device_lists_outbound_last_success
+                WHERE destination = ? AND user_id = ?
+            """
+            txn.executemany(sql, ((row[0], row[1]) for row in rows))
+
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
         return self.runInteraction(
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index e00f31da2b..2cebb203c6 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -185,8 +185,8 @@ class EndToEndKeyStore(SQLBaseStore):
                     for algorithm, key_id, json_bytes in new_keys
                 ],
             )
-            txn.call_after(
-                self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+            self._invalidate_cache_and_stream(
+                txn, self.count_e2e_one_time_keys, (user_id, device_id,)
             )
         yield self.runInteraction(
             "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
@@ -237,24 +237,29 @@ class EndToEndKeyStore(SQLBaseStore):
             )
             for user_id, device_id, algorithm, key_id in delete:
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
-                txn.call_after(
-                    self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+                self._invalidate_cache_and_stream(
+                    txn, self.count_e2e_one_time_keys, (user_id, device_id,)
                 )
             return result
         return self.runInteraction(
             "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
         )
 
-    @defer.inlineCallbacks
     def delete_e2e_keys_by_device(self, user_id, device_id):
-        yield self._simple_delete(
-            table="e2e_device_keys_json",
-            keyvalues={"user_id": user_id, "device_id": device_id},
-            desc="delete_e2e_device_keys_by_device"
-        )
-        yield self._simple_delete(
-            table="e2e_one_time_keys_json",
-            keyvalues={"user_id": user_id, "device_id": device_id},
-            desc="delete_e2e_one_time_keys_by_device"
+        def delete_e2e_keys_by_device_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="e2e_device_keys_json",
+                keyvalues={"user_id": user_id, "device_id": device_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="e2e_one_time_keys_json",
+                keyvalues={"user_id": user_id, "device_id": device_id},
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.count_e2e_one_time_keys, (user_id, device_id,)
+            )
+        return self.runInteraction(
+            "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
         )
-        self.count_e2e_one_time_keys.invalidate((user_id, device_id,))
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 519059c306..e8133de2fa 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -37,25 +37,55 @@ class EventFederationStore(SQLBaseStore):
     and backfilling from another server respectively.
     """
 
+    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
     def __init__(self, hs):
         super(EventFederationStore, self).__init__(hs)
 
+        self.register_background_update_handler(
+            self.EVENT_AUTH_STATE_ONLY,
+            self._background_delete_non_state_event_auth,
+        )
+
         hs.get_clock().looping_call(
             self._delete_old_forward_extrem_cache, 60 * 60 * 1000
         )
 
-    def get_auth_chain(self, event_ids):
-        return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
+    def get_auth_chain(self, event_ids, include_given=False):
+        """Get auth events for given event_ids. The events *must* be state events.
+
+        Args:
+            event_ids (list): state events
+            include_given (bool): include the given events in result
+
+        Returns:
+            list of events
+        """
+        return self.get_auth_chain_ids(
+            event_ids, include_given=include_given,
+        ).addCallback(self._get_events)
+
+    def get_auth_chain_ids(self, event_ids, include_given=False):
+        """Get auth events for given event_ids. The events *must* be state events.
+
+        Args:
+            event_ids (list): state events
+            include_given (bool): include the given events in result
 
-    def get_auth_chain_ids(self, event_ids):
+        Returns:
+            list of event_ids
+        """
         return self.runInteraction(
             "get_auth_chain_ids",
             self._get_auth_chain_ids_txn,
-            event_ids
+            event_ids, include_given
         )
 
-    def _get_auth_chain_ids_txn(self, txn, event_ids):
-        results = set()
+    def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
+        if include_given:
+            results = set(event_ids)
+        else:
+            results = set()
 
         base_sql = (
             "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
@@ -504,3 +534,52 @@ class EventFederationStore(SQLBaseStore):
 
         txn.execute(query, (room_id,))
         txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
+
+    @defer.inlineCallbacks
+    def _background_delete_non_state_event_auth(self, progress, batch_size):
+        def delete_event_auth(txn):
+            target_min_stream_id = progress.get("target_min_stream_id_inclusive")
+            max_stream_id = progress.get("max_stream_id_exclusive")
+
+            if not target_min_stream_id or not max_stream_id:
+                txn.execute("SELECT COALESCE(MIN(stream_ordering), 0) FROM events")
+                rows = txn.fetchall()
+                target_min_stream_id = rows[0][0]
+
+                txn.execute("SELECT COALESCE(MAX(stream_ordering), 0) FROM events")
+                rows = txn.fetchall()
+                max_stream_id = rows[0][0]
+
+            min_stream_id = max_stream_id - batch_size
+
+            sql = """
+                DELETE FROM event_auth
+                WHERE event_id IN (
+                    SELECT event_id FROM events
+                    LEFT JOIN state_events USING (room_id, event_id)
+                    WHERE ? <= stream_ordering AND stream_ordering < ?
+                        AND state_key IS null
+                )
+            """
+
+            txn.execute(sql, (min_stream_id, max_stream_id,))
+
+            new_progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_AUTH_STATE_ONLY, new_progress
+            )
+
+            return min_stream_id >= target_min_stream_id
+
+        result = yield self.runInteraction(
+            self.EVENT_AUTH_STATE_ONLY, delete_event_auth
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
+
+        defer.returnValue(batch_size)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f29d71589d..f60ed889d5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -647,9 +647,10 @@ class EventsStore(SQLBaseStore):
                 list of the event ids which are the forward extremities.
 
         """
-        self._update_current_state_txn(txn, current_state_for_room)
-
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+
+        self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
@@ -712,7 +713,7 @@ class EventsStore(SQLBaseStore):
             backfilled=backfilled,
         )
 
-    def _update_current_state_txn(self, txn, state_delta_by_room):
+    def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in state_delta_by_room.iteritems():
                 to_delete, to_insert, _ = current_state_tuple
                 txn.executemany(
@@ -734,6 +735,29 @@ class EventsStore(SQLBaseStore):
                     ],
                 )
 
+                state_deltas = {key: None for key in to_delete}
+                state_deltas.update(to_insert)
+
+                self._simple_insert_many_txn(
+                    txn,
+                    table="current_state_delta_stream",
+                    values=[
+                        {
+                            "stream_id": max_stream_order,
+                            "room_id": room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": ev_id,
+                            "prev_event_id": to_delete.get(key, None),
+                        }
+                        for key, ev_id in state_deltas.iteritems()
+                    ]
+                )
+
+                self._curr_state_delta_stream_cache.entity_has_changed(
+                    room_id, max_stream_order,
+                )
+
                 # Invalidate the various caches
 
                 # Figure out the changes of membership to invalidate the
@@ -742,11 +766,7 @@ class EventsStore(SQLBaseStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in to_delete.iterkeys()
-                    if ev_type == EventTypes.Member
-                )
-                members_changed.update(
-                    state_key for ev_type, state_key in to_insert.iterkeys()
+                    state_key for ev_type, state_key in state_deltas
                     if ev_type == EventTypes.Member
                 )
 
@@ -755,6 +775,11 @@ class EventsStore(SQLBaseStore):
                         txn, self.get_rooms_for_user, (member,)
                     )
 
+                for host in set(get_domain_from_id(u) for u in members_changed):
+                    self._invalidate_cache_and_stream(
+                        txn, self.is_host_joined, (room_id, host)
+                    )
+
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
                 )
@@ -1119,6 +1144,7 @@ class EventsStore(SQLBaseStore):
                 }
                 for event, _ in events_and_contexts
                 for auth_id, _ in event.auth_events
+                if event.is_state()
             ],
         )
 
@@ -1418,7 +1444,7 @@ class EventsStore(SQLBaseStore):
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
                 )
 
                 row_dict = {
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6e623843d5..eaba699e29 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 41
+SCHEMA_VERSION = 42
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 0a819d32c5..8758b1c0c7 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -49,7 +49,7 @@ def _load_rules(rawrules, enabled_map):
 
 
 class PushRuleStore(SQLBaseStore):
-    @cachedInlineCallbacks()
+    @cachedInlineCallbacks(max_entries=5000)
     def get_push_rules_for_user(self, user_id):
         rows = yield self._simple_select_list(
             table="push_rules",
@@ -73,7 +73,7 @@ class PushRuleStore(SQLBaseStore):
 
         defer.returnValue(rules)
 
-    @cachedInlineCallbacks()
+    @cachedInlineCallbacks(max_entries=5000)
     def get_push_rules_enabled_for_user(self, user_id):
         results = yield self._simple_select_list(
             table="push_rules_enable",
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index efb90c3c91..f42b8014c7 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -45,7 +45,9 @@ class ReceiptsStore(SQLBaseStore):
             return
 
         # Returns an ObservableDeferred
-        res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+        res = self.get_users_with_read_receipts_in_room.cache.get(
+            room_id, None, update_metrics=False,
+        )
 
         if res:
             if isinstance(res, defer.Deferred) and res.called:
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0829ae5bee..457ca288d0 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from collections import namedtuple
 
 from ._base import SQLBaseStore
+from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.stringutils import to_ascii
@@ -392,7 +393,8 @@ class RoomMemberStore(SQLBaseStore):
             context=context,
         )
 
-    def get_joined_users_from_state(self, room_id, state_group, state_ids):
+    def get_joined_users_from_state(self, room_id, state_entry):
+        state_group = state_entry.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
             # state group, i.e. we need to make sure that calls with a state_group
@@ -401,7 +403,7 @@ class RoomMemberStore(SQLBaseStore):
             state_group = object()
 
         return self._get_joined_users_from_context(
-            room_id, state_group, state_ids,
+            room_id, state_group, state_entry.state, context=state_entry,
         )
 
     @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
@@ -499,42 +501,40 @@ class RoomMemberStore(SQLBaseStore):
 
         defer.returnValue(users_in_room)
 
-    def is_host_joined(self, room_id, host, state_group, state_ids):
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
+    @cachedInlineCallbacks(max_entries=10000)
+    def is_host_joined(self, room_id, host):
+        if '%' in host or '_' in host:
+            raise Exception("Invalid host name")
 
-        return self._is_host_joined(
-            room_id, host, state_group, state_ids
-        )
+        sql = """
+            SELECT state_key FROM current_state_events AS c
+            INNER JOIN room_memberships USING (event_id)
+            WHERE membership = 'join'
+                AND type = 'm.room.member'
+                AND c.room_id = ?
+                AND state_key LIKE ?
+            LIMIT 1
+        """
 
-    @cachedInlineCallbacks(num_args=3)
-    def _is_host_joined(self, room_id, host, state_group, current_state_ids):
-        # We don't use `state_group`, its there so that we can cache based
-        # on it. However, its important that its never None, since two current_state's
-        # with a state_group of None are likely to be different.
-        # See bulk_get_push_rules_for_room for how we work around this.
-        assert state_group is not None
+        # We do need to be careful to ensure that host doesn't have any wild cards
+        # in it, but we checked above for known ones and we'll check below that
+        # the returned user actually has the correct domain.
+        like_clause = "%:" + host
 
-        for (etype, state_key), event_id in current_state_ids.items():
-            if etype == EventTypes.Member:
-                try:
-                    if get_domain_from_id(state_key) != host:
-                        continue
-                except:
-                    logger.warn("state_key not user_id: %s", state_key)
-                    continue
+        rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
 
-                event = yield self.get_event(event_id, allow_none=True)
-                if event and event.content["membership"] == Membership.JOIN:
-                    defer.returnValue(True)
+        if not rows:
+            defer.returnValue(False)
 
-        defer.returnValue(False)
+        user_id = rows[0][0]
+        if get_domain_from_id(user_id) != host:
+            # This can only happen if the host name has something funky in it
+            raise Exception("Invalid host name")
 
-    def get_joined_hosts(self, room_id, state_group, state_ids):
+        defer.returnValue(True)
+
+    def get_joined_hosts(self, room_id, state_entry):
+        state_group = state_entry.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
             # state group, i.e. we need to make sure that calls with a state_group
@@ -543,33 +543,20 @@ class RoomMemberStore(SQLBaseStore):
             state_group = object()
 
         return self._get_joined_hosts(
-            room_id, state_group, state_ids
+            room_id, state_group, state_entry.state, state_entry=state_entry,
         )
 
     @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
-    def _get_joined_hosts(self, room_id, state_group, current_state_ids):
+    # @defer.inlineCallbacks
+    def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
         # We don't use `state_group`, its there so that we can cache based
         # on it. However, its important that its never None, since two current_state's
         # with a state_group of None are likely to be different.
         # See bulk_get_push_rules_for_room for how we work around this.
         assert state_group is not None
 
-        joined_hosts = set()
-        for etype, state_key in current_state_ids:
-            if etype == EventTypes.Member:
-                try:
-                    host = get_domain_from_id(state_key)
-                except:
-                    logger.warn("state_key not user_id: %s", state_key)
-                    continue
-
-                if host in joined_hosts:
-                    continue
-
-                event_id = current_state_ids[(etype, state_key)]
-                event = yield self.get_event(event_id, allow_none=True)
-                if event and event.content["membership"] == Membership.JOIN:
-                    joined_hosts.add(intern_string(host))
+        cache = self._get_joined_hosts_cache(room_id)
+        joined_hosts = yield cache.get_destinations(state_entry)
 
         defer.returnValue(joined_hosts)
 
@@ -647,3 +634,75 @@ class RoomMemberStore(SQLBaseStore):
             yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
 
         defer.returnValue(result)
+
+    @cached(max_entries=10000, iterable=True)
+    def _get_joined_hosts_cache(self, room_id):
+        return _JoinedHostsCache(self, room_id)
+
+
+class _JoinedHostsCache(object):
+    """Cache for joined hosts in a room that is optimised to handle updates
+    via state deltas.
+    """
+
+    def __init__(self, store, room_id):
+        self.store = store
+        self.room_id = room_id
+
+        self.hosts_to_joined_users = {}
+
+        self.state_group = object()
+
+        self.linearizer = Linearizer("_JoinedHostsCache")
+
+        self._len = 0
+
+    @defer.inlineCallbacks
+    def get_destinations(self, state_entry):
+        """Get set of destinations for a state entry
+
+        Args:
+            state_entry(synapse.state._StateCacheEntry)
+        """
+        if state_entry.state_group == self.state_group:
+            defer.returnValue(frozenset(self.hosts_to_joined_users))
+
+        with (yield self.linearizer.queue(())):
+            if state_entry.state_group == self.state_group:
+                pass
+            elif state_entry.prev_group == self.state_group:
+                for (typ, state_key), event_id in state_entry.delta_ids.iteritems():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    host = intern_string(get_domain_from_id(state_key))
+                    user_id = state_key
+                    known_joins = self.hosts_to_joined_users.setdefault(host, set())
+
+                    event = yield self.store.get_event(event_id)
+                    if event.membership == Membership.JOIN:
+                        known_joins.add(user_id)
+                    else:
+                        known_joins.discard(user_id)
+
+                        if not known_joins:
+                            self.hosts_to_joined_users.pop(host, None)
+            else:
+                joined_users = yield self.store.get_joined_users_from_state(
+                    self.room_id, state_entry,
+                )
+
+                self.hosts_to_joined_users = {}
+                for user_id in joined_users:
+                    host = intern_string(get_domain_from_id(user_id))
+                    self.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+            if state_entry.state_group:
+                self.state_group = state_entry.state_group
+            else:
+                self.state_group = object()
+            self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues())
+        defer.returnValue(frozenset(self.hosts_to_joined_users))
+
+    def __len__(self):
+        return self._len
diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql
new file mode 100644
index 0000000000..d28851aff8
--- /dev/null
+++ b/synapse/storage/schema/delta/42/current_state_delta.sql
@@ -0,0 +1,26 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE current_state_delta_stream (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    event_id TEXT,  -- Is null if the key was removed
+    prev_event_id TEXT  -- Is null if the key was added
+);
+
+CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id);
diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql
new file mode 100644
index 0000000000..9ab8c14fa3
--- /dev/null
+++ b/synapse/storage/schema/delta/42/device_list_last_id.sql
@@ -0,0 +1,33 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Table of last stream_id that we sent to destination for user_id. This is
+-- used to fill out the `prev_id` fields of outbound device list updates.
+CREATE TABLE device_lists_outbound_last_success (
+    destination TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL
+);
+
+INSERT INTO device_lists_outbound_last_success
+    SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id
+        FROM device_lists_outbound_pokes
+        WHERE sent = (1 = 1)  -- sqlite doesn't have inbuilt boolean values
+        GROUP BY destination, user_id;
+
+CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success(
+    destination, user_id, stream_id
+);
diff --git a/synapse/storage/schema/delta/42/event_auth_state_only.sql b/synapse/storage/schema/delta/42/event_auth_state_only.sql
new file mode 100644
index 0000000000..b8821ac759
--- /dev/null
+++ b/synapse/storage/schema/delta/42/event_auth_state_only.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('event_auth_state_only', '{}');
diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py
new file mode 100644
index 0000000000..ea6a18196d
--- /dev/null
+++ b/synapse/storage/schema/delta/42/user_dir.py
@@ -0,0 +1,84 @@
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+logger = logging.getLogger(__name__)
+
+
+BOTH_TABLES = """
+CREATE TABLE user_directory_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO user_directory_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_directory (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,  -- A room_id that we know the user is joined to
+    display_name TEXT,
+    avatar_url TEXT
+);
+
+CREATE INDEX user_directory_room_idx ON user_directory(room_id);
+CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
+
+CREATE TABLE users_in_pubic_room (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL  -- A room_id that we know is public
+);
+
+CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id);
+CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id);
+"""
+
+
+POSTGRES_TABLE = """
+CREATE TABLE user_directory_search (
+    user_id TEXT NOT NULL,
+    vector tsvector
+);
+
+CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector);
+CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id);
+"""
+
+
+SQLITE_TABLE = """
+CREATE VIRTUAL TABLE user_directory_search
+    USING fts4 ( user_id, value );
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    for statement in get_statements(BOTH_TABLES.splitlines()):
+        cur.execute(statement)
+
+    if isinstance(database_engine, PostgresEngine):
+        for statement in get_statements(POSTGRES_TABLE.splitlines()):
+            cur.execute(statement)
+    elif isinstance(database_engine, Sqlite3Engine):
+        for statement in get_statements(SQLITE_TABLE.splitlines()):
+            cur.execute(statement)
+    else:
+        raise Exception("Unrecognized database engine")
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 85acf2ad1e..d1e679719b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,6 +20,7 @@ from synapse.util.stringutils import to_ascii
 from synapse.storage.engines import PostgresEngine
 
 from twisted.internet import defer
+from collections import namedtuple
 
 import logging
 
@@ -29,6 +30,16 @@ logger = logging.getLogger(__name__)
 MAX_STATE_DELTA_HOPS = 100
 
 
+class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))):
+    """Return type of get_state_group_delta that implements __len__, which lets
+    us use the itrable flag when caching
+    """
+    __slots__ = []
+
+    def __len__(self):
+        return len(self.delta_ids) if self.delta_ids else 0
+
+
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
 
@@ -98,6 +109,46 @@ class StateStore(SQLBaseStore):
             _get_current_state_ids_txn,
         )
 
+    @cached(max_entries=10000, iterable=True)
+    def get_state_group_delta(self, state_group):
+        """Given a state group try to return a previous group and a delta between
+        the old and the new.
+
+        Returns:
+            (prev_group, delta_ids), where both may be None.
+        """
+        def _get_state_group_delta_txn(txn):
+            prev_group = self._simple_select_one_onecol_txn(
+                txn,
+                table="state_group_edges",
+                keyvalues={
+                    "state_group": state_group,
+                },
+                retcol="prev_state_group",
+                allow_none=True,
+            )
+
+            if not prev_group:
+                return _GetStateGroupDelta(None, None)
+
+            delta_ids = self._simple_select_list_txn(
+                txn,
+                table="state_groups_state",
+                keyvalues={
+                    "state_group": state_group,
+                },
+                retcols=("type", "state_key", "event_id",)
+            )
+
+            return _GetStateGroupDelta(prev_group, {
+                (row["type"], row["state_key"]): row["event_id"]
+                for row in delta_ids
+            })
+        return self.runInteraction(
+            "get_state_group_delta",
+            _get_state_group_delta_txn,
+        )
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
@@ -184,6 +235,19 @@ class StateStore(SQLBaseStore):
             # We persist as a delta if we can, while also ensuring the chain
             # of deltas isn't tooo long, as otherwise read performance degrades.
             if context.prev_group:
+                is_in_db = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_groups",
+                    keyvalues={"id": context.prev_group},
+                    retcol="id",
+                    allow_none=True,
+                )
+                if not is_in_db:
+                    raise Exception(
+                        "Trying to persist state with unpersisted prev_group: %r"
+                        % (context.prev_group,)
+                    )
+
                 potential_hops = self._count_state_group_hops_txn(
                     txn, context.prev_group
                 )
@@ -563,20 +627,22 @@ class StateStore(SQLBaseStore):
                 where a `state_key` of `None` matches all state_keys for the
                 `type`.
         """
-        is_all, state_dict_ids = self._state_group_cache.get(group)
+        is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
 
         type_to_key = {}
         missing_types = set()
+
         for typ, state_key in types:
+            key = (typ, state_key)
             if state_key is None:
                 type_to_key[typ] = None
-                missing_types.add((typ, state_key))
+                missing_types.add(key)
             else:
                 if type_to_key.get(typ, object()) is not None:
                     type_to_key.setdefault(typ, set()).add(state_key)
 
-                if (typ, state_key) not in state_dict_ids:
-                    missing_types.add((typ, state_key))
+                if key not in state_dict_ids and key not in known_absent:
+                    missing_types.add(key)
 
         sentinel = object()
 
@@ -590,7 +656,7 @@ class StateStore(SQLBaseStore):
                 return True
             return False
 
-        got_all = not (missing_types or types is None)
+        got_all = is_all or not missing_types
 
         return {
             k: v for k, v in state_dict_ids.iteritems()
@@ -607,7 +673,7 @@ class StateStore(SQLBaseStore):
         Args:
             group: The state group to lookup
         """
-        is_all, state_dict_ids = self._state_group_cache.get(group)
+        is_all, _, state_dict_ids = self._state_group_cache.get(group)
 
         return state_dict_ids, is_all
 
@@ -624,7 +690,7 @@ class StateStore(SQLBaseStore):
         missing_groups = []
         if types is not None:
             for group in set(groups):
-                state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
+                state_dict_ids, _, got_all = self._get_some_state_from_cache(
                     group, types
                 )
                 results[group] = state_dict_ids
@@ -653,19 +719,7 @@ class StateStore(SQLBaseStore):
             # Now we want to update the cache with all the things we fetched
             # from the database.
             for group, group_state_dict in group_to_state_dict.iteritems():
-                if types:
-                    # We delibrately put key -> None mappings into the cache to
-                    # cache absence of the key, on the assumption that if we've
-                    # explicitly asked for some types then we will probably ask
-                    # for them again.
-                    state_dict = {
-                        (intern_string(etype), intern_string(state_key)): None
-                        for (etype, state_key) in types
-                    }
-                    state_dict.update(results[group])
-                    results[group] = state_dict
-                else:
-                    state_dict = results[group]
+                state_dict = results[group]
 
                 state_dict.update(
                     ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
@@ -677,17 +731,9 @@ class StateStore(SQLBaseStore):
                     key=group,
                     value=state_dict,
                     full=(types is None),
+                    known_absent=types,
                 )
 
-        # Remove all the entries with None values. The None values were just
-        # used for bookkeeping in the cache.
-        for group, state_dict in results.iteritems():
-            results[group] = {
-                key: event_id
-                for key, event_id in state_dict.iteritems()
-                if event_id
-            }
-
         defer.returnValue(results)
 
     def get_next_state_group(self):
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
new file mode 100644
index 0000000000..137aca2881
--- /dev/null
+++ b/synapse/storage/user_directory.py
@@ -0,0 +1,506 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import get_domain_from_id, get_localpart_from_id
+
+import re
+
+
+class UserDirectoryStore(SQLBaseStore):
+
+    @cachedInlineCallbacks(cache_context=True)
+    def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
+        """Check if the room is either world_readable or publically joinable
+        """
+        current_state_ids = yield self.get_current_state_ids(
+            room_id, on_invalidate=cache_context.invalidate
+        )
+
+        join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+        if join_rules_id:
+            join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
+            if join_rule_ev:
+                if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
+                    defer.returnValue(True)
+
+        hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
+        if hist_vis_id:
+            hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
+            if hist_vis_ev:
+                if hist_vis_ev.content.get("history_visibility") == "world_readable":
+                    defer.returnValue(True)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def add_users_to_public_room(self, room_id, user_ids):
+        """Add user to the list of users in public rooms
+
+        Args:
+            room_id (str): A room_id that all users are in that is world_readable
+                or publically joinable
+            user_ids (list(str)): Users to add
+        """
+        yield self._simple_insert_many(
+            table="users_in_pubic_room",
+            values=[
+                {
+                    "user_id": user_id,
+                    "room_id": room_id,
+                }
+                for user_id in user_ids
+            ],
+            desc="add_users_to_public_room"
+        )
+        for user_id in user_ids:
+            self.get_user_in_public_room.invalidate((user_id,))
+
+    def add_profiles_to_user_dir(self, room_id, users_with_profile):
+        """Add profiles to the user directory
+
+        Args:
+            room_id (str): A room_id that all users are joined to
+            users_with_profile (dict): Users to add to directory in the form of
+                mapping of user_id -> ProfileInfo
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            # We weight the loclpart most highly, then display name and finally
+            # server name
+            sql = """
+                INSERT INTO user_directory_search(user_id, vector)
+                VALUES (?,
+                    setweight(to_tsvector('english', ?), 'A')
+                    || setweight(to_tsvector('english', ?), 'D')
+                    || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                )
+            """
+            args = (
+                (
+                    user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
+                    profile.display_name,
+                )
+                for user_id, profile in users_with_profile.iteritems()
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = """
+                INSERT INTO user_directory_search(user_id, value)
+                VALUES (?,?)
+            """
+            args = (
+                (
+                    user_id,
+                    "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
+                )
+                for user_id, p in users_with_profile.iteritems()
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        def _add_profiles_to_user_dir_txn(txn):
+            txn.executemany(sql, args)
+            self._simple_insert_many_txn(
+                txn,
+                table="user_directory",
+                values=[
+                    {
+                        "user_id": user_id,
+                        "room_id": room_id,
+                        "display_name": profile.display_name,
+                        "avatar_url": profile.avatar_url,
+                    }
+                    for user_id, profile in users_with_profile.iteritems()
+                ]
+            )
+            for user_id in users_with_profile:
+                txn.call_after(
+                    self.get_user_in_directory.invalidate, (user_id,)
+                )
+
+        return self.runInteraction(
+            "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
+        )
+
+    @defer.inlineCallbacks
+    def update_user_in_user_dir(self, user_id, room_id):
+        yield self._simple_update_one(
+            table="user_directory",
+            keyvalues={"user_id": user_id},
+            updatevalues={"room_id": room_id},
+            desc="update_user_in_user_dir",
+        )
+        self.get_user_in_directory.invalidate((user_id,))
+
+    def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
+        def _update_profile_in_user_dir_txn(txn):
+            new_entry = self._simple_upsert_txn(
+                txn,
+                table="user_directory",
+                keyvalues={"user_id": user_id},
+                insertion_values={"room_id": room_id},
+                values={"display_name": display_name, "avatar_url": avatar_url},
+                lock=False,  # We're only inserter
+            )
+
+            if isinstance(self.database_engine, PostgresEngine):
+                # We weight the loclpart most highly, then display name and finally
+                # server name
+                if new_entry:
+                    sql = """
+                        INSERT INTO user_directory_search(user_id, vector)
+                        VALUES (?,
+                            setweight(to_tsvector('english', ?), 'A')
+                            || setweight(to_tsvector('english', ?), 'D')
+                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                        )
+                    """
+                    txn.execute(
+                        sql,
+                        (
+                            user_id, get_localpart_from_id(user_id),
+                            get_domain_from_id(user_id), display_name,
+                        )
+                    )
+                else:
+                    sql = """
+                        UPDATE user_directory_search
+                        SET vector = setweight(to_tsvector('english', ?), 'A')
+                            || setweight(to_tsvector('english', ?), 'D')
+                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                        WHERE user_id = ?
+                    """
+                    txn.execute(
+                        sql,
+                        (
+                            get_localpart_from_id(user_id), get_domain_from_id(user_id),
+                            display_name, user_id,
+                        )
+                    )
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                value = "%s %s" % (user_id, display_name,) if display_name else user_id
+                self._simple_upsert_txn(
+                    txn,
+                    table="user_directory_search",
+                    keyvalues={"user_id": user_id},
+                    values={"value": value},
+                    lock=False,  # We're only inserter
+                )
+            else:
+                # This should be unreachable.
+                raise Exception("Unrecognized database engine")
+
+            txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
+
+        return self.runInteraction(
+            "update_profile_in_user_dir", _update_profile_in_user_dir_txn
+        )
+
+    @defer.inlineCallbacks
+    def update_user_in_public_user_list(self, user_id, room_id):
+        yield self._simple_update_one(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            updatevalues={"room_id": room_id},
+            desc="update_user_in_public_user_list",
+        )
+        self.get_user_in_public_room.invalidate((user_id,))
+
+    def remove_from_user_dir(self, user_id):
+        def _remove_from_user_dir_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="user_directory",
+                keyvalues={"user_id": user_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="user_directory_search",
+                keyvalues={"user_id": user_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="users_in_pubic_room",
+                keyvalues={"user_id": user_id},
+            )
+            txn.call_after(
+                self.get_user_in_directory.invalidate, (user_id,)
+            )
+            txn.call_after(
+                self.get_user_in_public_room.invalidate, (user_id,)
+            )
+        return self.runInteraction(
+            "remove_from_user_dir", _remove_from_user_dir_txn,
+        )
+
+    @defer.inlineCallbacks
+    def remove_from_user_in_public_room(self, user_id):
+        yield self._simple_delete(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            desc="remove_from_user_in_public_room",
+        )
+        self.get_user_in_public_room.invalidate((user_id,))
+
+    def get_users_in_public_due_to_room(self, room_id):
+        """Get all user_ids that are in the room directory becuase they're
+        in the given room_id
+        """
+        return self._simple_select_onecol(
+            table="users_in_pubic_room",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_public_due_to_room",
+        )
+
+    def get_users_in_dir_due_to_room(self, room_id):
+        """Get all user_ids that are in the room directory becuase they're
+        in the given room_id
+        """
+        return self._simple_select_onecol(
+            table="user_directory",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_dir_due_to_room",
+        )
+
+    def get_all_rooms(self):
+        """Get all room_ids we've ever known about
+        """
+        return self._simple_select_onecol(
+            table="current_state_events",
+            keyvalues={},
+            retcol="DISTINCT room_id",
+            desc="get_all_rooms",
+        )
+
+    def delete_all_from_user_dir(self):
+        """Delete the entire user directory
+        """
+        def _delete_all_from_user_dir_txn(txn):
+            txn.execute("DELETE FROM user_directory")
+            txn.execute("DELETE FROM user_directory_search")
+            txn.execute("DELETE FROM users_in_pubic_room")
+            txn.call_after(self.get_user_in_directory.invalidate_all)
+            txn.call_after(self.get_user_in_public_room.invalidate_all)
+        return self.runInteraction(
+            "delete_all_from_user_dir", _delete_all_from_user_dir_txn
+        )
+
+    @cached()
+    def get_user_in_directory(self, user_id):
+        return self._simple_select_one(
+            table="user_directory",
+            keyvalues={"user_id": user_id},
+            retcols=("room_id", "display_name", "avatar_url",),
+            allow_none=True,
+            desc="get_user_in_directory",
+        )
+
+    @cached()
+    def get_user_in_public_room(self, user_id):
+        return self._simple_select_one(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            retcols=("room_id",),
+            allow_none=True,
+            desc="get_user_in_public_room",
+        )
+
+    def get_user_directory_stream_pos(self):
+        return self._simple_select_one_onecol(
+            table="user_directory_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="get_user_directory_stream_pos",
+        )
+
+    def update_user_directory_stream_pos(self, stream_id):
+        return self._simple_update_one(
+            table="user_directory_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="update_user_directory_stream_pos",
+        )
+
+    def get_current_state_deltas(self, prev_stream_id):
+        prev_stream_id = int(prev_stream_id)
+        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+            return []
+
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results.
+            # We arbitarily limit to 100 stream_id entries to ensure we don't
+            # select toooo many.
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            max_stream_id = prev_stream_id
+            for max_stream_id, count in txn:
+                total += count
+                if total > 100:
+                    # We arbitarily limit to 100 entries to ensure we don't
+                    # select toooo many.
+                    break
+
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self._simple_select_one_onecol(
+            table="current_state_delta_stream",
+            keyvalues={},
+            retcol="COALESCE(MAX(stream_id), -1)",
+            desc="get_max_stream_id_in_current_state_deltas",
+        )
+
+    @defer.inlineCallbacks
+    def search_user_dir(self, search_term, limit):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
+
+            # We order by rank and then if they have profile info
+            # The ranking algorithm is hand tweaked for "best" results. Broadly
+            # the idea is we give a higher weight to exact matches.
+            # The array of numbers are the weights for the various part of the
+            # search: (domain, _, display name, localpart)
+            sql = """
+                SELECT user_id, display_name, avatar_url
+                FROM user_directory_search
+                INNER JOIN user_directory USING (user_id)
+                INNER JOIN users_in_pubic_room USING (user_id)
+                WHERE vector @@ to_tsquery('english', ?)
+                ORDER BY
+                    2 * ts_rank_cd(
+                        '{0.1, 0.1, 0.9, 1.0}',
+                        vector,
+                        to_tsquery('english', ?),
+                        8
+                    )
+                    + ts_rank_cd(
+                        '{0.1, 0.1, 0.9, 1.0}',
+                        vector,
+                        to_tsquery('english', ?),
+                        8
+                    )
+                    DESC,
+                    display_name IS NULL,
+                    avatar_url IS NULL
+                LIMIT ?
+            """
+            args = (full_query, exact_query, prefix_query, limit + 1,)
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            search_query = _parse_query_sqlite(search_term)
+
+            sql = """
+                SELECT user_id, display_name, avatar_url
+                FROM user_directory_search
+                INNER JOIN user_directory USING (user_id)
+                INNER JOIN users_in_pubic_room USING (user_id)
+                WHERE value MATCH ?
+                ORDER BY
+                    rank(matchinfo(user_directory_search)) DESC,
+                    display_name IS NULL,
+                    avatar_url IS NULL
+                LIMIT ?
+            """
+            args = (search_query, limit + 1)
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        results = yield self._execute(
+            "search_user_dir", self.cursor_to_dict, sql, *args
+        )
+
+        limited = len(results) > limit
+
+        defer.returnValue({
+            "limited": limited,
+            "results": results,
+        })
+
+
+def _parse_query_sqlite(search_term):
+    """Takes a plain unicode string from the user and converts it into a form
+    that can be passed to database.
+    We use this so that we can add prefix matching, which isn't something
+    that is supported by default.
+
+    We specifically add both a prefix and non prefix matching term so that
+    exact matches get ranked higher.
+    """
+
+    # Pull out the individual words, discarding any non-word characters.
+    results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+    return " & ".join("(%s* | %s)" % (result, result,) for result in results)
+
+
+def _parse_query_postgres(search_term):
+    """Takes a plain unicode string from the user and converts it into a form
+    that can be passed to database.
+    We use this so that we can add prefix matching, which isn't something
+    that is supported by default.
+    """
+
+    # Pull out the individual words, discarding any non-word characters.
+    results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+
+    both = " & ".join("(%s:* | %s)" % (result, result,) for result in results)
+    exact = " & ".join("%s" % (result,) for result in results)
+    prefix = " & ".join("%s:*" % (result,) for result in results)
+
+    return both, exact, prefix
diff --git a/synapse/types.py b/synapse/types.py
index 445bdcb4d7..111948540d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -62,6 +62,13 @@ def get_domain_from_id(string):
     return string[idx + 1:]
 
 
+def get_localpart_from_id(string):
+    idx = string.find(":")
+    if idx == -1:
+        raise SynapseError(400, "Invalid ID: %r" % (string,))
+    return string[1:idx]
+
+
 class DomainSpecificString(
         namedtuple("DomainSpecificString", ("localpart", "domain"))
 ):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 48dcbafeef..cbdff86596 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -404,6 +404,7 @@ class CacheDescriptor(_CacheDescriptorBase):
 
         wrapped.invalidate_all = cache.invalidate_all
         wrapped.cache = cache
+        wrapped.num_args = self.num_args
 
         obj.__dict__[self.orig.__name__] = wrapped
 
@@ -451,8 +452,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
             )
 
     def __get__(self, obj, objtype=None):
-
-        cache = getattr(obj, self.cached_method_name).cache
+        cached_method = getattr(obj, self.cached_method_name)
+        cache = cached_method.cache
+        num_args = cached_method.num_args
 
         @functools.wraps(self.orig)
         def wrapped(*args, **kwargs):
@@ -469,12 +471,23 @@ class CacheListDescriptor(_CacheDescriptorBase):
             results = {}
             cached_defers = {}
             missing = []
-            for arg in list_args:
+
+            # If the cache takes a single arg then that is used as the key,
+            # otherwise a tuple is used.
+            if num_args == 1:
+                def cache_get(arg):
+                    return cache.get(arg, callback=invalidate_callback)
+            else:
                 key = list(keyargs)
-                key[self.list_pos] = arg
 
+                def cache_get(arg):
+                    key[self.list_pos] = arg
+                    return cache.get(tuple(key), callback=invalidate_callback)
+
+            for arg in list_args:
                 try:
-                    res = cache.get(tuple(key), callback=invalidate_callback)
+                    res = cache_get(arg)
+
                     if not isinstance(res, ObservableDeferred):
                         results[arg] = res
                     elif not res.has_succeeded():
@@ -505,17 +518,28 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
                     observer = ObservableDeferred(observer)
 
-                    key = list(keyargs)
-                    key[self.list_pos] = arg
-                    cache.set(
-                        tuple(key), observer,
-                        callback=invalidate_callback
-                    )
-
-                    def invalidate(f, key):
-                        cache.invalidate(key)
-                        return f
-                    observer.addErrback(invalidate, tuple(key))
+                    if num_args == 1:
+                        cache.set(
+                            arg, observer,
+                            callback=invalidate_callback
+                        )
+
+                        def invalidate(f, key):
+                            cache.invalidate(key)
+                            return f
+                        observer.addErrback(invalidate, arg)
+                    else:
+                        key = list(keyargs)
+                        key[self.list_pos] = arg
+                        cache.set(
+                            tuple(key), observer,
+                            callback=invalidate_callback
+                        )
+
+                        def invalidate(f, key):
+                            cache.invalidate(key)
+                            return f
+                        observer.addErrback(invalidate, tuple(key))
 
                     res = observer.observe()
                     res.addCallback(lambda r, arg: (arg, r), arg)
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index cb6933c61c..d4105822b3 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -23,7 +23,17 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))):
+class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))):
+    """Returned when getting an entry from the cache
+
+    Attributes:
+        full (bool): Whether the cache has the full or dict or just some keys.
+            If not full then not all requested keys will necessarily be present
+            in `value`
+        known_absent (set): Keys that were looked up in the dict and were not
+            there.
+        value (dict): The full or partial dict value
+    """
     def __len__(self):
         return len(self.value)
 
@@ -58,21 +68,31 @@ class DictionaryCache(object):
                 )
 
     def get(self, key, dict_keys=None):
+        """Fetch an entry out of the cache
+
+        Args:
+            key
+            dict_key(list): If given a set of keys then return only those keys
+                that exist in the cache.
+
+        Returns:
+            DictionaryEntry
+        """
         entry = self.cache.get(key, self.sentinel)
         if entry is not self.sentinel:
             self.metrics.inc_hits()
 
             if dict_keys is None:
-                return DictionaryEntry(entry.full, dict(entry.value))
+                return DictionaryEntry(entry.full, entry.known_absent, dict(entry.value))
             else:
-                return DictionaryEntry(entry.full, {
+                return DictionaryEntry(entry.full, entry.known_absent, {
                     k: entry.value[k]
                     for k in dict_keys
                     if k in entry.value
                 })
 
         self.metrics.inc_misses()
-        return DictionaryEntry(False, {})
+        return DictionaryEntry(False, set(), {})
 
     def invalidate(self, key):
         self.check_thread()
@@ -87,19 +107,34 @@ class DictionaryCache(object):
         self.sequence += 1
         self.cache.clear()
 
-    def update(self, sequence, key, value, full=False):
+    def update(self, sequence, key, value, full=False, known_absent=None):
+        """Updates the entry in the cache
+
+        Args:
+            sequence
+            key
+            value (dict): The value to update the cache with.
+            full (bool): Whether the given value is the full dict, or just a
+                partial subset there of. If not full then any existing entries
+                for the key will be updated.
+            known_absent (set): Set of keys that we know don't exist in the full
+                dict.
+        """
         self.check_thread()
         if self.sequence == sequence:
             # Only update the cache if the caches sequence number matches the
             # number that the cache had before the SELECT was started (SYN-369)
+            if known_absent is None:
+                known_absent = set()
             if full:
-                self._insert(key, value)
+                self._insert(key, value, known_absent)
             else:
-                self._update_or_insert(key, value)
+                self._update_or_insert(key, value, known_absent)
 
-    def _update_or_insert(self, key, value):
-        entry = self.cache.setdefault(key, DictionaryEntry(False, {}))
+    def _update_or_insert(self, key, value, known_absent):
+        entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
         entry.value.update(value)
+        entry.known_absent.update(known_absent)
 
-    def _insert(self, key, value):
-        self.cache[key] = DictionaryEntry(True, value)
+    def _insert(self, key, value, known_absent):
+        self.cache[key] = DictionaryEntry(True, known_absent, value)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 70fe00ce0b..609625b322 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -89,6 +89,21 @@ class StreamChangeCache(object):
 
         return result
 
+    def has_any_entity_changed(self, stream_pos):
+        """Returns if any entity has changed
+        """
+        assert type(stream_pos) is int
+
+        if stream_pos >= self._earliest_known_stream_pos:
+            self.metrics.inc_hits()
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return i < len(keys)
+        else:
+            self.metrics.inc_misses()
+            return True
+
     def get_all_entities_changed(self, stream_pos):
         """Returns all entites that have had new things since the given
         position. If the position is too old it will return None.
diff --git a/tests/test_state.py b/tests/test_state.py
index 6454f994e3..feb84f3d48 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -143,6 +143,7 @@ class StateTestCase(unittest.TestCase):
                 "add_event_hashes",
                 "get_events",
                 "get_next_state_group",
+                "get_state_group_delta",
             ]
         )
         hs = Mock(spec_set=[
@@ -154,6 +155,7 @@ class StateTestCase(unittest.TestCase):
         hs.get_auth.return_value = Auth(hs)
 
         self.store.get_next_state_group.side_effect = Mock
+        self.store.get_state_group_delta.return_value = (None, None)
 
         self.state = StateHandler(hs)
         self.event_id = 0
diff --git a/tests/util/test_dict_cache.py b/tests/util/test_dict_cache.py
index 272b71034a..bc92f85fa6 100644
--- a/tests/util/test_dict_cache.py
+++ b/tests/util/test_dict_cache.py
@@ -28,7 +28,7 @@ class DictCacheTestCase(unittest.TestCase):
         key = "test_simple_cache_hit_full"
 
         v = self.cache.get(key)
-        self.assertEqual((False, {}), v)
+        self.assertEqual((False, set(), {}), v)
 
         seq = self.cache.sequence
         test_value = {"test": "test_simple_cache_hit_full"}
diff --git a/tests/utils.py b/tests/utils.py
index d3d6c8021d..4f7e32b3ab 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -55,6 +55,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.password_providers = []
         config.worker_replication_url = ""
         config.worker_app = None
+        config.email_enable_notifs = False
 
     config.use_frozen_dicts = True
     config.database_config = {"name": "sqlite3"}